Skip to content

Commit

Permalink
Differentiated configuration for RocksDB
Browse files Browse the repository at this point in the history
  • Loading branch information
qiujiayu committed Dec 24, 2022
1 parent 2bc70de commit 9bd6657
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import com.alipay.sofa.jraft.util.*;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.ColumnFamilyDescriptor;
import org.rocksdb.ColumnFamilyHandle;
Expand Down Expand Up @@ -52,15 +53,6 @@
import com.alipay.sofa.jraft.option.LogStorageOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.storage.LogStorage;
import com.alipay.sofa.jraft.util.Bits;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.DebugStatistics;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.StorageOptionsFactory;
import com.alipay.sofa.jraft.util.ThreadPoolsFactory;
import com.alipay.sofa.jraft.util.Utils;

/**
* Log storage based on rocksdb.
Expand Down Expand Up @@ -137,7 +129,10 @@ protected static class EmptyWriteContext implements WriteContext {
static EmptyWriteContext INSTANCE = new EmptyWriteContext();
}

public static final String PART_ROCKSDB_OPTIONS_KEY = "jraft.log_storage.rocksdb_opts.per_group";

private String groupId;
private boolean partRocksDBOptions;
private final String path;
private final boolean sync;
private final boolean openStatistics;
Expand Down Expand Up @@ -167,14 +162,19 @@ public RocksDBLogStorage(final String path, final RaftOptions raftOptions) {
this.openStatistics = raftOptions.isOpenStatistics();
}

public static DBOptions createDBOptions() {
return StorageOptionsFactory.getRocksDBOptions(RocksDBLogStorage.class);
private String getRocksDBGroup() {
return partRocksDBOptions ? this.groupId : null;
}

private DBOptions createDBOptions() {
return StorageOptionsFactory.getRocksDBOptions(getRocksDBGroup(), RocksDBLogStorage.class);
}

public static ColumnFamilyOptions createColumnFamilyOptions() {
private ColumnFamilyOptions createColumnFamilyOptions() {
final BlockBasedTableConfig tConfig = StorageOptionsFactory
.getRocksDBTableFormatConfig(RocksDBLogStorage.class);
return StorageOptionsFactory.getRocksDBColumnFamilyOptions(RocksDBLogStorage.class) //
.getRocksDBTableFormatConfig(getRocksDBGroup(), RocksDBLogStorage.class);
return StorageOptionsFactory
.getRocksDBColumnFamilyOptions(getRocksDBGroup(), RocksDBLogStorage.class) //
.useFixedLengthPrefixExtractor(8) //
.setTableFormatConfig(tConfig) //
.setMergeOperator(new StringAppendOperator());
Expand All @@ -187,6 +187,7 @@ public boolean init(final LogStorageOptions opts) {
this.groupId = opts.getGroupId();
this.writeLock.lock();
try {
this.partRocksDBOptions = SystemPropertyUtil.getBoolean(PART_ROCKSDB_OPTIONS_KEY, false);
if (this.db != null) {
LOG.warn("RocksDBLogStorage init() in {} already.", this.path);
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.alipay.sofa.jraft.util;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;

import org.rocksdb.BlockBasedTableConfig;
Expand All @@ -31,7 +32,6 @@
import org.rocksdb.util.SizeUnit;

/**
*
* @author jiachun.fjc
*/
public final class StorageOptionsFactory {
Expand Down Expand Up @@ -64,37 +64,59 @@ public static void releaseAllOptions() {
}
}

private static String buildKey(final String groupId, final Class<?> cls) {
Requires.requireNonNull(cls, "cls");
if (Objects.isNull(groupId)) {
return cls.getName();
}
return groupId + "-" + cls.getName();
}

