leveldb在内存存储为Memtable,为防止异常情况Memtable没来得及写入SSTable文件程序挂掉,leveldb首先会写记录进入log文件,再把记录写入Memtable。这样即使异常挂掉也可以从log文件恢复数据。
log相关的代码在
- db/log_format.h
- db/log_reader.h
- db/log_reader.cc
- db/log_writer.h
- db/log_writer.cc
1.log文件格式
log_format.h头文件
#ifndef STORAGE_LEVELDB_DB_LOG_FORMAT_H_
#define STORAGE_LEVELDB_DB_LOG_FORMAT_H_
namespace leveldb {
namespace log {
enum RecordType {
// Zero is reserved for preallocated files
kZeroType = 0,
kFullType = 1,
// For fragments
kFirstType = 2,
kMiddleType = 3,
kLastType = 4
};
static const int kMaxRecordType = kLastType;
static const int kBlockSize = 32768; //32K
// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;
} // namespace log
} // namespace leveldb
#endif // STORAGE_LEVELDB_DB_LOG_FORMAT_H_

从这头文件可以看出,log文件分块,每一块32768字节(32K),当一条记录过大在1个block装不下时,记录可以分几部分装在不同的block里,每一部分有一个7字节的头(4字节crc校验码+2字节数据长度+1字节type)。
为什么拿2字节来表示长度?因为每部分数据不可能超过32K( 2^16 > 32K > 2^8 ),两个字节足够了。
下图copy from web

