leveldb varint 可变长整数

在读leveldb时看到Varint这个东西,瞬间想起了自己做协议分析、XYHY的时光了。GOOGLE的另一个开源项目protobuf也有varint这个东东。许多软件的私有通讯协议也有用到varint。

1.简介

Varint 是一种紧凑的表示数字的方法。它用一个或多个字节来表示一个数字,值越小的数字使用越少的字节数。这能减少用来表示数字的字节数。

比如对于 int32 类型的数字,一般需要 4 个 byte 来表示。但是采用 Varint,对于很小的 int32 类型的数字,则可以用 1 个 byte 来表示。当然凡事都有好的也有不好的一面,采用 Varint 表示法,大的数字则需要 5 个 byte 来表示。从统计的角度来说,一般不会所有的消息中的数字都是大数,因此大多数情况下,采用 Varint 后,可以用更少的字节数来表示数字信息。Varint 中的每个 byte 的最高位 bit 有特殊的含义,如果该位为 1,表示后续的 byte 也是该数字的一部分,如果该位为 0,则结束。

 

简单来说,可变长的整数,每个字节的最高位标识后续的byte是否还是该数字的一部分

因为所用到的数字(表示长度或者表示其他)往往比较小,用可变长整数信息的表示非常紧凑,自然需要更少的资源。比如(protobuf应用)网络上传输的字节数更少,需要的 IO 更少;(protobuf 或 leveldb)存盘的所用空间更少

2.leveldb实现

编码
暂时也没想清楚32位整数和64位整数编码,为什么写成两种风格while和if-if else if-else if?

char* EncodeVarint64(char* dst, uint64_t v) {
  static const int B = 128;
  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
  while (v >= B) {
    *(ptr++) = (v & (B-1)) | B;
    v >>= 7;
  }
  *(ptr++) = static_cast<unsigned char>(v);
  return reinterpret_cast<char*>(ptr);
}

char* EncodeVarint32(char* dst, uint32_t v) {
  // Operate on characters as unsigneds
  unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
  static const int B = 128;
  if (v < (1<<7)) {
    *(ptr++) = v;
  } else if (v < (1<<14)) { *(ptr++) = v | B; *(ptr++) = v>>7;
  } else if (v < (1<<21)) { *(ptr++) = v | B; *(ptr++) = (v>>7) | B;
    *(ptr++) = v>>14;
  } else if (v < (1<<28)) { *(ptr++) = v | B; *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = v>>21;
  } else {
    *(ptr++) = v | B;
    *(ptr++) = (v>>7) | B;
    *(ptr++) = (v>>14) | B;
    *(ptr++) = (v>>21) | B;
    *(ptr++) = v>>28;
  }
  return reinterpret_cast<char*>(ptr);
}

解码

const char* GetVarint32PtrFallback(const char* p,
                                   const char* limit,
                                   uint32_t* value) {
  uint32_t result = 0;
  for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
    uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
    p++;
    if (byte & 128) {
      // More bytes are present
      result |= ((byte & 127) << shift);
    } else {
      result |= (byte << shift);
      *value = result;
      return reinterpret_cast<const char*>(p);
    }
  }
  return nullptr;
}

inline const char* GetVarint32Ptr(const char* p,
                                  const char* limit,
                                  uint32_t* value) {
  if (p < limit) {
    uint32_t result = *(reinterpret_cast<const unsigned char*>(p));
    if ((result & 128) == 0) {
      *value = result;
      return p + 1;
    }
  }
  return GetVarint32PtrFallback(p, limit, value);
}


bool GetVarint32(Slice* input, uint32_t* value) {
  const char* p = input->data();
  const char* limit = p + input->size();
  const char* q = GetVarint32Ptr(p, limit, value);
  if (q == nullptr) {
    return false;
  } else {
    *input = Slice(q, limit - q);
    return true;
  }
}

const char* GetVarint64Ptr(const char* p, const char* limit, uint64_t* value) {
  uint64_t result = 0;
  for (uint32_t shift = 0; shift <= 63 && p < limit; shift += 7) {
    uint64_t byte = *(reinterpret_cast<const unsigned char*>(p));
    p++;
    if (byte & 128) {
      // More bytes are present
      result |= ((byte & 127) << shift);
    } else {
      result |= (byte << shift);
      *value = result;
      return reinterpret_cast<const char*>(p);
    }
  }
  return nullptr;
}

bool GetVarint64(Slice* input, uint64_t* value) {
  const char* p = input->data();
  const char* limit = p + input->size();
  const char* q = GetVarint64Ptr(p, limit, value);
  if (q == nullptr) {
    return false;
  } else {
    *input = Slice(q, limit - q);
    return true;
  }
}

