Skip to content

Commit

Permalink
Upgrade rocksdb version to 7.7.3
Browse files Browse the repository at this point in the history
  • Loading branch information
qiujiayu committed Nov 5, 2022
1 parent a9c5b70 commit e776a5c
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.commons.io.FileUtils;
import org.rocksdb.BackupEngine;
import org.rocksdb.BackupInfo;
import org.rocksdb.BackupableDBOptions;
import org.rocksdb.BackupEngineOptions;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Checkpoint;
import org.rocksdb.ColumnFamilyDescriptor;
Expand Down Expand Up @@ -262,17 +262,28 @@ public void get(final byte[] key, @SuppressWarnings("unused") final boolean read
}
}

private Map<byte[], byte[]> multiGetAsMap(List<byte[]> keys) throws RocksDBException {
final List<byte[]> rawList = this.db.multiGetAsList(keys);
final Map<byte[], byte[]> resultMap = Maps.newHashMapWithExpectedSize(rawList.size());
int index = 0;
for (final byte[] value : rawList) {
resultMap.put(keys.get(index), value);
}
return resultMap;
}

@Override
public void multiGet(final List<byte[]> keys, @SuppressWarnings("unused") final boolean readOnlySafe,
final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("MULTI_GET");
final Lock readLock = this.readWriteLock.readLock();
readLock.lock();
try {
final Map<byte[], byte[]> rawMap = this.db.multiGet(keys);
final Map<ByteArray, byte[]> resultMap = Maps.newHashMapWithExpectedSize(rawMap.size());
for (final Map.Entry<byte[], byte[]> entry : rawMap.entrySet()) {
resultMap.put(ByteArray.wrap(entry.getKey()), entry.getValue());
final List<byte[]> rawList = this.db.multiGetAsList(keys);
final Map<ByteArray, byte[]> resultMap = Maps.newHashMapWithExpectedSize(rawList.size());
int index = 0;
for (final byte[] value : rawList) {
resultMap.put(ByteArray.wrap(keys.get(index)), value);
}
setSuccess(closure, resultMap);
} catch (final Exception e) {
Expand Down Expand Up @@ -558,7 +569,7 @@ public void batchGetAndPut(final KVStateOutputList kvStates) {
batch.put(key, op.getValue());
}
// first, get prev values
final Map<byte[], byte[]> prevValMap = this.db.multiGet(keys);
final Map<byte[], byte[]> prevValMap = multiGetAsMap(keys);
this.db.write(this.writeOptions, batch);
for (final KVState kvState : segment) {
setSuccess(kvState.getDone(), prevValMap.get(kvState.getOp().getKey()));
Expand Down Expand Up @@ -623,7 +634,7 @@ public void batchCompareAndPut(final KVStateOutputList kvStates) {
expects.put(key, expect);
updates.put(key, update);
}
final Map<byte[], byte[]> prevValMap = this.db.multiGet(Lists.newArrayList(expects.keySet()));
final Map<byte[], byte[]> prevValMap = multiGetAsMap(Lists.newArrayList(expects.keySet()));
for (final KVState kvState : segment) {
final byte[] key = kvState.getOp().getKey();
if (Arrays.equals(expects.get(key), prevValMap.get(key))) {
Expand Down Expand Up @@ -734,7 +745,7 @@ public void compareAndPutAll(final List<CASEntry> entries, final KVStoreClosure
for (final CASEntry entry : entries) {
keys.add(entry.getKey());
}
final Map<byte[], byte[]> prevValMap = this.db.multiGet(keys);
final Map<byte[], byte[]> prevValMap = this.multiGetAsMap(keys);
for (final CASEntry entry : entries) {
if (!Arrays.equals(entry.getExpect(), prevValMap.get(entry.getKey()))) {
setSuccess(closure, Boolean.FALSE);
Expand Down Expand Up @@ -802,7 +813,7 @@ public void batchPutIfAbsent(final KVStateOutputList kvStates) {
keys.add(key);
values.put(key, value);
}
final Map<byte[], byte[]> prevValMap = this.db.multiGet(keys);
final Map<byte[], byte[]> prevValMap = this.multiGetAsMap(keys);
for (final KVState kvState : segment) {
final byte[] key = kvState.getOp().getKey();
final byte[] prevVal = prevValMap.get(key);
Expand Down Expand Up @@ -1423,7 +1434,7 @@ RocksDBBackupInfo backupDB(final String backupDBPath) throws IOException {
FileUtils.forceMkdir(new File(backupDBPath));
final Lock writeLock = this.readWriteLock.writeLock();
writeLock.lock();
try (final BackupableDBOptions backupOpts = createBackupDBOptions(backupDBPath);
try (final BackupEngineOptions backupOpts = createBackupDBOptions(backupDBPath);
final BackupEngine backupEngine = BackupEngine.open(this.options.getEnv(), backupOpts)) {
backupEngine.createNewBackup(this.db, true);
final List<BackupInfo> backupInfoList = backupEngine.getBackupInfo();
Expand All @@ -1449,7 +1460,7 @@ void restoreBackup(final String backupDBPath, final RocksDBBackupInfo rocksBacku
final Lock writeLock = this.readWriteLock.writeLock();
writeLock.lock();
closeRocksDB();
try (final BackupableDBOptions backupOpts = createBackupDBOptions(backupDBPath);
try (final BackupEngineOptions backupOpts = createBackupDBOptions(backupDBPath);
final BackupEngine backupEngine = BackupEngine.open(this.options.getEnv(), backupOpts);
final RestoreOptions restoreOpts = new RestoreOptions(false)) {
final String dbPath = this.opts.getDbPath();
Expand Down Expand Up @@ -1640,8 +1651,8 @@ private static ColumnFamilyOptions createColumnFamilyOptions() {

// Creates the backupable db options to control the behavior of
// a backupable database.
private static BackupableDBOptions createBackupDBOptions(final String backupDBPath) {
return new BackupableDBOptions(backupDBPath) //
private static BackupEngineOptions createBackupDBOptions(final String backupDBPath) {
return new BackupEngineOptions(backupDBPath) //
.setSync(true) //
.setShareTableFiles(false); // don't share data between backups
}
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@
<project.encoding>UTF-8</project.encoding>
<protobuf.version>3.5.1</protobuf.version>
<protostuff.version>1.6.0</protostuff.version>
<rocksdb.version>6.22.1.1</rocksdb.version>
<rocksdb.version>7.7.3</rocksdb.version>
<slf4j.version>1.7.21</slf4j.version>
</properties>

Expand Down

0 comments on commit e776a5c

Please sign in to comment.