diff -ruN redis-rdb-tools-0.1.15/rdbtools/parser.py redis-rdb-tools-master/rdbtools/parser.py --- redis-rdb-tools-0.1.15/rdbtools/parser.py 2024-06-03 11:08:18.192245124 -0400 +++ redis-rdb-tools-master/rdbtools/parser.py 2023-04-03 06:50:37.000000000 -0400 @@ -7,6 +7,8 @@ from .compat import range, str2regexp from .iowrapper import IOWrapper +from ctypes import c_int16 + try: try: from cStringIO import StringIO as BytesIO @@ -52,6 +54,12 @@ REDIS_RDB_TYPE_HASH_ZIPLIST = 13 REDIS_RDB_TYPE_LIST_QUICKLIST = 14 REDIS_RDB_TYPE_STREAM_LISTPACKS = 15 +REDIS_RDB_TYPE_HASH_LISTPACK = 16 +REDIS_RDB_TYPE_ZSET_LISTPACK = 17 +REDIS_RDB_TYPE_LIST_QUICKLIST_2 = 18 +REDIS_RDB_TYPE_STREAM_LISTPACKS_2 = 19 +REDIS_RDB_TYPE_SET_LISTPACK = 20 +REDIS_RDB_TYPE_STREAM_LISTPACKS_3 = 21 REDIS_RDB_ENC_INT8 = 0 REDIS_RDB_ENC_INT16 = 1 @@ -67,7 +75,8 @@ DATA_TYPE_MAPPING = { 0 : "string", 1 : "list", 2 : "set", 3 : "sortedset", 4 : "hash", 5 : "sortedset", 6 : "module", 7: "module", - 9 : "hash", 10 : "list", 11 : "set", 12 : "sortedset", 13 : "hash", 14 : "list", 15 : "stream"} + 9 : "hash", 10 : "list", 11 : "set", 12 : "sortedset", 13 : "hash", 14 : "list", 15 : "stream", 16 : "hash", + 17: "sortedset", 18: "list", 19: "stream", 20: "set", 21: "stream"} class RdbCallback(object): """ @@ -586,8 +595,18 @@ raise Exception('read_object', 'Unable to read Redis Modules RDB objects (key %s)' % self._key) elif enc_type == REDIS_RDB_TYPE_MODULE_2: self.read_module(f) - elif enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS: - self.read_stream(f) + elif enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS or \ + enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_2 or \ + enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_3: + self.read_stream(f, enc_type) + elif enc_type == REDIS_RDB_TYPE_HASH_LISTPACK: + self.read_hash_from_listpack(f) + elif enc_type == REDIS_RDB_TYPE_ZSET_LISTPACK: + self.read_zset_from_listpack(f) + elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST_2: + self.read_list_from_quicklist2(f) + elif enc_type == REDIS_RDB_TYPE_SET_LISTPACK: + self.read_set_from_listpack(f) else: raise Exception('read_object', 'Invalid object type %d for key %s' % (enc_type, self._key)) @@ -655,8 +674,18 @@ raise Exception('skip_object', 'Unable to skip Redis Modules RDB objects (key %s)' % self._key) elif enc_type == REDIS_RDB_TYPE_MODULE_2: self.skip_module(f) - elif enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS: - self.skip_stream(f) + elif enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS or \ + enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_2 or \ + enc_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_3: + self.skip_stream(f, enc_type) + elif enc_type == REDIS_RDB_TYPE_HASH_LISTPACK: + skip_strings = 1 + elif enc_type == REDIS_RDB_TYPE_ZSET_LISTPACK: + skip_strings = 1 + elif enc_type == REDIS_RDB_TYPE_LIST_QUICKLIST_2: + self.skip_list_from_quicklist2(f) + elif enc_type == REDIS_RDB_TYPE_SET_LISTPACK: + skip_strings = 1 else: raise Exception('skip_object', 'Invalid object type %d for key %s' % (enc_type, self._key)) for x in range(0, skip_strings): @@ -871,7 +900,7 @@ iowrapper.stop_recording() self._callback.end_module(self._key, buffer_size=iowrapper.get_recorded_size(), buffer=buffer) - def skip_stream(self, f): + def skip_stream(self, f, rdb_type): listpacks = self.read_length(f) for _lp in range(listpacks): self.skip_string(f) @@ -879,6 +908,12 @@ self.read_length(f) self.read_length(f) self.read_length(f) + if rdb_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_2: + self.read_length(f) + self.read_length(f) + self.read_length(f) + self.read_length(f) + self.read_length(f) cgroups = self.read_length(f) for _cg in range(cgroups): self.skip_string(f) @@ -893,10 +928,18 @@ for _c in range(consumers): self.skip_string(f) f.read(8) + if rdb_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_3: + f.read(8) pending = self.read_length(f) f.read(pending*16) - def read_stream(self, f): + def skip_list_from_quicklist2(self, f): + count = self.read_length(f) + for i in range(0, count): + self.read_length(f) + self.read_string(f) + + def read_stream(self, f, rdb_type): listpacks = self.read_length(f) self._callback.start_stream(self._key, listpacks, self._expiry, info={'encoding': 'listpack', 'idle': self._idle, 'freq': self._freq}) @@ -904,6 +947,10 @@ self._callback.stream_listpack(self._key, self.read_string(f), self.read_string(f)) items = self.read_length(f) last_entry_id = "%s-%s" % (self.read_length(f), self.read_length(f)) + if rdb_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_2: + first_entry_id = "%s-%s" % (self.read_length(f), self.read_length(f)) + max_deleted_entry_id = "%s-%s" % (self.read_length(f), self.read_length(f)) + entries_added = self.read_length(f) cgroups = self.read_length(f) cgroups_data = [] for _cg in range(cgroups): @@ -923,13 +970,17 @@ for _c in range(consumers): cname = self.read_string(f) seen_time = read_milliseconds_time(f) + active_time = seen_time + if rdb_type == REDIS_RDB_TYPE_STREAM_LISTPACKS_3: + active_time = read_milliseconds_time(f) pending = self.read_length(f) consumer_pending_entries = [] - for _pel in range( pending): + for _pel in range(pending): eid = f.read(16) consumer_pending_entries.append({'id': eid}) consumers_data.append({'name': cname, 'seen_time': seen_time, + 'active_time': active_time, 'pending': consumer_pending_entries}) cgroups_data.append({'name': cgname, 'last_entry_id': last_cg_entry_id, @@ -937,6 +988,116 @@ 'consumers': consumers_data}) self._callback.end_stream(self._key, items, last_entry_id, cgroups_data) + + def read_hash_from_listpack(self, f): + raw_string = self.read_string(f) + buff = BytesIO(raw_string) + lpbytes = read_unsigned_int(buff) + num_entries = read_unsigned_short(buff) + if (num_entries % 2) : + raise Exception('read_hash_from_listpack', "Expected even number of elements, but found %d for key %s" % (num_entries, self._key)) + num_entries = num_entries // 2 + self._callback.start_hash(self._key, num_entries, self._expiry, info={'encoding':'listpack', 'sizeof_value':len(raw_string),'idle':self._idle,'freq':self._freq}) + for x in range(0, num_entries) : + field = self.read_listpack_entry(buff) + value = self.read_listpack_entry(buff) + self._callback.hset(self._key, field, value) + listpack_end = read_unsigned_char(buff) + if listpack_end != 255: + raise Exception('read_hash_from_listpack', "Invalid listpack end - %d for key %s" % (listpack_end, self._key)) + self._callback.end_hash(self._key) + + def read_listpack_entry(self, f): + length = 0 + value = None + bytes = [] + bytes.append(read_unsigned_char(f)) + encoding_type = bytes[0] + if (encoding_type >> 7) == 0: + value = encoding_type & 0x7F + elif (encoding_type >> 6) == 2: + length = encoding_type & 0x3F + value = f.read(length) + elif (encoding_type >> 5) == 6: + bytes.append(read_unsigned_char(f)) + value = (c_int16(((bytes[0] & 0x1F) << 11) | (bytes[1]) << 3).value) >> 3 + elif (encoding_type >> 4) == 14: + length = ((encoding_type & 0xF) << 8) | read_unsigned_char(f) + value = f.read(length) + elif encoding_type == 240: + length = read_unsigned_int(f) + value = f.read(length) + elif encoding_type == 241: + value = read_signed_short(f) + elif encoding_type == 242: + value = read_24bit_signed_number(f) + elif encoding_type == 243: + value = read_signed_int(f) + elif encoding_type == 244: + value = read_signed_long(f) + else: + raise Exception('read_listpack_entry', 'Invalid encoding_type %d for key %s' % (encoding_type, self._key)) + + assert 0 <= length <= 34359738367 + while True: + read_unsigned_char(f) + length >>= 7 + if length == 0: + break + return value + + def read_zset_from_listpack(self, f): + raw_string = self.read_string(f) + buff = BytesIO(raw_string) + lpbytes = read_unsigned_int(buff) + num_entries = read_unsigned_short(buff) + if (num_entries % 2) : + raise Exception('read_zset_from_listpack', "Expected even number of elements, but found %d for key %s" % (num_entries, self._key)) + num_entries = num_entries // 2 + self._callback.start_sorted_set(self._key, num_entries, self._expiry, info={'encoding':'listpack', 'idle':self._idle,'freq':self._freq}) + for x in range(0, num_entries) : + member = self.read_listpack_entry(buff) + score = self.read_listpack_entry(buff) + if isinstance(score, bytes) : + score = float(score) + self._callback.zadd(self._key, score, member) + listpack_end = read_unsigned_char(buff) + if listpack_end != 255 : + raise Exception('read_zset_from_listpack', "Invalid listpack end - %d for key %s" % (listpack_end, self._key)) + self._callback.end_sorted_set(self._key) + + def read_list_from_quicklist2(self, f): + count = self.read_length(f) + total_size = 0 + self._callback.start_list(self._key, self._expiry, info={'encoding': 'quicklist2', 'listpacks': count,'idle':self._idle,'freq':self._freq}) + for i in range(0, count): + container = self.read_length(f) + raw_string = self.read_string(f) + buff = BytesIO(raw_string) + lpbytes = read_unsigned_int(buff) + num_entries = read_unsigned_short(buff) + for x in range(0, num_entries): + self._callback.rpush(self._key, self.read_listpack_entry(buff)) + listpack_end = read_unsigned_char(buff) + if listpack_end != 255: + raise Exception('read_quicklist2', "Invalid listpack end - %d for key %s" % (listpack_end, self._key)) + self._callback.end_list(self._key, info={'encoding': 'quicklist2', 'quicklist2': count, 'sizeof_value': total_size}) + + def read_set_from_listpack(self, f): + raw_string = self.read_string(f) + buff = BytesIO(raw_string) + lpbytes = read_unsigned_int(buff) + num_entries = read_unsigned_short(buff) + self._callback.start_set(self._key, num_entries, self._expiry, info={'encoding':'listpack', 'idle':self._idle,'freq':self._freq}) + for x in range(0, num_entries) : + member = self.read_listpack_entry(buff) + self._callback.sadd(self._key, member) + listpack_end = read_unsigned_char(buff) + if listpack_end != 255 : + raise Exception('read_set_from_listpack', "Invalid listpack end - %d for key %s" % (listpack_end, self._key)) + self._callback.end_set(self._key) + + charset = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_' def _decode_module_id(self, module_id): @@ -959,7 +1120,7 @@ def verify_version(self, version_str) : version = int(version_str) - if version < 1 or version > 9: + if version < 1 or version > 11: raise Exception('verify_version', 'Invalid RDB version number %d' % version) self._rdb_version = version