2.log的写入逻辑
Status Writer::AddRecord(const Slice& slice) {
const char* ptr = slice.data();
size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do {
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize) {
// Switch to a new block
if (leftover > 0) {
// Fill the trailer (literal below relies on kHeaderSize being 7)
//当块剩余字节连头都装不下时填充0
assert(kHeaderSize == 7);
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
//根据标志、长度确定类型:
//kFullType(整条记录都在可以放到本block),
//kFirstType(第一部分放到本block) ,kMiddleType...kLastType..
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
if (begin && end) {
type = kFullType;
} else if (begin) {
type = kFirstType;
} else if (end) {
type = kLastType;
} else {
type = kMiddleType;
}
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) {
assert(n <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + n <= kBlockSize);
// Format the header
char buf[kHeaderSize];
buf[4] = static_cast<char>(n & 0xff);
buf[5] = static_cast<char>(n >> 8);
buf[6] = static_cast<char>(t);
// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok()) {
s = dest_->Append(Slice(ptr, n));
if (s.ok()) {
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + n;
return s;
}
循环计算长度、打上Type标签、计算CRC,然后Append到buffer中,再然后Flush到文件中(flush到文件中并不准确,暂且这么一说吧,为什么?见第4小结)。
3.基于POSIX的WritableFile接口实现
env.h是为跨平台准备的env接口,在这里我们只看其posix实现(env_posix.cc)
class PosixWritableFile : public WritableFile {
private:
// buf_[0, pos_-1] contains data to be written to fd_.
std::string filename_;
int fd_;
char buf_[kBufSize];
size_t pos_;
public:
PosixWritableFile(const std::string& fname, int fd)
: filename_(fname), fd_(fd), pos_(0) { }
~PosixWritableFile() {
if (fd_ >= 0) {
// Ignoring any potential errors
Close();
}
}
virtual Status Append(const Slice& data) {
size_t n = data.size();
const char* p = data.data();
// Fit as much as possible into buffer.
size_t copy = std::min(n, kBufSize - pos_);
memcpy(buf_ + pos_, p, copy);
p += copy;
n -= copy;
pos_ += copy;
if (n == 0) {
return Status::OK();
}
// Can't fit in buffer, so need to do at least one write.
Status s = FlushBuffered();
if (!s.ok()) {
return s;
}
// Small writes go to buffer, large writes are written directly.
if (n < kBufSize) {
memcpy(buf_, p, n);
pos_ = n;
return Status::OK();
}
return WriteRaw(p, n);
}
virtual Status Close() {
Status result = FlushBuffered();
const int r = close(fd_);
if (r < 0 && result.ok()) {
result = PosixError(filename_, errno);
}
fd_ = -1;
return result;
}
virtual Status Flush() {
return FlushBuffered();
}
Status SyncDirIfManifest() {
const char* f = filename_.c_str();
const char* sep = strrchr(f, '/');
Slice basename;
std::string dir;
if (sep == nullptr) {
dir = ".";
basename = f;
} else {
dir = std::string(f, sep - f);
basename = sep + 1;
}
Status s;
if (basename.starts_with("MANIFEST")) {
int fd = open(dir.c_str(), O_RDONLY);
if (fd < 0) {
s = PosixError(dir, errno);
} else {
if (fsync(fd) < 0) {
s = PosixError(dir, errno);
}
close(fd);
}
}
return s;
}
virtual Status Sync() {
// Ensure new files referred to by the manifest are in the filesystem.
Status s = SyncDirIfManifest();
if (!s.ok()) {
return s;
}
s = FlushBuffered();
if (s.ok()) {
if (fdatasync(fd_) != 0) {
s = PosixError(filename_, errno);
}
}
return s;
}
private:
Status FlushBuffered() {
Status s = WriteRaw(buf_, pos_);
pos_ = 0;
return s;
}
Status WriteRaw(const char* p, size_t n) {
while (n > 0) {
ssize_t r = write(fd_, p, n);
if (r < 0) {
if (errno == EINTR) {
continue; // Retry
}
return PosixError(filename_, errno);
}
p += r;
n -= r;
}
return Status::OK();
}
};
4.值得注意的fsync或fdatasync
在上一小结中有这么一个函数
// HAVE_FDATASYNC is defined in the auto-generated port_config.h, which is
// included by port_stdcxx.h.
#if !HAVE_FDATASYNC
#define fdatasync fsync
#endif // !HAVE_FDATASYNC
virtual Status Sync() {
// Ensure new files referred to by the manifest are in the filesystem.
Status s = SyncDirIfManifest();
if (!s.ok()) {
return s;
}
s = FlushBuffered();
if (s.ok()) {
if (fdatasync(fd_) != 0) {
s = PosixError(filename_, errno);
}
}
return s;
}
若果你熟悉标准C的fopen、fprintf,会知道fprintf是有缓冲区的,刷新这些缓冲区的方法是fflush,确保信息传递到OS,但并不意味着它在磁盘上,它也可以在OS中缓冲。这里的write虽然是系统调用,write后的数据有可能在OS中缓冲,fsync或者fdatasync 会确保OS缓冲区中的内容写入物理磁盘。
你也许会在其他日志记录库中看到此类操作:
fprintf (myFileHandle, "something\n"); // output it
fflush (myFileHandle); // flush to OS
fsync (fileno (myFileHandle)); // flush to disk
fileno是一个函数,可以得到FILE*文件句柄的基础文件描述符,fsync在描述符上执行确保数据刷到磁盘。
fsync是相对昂贵的操作,因为磁盘写入通常比内存写入慢得多。
可以看一下leveldb的注释
// Options that control write operations
struct LEVELDB_EXPORT WriteOptions {
// If true, the write will be flushed from the operating system
// buffer cache (by calling WritableFile::Sync()) before the write
// is considered complete. If this flag is true, writes will be
// slower.
//
// If this flag is false, and the machine crashes, some recent
// writes may be lost. Note that if it is just the process that
// crashes (i.e., the machine does not reboot), no writes will be
// lost even if sync==false.
//
// In other words, a DB write with sync==false has similar
// crash semantics as the "write()" system call. A DB write
// with sync==true has similar crash semantics to a "write()"
// system call followed by "fsync()".
//
// Default: false
bool sync;
WriteOptions()
: sync(false) {
}
};
默认情况下,sync的flag为false,后果是有可能会在异常的时候丢失一部分数据;如果为true的话write的时候会变慢。
参考:
https://stackoverflow.com/questions/10371017/fsync-vs-write-system-call
linux 同步IO: sync、fsync与fdatasync
5.log的读取逻辑
bool Reader::ReadRecord(Slice* record, std::string* scratch) {
if (last_record_offset_ < initial_offset_) {
if (!SkipToInitialBlock()) {
return false;
}
}
scratch->clear();
record->clear();
bool in_fragmented_record = false;
// Record offset of the logical record that we're reading
// 0 is a dummy value to make compilers happy
uint64_t prospective_record_offset = 0;
Slice fragment;
while (true) {
const unsigned int record_type = ReadPhysicalRecord(&fragment);
// ReadPhysicalRecord may have only had an empty trailer remaining in its
// internal buffer. Calculate the offset of the next physical record now
// that it has returned, properly accounting for its header size.
uint64_t physical_record_offset =
end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size();
if (resyncing_) {
if (record_type == kMiddleType) {
continue;
} else if (record_type == kLastType) {
resyncing_ = false;
continue;
} else {
resyncing_ = false;
}
}
switch (record_type) {
case kFullType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (!scratch->empty()) {
ReportCorruption(scratch->size(), "partial record without end(1)");
}
}
prospective_record_offset = physical_record_offset;
scratch->clear();
*record = fragment;
last_record_offset_ = prospective_record_offset;
return true;
case kFirstType:
if (in_fragmented_record) {
// Handle bug in earlier versions of log::Writer where
// it could emit an empty kFirstType record at the tail end
// of a block followed by a kFullType or kFirstType record
// at the beginning of the next block.
if (!scratch->empty()) {
ReportCorruption(scratch->size(), "partial record without end(2)");
}
}
prospective_record_offset = physical_record_offset;
scratch->assign(fragment.data(), fragment.size());
in_fragmented_record = true;
break;
case kMiddleType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(1)");
} else {
scratch->append(fragment.data(), fragment.size());
}
break;
case kLastType:
if (!in_fragmented_record) {
ReportCorruption(fragment.size(),
"missing start of fragmented record(2)");
} else {
scratch->append(fragment.data(), fragment.size());
*record = Slice(*scratch);
last_record_offset_ = prospective_record_offset;
return true;
}
break;
case kEof:
if (in_fragmented_record) {
// This can be caused by the writer dying immediately after
// writing a physical record but before completing the next; don't
// treat it as a corruption, just ignore the entire logical record.
scratch->clear();
}
return false;
case kBadRecord:
if (in_fragmented_record) {
ReportCorruption(scratch->size(), "error in middle of record");
in_fragmented_record = false;
scratch->clear();
}
break;
default: {
char buf[40];
snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
ReportCorruption(
(fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
buf);
in_fragmented_record = false;
scratch->clear();
break;
}
}
}
return false;
}
unsigned int Reader::ReadPhysicalRecord(Slice* result) {
while (true) {
if (buffer_.size() < kHeaderSize) {
if (!eof_) {
// Last read was a full read, so this is a trailer to skip
buffer_.clear();
//从文件读one block,放入backing_store_
//buffer_只是记录backing_store_的读取位置和所剩长度;
//ReadPhysicalRecord的调用者ReadRecord相当于循环从backing_store_读记录,
//当buffer_.size() < kHeaderSize 7时,就从实体文件读一个block(32K)
Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
end_of_buffer_offset_ += buffer_.size();
if (!status.ok()) {
buffer_.clear();
ReportDrop(kBlockSize, status);
eof_ = true;
return kEof;
} else if (buffer_.size() < kBlockSize) {
eof_ = true;
}
continue;
} else {
// Note that if buffer_ is non-empty, we have a truncated header at the
// end of the file, which can be caused by the writer crashing in the
// middle of writing the header. Instead of considering this an error,
// just report EOF.
buffer_.clear();
return kEof;
}
}
// Parse the header
const char* header = buffer_.data();
const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
const unsigned int type = header[6];
const uint32_t length = a | (b << 8);
if (kHeaderSize + length > buffer_.size()) {
size_t drop_size = buffer_.size();
buffer_.clear();
if (!eof_) {
ReportCorruption(drop_size, "bad record length");
return kBadRecord;
}
// If the end of the file has been reached without reading |length| bytes
// of payload, assume the writer died in the middle of writing the record.
// Don't report a corruption.
return kEof;
}
if (type == kZeroType && length == 0) {
// Skip zero length record without reporting any drops since
// such records are produced by the mmap based writing code in
// env_posix.cc that preallocates file regions.
buffer_.clear();
return kBadRecord;
}
// Check crc
if (checksum_) {
uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
if (actual_crc != expected_crc) {
// Drop the rest of the buffer since "length" itself may have
// been corrupted and if we trust it, we could find some
// fragment of a real log record that just happens to look
// like a valid log record.
size_t drop_size = buffer_.size();
buffer_.clear();
ReportCorruption(drop_size, "checksum mismatch");
return kBadRecord;
}
}
buffer_.remove_prefix(kHeaderSize + length);
// Skip physical record that started before initial_offset_
if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
initial_offset_) {
result->clear();
return kBadRecord;
}
*result = Slice(header + kHeaderSize, length);
return type;
}
}
ReadPhysicalRecord的调用者ReadRecord相当于循环从backing_store_读记录到scratch,buffer_记录backing_store_的读取位置和所剩长度,当buffer_.size() < kHeaderSize 7时,就从实体文件读一个block(32K)进backing_store_。其中还有一些CRC校验、类型判断、异常处理等。
注意buffer_(Slice)中没有实际数据,只有指向backing_store_(new char[kBlockSize])数据的指针,Slice的使用者保证在Slice的生命周期内外部数组是有效的。为什么不用直接用std::string,而是最后再把append到std::string里?mybe,每次读32K固定的数据,需要记录的是读取位置和剩余长度,slice和字符数组这样的组合 更适合这个场景。
6.总结
1.理解了log文件的结构,就很容易理解log读写的逻辑。
2.Slice这样看似简单的数据结构,初始自己感觉没有必要存在,仔细想想就会感觉到作者的设计精妙之处。
3.fsync/fdatasync 用或不用,可靠度和性能间取舍。
程序员之路,还是需要积累啊。ヾ(◍°∇°◍)ノ゙