Skip to content

Conversation

@mikedias
Copy link
Contributor

Implements #754

What is the purpose of the pull request

Implement Paimon Source Incremental Sync

Brief change log

• Implemented getTableChangeForCommit() to extract file changes (added/removed) from delta manifests for
incremental sync
• Implemented getCommitsBacklog() to identify snapshots that need to be processed since the last sync instant
• Implemented isIncrementalSyncSafeFrom() to validate if incremental sync is safe from a given instant by checking
snapshot availability

Verify this pull request

• Added tests in TestPaimonDataFileExtractor to cover the extractFilesDiff() logic.
• Added tests in TestPaimonConversionSource to cover the incremental sync methods.
• Existing integration tests in ITConversionController verify end-to-end incremental sync behavior

return false;
}

// Check 3: Verify a snapshot exists at or before the instant
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this check simply check that the earliestSnapshot.timeMillis <= timeInMillis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, much better now!

Comment on lines 170 to 173
assertTrue(filesDiff.getFilesAdded().size() > 0);

// Verify removed files collection exists (size may vary based on compaction behavior)
assertNotNull(filesDiff.getFilesRemoved());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any way to configure the test so that the sizes are predictable and we can assert on them?

Also should we assert the files removed is non-zero?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On this current setup it will always be 1. I've changed to reflect that.


CommitsBacklog<Snapshot> backlog = conversionSource.getCommitsBacklog(instantsForSync);

// Verify we get at least the second snapshot (may get more if insertRows creates multiple)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with Paimon, what would cause a single round of inserts to create multiple snapshots?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Paimon can trigger compaction at different times depending on the settings. It wouldn't be the case for this particular test, but it would make it flaky if we change things and don't consider this scenario on the test.


// Verify we get at least the second snapshot (may get more if insertRows creates multiple)
assertNotNull(backlog);
assertTrue(backlog.getCommitsToProcess().size() >= 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we cannot know the size for certain upfront, we may want to assert that the first snapshot is not in the list of commits to process

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@Test
void testIsIncrementalSyncSafeFromReturnsFalse() {
Instant testInstant = Instant.now();
void testIsIncrementalSyncSafeFromReturnsTrueForValidInstant() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test where the IsIncrementalSyncSafeFrom returns false since the instant is before the first snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!


// Insert more data to create a second snapshot
testTable.insertRows(3);
org.apache.paimon.Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we import org.apache.paimon.Snapshot here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@the-other-tim-brown the-other-tim-brown merged commit f9ee948 into apache:main Jan 6, 2026
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants