Skip to content

Commit 23d870e

Browse files
rbokkaashvina
rbokka
authored andcommitted
Modified incremental safe check which does not depend on snapshots list order
Co-authored-by: Timothy Brown <tim@onehouse.ai>
1 parent 50e5181 commit 23d870e

File tree

3 files changed

+45
-33
lines changed

3 files changed

+45
-33
lines changed

xtable-core/src/main/java/org/apache/xtable/iceberg/IcebergConversionSource.java

+14-21
Original file line numberDiff line numberDiff line change
@@ -246,31 +246,24 @@ public CommitsBacklog<Snapshot> getCommitsBacklog(InstantsForIncrementalSync las
246246
public boolean isIncrementalSyncSafeFrom(Instant instant) {
247247
long timeInMillis = instant.toEpochMilli();
248248
Table iceTable = getSourceTable();
249-
boolean doesInstantOfAgeExists = false;
250-
Long targetSnapshotId = null;
251-
for (Snapshot snapshot : iceTable.snapshots()) {
252-
if (snapshot.timestampMillis() <= timeInMillis) {
253-
doesInstantOfAgeExists = true;
254-
targetSnapshotId = snapshot.snapshotId();
255-
} else {
256-
break;
249+
Snapshot currentSnapshot = iceTable.currentSnapshot();
250+
251+
while (currentSnapshot != null && currentSnapshot.timestampMillis() > timeInMillis) {
252+
Long parentSnapshotId = currentSnapshot.parentId();
253+
if (parentSnapshotId == null) {
254+
// no more snapshots in the chain and did not find targetSnapshot
255+
return false;
257256
}
258-
}
259-
if (!doesInstantOfAgeExists) {
260-
return false;
261-
}
262-
// Go from latest snapshot until targetSnapshotId through parent reference.
263-
// nothing has to be null in this chain to guarantee safety of incremental sync.
264-
Long currentSnapshotId = iceTable.currentSnapshot().snapshotId();
265-
while (currentSnapshotId != null && currentSnapshotId != targetSnapshotId) {
266-
Snapshot currentSnapshot = iceTable.snapshot(currentSnapshotId);
267-
if (currentSnapshot == null) {
268-
// The snapshot is expired.
257+
258+
Snapshot parentSnapshot = iceTable.snapshot(parentSnapshotId);
259+
if (parentSnapshot == null) {
260+
// chain is broken due to expired snapshot
261+
log.info("Expired snapshot id: {}", parentSnapshotId);
269262
return false;
270263
}
271-
currentSnapshotId = currentSnapshot.parentId();
264+
currentSnapshot = parentSnapshot;
272265
}
273-
return true;
266+
return currentSnapshot != null;
274267
}
275268

276269
@Override

xtable-core/src/test/java/org/apache/xtable/TestIcebergTable.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ public String getFilterQuery() {
307307
return String.format("%s > 'aaa'", icebergDataHelper.getRecordKeyField());
308308
}
309309

310-
public Long getLastCommitTimestamp() {
310+
public long getLastCommitTimestamp() {
311311
return getLatestSnapshot().timestampMillis();
312312
}
313313

xtable-core/src/test/java/org/apache/xtable/iceberg/ITIcebergConversionSource.java

+30-11
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public void testInsertsUpsertsAndDeletes(boolean isPartitioned) {
146146
List<TableChange> allTableChanges = new ArrayList<>();
147147

148148
testIcebergTable.insertRows(50);
149-
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
149+
long timestamp1 = testIcebergTable.getLastCommitTimestamp();
150150
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
151151

152152
List<Record> records1 = testIcebergTable.insertRows(50);
@@ -204,7 +204,7 @@ public void testDropPartition() {
204204
List<TableChange> allTableChanges = new ArrayList<>();
205205

206206
List<Record> records1 = testIcebergTable.insertRows(50);
207-
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
207+
long timestamp1 = testIcebergTable.getLastCommitTimestamp();
208208
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
209209

210210
List<Record> records2 = testIcebergTable.insertRows(50);
@@ -264,7 +264,7 @@ public void testDeleteAllRecordsInPartition() {
264264
List<TableChange> allTableChanges = new ArrayList<>();
265265

266266
List<Record> records1 = testIcebergTable.insertRows(50);
267-
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
267+
long timestamp1 = testIcebergTable.getLastCommitTimestamp();
268268
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
269269

270270
List<Record> records2 = testIcebergTable.insertRows(50);
@@ -325,7 +325,7 @@ public void testExpireSnapshots(boolean isPartitioned) throws InterruptedExcepti
325325
List<TableChange> allTableChanges = new ArrayList<>();
326326

327327
List<Record> records1 = testIcebergTable.insertRows(50);
328-
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
328+
long timestamp1 = testIcebergTable.getLastCommitTimestamp();
329329

330330
testIcebergTable.upsertRows(records1.subList(0, 20));
331331
allActiveFiles.add(testIcebergTable.getAllActiveFiles());
@@ -383,8 +383,8 @@ public void testForIncrementalSyncSafetyCheck(boolean shouldExpireSnapshots) {
383383
TestIcebergTable.forStandardSchemaAndPartitioning(
384384
tableName, "level", tempDir, hadoopConf)) {
385385
// Insert 50 rows to INFO partition.
386-
List<Record> commit1Rows = testIcebergTable.insertRecordsForPartition(50, "INFO");
387-
Long timestamp1 = testIcebergTable.getLastCommitTimestamp();
386+
List<Record> firstCommitRows = testIcebergTable.insertRecordsForPartition(50, "INFO");
387+
long timestampAfterFirstCommit = testIcebergTable.getLastCommitTimestamp();
388388
SourceTable tableConfig =
389389
SourceTable.builder()
390390
.name(testIcebergTable.getTableName())
@@ -393,23 +393,42 @@ public void testForIncrementalSyncSafetyCheck(boolean shouldExpireSnapshots) {
393393
.build();
394394

395395
// Upsert all rows inserted before, so all files are replaced.
396-
testIcebergTable.upsertRows(commit1Rows.subList(0, 50));
397-
long snapshotIdAfterCommit2 = testIcebergTable.getLatestSnapshot().snapshotId();
396+
testIcebergTable.upsertRows(firstCommitRows.subList(0, 50));
397+
long timestampAfterSecondCommit = testIcebergTable.getLastCommitTimestamp();
398+
long snapshotIdAfterSecondCommit = testIcebergTable.getLatestSnapshot().snapshotId();
398399

399400
// Insert 50 rows to different (ERROR) partition.
400401
testIcebergTable.insertRecordsForPartition(50, "ERROR");
402+
long timestampAfterThirdCommit = testIcebergTable.getLastCommitTimestamp();
401403

402404
if (shouldExpireSnapshots) {
403405
// Expire snapshotAfterCommit2.
404-
testIcebergTable.expireSnapshot(snapshotIdAfterCommit2);
406+
testIcebergTable.expireSnapshot(snapshotIdAfterSecondCommit);
405407
}
406408
IcebergConversionSource conversionSource =
407409
sourceProvider.getConversionSourceInstance(tableConfig);
408410
if (shouldExpireSnapshots) {
409-
assertFalse(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
411+
// Since the second snapshot is expired, we cannot safely perform incremental sync from the
412+
// first two commits
413+
assertFalse(
414+
conversionSource.isIncrementalSyncSafeFrom(
415+
Instant.ofEpochMilli(timestampAfterFirstCommit)));
416+
assertFalse(
417+
conversionSource.isIncrementalSyncSafeFrom(
418+
Instant.ofEpochMilli(timestampAfterSecondCommit)));
410419
} else {
411-
assertTrue(conversionSource.isIncrementalSyncSafeFrom(Instant.ofEpochMilli(timestamp1)));
420+
// The full history is still present so incremental sync is safe from any of these commits
421+
assertTrue(
422+
conversionSource.isIncrementalSyncSafeFrom(
423+
Instant.ofEpochMilli(timestampAfterFirstCommit)));
424+
assertTrue(
425+
conversionSource.isIncrementalSyncSafeFrom(
426+
Instant.ofEpochMilli(timestampAfterSecondCommit)));
412427
}
428+
// Table always has the last commit so incremental sync is safe
429+
assertTrue(
430+
conversionSource.isIncrementalSyncSafeFrom(
431+
Instant.ofEpochMilli(timestampAfterThirdCommit)));
413432
// Table doesn't have instant of this older commit, hence it is not safe.
414433
Instant instantAsOfHourAgo = Instant.now().minus(1, ChronoUnit.HOURS);
415434
assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantAsOfHourAgo));

0 commit comments

Comments
 (0)