diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java index 1ef6dd997..f57c625d1 100644 --- a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java +++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java @@ -20,6 +20,9 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; import java.util.List; import lombok.extern.log4j.Log4j2; @@ -36,6 +39,7 @@ import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; import org.apache.xtable.spi.extractor.ConversionSource; @@ -97,8 +101,6 @@ public InternalSnapshot getCurrentSnapshot() { .table(internalTable) .version(Long.toString(snapshot.timeMillis())) .partitionedDataFiles(PartitionFileGroup.fromFiles(dataFiles)) - // TODO : Implement pending commits extraction, required for incremental sync - // https://github.com/apache/incubator-xtable/issues/754 .sourceIdentifier(getCommitIdentifier(snapshot)) .build(); } @@ -114,18 +116,106 @@ private Snapshot getLastSnapshot() { @Override public TableChange getTableChangeForCommit(Snapshot snapshot) { - throw new UnsupportedOperationException("Incremental Sync is not supported yet."); + InternalTable tableAtSnapshot = getTable(snapshot); + InternalSchema internalSchema = tableAtSnapshot.getReadSchema(); + + InternalFilesDiff filesDiff = + dataFileExtractor.extractFilesDiff(paimonTable, snapshot, internalSchema); + + return TableChange.builder() + .tableAsOfChange(tableAtSnapshot) + .filesDiff(filesDiff) + .sourceIdentifier(getCommitIdentifier(snapshot)) + .build(); } @Override public CommitsBacklog getCommitsBacklog( InstantsForIncrementalSync instantsForIncrementalSync) { - throw new UnsupportedOperationException("Incremental Sync is not supported yet."); + Instant lastSyncInstant = instantsForIncrementalSync.getLastSyncInstant(); + long lastSyncTimeMillis = lastSyncInstant.toEpochMilli(); + + log.info( + "Getting commits backlog for Paimon table {} from instant {}", + paimonTable.name(), + lastSyncInstant); + + Iterator snapshotIterator; + try { + snapshotIterator = snapshotManager.snapshots(); + } catch (IOException e) { + throw new ReadException("Could not iterate over the Paimon snapshot list", e); + } + + List snapshotsToProcess = new ArrayList<>(); + while (snapshotIterator.hasNext()) { + Snapshot snapshot = snapshotIterator.next(); + // Only include snapshots committed after the last sync + if (snapshot.timeMillis() > lastSyncTimeMillis) { + snapshotsToProcess.add(snapshot); + log.debug( + "Including snapshot {} (time={}, commitId={}) in backlog", + snapshot.id(), + snapshot.timeMillis(), + snapshot.commitIdentifier()); + } + } + + log.info("Found {} snapshots to process for incremental sync", snapshotsToProcess.size()); + + return CommitsBacklog.builder() + .commitsToProcess(snapshotsToProcess) + .inFlightInstants(Collections.emptyList()) + .build(); } @Override public boolean isIncrementalSyncSafeFrom(Instant instant) { - return false; // Incremental sync is not supported yet + long timeInMillis = instant.toEpochMilli(); + + Long earliestSnapshotId = snapshotManager.earliestSnapshotId(); + Long latestSnapshotId = snapshotManager.latestSnapshotId(); + if (earliestSnapshotId == null || latestSnapshotId == null) { + log.warn("No snapshots found in table {}", paimonTable.name()); + return false; + } + + Snapshot earliestSnapshot = snapshotManager.snapshot(earliestSnapshotId); + Snapshot latestSnapshot = snapshotManager.snapshot(latestSnapshotId); + + // Check 1: If instant is in the future (after latest snapshot), return false + if (timeInMillis > latestSnapshot.timeMillis()) { + log.warn( + "Instant {} is in the future. Latest snapshot {} has time {}", + instant, + latestSnapshot.id(), + latestSnapshot.timeMillis()); + return false; + } + + // Check 2: Has snapshot expiration affected this instant? + // If the earliest snapshot is after the requested instant, + // then snapshots have been expired and we can't do incremental sync + if (earliestSnapshot.timeMillis() > timeInMillis) { + log.warn( + "Incremental sync is not safe from instant {}. " + + "Earliest available snapshot {} (time={}) is newer than the requested instant. " + + "Snapshots may have been expired.", + instant, + earliestSnapshot.id(), + earliestSnapshot.timeMillis()); + return false; + } + + // Check 3: Verify a snapshot exists at or before the instant + if (earliestSnapshot.timeMillis() <= timeInMillis) { + log.info( + "Incremental sync is safe from instant {} for table {}", instant, paimonTable.name()); + return true; + } + + log.warn("No snapshot found at or before instant {} for table {}", instant, paimonTable.name()); + return false; } @Override diff --git a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java index 68ccfc3ee..4555b0cfc 100644 --- a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java +++ b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java @@ -18,18 +18,32 @@ package org.apache.xtable.paimon; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.Set; + +import lombok.extern.log4j.Log4j2; import org.apache.paimon.Snapshot; import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.manifest.FileKind; import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; +import org.apache.paimon.manifest.ManifestFileMeta; +import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.snapshot.SnapshotReader; import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.stat.ColumnStat; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; +@Log4j2 public class PaimonDataFileExtractor { private final PaimonPartitionExtractor partitionExtractor = @@ -52,7 +66,16 @@ public List toInternalDataFiles( return result; } - private InternalDataFile toInternalDataFile( + /** + * Converts a Paimon ManifestEntry to an InternalDataFile. This method is used for both full + * snapshot reads and incremental sync. + * + * @param table the Paimon table + * @param entry the manifest entry representing a data file + * @param internalSchema the internal schema for partition value extraction + * @return InternalDataFile representation + */ + public InternalDataFile toInternalDataFile( FileStoreTable table, ManifestEntry entry, InternalSchema internalSchema) { return InternalDataFile.builder() .physicalPath(toFullPhysicalPath(table, entry)) @@ -84,6 +107,60 @@ private List toColumnStats(DataFileMeta file) { return Collections.emptyList(); } + /** + * Extracts file changes (added and removed files) from delta manifests for a given snapshot. This + * method reads only the delta manifests which contain the changes introduced in this specific + * snapshot, making it efficient for incremental sync. + * + * @param table the Paimon table + * @param snapshot the snapshot to extract changes from + * @param internalSchema the internal schema for partition value extraction + * @return InternalFilesDiff containing added and removed files + */ + public InternalFilesDiff extractFilesDiff( + FileStoreTable table, Snapshot snapshot, InternalSchema internalSchema) { + + ManifestList manifestList = table.store().manifestListFactory().create(); + ManifestFile manifestFile = table.store().manifestFileFactory().create(); + + // Read delta manifests - these contain only the changes in this snapshot + List deltaManifests = manifestList.readDeltaManifests(snapshot); + log.debug("Found {} delta manifests for snapshot {}", deltaManifests.size(), snapshot.id()); + + Set addedFiles = new HashSet<>(); + Set removedFiles = new HashSet<>(); + + // For primary key tables, only consider top-level files (fully compacted) + int topLevel = table.coreOptions().numLevels() - 1; + boolean hasPrimaryKeys = !table.schema().primaryKeys().isEmpty(); + + for (ManifestFileMeta manifestMeta : deltaManifests) { + List entries = manifestFile.read(manifestMeta.fileName()); + log.debug("Processing {} manifest entries from {}", entries.size(), manifestMeta.fileName()); + + for (ManifestEntry entry : entries) { + if (hasPrimaryKeys && entry.file().level() != topLevel) { + continue; + } + + InternalDataFile dataFile = toInternalDataFile(table, entry, internalSchema); + if (entry.kind() == FileKind.ADD) { + addedFiles.add(dataFile); + } else if (entry.kind() == FileKind.DELETE) { + removedFiles.add(dataFile); + } + } + } + + log.info( + "Snapshot {} has {} files added and {} files removed", + snapshot.id(), + addedFiles.size(), + removedFiles.size()); + + return InternalFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build(); + } + private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) { // If the table has primary keys, we read only the top level files // which means we can only consider fully compacted files. diff --git a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java index b8ea413bb..532b431e0 100644 --- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java +++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java @@ -149,9 +149,6 @@ private static Stream generateTestParametersForFormatsSyncModesAndPar List arguments = new ArrayList<>(); for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) { for (SyncMode syncMode : SyncMode.values()) { - if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL) - continue; // Paimon does not support incremental sync yet - for (boolean isPartitioned : new boolean[] {true, false}) { arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned)); } diff --git a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java index 4d8f8c2bf..5e28e0106 100644 --- a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java +++ b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java @@ -34,9 +34,11 @@ import org.apache.xtable.GenericTable; import org.apache.xtable.TestPaimonTable; import org.apache.xtable.exception.ReadException; +import org.apache.xtable.model.CommitsBacklog; import org.apache.xtable.model.InstantsForIncrementalSync; import org.apache.xtable.model.InternalSnapshot; import org.apache.xtable.model.InternalTable; +import org.apache.xtable.model.TableChange; import org.apache.xtable.model.storage.DataLayoutStrategy; import org.apache.xtable.model.storage.PartitionFileGroup; import org.apache.xtable.model.storage.TableFormat; @@ -165,38 +167,123 @@ void testGetCurrentSnapshotThrowsExceptionWhenNoSnapshot() { } @Test - void testGetTableChangeForCommitThrowsUnsupportedOperationException() { + void testGetCommitsBacklogReturnsCommitsAfterLastSync() { + // Insert initial data to create first snapshot + testTable.insertRows(5); + Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(firstSnapshot); + + // Insert more data to create second snapshot testTable.insertRows(3); - Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot(); + Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(secondSnapshot); + assertNotEquals(firstSnapshot.id(), secondSnapshot.id()); - UnsupportedOperationException exception = - assertThrows( - UnsupportedOperationException.class, - () -> conversionSource.getTableChangeForCommit(snapshot)); + // Get commits backlog from first snapshot time + InstantsForIncrementalSync instantsForSync = + InstantsForIncrementalSync.builder() + .lastSyncInstant(Instant.ofEpochMilli(firstSnapshot.timeMillis())) + .build(); + + CommitsBacklog backlog = conversionSource.getCommitsBacklog(instantsForSync); + + // Verify we get at least the second snapshot (may get more if insertRows creates multiple) + assertNotNull(backlog); + assertTrue(backlog.getCommitsToProcess().size() >= 1); + + // Verify the last snapshot in the backlog is the second snapshot + assertEquals( + secondSnapshot.id(), + backlog.getCommitsToProcess().get(backlog.getCommitsToProcess().size() - 1).id()); + + // Verify the first snapshot is NOT in the list of commits to process + assertFalse( + backlog.getCommitsToProcess().stream() + .anyMatch(snapshot -> snapshot.id() == firstSnapshot.id()), + "First snapshot should not be in the backlog since we're syncing from that instant"); + assertTrue(backlog.getInFlightInstants().isEmpty()); + } - assertEquals("Incremental Sync is not supported yet.", exception.getMessage()); + @Test + void testGetCommitsBacklogReturnsEmptyForFutureInstant() { + testTable.insertRows(5); + + // Use a future instant + InstantsForIncrementalSync instantsForSync = + InstantsForIncrementalSync.builder() + .lastSyncInstant(Instant.now().plusSeconds(3600)) + .build(); + + CommitsBacklog backlog = conversionSource.getCommitsBacklog(instantsForSync); + + // Verify no snapshots are returned + assertNotNull(backlog); + assertTrue(backlog.getCommitsToProcess().isEmpty()); } @Test - void testGetCommitsBacklogThrowsUnsupportedOperationException() { - InstantsForIncrementalSync mockInstants = - InstantsForIncrementalSync.builder().lastSyncInstant(Instant.now()).build(); + void testGetTableChangeForCommitReturnsCorrectFilesDiff() { + // Insert initial data + testTable.insertRows(5); + Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(firstSnapshot); + + // Insert more data to create second snapshot + testTable.insertRows(3); + Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(secondSnapshot); - UnsupportedOperationException exception = - assertThrows( - UnsupportedOperationException.class, - () -> conversionSource.getCommitsBacklog(mockInstants)); + // Get table change for second snapshot + TableChange tableChange = conversionSource.getTableChangeForCommit(secondSnapshot); - assertEquals("Incremental Sync is not supported yet.", exception.getMessage()); + // Verify table change structure + assertNotNull(tableChange); + assertNotNull(tableChange.getFilesDiff()); + assertNotNull(tableChange.getTableAsOfChange()); + assertEquals( + Long.toString(secondSnapshot.commitIdentifier()), tableChange.getSourceIdentifier()); + + // For append-only table, we should have added files and no removed files + assertTrue(tableChange.getFilesDiff().getFilesAdded().size() > 0); + } + + @Test + void testIsIncrementalSyncSafeFromReturnsTrueForValidInstant() { + testTable.insertRows(5); + Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot(); + Instant snapshotTime = Instant.ofEpochMilli(snapshot.timeMillis()); + + assertTrue(conversionSource.isIncrementalSyncSafeFrom(snapshotTime)); + } + + @Test + void testIsIncrementalSyncSafeFromReturnsFalseForFutureInstant() { + testTable.insertRows(5); + Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot(); + + // Use an instant way in the future (well after the snapshot) + Instant futureInstant = Instant.ofEpochMilli(snapshot.timeMillis()).plusSeconds(3600); + + assertFalse(conversionSource.isIncrementalSyncSafeFrom(futureInstant)); + } + + @Test + void testIsIncrementalSyncSafeFromReturnsFalseForEmptyTable() { + // Don't insert any data + Instant someInstant = Instant.now(); + + assertFalse(conversionSource.isIncrementalSyncSafeFrom(someInstant)); } @Test - void testIsIncrementalSyncSafeFromReturnsFalse() { - Instant testInstant = Instant.now(); + void testIsIncrementalSyncSafeFromReturnsFalseForInstantBeforeFirstSnapshot() { + testTable.insertRows(5); + Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot(); - boolean result = conversionSource.isIncrementalSyncSafeFrom(testInstant); + Instant instantBeforeFirstSnapshot = + Instant.ofEpochMilli(snapshot.timeMillis()).minusSeconds(3600); - assertFalse(result); + assertFalse(conversionSource.isIncrementalSyncSafeFrom(instantBeforeFirstSnapshot)); } @Test diff --git a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java index 9f906516c..0f3ed30d7 100644 --- a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java +++ b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; +import org.apache.paimon.Snapshot; import org.apache.paimon.table.FileStoreTable; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -37,6 +38,7 @@ import org.apache.xtable.model.schema.InternalSchema; import org.apache.xtable.model.schema.InternalType; import org.apache.xtable.model.storage.InternalDataFile; +import org.apache.xtable.model.storage.InternalFilesDiff; public class TestPaimonDataFileExtractor { private static final PaimonDataFileExtractor extractor = PaimonDataFileExtractor.getInstance(); @@ -147,6 +149,102 @@ void testColumnStatsAreEmpty() { } } + @Test + void testExtractFilesDiffWithNewFiles() { + createUnpartitionedTable(); + + // Insert initial data + testTable.insertRows(5); + Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(firstSnapshot); + + // Insert more data to create a second snapshot + testTable.insertRows(3); + Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(secondSnapshot); + + InternalFilesDiff filesDiff = + extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema); + + // Verify we have replaced the single file on this setup + assertNotNull(filesDiff); + assertNotNull(filesDiff.getFilesAdded()); + assertEquals(1, filesDiff.getFilesAdded().size()); + // Note: Even for inserts, Paimon tables with primary keys (which all test tables have) + // may have removed files due to compaction. The compaction merges files, so old files are + // removed + // and new compacted files are added. This is expected behavior. + assertNotNull(filesDiff.getFilesRemoved()); + assertEquals(1, filesDiff.getFilesRemoved().size()); + } + + @Test + void testExtractFilesDiffWithPartitionedTable() { + createPartitionedTable(); + + // Insert initial data + testTable.insertRows(5); + Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(firstSnapshot); + + // Insert more data + testTable.insertRows(3); + Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(secondSnapshot); + + InternalFilesDiff filesDiff = + extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema); + + // Verify we have added files with partition values + assertNotNull(filesDiff); + assertTrue(filesDiff.getFilesAdded().size() > 0); + + for (InternalDataFile file : filesDiff.dataFilesAdded()) { + assertNotNull(file.getPartitionValues()); + } + } + + @Test + void testExtractFilesDiffWithTableWithPrimaryKeys() { + createTableWithPrimaryKeys(); + + // Insert initial data + testTable.insertRows(5); + Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(firstSnapshot); + + // Insert more data to create compaction + testTable.insertRows(3); + Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(secondSnapshot); + + InternalFilesDiff filesDiff = + extractor.extractFilesDiff(paimonTable, secondSnapshot, testSchema); + + // Verify the diff is returned (size may vary based on compaction) + assertNotNull(filesDiff); + assertNotNull(filesDiff.getFilesAdded()); + assertNotNull(filesDiff.getFilesRemoved()); + } + + @Test + void testExtractFilesDiffForFirstSnapshot() { + createUnpartitionedTable(); + + // Insert data to create first snapshot + testTable.insertRows(5); + Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot(); + assertNotNull(firstSnapshot); + + InternalFilesDiff filesDiff = + extractor.extractFilesDiff(paimonTable, firstSnapshot, testSchema); + + // First snapshot should only have added files + assertNotNull(filesDiff); + assertTrue(filesDiff.getFilesAdded().size() > 0); + assertEquals(0, filesDiff.getFilesRemoved().size()); + } + private void createUnpartitionedTable() { testTable = (TestPaimonTable)