目录
在读leveldb时看到Varint这个东西,瞬间想起了自己做协议分析、XYHY的时光了。GOOGLE的另一个开源项目protobuf也有varint这个东东。许多软件的私有通讯协议也有用到varint。
1.简介
Varint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。
比如对于 int32 类型的数字,一般需要 4 个 byte 来表示。但是采用 Varint,对于很小的 int32 类型的数字,则可以用 1 个 byte 来表示。当然凡事都有好的也有不好的一面,采用 Varint 表示法,大的数字则需要 5 个 byte 来表示。从统计的角度来说,一般不会所有的消息中的数字都是大数,因此大多数情况下,采用 Varint 后,可以用更少的字节数来表示数字信息。Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。
【简单来说,可变长的整数,每个字节的最高位标识后续的byte是否还是该数字的一部分】
【因为所用到的数字(表示长度或者表示其他)往往比较小,用可变长整数信息的表示非常紧凑,自然需要更少的资源。比如(protobuf应用)网络上传输的字节数更少,需要的 IO 更少;(protobuf 或 leveldb)存盘的所用空间更少】
2.leveldb实现
编码
暂时也没想清楚32位整数和64位整数编码,为什么写成两种风格while和if-if else if-else if?
char* EncodeVarint64(char* dst, uint64_t v) { static const int B = 128; unsigned char* ptr = reinterpret_cast<unsigned char*>(dst); while (v >= B) { *(ptr++) = (v & (B-1)) | B; v >>= 7; } *(ptr++) = static_cast<unsigned char>(v); return reinterpret_cast<char*>(ptr); } char* EncodeVarint32(char* dst, uint32_t v) { // Operate on characters as unsigneds unsigned char* ptr = reinterpret_cast<unsigned char*>(dst); static const int B = 128; if (v < (1<<7)) { *(ptr++) = v; } else if (v < (1<<14)) { *(ptr++) = v | B; *(ptr++) = v>>7; } else if (v < (1<<21)) { *(ptr++) = v | B; *(ptr++) = (v>>7) | B; *(ptr++) = v>>14; } else if (v < (1<<28)) { *(ptr++) = v | B; *(ptr++) = (v>>7) | B; *(ptr++) = (v>>14) | B; *(ptr++) = v>>21; } else { *(ptr++) = v | B; *(ptr++) = (v>>7) | B; *(ptr++) = (v>>14) | B; *(ptr++) = (v>>21) | B; *(ptr++) = v>>28; } return reinterpret_cast<char*>(ptr); }
解码
const char* GetVarint32PtrFallback(const char* p, const char* limit, uint32_t* value) { uint32_t result = 0; for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) { uint32_t byte = *(reinterpret_cast<const unsigned char*>(p)); p++; if (byte & 128) { // More bytes are present result |= ((byte & 127) << shift); } else { result |= (byte << shift); *value = result; return reinterpret_cast<const char*>(p); } } return nullptr; } inline const char* GetVarint32Ptr(const char* p, const char* limit, uint32_t* value) { if (p < limit) { uint32_t result = *(reinterpret_cast<const unsigned char*>(p)); if ((result & 128) == 0) { *value = result; return p + 1; } } return GetVarint32PtrFallback(p, limit, value); } bool GetVarint32(Slice* input, uint32_t* value) { const char* p = input->data(); const char* limit = p + input->size(); const char* q = GetVarint32Ptr(p, limit, value); if (q == nullptr) { return false; } else { *input = Slice(q, limit - q); return true; } } const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* value) { uint64_t result = 0; for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) { uint64_t byte = *(reinterpret_cast<const unsigned char*>(p)); p++; if (byte & 128) { // More bytes are present result |= ((byte & 127) << shift); } else { result |= (byte << shift); *value = result; return reinterpret_cast<const char*>(p); } } return nullptr; } bool GetVarint64(Slice* input, uint64_t* value) { const char* p = input->data(); const char* limit = p + input->size(); const char* q = GetVarint64Ptr(p, limit, value); if (q == nullptr) { return false; } else { *input = Slice(q, limit - q); return true; } }
3.protobuf实现
编码
inline uint8* CodedOutputStream::WriteVarint32ToArray(uint32 value, uint8* target) { while (value >= 0x80) { *target = static_cast<uint8>(value | 0x80); value >>= 7; ++target; } *target = static_cast<uint8>(value); return target + 1; } inline uint8* CodedOutputStream::WriteVarint64ToArray(uint64 value, uint8* target) { while (value >= 0x80) { *target = static_cast<uint8>(value | 0x80); value >>= 7; ++target; } *target = static_cast<uint8>(value); return target + 1; }
解码
namespace { // Decodes varint64 with known size, N, and returns next pointer. Knowing N at // compile time, compiler can generate optimal code. For example, instead of // subtracting 0x80 at each iteration, it subtracts properly shifted mask once. template <size_t N> const uint8* DecodeVarint64KnownSize(const uint8* buffer, uint64* value) { GOOGLE_DCHECK_GT(N, 0); uint64 result = static_cast<uint64>(buffer[N - 1]) << (7 * (N - 1)); for (int i = 0, offset = 0; i < N - 1; i++, offset += 7) { result += static_cast<uint64>(buffer[i] - 0x80) << offset; } *value = result; return buffer + N; } // Read a varint from the given buffer, write it to *value, and return a pair. // The first part of the pair is true iff the read was successful. The second // part is buffer + (number of bytes read). This function is always inlined, // so returning a pair is costless. PROTOBUF_ALWAYS_INLINE ::std::pair<bool, const uint8*> ReadVarint32FromArray( uint32 first_byte, const uint8* buffer, uint32* value); inline ::std::pair<bool, const uint8*> ReadVarint32FromArray( uint32 first_byte, const uint8* buffer, uint32* value) { // Fast path: We have enough bytes left in the buffer to guarantee that // this read won't cross the end, so we can skip the checks. GOOGLE_DCHECK_EQ(*buffer, first_byte); GOOGLE_DCHECK_EQ(first_byte & 0x80, 0x80) << first_byte; const uint8* ptr = buffer; uint32 b; uint32 result = first_byte - 0x80; ++ptr; // We just processed the first byte. Move on to the second. b = *(ptr++); result += b << 7; if (!(b & 0x80)) goto done; result -= 0x80 << 7; b = *(ptr++); result += b << 14; if (!(b & 0x80)) goto done; result -= 0x80 << 14; b = *(ptr++); result += b << 21; if (!(b & 0x80)) goto done; result -= 0x80 << 21; b = *(ptr++); result += b << 28; if (!(b & 0x80)) goto done; // "result -= 0x80 << 28" is irrevelant. // If the input is larger than 32 bits, we still need to read it all // and discard the high-order bits. for (int i = 0; i < kMaxVarintBytes - kMaxVarint32Bytes; i++) { b = *(ptr++); if (!(b & 0x80)) goto done; } // We have overrun the maximum size of a varint (10 bytes). Assume // the data is corrupt. return std::make_pair(false, ptr); done: *value = result; return std::make_pair(true, ptr); } PROTOBUF_ALWAYS_INLINE::std::pair<bool, const uint8*> ReadVarint64FromArray( const uint8* buffer, uint64* value); inline ::std::pair<bool, const uint8*> ReadVarint64FromArray( const uint8* buffer, uint64* value) { // Assumes varint64 is at least 2 bytes. GOOGLE_DCHECK_GE(buffer[0], 128); const uint8* next; if (buffer[1] < 128) { next = DecodeVarint64KnownSize<2>(buffer, value); } else if (buffer[2] < 128) { next = DecodeVarint64KnownSize<3>(buffer, value); } else if (buffer[3] < 128) { next = DecodeVarint64KnownSize<4>(buffer, value); } else if (buffer[4] < 128) { next = DecodeVarint64KnownSize<5>(buffer, value); } else if (buffer[5] < 128) { next = DecodeVarint64KnownSize<6>(buffer, value); } else if (buffer[6] < 128) { next = DecodeVarint64KnownSize<7>(buffer, value); } else if (buffer[7] < 128) { next = DecodeVarint64KnownSize<8>(buffer, value); } else if (buffer[8] < 128) { next = DecodeVarint64KnownSize<9>(buffer, value); } else if (buffer[9] < 128) { next = DecodeVarint64KnownSize<10>(buffer, value); } else { // We have overrun the maximum size of a varint (10 bytes). Assume // the data is corrupt. return std::make_pair(false, buffer + 11); } return std::make_pair(true, next); } } // namespace
4.有符号整数的可变长表示
protobuf还用zigzag编码实现有符号整数可变长,先把有符号整数转换成无符号整数(zigzag),然后再用varint表示zigzag编码后的无符号整数。
注释及代码
// ZigZag Transform: Encodes signed integers so that they can be // effectively used with varint encoding. // // varint operates on unsigned integers, encoding smaller numbers into // fewer bytes. If you try to use it on a signed integer, it will treat // this number as a very large unsigned integer, which means that even // small signed numbers like -1 will take the maximum number of bytes // (10) to encode. ZigZagEncode() maps signed integers to unsigned // in such a way that those with a small absolute value will have smaller // encoded values, making them appropriate for encoding using varint. // // int32 -> uint32 // ------------------------- // 0 -> 0 // -1 -> 1 // 1 -> 2 // -2 -> 3 // ... -> ... // 2147483647 -> 4294967294 // -2147483648 -> 4294967295 // // >> encode >> // << decode << inline uint32 WireFormatLite::ZigZagEncode32(int32 n) { // Note: the right-shift must be arithmetic // Note: left shift must be unsigned because of overflow //右移必须是有符号右移,左移必须是无符号左移, //很有意思值得思考,想通后豁然开朗,位运算用的简直出神入化 return (static_cast<uint32>(n) << 1) ^ static_cast<uint32>(n >> 31); } inline int32 WireFormatLite::ZigZagDecode32(uint32 n) { // Note: Using unsigned types prevent undefined behavior return static_cast<int32>((n >> 1) ^ (~(n & 1) + 1)); } inline uint64 WireFormatLite::ZigZagEncode64(int64 n) { // Note: the right-shift must be arithmetic // Note: left shift must be unsigned because of overflow return (static_cast<uint64>(n) << 1) ^ static_cast<uint64>(n >> 63); } inline int64 WireFormatLite::ZigZagDecode64(uint64 n) { // Note: Using unsigned types prevent undefined behavior return static_cast<int64>((n >> 1) ^ (~(n & 1) + 1)); }
5.协议实例(新浪微博私信)
私有协议中,一般都是LV(length-value)或者TLV(type-length-value)模式,在表示长度的时候一般是固定的字节(四字节或者二字节或者一字节),但随着可变长整数遇到的次数越来越多时,再做协议分析分析长度的时候我往往就留心这种方式了。
这里拿新浪微博私信(iPhone客户端或者Android客户端)协议做个例子,这种私有协议好多其他的都记不清了,微信好像也有这种用法。
我先后发了两条消息,一个较短,一个较长,全发的f
下面是抓包wireshark截图
第一条:
在这我们只关心表示消息长度的字段,其它字段不做分析,有兴趣可以自行抓包对比分析。
其实包的前四个字节0xca000000(小端)表示整个包的剩余长度,这就是常见的固定4字节整数,
而表示发送内容长度的这个整数只有一个字节0x3f 十进制63,表示消息长63字节。
别着急下定论,有时候XYHY不准确往往是因为分析不准确,何不把消息发的长一些?
第二条:
这时你会发现表示消息长度的这个整数是可变长的(小端),0xbb 02 用二进制表示10111011 00000010,去掉首位再转换大端拼接起来
变为0000010 0111011转换为十进制为315,消息的长度为315. 这样换算为了更好理解。
当时还不了解varint的时候,总结的算长度的算法:
例1:0xbb 02 =(0xbb – 0x80)+(0x02)*0x80
例2:0xbb bb 02 =(0xbb – 0x80)+(0xbb-0x80)*0x80 + (0x02)*0x80*0x80
简言之,如果大于0x7f就表示后边还有数字的一部分,也就是首位为1。减去0x80就表示这个字节的大小,再乘上0x80级数。
当时就是这样-+*根本没想到位运算。也是醉了。
程序员的的成长是个积累的过程,还是得多读多看。
参考:Google Protocol Buffer 的使用和原理