Skip to content

Commit

Permalink
[#1276] improvment(core): Optimize logic about dropping old version o…
Browse files Browse the repository at this point in the history
…f data in KvGcCollector (#2918)

### What changes were proposed in this pull request?

Introduce a variable to mark the last transaction ID and perform the GC
from the last transaction ID next time to fulfill `incremental GC`.

### Why are the changes needed?

Full GC for the old version of the data takes a lot of time, we'd better
not use this method.

Fix: #1276 

### Does this PR introduce _any_ user-facing change?

N/A.

### How was this patch tested?

Existing tests and test locally.
  • Loading branch information
yuqi1129 authored Apr 16, 2024
1 parent 6a8cf80 commit 6b9a47b
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
Expand All @@ -44,6 +44,15 @@ public final class KvGarbageCollector implements Closeable {
private final KvBackend kvBackend;
private final Config config;
private final EntityKeyEncoder<byte[]> entityKeyEncoder;
private static final byte[] LAST_COLLECT_COMMIT_ID_KEY =
Bytes.concat(
new byte[] {0x1D, 0x00, 0x03}, "last_collect_commit_id".getBytes(StandardCharsets.UTF_8));

// Keep the last collect commit id to avoid collecting the same data multiple times, the first
// time the commit is 1 (minimum), and assuming we have collected the data with transaction id
// (1, 100], then the second time we collect the data and current tx_id is 200,
// then the current transaction id range is (100, 200] and so on.
byte[] commitIdHasBeenCollected;
private long frequencyInMinutes;

private static final String TIME_STAMP_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS";
Expand Down Expand Up @@ -139,24 +148,30 @@ private void collectAndRemoveOldVersionData() throws IOException {
long transactionIdToDelete = deleteTimeLine << 18;
LOG.info("Start to remove data which is older than {}", transactionIdToDelete);
byte[] startKey = TransactionalKvBackendImpl.generateCommitKey(transactionIdToDelete);
byte[] endKey = endOfTransactionId();
commitIdHasBeenCollected = kvBackend.get(LAST_COLLECT_COMMIT_ID_KEY);
if (commitIdHasBeenCollected == null) {
commitIdHasBeenCollected = endOfTransactionId();
}

long lastGCId = getTransactionId(getBinaryTransactionId(commitIdHasBeenCollected));
LOG.info(
"Start to collect data which is modified between '{}({})' (exclusive) and '{}({})' (inclusive)",
lastGCId,
lastGCId == 1 ? lastGCId : DateFormatUtils.format(lastGCId >> 18, TIME_STAMP_FORMAT),
transactionIdToDelete,
DateFormatUtils.format(deleteTimeLine, TIME_STAMP_FORMAT));

// Get all commit marks
// TODO(yuqi), Use multi-thread to scan the data in case of the data is too large.
List<Pair<byte[], byte[]>> kvs =
kvBackend.scan(
new KvRange.KvRangeBuilder()
.start(startKey)
.end(endKey)
.end(commitIdHasBeenCollected)
.startInclusive(true)
.endInclusive(false)
.build());

// Why should we reverse the order? Because we need to delete the data from the oldest data to
// the latest ones. kvs is sorted by transaction id in ascending order (Keys with bigger
// transaction id
// is smaller than keys with smaller transaction id). So we need to reverse the order.
Collections.sort(kvs, (o1, o2) -> Bytes.wrap(o2.getKey()).compareTo(o1.getKey()));
for (Pair<byte[], byte[]> kv : kvs) {
List<byte[]> keysInTheTransaction = SerializationUtils.deserialize(kv.getValue());
byte[] transactionId = getBinaryTransactionId(kv.getKey());
Expand All @@ -174,15 +189,19 @@ private void collectAndRemoveOldVersionData() throws IOException {

// Value has deleted mark, we can remove it.
if (null == TransactionalKvBackendImpl.getRealValue(rawValue)) {
// Delete the key of all versions.
removeAllVersionsOfKey(rawKey, key, false);

LogHelper logHelper = decodeKey(key, transactionId);
kvBackend.delete(rawKey);
LOG.info(
"Physically delete key that has marked deleted: name identifier: '{}', entity type: '{}', createTime: '{}({})', key: '{}'",
"Physically delete key that has marked deleted: name identifier: '{}', entity type: '{}',"
+ " createTime: '{}({})', key: '{}'",
logHelper.identifier,
logHelper.type,
logHelper.createTimeAsString,
logHelper.createTimeInMs,
Bytes.wrap(key));
kvBackend.delete(rawKey);
keysDeletedCount++;
continue;
}
Expand All @@ -200,12 +219,17 @@ private void collectAndRemoveOldVersionData() throws IOException {
.limit(1)
.build());
if (!newVersionOfKey.isEmpty()) {
// Have a new version, we can safely remove all old versions.
removeAllVersionsOfKey(rawKey, key, false);

// Has a newer version, we can remove it.
LogHelper logHelper = decodeKey(key, transactionId);
byte[] newVersionKey = newVersionOfKey.get(0).getKey();
LogHelper newVersionLogHelper = decodeKey(newVersionKey);
kvBackend.delete(rawKey);
LOG.info(
"Physically delete key that has newer version: name identifier: '{}', entity type: '{}', createTime: '{}({})', newVersion createTime: '{}({})',"
"Physically delete key that has newer version: name identifier: '{}', entity type: '{}',"
+ " createTime: '{}({})', newVersion createTime: '{}({})',"
+ " key: '{}', newVersion key: '{}'",
logHelper.identifier,
logHelper.type,
Expand All @@ -215,21 +239,86 @@ private void collectAndRemoveOldVersionData() throws IOException {
newVersionLogHelper.createTimeInMs,
Bytes.wrap(rawKey),
Bytes.wrap(newVersionKey));
kvBackend.delete(rawKey);
keysDeletedCount++;
}
}

// All keys in this transaction have been deleted, we can remove the commit mark.
if (keysDeletedCount == keysInTheTransaction.size()) {
kvBackend.delete(kv.getKey());
long timestamp = getTransactionId(transactionId) >> 18;
LOG.info(
"Physically delete commit mark: {}, createTime: '{}({})', key: '{}'",
Bytes.wrap(kv.getKey()),
DateFormatUtils.format(timestamp, TIME_STAMP_FORMAT),
timestamp,
Bytes.wrap(kv.getKey()));
kvBackend.delete(kv.getKey());
}
}

commitIdHasBeenCollected = kvs.isEmpty() ? startKey : kvs.get(0).getKey();
kvBackend.put(LAST_COLLECT_COMMIT_ID_KEY, commitIdHasBeenCollected, true);
}

/**
* Remove all versions of the key.
*
* @param rawKey raw key, it contains the transaction id.
* @param key key, it's the real key and does not contain the transaction id
* @param includeStart whether include the start key.
* @throws IOException if an I/O exception occurs during deletion.
*/
private void removeAllVersionsOfKey(byte[] rawKey, byte[] key, boolean includeStart)
throws IOException {
List<Pair<byte[], byte[]>> kvs =
kvBackend.scan(
new KvRange.KvRangeBuilder()
.start(rawKey)
.end(generateKey(key, 1))
.startInclusive(includeStart)
.endInclusive(false)
.build());

for (Pair<byte[], byte[]> kv : kvs) {
// Delete real data.
kvBackend.delete(kv.getKey());

LogHelper logHelper = decodeKey(kv.getKey());
LOG.info(
"Physically delete key that has marked deleted: name identifier: '{}', entity type: '{}',"
+ " createTime: '{}({})', key: '{}'",
logHelper.identifier,
logHelper.type,
logHelper.createTimeAsString,
logHelper.createTimeInMs,
Bytes.wrap(key));

// Try to delete commit id if the all keys in the transaction id have been dropped.
byte[] transactionId = getBinaryTransactionId(kv.getKey());
byte[] transactionKey = generateCommitKey(transactionId);
byte[] transactionValue = kvBackend.get(transactionKey);

List<byte[]> keysInTheTransaction = SerializationUtils.deserialize(transactionValue);

boolean allDropped = true;
for (byte[] keyInTheTransaction : keysInTheTransaction) {
if (kvBackend.get(generateKey(keyInTheTransaction, transactionId)) != null) {
// There is still a key in the transaction, we cannot delete the commit mark.
allDropped = false;
break;
}
}

// Try to delete the commit mark.
if (allDropped) {
kvBackend.delete(transactionKey);
long timestamp = TransactionalKvBackendImpl.getTransactionId(transactionId) >> 18;
LOG.info(
"Physically delete commit mark: {}, createTime: '{}({})', key: '{}'",
Bytes.wrap(kv.getKey()),
DateFormatUtils.format(timestamp, TIME_STAMP_FORMAT),
timestamp,
Bytes.wrap(kv.getKey()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import static com.datastrato.gravitino.storage.kv.TestKvEntityStorage.createFilesetEntity;
import static com.datastrato.gravitino.storage.kv.TestKvEntityStorage.createSchemaEntity;
import static com.datastrato.gravitino.storage.kv.TestKvEntityStorage.createTableEntity;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getBinaryTransactionId;
import static com.datastrato.gravitino.storage.kv.TransactionalKvBackendImpl.getTransactionId;

import com.datastrato.gravitino.Config;
import com.datastrato.gravitino.Configs;
Expand Down Expand Up @@ -409,4 +411,96 @@ void testRemoveWithGCCollector2() throws IOException, InterruptedException {
TableEntity.class));
}
}

@Test
void testIncrementalGC() throws Exception {
Config config = getConfig();
Mockito.when(config.get(STORE_TRANSACTION_MAX_SKEW_TIME)).thenReturn(1000L);

try (EntityStore store = EntityStoreFactory.createEntityStore(config)) {
store.initialize(config);

if (!(store instanceof KvEntityStore)) {
return;
}
KvEntityStore kvEntityStore = (KvEntityStore) store;

store.setSerDe(EntitySerDeFactory.createEntitySerDe(config.get(Configs.ENTITY_SERDE)));
AuditInfo auditInfo =
AuditInfo.builder().withCreator("creator").withCreateTime(Instant.now()).build();

BaseMetalake metalake1 = createBaseMakeLake(1L, "metalake1", auditInfo);
BaseMetalake metalake2 = createBaseMakeLake(2L, "metalake2", auditInfo);
BaseMetalake metalake3 = createBaseMakeLake(3L, "metalake3", auditInfo);

for (int i = 0; i < 10; i++) {
store.put(metalake1);
store.put(metalake2);
store.put(metalake3);

store.delete(NameIdentifier.of("metalake1"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake2"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake3"), Entity.EntityType.METALAKE);

Thread.sleep(10);
}

store.put(metalake1);
store.put(metalake2);
store.put(metalake3);

Mockito.when(config.get(STORE_DELETE_AFTER_TIME)).thenReturn(1000L);
Thread.sleep(1500);

// Scan raw key-value data from storage to confirm the data is deleted
kvEntityStore.kvGarbageCollector.collectAndClean();
List<Pair<byte[], byte[]>> allData =
kvEntityStore.backend.scan(
new KvRange.KvRangeBuilder()
.start("_".getBytes())
.end("z".getBytes())
.startInclusive(false)
.endInclusive(false)
.build());

Assertions.assertEquals(3, allData.size());

long transactionId =
getTransactionId(
getBinaryTransactionId(kvEntityStore.kvGarbageCollector.commitIdHasBeenCollected));
Assertions.assertNotEquals(1, transactionId);

for (int i = 0; i < 10; i++) {
store.delete(NameIdentifier.of("metalake1"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake2"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake3"), Entity.EntityType.METALAKE);
store.put(metalake1);
store.put(metalake2);
store.put(metalake3);
Thread.sleep(10);
}
store.delete(NameIdentifier.of("metalake1"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake2"), Entity.EntityType.METALAKE);
store.delete(NameIdentifier.of("metalake3"), Entity.EntityType.METALAKE);

Thread.sleep(1500);
kvEntityStore.kvGarbageCollector.collectAndClean();

allData =
kvEntityStore.backend.scan(
new KvRange.KvRangeBuilder()
.start("_".getBytes())
.end("z".getBytes())
.startInclusive(false)
.endInclusive(false)
.build());

Assertions.assertTrue(allData.isEmpty());

long transactionIdV2 =
getTransactionId(
getBinaryTransactionId(kvEntityStore.kvGarbageCollector.commitIdHasBeenCollected));
Assertions.assertTrue(transactionIdV2 > transactionId);
}
}
}
1 change: 1 addition & 0 deletions rfc/rfc-3/Transaction-implementation-on-kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,6 @@ Scan and range query are almost the same as that of read process, for more detai
- Keys that start with 0x'1D0000' store the contents of id-name mapping. for more please refer to class `KvNameMappingService`.
- Keys that start with 0x'1D0001' store the data of current timestamp which is used for generating transaction id, for more please refer to class `TransactionIdGeneratorImpl`.
- Keys that start with 0x'1D0002' store the information of storage layout version. For more please refer to `KvEntityStore#initStorageVersionInfo`
- Keys that start with 0x'1D0003' store tha transaction id that was used by `KvGarbageCollector` last time.
- Keys that start with 0x'1E' store transaction marks which mark the transaction is committed or not.
- Other key spaces are used to store gravitino entities like `metalakes`,`catalogs`, `scheams`, `tables` and so on. it usually starts with from 0x'20'(space) to 0x'7F'(delete). For more please refer to class `KvEntityStoreImpl`.

0 comments on commit 6b9a47b

Please sign in to comment.