目录
leveldb在内存存储为Memtable,为防止异常情况Memtable没来得及写入SSTable文件程序挂掉,leveldb首先会写记录进入log文件,再把记录写入Memtable。这样即使异常挂掉也可以从log文件恢复数据。
log相关的代码在
- db/log_format.h
- db/log_reader.h
- db/log_reader.cc
- db/log_writer.h
- db/log_writer.cc
1.log文件格式
log_format.h头文件
#ifndef STORAGE_LEVELDB_DB_LOG_FORMAT_H_ #define STORAGE_LEVELDB_DB_LOG_FORMAT_H_ namespace leveldb { namespace log { enum RecordType { // Zero is reserved for preallocated files kZeroType = 0, kFullType = 1, // For fragments kFirstType = 2, kMiddleType = 3, kLastType = 4 }; static const int kMaxRecordType = kLastType; static const int kBlockSize = 32768; //32K // Header is checksum (4 bytes), length (2 bytes), type (1 byte). static const int kHeaderSize = 4 + 2 + 1; } // namespace log } // namespace leveldb #endif // STORAGE_LEVELDB_DB_LOG_FORMAT_H_
从这头文件可以看出,log文件分块,每一块32768字节(32K),当一条记录过大在1个block装不下时,记录可以分几部分装在不同的block里,每一部分有一个7字节的头(4字节crc校验码+2字节数据长度+1字节type)。
为什么拿2字节来表示长度?因为每部分数据不可能超过32K( 2^16 > 32K > 2^8 ),两个字节足够了。
下图copy from web
2.log的写入逻辑
Status Writer::AddRecord(const Slice& slice) { const char* ptr = slice.data(); size_t left = slice.size(); // Fragment the record if necessary and emit it. Note that if slice // is empty, we still want to iterate once to emit a single // zero-length record Status s; bool begin = true; do { const int leftover = kBlockSize - block_offset_; assert(leftover >= 0); if (leftover < kHeaderSize) { // Switch to a new block if (leftover > 0) { // Fill the trailer (literal below relies on kHeaderSize being 7) //当块剩余字节连头都装不下时填充0 assert(kHeaderSize == 7); dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover)); } block_offset_ = 0; } // Invariant: we never leave < kHeaderSize bytes in a block. assert(kBlockSize - block_offset_ - kHeaderSize >= 0); //根据标志、长度确定类型: //kFullType(整条记录都在可以放到本block), //kFirstType(第一部分放到本block) ,kMiddleType...kLastType.. const size_t avail = kBlockSize - block_offset_ - kHeaderSize; const size_t fragment_length = (left < avail) ? left : avail; RecordType type; const bool end = (left == fragment_length); if (begin && end) { type = kFullType; } else if (begin) { type = kFirstType; } else if (end) { type = kLastType; } else { type = kMiddleType; } s = EmitPhysicalRecord(type, ptr, fragment_length); ptr += fragment_length; left -= fragment_length; begin = false; } while (s.ok() && left > 0); return s; } Status Writer::EmitPhysicalRecord(RecordType t, const char* ptr, size_t n) { assert(n <= 0xffff); // Must fit in two bytes assert(block_offset_ + kHeaderSize + n <= kBlockSize); // Format the header char buf[kHeaderSize]; buf[4] = static_cast<char>(n & 0xff); buf[5] = static_cast<char>(n >> 8); buf[6] = static_cast<char>(t); // Compute the crc of the record type and the payload. uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n); crc = crc32c::Mask(crc); // Adjust for storage EncodeFixed32(buf, crc); // Write the header and the payload Status s = dest_->Append(Slice(buf, kHeaderSize)); if (s.ok()) { s = dest_->Append(Slice(ptr, n)); if (s.ok()) { s = dest_->Flush(); } } block_offset_ += kHeaderSize + n; return s; }
循环计算长度、打上Type标签、计算CRC,然后Append到buffer中,再然后Flush到文件中(flush到文件中并不准确,暂且这么一说吧,为什么?见第4小结)。
3.基于POSIX的WritableFile接口实现
env.h是为跨平台准备的env接口,在这里我们只看其posix实现(env_posix.cc)
class PosixWritableFile : public WritableFile { private: // buf_[0, pos_-1] contains data to be written to fd_. std::string filename_; int fd_; char buf_[kBufSize]; size_t pos_; public: PosixWritableFile(const std::string& fname, int fd) : filename_(fname), fd_(fd), pos_(0) { } ~PosixWritableFile() { if (fd_ >= 0) { // Ignoring any potential errors Close(); } } virtual Status Append(const Slice& data) { size_t n = data.size(); const char* p = data.data(); // Fit as much as possible into buffer. size_t copy = std::min(n, kBufSize - pos_); memcpy(buf_ + pos_, p, copy); p += copy; n -= copy; pos_ += copy; if (n == 0) { return Status::OK(); } // Can't fit in buffer, so need to do at least one write. Status s = FlushBuffered(); if (!s.ok()) { return s; } // Small writes go to buffer, large writes are written directly. if (n < kBufSize) { memcpy(buf_, p, n); pos_ = n; return Status::OK(); } return WriteRaw(p, n); } virtual Status Close() { Status result = FlushBuffered(); const int r = close(fd_); if (r < 0 && result.ok()) { result = PosixError(filename_, errno); } fd_ = -1; return result; } virtual Status Flush() { return FlushBuffered(); } Status SyncDirIfManifest() { const char* f = filename_.c_str(); const char* sep = strrchr(f, '/'); Slice basename; std::string dir; if (sep == nullptr) { dir = "."; basename = f; } else { dir = std::string(f, sep - f); basename = sep + 1; } Status s; if (basename.starts_with("MANIFEST")) { int fd = open(dir.c_str(), O_RDONLY); if (fd < 0) { s = PosixError(dir, errno); } else { if (fsync(fd) < 0) { s = PosixError(dir, errno); } close(fd); } } return s; } virtual Status Sync() { // Ensure new files referred to by the manifest are in the filesystem. Status s = SyncDirIfManifest(); if (!s.ok()) { return s; } s = FlushBuffered(); if (s.ok()) { if (fdatasync(fd_) != 0) { s = PosixError(filename_, errno); } } return s; } private: Status FlushBuffered() { Status s = WriteRaw(buf_, pos_); pos_ = 0; return s; } Status WriteRaw(const char* p, size_t n) { while (n > 0) { ssize_t r = write(fd_, p, n); if (r < 0) { if (errno == EINTR) { continue; // Retry } return PosixError(filename_, errno); } p += r; n -= r; } return Status::OK(); } };
4.值得注意的fsync或fdatasync
在上一小结中有这么一个函数
// HAVE_FDATASYNC is defined in the auto-generated port_config.h, which is // included by port_stdcxx.h. #if !HAVE_FDATASYNC #define fdatasync fsync #endif // !HAVE_FDATASYNC virtual Status Sync() { // Ensure new files referred to by the manifest are in the filesystem. Status s = SyncDirIfManifest(); if (!s.ok()) { return s; } s = FlushBuffered(); if (s.ok()) { if (fdatasync(fd_) != 0) { s = PosixError(filename_, errno); } } return s; }
若果你熟悉标准C的fopen、fprintf,会知道fprintf是有缓冲区的,刷新这些缓冲区的方法是fflush,确保信息传递到OS,但并不意味着它在磁盘上,它也可以在OS中缓冲。这里的write虽然是系统调用,write后的数据有可能在OS中缓冲,fsync或者fdatasync 会确保OS缓冲区中的内容写入物理磁盘。
你也许会在其他日志记录库中看到此类操作:
fprintf (myFileHandle, "something\n"); // output it fflush (myFileHandle); // flush to OS fsync (fileno (myFileHandle)); // flush to disk
fileno是一个函数,可以得到FILE*文件句柄的基础文件描述符,fsync在描述符上执行确保数据刷到磁盘。
fsync是相对昂贵的操作,因为磁盘写入通常比内存写入慢得多。
可以看一下leveldb的注释
// Options that control write operations struct LEVELDB_EXPORT WriteOptions { // If true, the write will be flushed from the operating system // buffer cache (by calling WritableFile::Sync()) before the write // is considered complete. If this flag is true, writes will be // slower. // // If this flag is false, and the machine crashes, some recent // writes may be lost. Note that if it is just the process that // crashes (i.e., the machine does not reboot), no writes will be // lost even if sync==false. // // In other words, a DB write with sync==false has similar // crash semantics as the "write()" system call. A DB write // with sync==true has similar crash semantics to a "write()" // system call followed by "fsync()". // // Default: false bool sync; WriteOptions() : sync(false) { } };
默认情况下,sync的flag为false,后果是有可能会在异常的时候丢失一部分数据;如果为true的话write的时候会变慢。
参考:
https://stackoverflow.com/questions/10371017/fsync-vs-write-system-call
linux 同步IO: sync、fsync与fdatasync
5.log的读取逻辑
bool Reader::ReadRecord(Slice* record, std::string* scratch) { if (last_record_offset_ < initial_offset_) { if (!SkipToInitialBlock()) { return false; } } scratch->clear(); record->clear(); bool in_fragmented_record = false; // Record offset of the logical record that we're reading // 0 is a dummy value to make compilers happy uint64_t prospective_record_offset = 0; Slice fragment; while (true) { const unsigned int record_type = ReadPhysicalRecord(&fragment); // ReadPhysicalRecord may have only had an empty trailer remaining in its // internal buffer. Calculate the offset of the next physical record now // that it has returned, properly accounting for its header size. uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size() - kHeaderSize - fragment.size(); if (resyncing_) { if (record_type == kMiddleType) { continue; } else if (record_type == kLastType) { resyncing_ = false; continue; } else { resyncing_ = false; } } switch (record_type) { case kFullType: if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (!scratch->empty()) { ReportCorruption(scratch->size(), "partial record without end(1)"); } } prospective_record_offset = physical_record_offset; scratch->clear(); *record = fragment; last_record_offset_ = prospective_record_offset; return true; case kFirstType: if (in_fragmented_record) { // Handle bug in earlier versions of log::Writer where // it could emit an empty kFirstType record at the tail end // of a block followed by a kFullType or kFirstType record // at the beginning of the next block. if (!scratch->empty()) { ReportCorruption(scratch->size(), "partial record without end(2)"); } } prospective_record_offset = physical_record_offset; scratch->assign(fragment.data(), fragment.size()); in_fragmented_record = true; break; case kMiddleType: if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(1)"); } else { scratch->append(fragment.data(), fragment.size()); } break; case kLastType: if (!in_fragmented_record) { ReportCorruption(fragment.size(), "missing start of fragmented record(2)"); } else { scratch->append(fragment.data(), fragment.size()); *record = Slice(*scratch); last_record_offset_ = prospective_record_offset; return true; } break; case kEof: if (in_fragmented_record) { // This can be caused by the writer dying immediately after // writing a physical record but before completing the next; don't // treat it as a corruption, just ignore the entire logical record. scratch->clear(); } return false; case kBadRecord: if (in_fragmented_record) { ReportCorruption(scratch->size(), "error in middle of record"); in_fragmented_record = false; scratch->clear(); } break; default: { char buf[40]; snprintf(buf, sizeof(buf), "unknown record type %u", record_type); ReportCorruption( (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), buf); in_fragmented_record = false; scratch->clear(); break; } } } return false; } unsigned int Reader::ReadPhysicalRecord(Slice* result) { while (true) { if (buffer_.size() < kHeaderSize) { if (!eof_) { // Last read was a full read, so this is a trailer to skip buffer_.clear(); //从文件读one block,放入backing_store_ //buffer_只是记录backing_store_的读取位置和所剩长度; //ReadPhysicalRecord的调用者ReadRecord相当于循环从backing_store_读记录, //当buffer_.size() < kHeaderSize 7时,就从实体文件读一个block(32K) Status status = file_->Read(kBlockSize, &buffer_, backing_store_); end_of_buffer_offset_ += buffer_.size(); if (!status.ok()) { buffer_.clear(); ReportDrop(kBlockSize, status); eof_ = true; return kEof; } else if (buffer_.size() < kBlockSize) { eof_ = true; } continue; } else { // Note that if buffer_ is non-empty, we have a truncated header at the // end of the file, which can be caused by the writer crashing in the // middle of writing the header. Instead of considering this an error, // just report EOF. buffer_.clear(); return kEof; } } // Parse the header const char* header = buffer_.data(); const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; const unsigned int type = header[6]; const uint32_t length = a | (b << 8); if (kHeaderSize + length > buffer_.size()) { size_t drop_size = buffer_.size(); buffer_.clear(); if (!eof_) { ReportCorruption(drop_size, "bad record length"); return kBadRecord; } // If the end of the file has been reached without reading |length| bytes // of payload, assume the writer died in the middle of writing the record. // Don't report a corruption. return kEof; } if (type == kZeroType && length == 0) { // Skip zero length record without reporting any drops since // such records are produced by the mmap based writing code in // env_posix.cc that preallocates file regions. buffer_.clear(); return kBadRecord; } // Check crc if (checksum_) { uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); uint32_t actual_crc = crc32c::Value(header + 6, 1 + length); if (actual_crc != expected_crc) { // Drop the rest of the buffer since "length" itself may have // been corrupted and if we trust it, we could find some // fragment of a real log record that just happens to look // like a valid log record. size_t drop_size = buffer_.size(); buffer_.clear(); ReportCorruption(drop_size, "checksum mismatch"); return kBadRecord; } } buffer_.remove_prefix(kHeaderSize + length); // Skip physical record that started before initial_offset_ if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length < initial_offset_) { result->clear(); return kBadRecord; } *result = Slice(header + kHeaderSize, length); return type; } }
ReadPhysicalRecord的调用者ReadRecord相当于循环从backing_store_读记录到scratch,buffer_记录backing_store_的读取位置和所剩长度,当buffer_.size() < kHeaderSize 7时,就从实体文件读一个block(32K)进backing_store_。其中还有一些CRC校验、类型判断、异常处理等。
注意buffer_(Slice)中没有实际数据,只有指向backing_store_(new char[kBlockSize])数据的指针,Slice的使用者保证在Slice的生命周期内外部数组是有效的。为什么不用直接用std::string,而是最后再把append到std::string里?mybe,每次读32K固定的数据,需要记录的是读取位置和剩余长度,slice和字符数组这样的组合 更适合这个场景。
6.总结
1.理解了log文件的结构,就很容易理解log读写的逻辑。
2.Slice这样看似简单的数据结构,初始自己感觉没有必要存在,仔细想想就会感觉到作者的设计精妙之处。
3.fsync/fdatasync 用或不用,可靠度和性能间取舍。
程序员之路,还是需要积累啊。ヾ(◍°∇°◍)ノ゙