Skip to content

Commit

Permalink
Fix config set compression type didn't take effect (apache#1576)
Browse files Browse the repository at this point in the history
Co-authored-by: git-hulk <hulk.website@gmail.com>
Co-authored-by: Twice <twice@apache.org>
  • Loading branch information
3 people committed Jul 30, 2023
1 parent 5890e61 commit 7a9c0a2
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 59 deletions.
76 changes: 44 additions & 32 deletions src/config/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,44 +45,25 @@ constexpr const char *errBlobDbNotEnabled = "Must set rocksdb.enable_blob_files
constexpr const char *errLevelCompactionDynamicLevelBytesNotSet =
"Must set rocksdb.level_compaction_dynamic_level_bytes yes first.";

ConfigEnum compression_types[] = {
{"no", rocksdb::CompressionType::kNoCompression}, {"snappy", rocksdb::CompressionType::kSnappyCompression},
{"lz4", rocksdb::CompressionType::kLZ4Compression}, {"zstd", rocksdb::CompressionType::kZSTD},
{"zlib", rocksdb::CompressionType::kZlibCompression}, {nullptr, 0}};

ConfigEnum supervised_modes[] = {{"no", kSupervisedNone},
{"auto", kSupervisedAutoDetect},
{"upstart", kSupervisedUpStart},
{"systemd", kSupervisedSystemd},
{nullptr, 0}};

ConfigEnum log_levels[] = {{"info", google::INFO},
{"warning", google::WARNING},
{"error", google::ERROR},
{"fatal", google::FATAL},
{nullptr, 0}};
std::vector<ConfigEnum> supervised_modes{
{"no", kSupervisedNone},
{"auto", kSupervisedAutoDetect},
{"upstart", kSupervisedUpStart},
{"systemd", kSupervisedSystemd},
};

std::vector<ConfigEnum> log_levels{
{"info", google::INFO},
{"warning", google::WARNING},
{"error", google::ERROR},
{"fatal", google::FATAL},
};

std::string TrimRocksDbPrefix(std::string s) {
if (strncasecmp(s.data(), "rocksdb.", 8) != 0) return s;
return s.substr(8, s.size() - 8);
}

int ConfigEnumGetValue(ConfigEnum *ce, const char *name) {
while (ce->name != nullptr) {
if (strcasecmp(ce->name, name) == 0) return ce->val;
ce++;
}
return INT_MIN;
}

const char *ConfigEnumGetName(ConfigEnum *ce, int val) {
while (ce->name != nullptr) {
if (ce->val == val) return ce->name;
ce++;
}
return nullptr;
}

