Skip to content

Commit

Permalink
feat(cluster): add support of the JSON type in cluster migration
Browse files Browse the repository at this point in the history
This closes apache#2583.
  • Loading branch information
git-hulk committed Nov 11, 2024
1 parent 01ce0e1 commit ca3029b
Show file tree
Hide file tree
Showing 11 changed files with 79 additions and 27 deletions.
34 changes: 27 additions & 7 deletions src/cluster/slot_migrate.cc
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,8 @@ StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &k

// Construct command according to type of the key
switch (metadata.Type()) {
case kRedisString: {
case kRedisString:
case kRedisJson: {
auto s = migrateSimpleKey(key, metadata, bytes, restore_cmds);
if (!s.IsOK()) {
return s.Prefixed("failed to migrate simple key");
Expand Down Expand Up @@ -738,13 +739,32 @@ StatusOr<KeyMigrationResult> SlotMigrator::migrateOneKey(const rocksdb::Slice &k

Status SlotMigrator::migrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes,
std::string *restore_cmds) {
std::vector<std::string> command = {"SET", key.ToString(), bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
if (metadata.expire > 0) {
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
if (metadata.Type() == kRedisString) {
std::vector<std::string> command = {"SET", key.ToString(), bytes.substr(Metadata::GetOffsetAfterExpire(bytes[0]))};
if (metadata.expire > 0) {
command.emplace_back("PXAT");
command.emplace_back(std::to_string(metadata.expire));
}
*restore_cmds += redis::ArrayOfBulkStrings(command);
current_pipeline_size_++;
} else if (metadata.Type() == kRedisJson) {
// kRedisJson
JsonValue json_value;
if (auto s = redis::Json::FromString(bytes, &json_value); !s.ok()) {
return {Status::NotOK, s.ToString()};
}
auto json_bytes = GET_OR_RET(json_value.Dump());
std::vector<std::string> command = {"JSON.SET", key.ToString(), "$", std::move(json_bytes)};
*restore_cmds += redis::ArrayOfBulkStrings(command);
current_pipeline_size_++;

if (metadata.expire > 0) {
*restore_cmds += redis::ArrayOfBulkStrings({"PEXPIREAT", key.ToString(), std::to_string(metadata.expire)});
current_pipeline_size_++;
}
} else {
return {Status::NotOK, "unsupported simple key type"};
}
*restore_cmds += redis::ArrayOfBulkStrings(command);
current_pipeline_size_++;

// Check whether pipeline needs to be sent
// TODO(chrisZMF): Resend data if failed to send data
Expand Down
12 changes: 12 additions & 0 deletions src/storage/batch_extractor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,18 @@ rocksdb::Status WriteBatchExtractor::PutCF(uint32_t column_family_id, const Slic
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
} else if (metadata.Type() == kRedisJson) {
JsonValue json_value;
s = redis::Json::FromString(value.ToString(), &json_value);
if (!s.ok()) return s;
auto json_bytes = json_value.Dump();
if (!json_bytes) return rocksdb::Status::Corruption(json_bytes.Msg());
command_args = {"JSON.SET", user_key, "$", json_bytes.GetValue()};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
if (metadata.expire > 0) {
command_args = {"PEXPIREAT", user_key, std::to_string(metadata.expire)};
resp_commands_[ns].emplace_back(redis::ArrayOfBulkStrings(command_args));
}
} else if (metadata.expire > 0) {
auto args = log_data_.GetArguments();
if (args->size() > 0) {
Expand Down
22 changes: 9 additions & 13 deletions src/storage/redis_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,10 @@ Database::Database(engine::Storage *storage, std::string ns)
metadata_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Metadata)),
namespace_(std::move(ns)) {}

// Some data types may support reading multiple types of metadata.
// For example, bitmap supports reading string metadata and bitmap metadata.
rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata) {
std::string old_metadata;
metadata->Encode(&old_metadata);

bool is_keyspace_hit = false;
ScopeExit se([this, &is_keyspace_hit] {
if (is_keyspace_hit) {
storage_->RecordStat(engine::StatType::KeyspaceHits, 1);
} else {
storage_->RecordStat(engine::StatType::KeyspaceMisses, 1);
}
});

auto s = metadata->Decode(bytes);
// delay InvalidArgument error check after type match check
if (!s.ok() && !s.IsInvalidArgument()) return s;
Expand All @@ -85,7 +74,14 @@ rocksdb::Status Database::ParseMetadata(RedisTypes types, Slice *bytes, Metadata
auto _ [[maybe_unused]] = metadata->Decode(old_metadata);
return rocksdb::Status::NotFound("no element found");
}
is_keyspace_hit = true;
return s;
}

// Some data types may support reading multiple types of metadata.
// For example, bitmap supports reading string metadata and bitmap metadata.
rocksdb::Status Database::ParseMetadataWithStats(RedisTypes types, Slice *bytes, Metadata *metadata) {
auto s = ParseMetadata(types, bytes, metadata);
storage_->RecordStat(s.ok() ? engine::StatType::KeyspaceHits : engine::StatType::KeyspaceHits, 1);
return s;
}

Expand All @@ -100,7 +96,7 @@ rocksdb::Status Database::GetMetadata(engine::Context &ctx, RedisTypes types, co
auto s = GetRawMetadata(ctx, ns_key, raw_value);
*rest = *raw_value;
if (!s.ok()) return s;
return ParseMetadata(types, rest, metadata);
return ParseMetadataWithStats(types, rest, metadata);
}

rocksdb::Status Database::GetRawMetadata(engine::Context &ctx, const Slice &ns_key, std::string *bytes) {
Expand Down
4 changes: 3 additions & 1 deletion src/storage/redis_db.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ class Database {
explicit Database(engine::Storage *storage, std::string ns = "");
/// Parsing metadata with type of `types` from bytes, the metadata is a base class of all metadata.
/// When parsing, the bytes will be consumed.
[[nodiscard]] rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata);
[[nodiscard]] rocksdb::Status ParseMetadataWithStats(RedisTypes types, Slice *bytes, Metadata *metadata);
// ParseMetadata behaves the same as ParseMetadataWithStats, but without recording stats.
[[nodiscard]] static rocksdb::Status ParseMetadata(RedisTypes types, Slice *bytes, Metadata *metadata);
/// GetMetadata is a helper function to get metadata from the database. It will read the "raw metadata"
/// from underlying storage, and then parse the raw metadata to the specified metadata type.
///
Expand Down
2 changes: 1 addition & 1 deletion src/types/redis_bitmap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ rocksdb::Status Bitmap::GetMetadata(engine::Context &ctx, const Slice &ns_key, B
if (!s.ok()) return s;

Slice slice = *raw_value;
return ParseMetadata({kRedisBitmap, kRedisString}, &slice, metadata);
return ParseMetadataWithStats({kRedisBitmap, kRedisString}, &slice, metadata);
}

rocksdb::Status Bitmap::GetBit(engine::Context &ctx, const Slice &user_key, uint32_t bit_offset, bool *bit) {
Expand Down
10 changes: 9 additions & 1 deletion src/types/redis_json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ std::vector<rocksdb::Status> Json::readMulti(engine::Context &ctx, const std::ve
if (!statuses[i].ok()) continue;
Slice rest(pin_values[i].data(), pin_values[i].size());
JsonMetadata metadata;
statuses[i] = ParseMetadata({kRedisJson}, &rest, &metadata);
statuses[i] = ParseMetadataWithStats({kRedisJson}, &rest, &metadata);
if (!statuses[i].ok()) continue;

statuses[i] = parse(metadata, rest, &values[i]);
Expand Down Expand Up @@ -674,4 +674,12 @@ rocksdb::Status Json::Resp(engine::Context &ctx, const std::string &user_key, co
return rocksdb::Status::OK();
}

rocksdb::Status Json::FromString(const std::string &value, JsonValue *result) {
Slice rest = value;
JsonMetadata metadata;
auto s = ParseMetadata({kRedisJson}, &rest, &metadata);
if (!s.ok()) return s;
return parse(metadata, rest, result);
}

} // namespace redis
1 change: 1 addition & 0 deletions src/types/redis_json.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ class Json : public Database {

rocksdb::Status Resp(engine::Context &ctx, const std::string &user_key, const std::string &path,
std::vector<std::string> *results, RESP resp);
static rocksdb::Status FromString(const std::string &value, JsonValue *result);

private:
rocksdb::Status write(engine::Context &ctx, Slice ns_key, JsonMetadata *metadata, const JsonValue &json_val);
Expand Down
4 changes: 2 additions & 2 deletions src/types/redis_string.cc
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ std::vector<rocksdb::Status> String::getRawValues(engine::Context &ctx, const st
(*raw_values)[i].assign(pin_values[i].data(), pin_values[i].size());
Metadata metadata(kRedisNone, false);
Slice slice = (*raw_values)[i];
auto s = ParseMetadata({kRedisString}, &slice, &metadata);
auto s = ParseMetadataWithStats({kRedisString}, &slice, &metadata);
if (!s.ok()) {
statuses[i] = s;
(*raw_values)[i].clear();
Expand All @@ -65,7 +65,7 @@ rocksdb::Status String::getRawValue(engine::Context &ctx, const std::string &ns_

Metadata metadata(kRedisNone, false);
Slice slice = *raw_value;
return ParseMetadata({kRedisString}, &slice, &metadata);
return ParseMetadataWithStats({kRedisString}, &slice, &metadata);
}

rocksdb::Status String::getValueAndExpire(engine::Context &ctx, const std::string &ns_key, std::string *value,
Expand Down
7 changes: 5 additions & 2 deletions tests/gocase/integration/slotmigrate/slotmigrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ func TestSlotMigrateDataType(t *testing.T) {

testSlot += 1
keys := make(map[string]string, 0)
for _, typ := range []string{"string", "expired_string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} {
for _, typ := range []string{"string", "expired_string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream", "json"} {
keys[typ] = fmt.Sprintf("%s_{%s}", typ, util.SlotTable[testSlot])
require.NoError(t, rdb0.Del(ctx, keys[typ]).Err())
}
Expand Down Expand Up @@ -590,8 +590,9 @@ func TestSlotMigrateDataType(t *testing.T) {
}).Err())
}
require.NoError(t, rdb0.Expire(ctx, keys["stream"], 10*time.Second).Err())
require.NoError(t, rdb0.JSONSet(ctx, keys["json"], "$", `{"a": 1, "b": "hello"}`).Err())
// check source data existence
for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} {
for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream", "json"} {
require.EqualValues(t, 1, rdb0.Exists(ctx, keys[typ]).Val())
}
// get source data
Expand Down Expand Up @@ -653,6 +654,8 @@ func TestSlotMigrateDataType(t *testing.T) {
require.EqualValues(t, 19, streamInfo.EntriesAdded)
require.EqualValues(t, "0-0", streamInfo.MaxDeletedEntryID)
require.EqualValues(t, 19, streamInfo.Length)
// type json
require.Equal(t, `{"a":1,"b":"hello"}`, rdb1.JSONGet(ctx, keys["json"]).Val())
// topology is changed on source server
for _, typ := range []string{"string", "list", "hash", "set", "zset", "bitmap", "sortint", "stream"} {
require.ErrorContains(t, rdb0.Exists(ctx, keys[typ]).Err(), "MOVED")
Expand Down
7 changes: 7 additions & 0 deletions utils/kvrocks2redis/tests/check_consistency.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ def _compare_bitmap_data(self, key, pos):
dst_data = self.dst_cli.getbit(key, pos)
return src_data, dst_data

def _compare_json_data(self, key):
src_data = self.src_cli.json_get(key, '$')
dst_data = self.dst_cli.json_get(key, '$')
return src_data, dst_data

def _compare_data(self, keys : list, data_type):
if data_type == "string":
return self._compare_string_data(keys[0])
Expand All @@ -73,6 +78,8 @@ def _compare_data(self, keys : list, data_type):
return self._compare_zset_data(keys[0])
elif data_type == 'bitmap':
return self._compare_bitmap_data(keys[0], keys[1])
elif data_type == 'json':
return self._compare_json_data(keys[0])
elif data_type == 'none':
return self.src_cli.type(keys[0]), 'none'
else:
Expand Down
3 changes: 3 additions & 0 deletions utils/kvrocks2redis/tests/populate-kvrocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
[('set', 'foo', 1), True],
[('setex', 'foo_ex', 3600, 1), True],
]),
('json', [
[('JSON.SET', 'jfoo', '$', '{"a":1,"b":2}'), True],
]),
('zset', [
[('zadd', 'zfoo', 1, 'a', 2, 'b', 3, 'c'), 3]
]),
Expand Down

0 comments on commit ca3029b

Please sign in to comment.