3.protobuf实现

编码

inline uint8* CodedOutputStream::WriteVarint32ToArray(uint32 value,
                                                      uint8* target) {
  while (value >= 0x80) {
    *target = static_cast<uint8>(value | 0x80);
    value >>= 7;
    ++target;
  }
  *target = static_cast<uint8>(value);
  return target + 1;
}

inline uint8* CodedOutputStream::WriteVarint64ToArray(uint64 value,
                                                      uint8* target) {
  while (value >= 0x80) {
    *target = static_cast<uint8>(value | 0x80);
    value >>= 7;
    ++target;
  }
  *target = static_cast<uint8>(value);
  return target + 1;
}

解码

namespace {

// Decodes varint64 with known size, N, and returns next pointer. Knowing N at
// compile time, compiler can generate optimal code. For example, instead of
// subtracting 0x80 at each iteration, it subtracts properly shifted mask once.
template <size_t N>
const uint8* DecodeVarint64KnownSize(const uint8* buffer, uint64* value) {
  GOOGLE_DCHECK_GT(N, 0);
  uint64 result = static_cast<uint64>(buffer[N - 1]) << (7 * (N - 1));
  for (int i = 0, offset = 0; i < N - 1; i++, offset += 7) {
    result += static_cast<uint64>(buffer[i] - 0x80) << offset;
  }
  *value = result;
  return buffer + N;
}

// Read a varint from the given buffer, write it to *value, and return a pair.
// The first part of the pair is true iff the read was successful.  The second
// part is buffer + (number of bytes read).  This function is always inlined,
// so returning a pair is costless.
PROTOBUF_ALWAYS_INLINE
::std::pair<bool, const uint8*> ReadVarint32FromArray(
    uint32 first_byte, const uint8* buffer,
    uint32* value);
inline ::std::pair<bool, const uint8*> ReadVarint32FromArray(
    uint32 first_byte, const uint8* buffer, uint32* value) {
  // Fast path:  We have enough bytes left in the buffer to guarantee that
  // this read won't cross the end, so we can skip the checks.
  GOOGLE_DCHECK_EQ(*buffer, first_byte);
  GOOGLE_DCHECK_EQ(first_byte & 0x80, 0x80) << first_byte;
  const uint8* ptr = buffer;
  uint32 b;
  uint32 result = first_byte - 0x80;
  ++ptr;  // We just processed the first byte.  Move on to the second.
  b = *(ptr++); result += b <<  7; if (!(b & 0x80)) goto done;
  result -= 0x80 << 7;
  b = *(ptr++); result += b << 14; if (!(b & 0x80)) goto done;
  result -= 0x80 << 14;
  b = *(ptr++); result += b << 21; if (!(b & 0x80)) goto done;
  result -= 0x80 << 21;
  b = *(ptr++); result += b << 28; if (!(b & 0x80)) goto done;
  // "result -= 0x80 << 28" is irrevelant.

  // If the input is larger than 32 bits, we still need to read it all
  // and discard the high-order bits.
  for (int i = 0; i < kMaxVarintBytes - kMaxVarint32Bytes; i++) {
    b = *(ptr++); if (!(b & 0x80)) goto done;
  }

  // We have overrun the maximum size of a varint (10 bytes).  Assume
  // the data is corrupt.
  return std::make_pair(false, ptr);

 done:
  *value = result;
  return std::make_pair(true, ptr);
}

PROTOBUF_ALWAYS_INLINE::std::pair<bool, const uint8*> ReadVarint64FromArray(
    const uint8* buffer, uint64* value);
inline ::std::pair<bool, const uint8*> ReadVarint64FromArray(
    const uint8* buffer, uint64* value) {
  // Assumes varint64 is at least 2 bytes.
  GOOGLE_DCHECK_GE(buffer[0], 128);

  const uint8* next;
  if (buffer[1] < 128) {
    next = DecodeVarint64KnownSize<2>(buffer, value);
  } else if (buffer[2] < 128) {
    next = DecodeVarint64KnownSize<3>(buffer, value);
  } else if (buffer[3] < 128) {
    next = DecodeVarint64KnownSize<4>(buffer, value);
  } else if (buffer[4] < 128) {
    next = DecodeVarint64KnownSize<5>(buffer, value);
  } else if (buffer[5] < 128) {
    next = DecodeVarint64KnownSize<6>(buffer, value);
  } else if (buffer[6] < 128) {
    next = DecodeVarint64KnownSize<7>(buffer, value);
  } else if (buffer[7] < 128) {
    next = DecodeVarint64KnownSize<8>(buffer, value);
  } else if (buffer[8] < 128) {
    next = DecodeVarint64KnownSize<9>(buffer, value);
  } else if (buffer[9] < 128) {
    next = DecodeVarint64KnownSize<10>(buffer, value);
  } else {
    // We have overrun the maximum size of a varint (10 bytes). Assume
    // the data is corrupt.
    return std::make_pair(false, buffer + 11);
  }

  return std::make_pair(true, next);
}

}  // namespace

4.有符号整数的可变长表示

protobuf还用zigzag编码实现有符号整数可变长,先把有符号整数转换成无符号整数(zigzag),然后再用varint表示zigzag编码后的无符号整数。

注释及代码

// ZigZag Transform:  Encodes signed integers so that they can be
// effectively used with varint encoding.
//
// varint operates on unsigned integers, encoding smaller numbers into
// fewer bytes.  If you try to use it on a signed integer, it will treat
// this number as a very large unsigned integer, which means that even
// small signed numbers like -1 will take the maximum number of bytes
// (10) to encode.  ZigZagEncode() maps signed integers to unsigned
// in such a way that those with a small absolute value will have smaller
// encoded values, making them appropriate for encoding using varint.
//
//       int32 ->     uint32
// -------------------------
//           0 ->          0
//          -1 ->          1
//           1 ->          2
//          -2 ->          3
//         ... ->        ...
//  2147483647 -> 4294967294
// -2147483648 -> 4294967295
//
//        >> encode >>
//        << decode <<

inline uint32 WireFormatLite::ZigZagEncode32(int32 n) {
  // Note:  the right-shift must be arithmetic
  // Note:  left shift must be unsigned because of overflow
  //右移必须是有符号右移,左移必须是无符号左移,
  //很有意思值得思考,想通后豁然开朗,位运算用的简直出神入化
  return (static_cast<uint32>(n) << 1) ^ static_cast<uint32>(n >> 31);
}

inline int32 WireFormatLite::ZigZagDecode32(uint32 n) {
  // Note:  Using unsigned types prevent undefined behavior
  return static_cast<int32>((n >> 1) ^ (~(n & 1) + 1));
}

inline uint64 WireFormatLite::ZigZagEncode64(int64 n) {
  // Note:  the right-shift must be arithmetic
  // Note:  left shift must be unsigned because of overflow
  return (static_cast<uint64>(n) << 1) ^ static_cast<uint64>(n >> 63);
}

inline int64 WireFormatLite::ZigZagDecode64(uint64 n) {
  // Note:  Using unsigned types prevent undefined behavior
  return static_cast<int64>((n >> 1) ^ (~(n & 1) + 1));
}

5.协议实例(新浪微博私信)

私有协议中,一般都是LV(length-value)或者TLV(type-length-value)模式,在表示长度的时候一般是固定的字节(四字节或者二字节或者一字节),但随着可变长整数遇到的次数越来越多时,再做协议分析分析长度的时候我往往就留心这种方式了。

 

这里拿新浪微博私信(iPhone客户端或者Android客户端)协议做个例子,这种私有协议好多其他的都记不清了,微信好像也有这种用法。

 

我先后发了两条消息,一个较短,一个较长,全发的f

下面是抓包wireshark截图

第一条:

在这我们只关心表示消息长度的字段,其它字段不做分析,有兴趣可以自行抓包对比分析。

其实包的前四个字节0xca000000(小端)表示整个包的剩余长度,这就是常见的固定4字节整数,

而表示发送内容长度的这个整数只有一个字节0x3f  十进制63,表示消息长63字节。

别着急下定论,有时候XYHY不准确往往是因为分析不准确,何不把消息发的长一些?

 

第二条:

这时你会发现表示消息长度的这个整数是可变长的(小端),0xbb 02   用二进制表示10111011 00000010,去掉首位再转换大端拼接起来

变为0000010 0111011转换为十进制为315,消息的长度为315.    这样换算为了更好理解。

 

当时还不了解varint的时候,总结的算长度的算法:

例1:0xbb 02  =(0xbb – 0x80)+(0x02)*0x80

例2:0xbb bb 02 =(0xbb – 0x80)+(0xbb-0x80)*0x80 + (0x02)*0x80*0x80

简言之,如果大于0x7f就表示后边还有数字的一部分,也就是首位为1。减去0x80就表示这个字节的大小,再乘上0x80级数。

当时就是这样-+*根本没想到位运算。也是醉了。

程序员的的成长是个积累的过程,还是得多读多看。

参考:Google Protocol Buffer 的使用和原理

 

此条目发表在leveldb分类目录,贴了, , , , , 标签。将固定链接加入收藏夹。

发表评论

您的电子邮箱地址不会被公开。