Config::Config() {
struct FieldWrapper {
std::string name;
Expand All @@ -92,6 +73,12 @@ Config::Config() {
FieldWrapper(std::string name, bool readonly, ConfigField *field)
: name(std::move(name)), readonly(readonly), field(field) {}
};

std::vector<ConfigEnum> compression_types;
compression_types.reserve(engine::CompressionOptions.size());
for (const auto &e : engine::CompressionOptions) {
compression_types.push_back({e.name, e.type});
}
FieldWrapper fields[] = {
{"daemonize", true, new YesNoField(&daemonize, false)},
{"bind", true, new StringField(&binds_str_, "")},
Expand Down Expand Up @@ -327,6 +314,30 @@ void Config::initFieldCallback() {
if (!srv) return Status::OK(); // srv is nullptr when load config from file
return srv->storage->SetOptionForAllColumnFamilies(TrimRocksDbPrefix(k), v);
};
auto set_compression_type_cb = [](Server *srv, const std::string &k, const std::string &v) -> Status {
if (!srv) return Status::OK();

std::string compression_option;
for (auto &option : engine::CompressionOptions) {
if (option.name == v) {
compression_option = option.val;
break;
}
}
if (compression_option.empty()) {
return {Status::NotOK, "Invalid compression type"};
}

// For the first two levels, it may contain the frequently accessed data,
// so it'd be better to use uncompressed data to save the CPU.
std::string compression_levels = "kNoCompression:kNoCompression";
auto db = srv->storage->GetDB();
for (size_t i = 2; i < db->GetOptions().compression_per_level.size(); i++) {
compression_levels += ":";
compression_levels += compression_option;
}
return srv->storage->SetOptionForAllColumnFamilies("compression_per_level", compression_levels);
};
#ifdef ENABLE_OPENSSL
auto set_tls_option = [](Server *srv, const std::string &k, const std::string &v) {
if (!srv) return Status::OK(); // srv is nullptr when load config from file
Expand Down Expand Up @@ -600,6 +611,7 @@ void Config::initFieldCallback() {
{"rocksdb.level0_slowdown_writes_trigger", set_cf_option_cb},
{"rocksdb.level0_stop_writes_trigger", set_cf_option_cb},
{"rocksdb.level0_file_num_compaction_trigger", set_cf_option_cb},
{"rocksdb.compression", set_compression_type_cb},
#ifdef ENABLE_OPENSSL
{"tls-cert-file", set_tls_option},
{"tls-key-file", set_tls_option},
Expand Down
32 changes: 20 additions & 12 deletions src/config/config_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,12 @@ using UInt32Field = IntegerField<uint32_t>;
using Int64Field = IntegerField<int64_t>;

struct ConfigEnum {
const char *name;
const std::string name;
const int val;
};

enum ConfigType { SingleConfig, MultiConfig };

int ConfigEnumGetValue(ConfigEnum *ce, const char *name);
const char *ConfigEnumGetName(ConfigEnum *ce, int val);

class ConfigField {
public:
ConfigField() = default;
Expand Down Expand Up @@ -186,23 +183,34 @@ class YesNoField : public ConfigField {

class EnumField : public ConfigField {
public:
EnumField(int *receiver, ConfigEnum *enums, int e) : receiver_(receiver), enums_(enums) { *receiver_ = e; }
EnumField(int *receiver, std::vector<ConfigEnum> enums, int e) : receiver_(receiver), enums_(std::move(enums)) {
*receiver_ = e;
}
~EnumField() override = default;
std::string ToString() override { return ConfigEnumGetName(enums_, *receiver_); }

std::string ToString() override {
for (const auto &e : enums_) {
if (e.val == *receiver_) return e.name;
}
return {};
}

Status ToNumber(int64_t *n) override {
*n = *receiver_;
return Status::OK();
}

Status Set(const std::string &v) override {
int e = ConfigEnumGetValue(enums_, v.c_str());
if (e == INT_MIN) {
return {Status::NotOK, "invalid enum option"};
for (const auto &e : enums_) {
if (strcasecmp(e.name.c_str(), v.c_str()) == 0) {
*receiver_ = e.val;
return Status::OK();
}
}
*receiver_ = e;
return Status::OK();
return {Status::NotOK, "invalid enum option"};
}

private:
int *receiver_;
ConfigEnum *enums_ = nullptr;
std::vector<ConfigEnum> enums_;
};
20 changes: 5 additions & 15 deletions src/storage/event_listener.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,12 @@ std::string StallConditionType2String(const rocksdb::WriteStallCondition type) {
}

std::string CompressType2String(const rocksdb::CompressionType type) {
std::map<rocksdb::CompressionType, std::string> compression_type_string_map = {
{rocksdb::kNoCompression, "no"},
{rocksdb::kSnappyCompression, "snappy"},
{rocksdb::kZlibCompression, "zlib"},
{rocksdb::kBZip2Compression, "zip2"},
{rocksdb::kLZ4Compression, "lz4"},
{rocksdb::kLZ4HCCompression, "lz4hc"},
{rocksdb::kXpressCompression, "xpress"},
{rocksdb::kZSTD, "zstd"},
{rocksdb::kZSTDNotFinalCompression, "zstd_not_final"},
{rocksdb::kDisableCompressionOption, "disable"}};
auto iter = compression_type_string_map.find(type);
if (iter == compression_type_string_map.end()) {
return "unknown";
for (const auto &option : engine::CompressionOptions) {
if (option.type == type) {
return option.name;
}
}
return iter->second;
return "unknown";
}

bool IsDiskQuotaExceeded(const rocksdb::Status &bg_error) {
Expand Down
14 changes: 14 additions & 0 deletions src/storage/storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ constexpr const char *kPropagateScriptCommand = "script";

constexpr const char *kLuaFunctionPrefix = "lua_f_";

struct CompressionOption {
rocksdb::CompressionType type;
const std::string name;
const std::string val;
};

inline const std::vector<CompressionOption> CompressionOptions = {
{rocksdb::kNoCompression, "no", "kNoCompression"},
{rocksdb::kSnappyCompression, "snappy", "kSnappyCompression"},
{rocksdb::kZlibCompression, "zlib", "kZlibCompression"},
{rocksdb::kLZ4Compression, "lz4", "kLZ4Compression"},
{rocksdb::kZSTD, "zstd", "kZSTD"},
};

class Storage {
public:
explicit Storage(Config *config);
Expand Down
21 changes: 21 additions & 0 deletions tests/gocase/unit/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,24 @@ func TestSetConfigBackupDir(t *testing.T) {
require.True(t, hasCompactionFiles(newBackupDir))
require.True(t, hasCompactionFiles(originBackupDir))
}

func TestConfigSetCompression(t *testing.T) {
configs := map[string]string{}
srv := util.StartServer(t, configs)
defer srv.Close()

ctx := context.Background()
rdb := srv.NewClient()
defer func() { require.NoError(t, rdb.Close()) }()
require.NoError(t, rdb.Do(ctx, "SET", "foo", "bar").Err())

configKey := "rocksdb.compression"
supportedCompressions := []string{"no", "snappy", "zlib", "lz4", "zstd"}
for _, compression := range supportedCompressions {
require.NoError(t, rdb.ConfigSet(ctx, configKey, compression).Err())
vals, err := rdb.ConfigGet(ctx, configKey).Result()
require.NoError(t, err)
require.EqualValues(t, compression, vals[configKey])
}
require.ErrorContains(t, rdb.ConfigSet(ctx, configKey, "unsupported").Err(), "invalid enum option")
}

0 comments on commit 7a9c0a2

Please sign in to comment.