/**
* Users can register a custom rocksdb dboptions, then the related
* classes will get their options by the key of their own class
* name. If the user does not register an options, a default options
* will be provided.
*
* @param groupId raft group id
* @param cls the key of DBOptions
* @param opts the DBOptions
*/
public static void registerRocksDBOptions(final Class<?> cls, final DBOptions opts) {
public static void registerRocksDBOptions(final String groupId, final Class<?> cls, final DBOptions opts) {
Requires.requireNonNull(cls, "cls");
Requires.requireNonNull(opts, "opts");
if (rocksDBOptionsTable.putIfAbsent(cls.getName(), opts) != null) {
throw new IllegalStateException("DBOptions with class key [" + cls.getName()
+ "] has already been registered");
String key = buildKey(groupId, cls);
if (rocksDBOptionsTable.putIfAbsent(key, opts) != null) {
throw new IllegalStateException("DBOptions with class key [" + key + "] has already been registered");
}
}

/**
* Users can register a custom rocksdb dboptions, then the related
* classes will get their options by the key of their own class
* name. If the user does not register an options, a default options
* will be provided.
* @param cls the key of DBOptions
* @param opts the DBOptions
*/
public static void registerRocksDBOptions(final Class<?> cls, final DBOptions opts) {
registerRocksDBOptions(null, cls, opts);
}

/**
* Get a new default DBOptions or a copy of the exist DBOptions.
* Users should call DBOptions#close() to release resources themselves.
*
* @param groupId raft group id
* @param cls the key of DBOptions
* @return new default DBOptions or a copy of the exist DBOptions
*/
public static DBOptions getRocksDBOptions(final Class<?> cls) {
public static DBOptions getRocksDBOptions(final String groupId, final Class<?> cls) {
Requires.requireNonNull(cls, "cls");
DBOptions opts = rocksDBOptionsTable.get(cls.getName());
String key = buildKey(groupId, cls);
DBOptions opts = rocksDBOptionsTable.get(key);
if (opts == null) {
final DBOptions newOpts = getDefaultRocksDBOptions();
opts = rocksDBOptionsTable.putIfAbsent(cls.getName(), newOpts);
opts = rocksDBOptionsTable.putIfAbsent(key, newOpts);
if (opts == null) {
opts = newOpts;
} else {
Expand Down Expand Up @@ -149,33 +171,51 @@ public static DBOptions getDefaultRocksDBOptions() {
* name. If the user does not register an options, a default options
* will be provided.
*
* @param groupId raft group id
* @param cls the key of ColumnFamilyOptions
* @param opts the ColumnFamilyOptions
*/
public static void registerRocksDBColumnFamilyOptions(final Class<?> cls, final ColumnFamilyOptions opts) {
public static void registerRocksDBColumnFamilyOptions(final String groupId, final Class<?> cls,
final ColumnFamilyOptions opts) {
Requires.requireNonNull(cls, "cls");
Requires.requireNonNull(opts, "opts");
if (columnFamilyOptionsTable.putIfAbsent(cls.getName(), opts) != null) {
throw new IllegalStateException("ColumnFamilyOptions with class key [" + cls.getName()
String key = buildKey(groupId, cls);
if (columnFamilyOptionsTable.putIfAbsent(key, opts) != null) {
throw new IllegalStateException("ColumnFamilyOptions with class key [" + key
+ "] has already been registered");
}
}

/**
* Users can register a custom rocksdb ColumnFamilyOptions, then the
* related classes will get their options by the key of their own class
* name. If the user does not register an options, a default options
* will be provided.
*
* @param cls the key of ColumnFamilyOptions
* @param opts the ColumnFamilyOptions
*/
public static void registerRocksDBColumnFamilyOptions(final Class<?> cls, final ColumnFamilyOptions opts) {
registerRocksDBColumnFamilyOptions(null, cls, opts);
}

/**
* Get a new default ColumnFamilyOptions or a copy of the exist
* ColumnFamilyOptions. Users should call ColumnFamilyOptions#close()
* to release resources themselves.
*
* @param groupId raft group id
* @param cls the key of ColumnFamilyOptions
* @return new default ColumnFamilyOptions or a copy of the exist
* ColumnFamilyOptions
*/
public static ColumnFamilyOptions getRocksDBColumnFamilyOptions(final Class<?> cls) {
public static ColumnFamilyOptions getRocksDBColumnFamilyOptions(final String groupId, final Class<?> cls) {
Requires.requireNonNull(cls, "cls");
ColumnFamilyOptions opts = columnFamilyOptionsTable.get(cls.getName());
String key = buildKey(groupId, cls);
ColumnFamilyOptions opts = columnFamilyOptionsTable.get(key);
if (opts == null) {
final ColumnFamilyOptions newOpts = getDefaultRocksDBColumnFamilyOptions();
opts = columnFamilyOptionsTable.putIfAbsent(cls.getName(), newOpts);
opts = columnFamilyOptionsTable.putIfAbsent(key, newOpts);
if (opts == null) {
opts = newOpts;
} else {
Expand Down Expand Up @@ -273,30 +313,47 @@ public static ColumnFamilyOptions getDefaultRocksDBColumnFamilyOptions() {
* classes will get their options by the key of their own class name. If
* the user does not register a config, a default config will be provided.
*
* @param groupId raft group id
* @param cls the key of BlockBasedTableConfig
* @param cfg the BlockBasedTableConfig
*/
public static void registerRocksDBTableFormatConfig(final Class<?> cls, final BlockBasedTableConfig cfg) {
public static void registerRocksDBTableFormatConfig(final String groupId, final Class<?> cls,
final BlockBasedTableConfig cfg) {
Requires.requireNonNull(cls, "cls");
Requires.requireNonNull(cfg, "cfg");
if (tableFormatConfigTable.putIfAbsent(cls.getName(), cfg) != null) {
throw new IllegalStateException("TableFormatConfig with class key [" + cls.getName()
String key = buildKey(groupId, cls);
if (tableFormatConfigTable.putIfAbsent(key, cfg) != null) {
throw new IllegalStateException("TableFormatConfig with class key [" + key
+ "] has already been registered");
}
}

/**
* Users can register a custom rocksdb BlockBasedTableConfig, then the related
* classes will get their options by the key of their own class name. If
* the user does not register a config, a default config will be provided.
*
* @param cls the key of BlockBasedTableConfig
* @param cfg the BlockBasedTableConfig
*/
public static void registerRocksDBTableFormatConfig(final Class<?> cls, final BlockBasedTableConfig cfg) {
registerRocksDBTableFormatConfig(null, cls, cfg);
}

/**
* Get a new default TableFormatConfig or a copy of the exist ableFormatConfig.
*
* @param groupId raft group id
* @param cls the key of TableFormatConfig
* @return new default TableFormatConfig or a copy of the exist TableFormatConfig
*/
public static BlockBasedTableConfig getRocksDBTableFormatConfig(final Class<?> cls) {
public static BlockBasedTableConfig getRocksDBTableFormatConfig(final String groupId, final Class<?> cls) {
Requires.requireNonNull(cls, "cls");
BlockBasedTableConfig cfg = tableFormatConfigTable.get(cls.getName());
String key = buildKey(groupId, cls);
BlockBasedTableConfig cfg = tableFormatConfigTable.get(key);
if (cfg == null) {
final BlockBasedTableConfig newCfg = getDefaultRocksDBTableConfig();
cfg = tableFormatConfigTable.putIfAbsent(cls.getName(), newCfg);
cfg = tableFormatConfigTable.putIfAbsent(key, newCfg);
if (cfg == null) {
cfg = newCfg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public void run() {

@BeforeClass
public static void setupNodeTest() {
StorageOptionsFactory.registerRocksDBTableFormatConfig(RocksDBLogStorage.class, StorageOptionsFactory
StorageOptionsFactory.registerRocksDBTableFormatConfig(GROUP_ID, RocksDBLogStorage.class, StorageOptionsFactory
.getDefaultRocksDBTableConfig().setBlockCacheSize(256 * SizeUnit.MB));
dumpThread = new DumpThread();
dumpThread.setName("NodeTest-DumpThread");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void run() {

@BeforeClass
public static void setupNodeTest() {
StorageOptionsFactory.registerRocksDBTableFormatConfig(RocksDBLogStorage.class, StorageOptionsFactory
StorageOptionsFactory.registerRocksDBTableFormatConfig(GROUP_ID, RocksDBLogStorage.class, StorageOptionsFactory
.getDefaultRocksDBTableConfig().setBlockCacheSize(256 * SizeUnit.MB));
dumpThread = new DumpThread();
dumpThread.setName("NodeTest-DumpThread");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import com.alipay.sofa.jraft.util.*;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -66,13 +67,6 @@
import com.alipay.sofa.jraft.rhea.util.Strings;
import com.alipay.sofa.jraft.rpc.RaftRpcServerFactory;
import com.alipay.sofa.jraft.rpc.RpcServer;
import com.alipay.sofa.jraft.util.BytesUtil;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.ExecutorServiceHelper;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.ThreadPoolMetricRegistry;
import com.alipay.sofa.jraft.util.Utils;
import com.codahale.metrics.ScheduledReporter;
import com.codahale.metrics.Slf4jReporter;

Expand All @@ -84,23 +78,27 @@
*/
public class StoreEngine implements Lifecycle<StoreEngineOptions>, Describer {

private static final Logger LOG = LoggerFactory
.getLogger(StoreEngine.class);
private static final Logger LOG = LoggerFactory
.getLogger(StoreEngine.class);

public static final String PART_ROCKSDB_OPTIONS_KEY = "rhea.rocksdb_opts.per_group";

static {
ExtSerializerSupports.init();
}

private final ConcurrentMap<Long, RegionKVService> regionKVServiceTable = Maps.newConcurrentMapLong();
private final ConcurrentMap<Long, RegionEngine> regionEngineTable = Maps.newConcurrentMapLong();
private final ConcurrentMap<Long, RegionKVService> regionKVServiceTable = Maps.newConcurrentMapLong();
private final ConcurrentMap<Long, RegionEngine> regionEngineTable = Maps.newConcurrentMapLong();
private final StateListenerContainer<Long> stateListenerContainer;
private final PlacementDriverClient pdClient;
private final long clusterId;

private Long storeId;
private final AtomicBoolean splitting = new AtomicBoolean(false);

private boolean partRocksDBOptions;
private final AtomicBoolean splitting = new AtomicBoolean(false);
// When the store is started (unix timestamp in milliseconds)
private long startTime = System.currentTimeMillis();
private long startTime = System.currentTimeMillis();
private File dbPath;
private RpcServer rpcServer;
private BatchRawKVStore<?> rawKVStore;
Expand Down Expand Up @@ -180,6 +178,7 @@ public synchronized boolean init(final StoreEngineOptions opts) {
return false;
}
this.storeId = store.getId();
this.partRocksDBOptions = SystemPropertyUtil.getBoolean(PART_ROCKSDB_OPTIONS_KEY, false);
// init executors
if (this.readIndexExecutor == null) {
this.readIndexExecutor = StoreEngineHelper.createReadIndexExecutor(opts.getReadIndexCoreThreads());
Expand Down Expand Up @@ -607,6 +606,10 @@ private boolean initRawKVStore(final StoreEngineOptions opts) {
}
}

private String getRocksDBGroup() {
return partRocksDBOptions ? String.valueOf(this.storeId) : null;
}

private boolean initRocksDB(final StoreEngineOptions opts) {
RocksDBOptions rocksOpts = opts.getRocksDBOptions();
if (rocksOpts == null) {
Expand All @@ -627,7 +630,7 @@ private boolean initRocksDB(final StoreEngineOptions opts) {
final String childPath = "db_" + this.storeId + "_" + opts.getServerAddress().getPort();
rocksOpts.setDbPath(Paths.get(dbPath, childPath).toString());
this.dbPath = new File(rocksOpts.getDbPath());
final RocksRawKVStore rocksRawKVStore = new RocksRawKVStore();
final RocksRawKVStore rocksRawKVStore = new RocksRawKVStore(getRocksDBGroup());
if (!rocksRawKVStore.init(rocksOpts)) {
LOG.error("Fail to init [RocksRawKVStore].");
return false;
Expand Down
Loading

0 comments on commit 9bd6657

Please sign in to comment.