Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;

import lombok.extern.log4j.Log4j2;
Expand All @@ -36,6 +39,7 @@
import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.storage.DataLayoutStrategy;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFilesDiff;
import org.apache.xtable.model.storage.PartitionFileGroup;
import org.apache.xtable.model.storage.TableFormat;
import org.apache.xtable.spi.extractor.ConversionSource;
Expand Down Expand Up @@ -97,8 +101,6 @@ public InternalSnapshot getCurrentSnapshot() {
.table(internalTable)
.version(Long.toString(snapshot.timeMillis()))
.partitionedDataFiles(PartitionFileGroup.fromFiles(dataFiles))
// TODO : Implement pending commits extraction, required for incremental sync
// https://github.com/apache/incubator-xtable/issues/754
.sourceIdentifier(getCommitIdentifier(snapshot))
.build();
}
Expand All @@ -114,18 +116,106 @@ private Snapshot getLastSnapshot() {

@Override
public TableChange getTableChangeForCommit(Snapshot snapshot) {
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
InternalTable tableAtSnapshot = getTable(snapshot);
InternalSchema internalSchema = tableAtSnapshot.getReadSchema();

InternalFilesDiff filesDiff =
dataFileExtractor.extractFilesDiff(paimonTable, snapshot, internalSchema);

return TableChange.builder()
.tableAsOfChange(tableAtSnapshot)
.filesDiff(filesDiff)
.sourceIdentifier(getCommitIdentifier(snapshot))
.build();
}

@Override
public CommitsBacklog<Snapshot> getCommitsBacklog(
InstantsForIncrementalSync instantsForIncrementalSync) {
throw new UnsupportedOperationException("Incremental Sync is not supported yet.");
Instant lastSyncInstant = instantsForIncrementalSync.getLastSyncInstant();
long lastSyncTimeMillis = lastSyncInstant.toEpochMilli();

log.info(
"Getting commits backlog for Paimon table {} from instant {}",
paimonTable.name(),
lastSyncInstant);

Iterator<Snapshot> snapshotIterator;
try {
snapshotIterator = snapshotManager.snapshots();
} catch (IOException e) {
throw new ReadException("Could not iterate over the Paimon snapshot list", e);
}

List<Snapshot> snapshotsToProcess = new ArrayList<>();
while (snapshotIterator.hasNext()) {
Snapshot snapshot = snapshotIterator.next();
// Only include snapshots committed after the last sync
if (snapshot.timeMillis() > lastSyncTimeMillis) {
snapshotsToProcess.add(snapshot);
log.debug(
"Including snapshot {} (time={}, commitId={}) in backlog",
snapshot.id(),
snapshot.timeMillis(),
snapshot.commitIdentifier());
}
}

log.info("Found {} snapshots to process for incremental sync", snapshotsToProcess.size());

return CommitsBacklog.<Snapshot>builder()
.commitsToProcess(snapshotsToProcess)
.inFlightInstants(Collections.emptyList())
.build();
}

@Override
public boolean isIncrementalSyncSafeFrom(Instant instant) {
return false; // Incremental sync is not supported yet
long timeInMillis = instant.toEpochMilli();

Long earliestSnapshotId = snapshotManager.earliestSnapshotId();
Long latestSnapshotId = snapshotManager.latestSnapshotId();
if (earliestSnapshotId == null || latestSnapshotId == null) {
log.warn("No snapshots found in table {}", paimonTable.name());
return false;
}

Snapshot earliestSnapshot = snapshotManager.snapshot(earliestSnapshotId);
Snapshot latestSnapshot = snapshotManager.snapshot(latestSnapshotId);

// Check 1: If instant is in the future (after latest snapshot), return false
if (timeInMillis > latestSnapshot.timeMillis()) {
log.warn(
"Instant {} is in the future. Latest snapshot {} has time {}",
instant,
latestSnapshot.id(),
latestSnapshot.timeMillis());
return false;
}

// Check 2: Has snapshot expiration affected this instant?
// If the earliest snapshot is after the requested instant,
// then snapshots have been expired and we can't do incremental sync
if (earliestSnapshot.timeMillis() > timeInMillis) {
log.warn(
"Incremental sync is not safe from instant {}. "
+ "Earliest available snapshot {} (time={}) is newer than the requested instant. "
+ "Snapshots may have been expired.",
instant,
earliestSnapshot.id(),
earliestSnapshot.timeMillis());
return false;
}

// Check 3: Verify a snapshot exists at or before the instant
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this check simply check that the earliestSnapshot.timeMillis <= timeInMillis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, much better now!

if (earliestSnapshot.timeMillis() <= timeInMillis) {
log.info(
"Incremental sync is safe from instant {} for table {}", instant, paimonTable.name());
return true;
}

log.warn("No snapshot found at or before instant {} for table {}", instant, paimonTable.name());
return false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,32 @@

package org.apache.xtable.paimon;

import java.util.*;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;

import lombok.extern.log4j.Log4j2;

import org.apache.paimon.Snapshot;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.snapshot.SnapshotReader;

import org.apache.xtable.model.schema.InternalSchema;
import org.apache.xtable.model.stat.ColumnStat;
import org.apache.xtable.model.storage.InternalDataFile;
import org.apache.xtable.model.storage.InternalFilesDiff;

@Log4j2
public class PaimonDataFileExtractor {

private final PaimonPartitionExtractor partitionExtractor =
Expand All @@ -52,7 +66,16 @@ public List<InternalDataFile> toInternalDataFiles(
return result;
}

private InternalDataFile toInternalDataFile(
/**
* Converts a Paimon ManifestEntry to an InternalDataFile. This method is used for both full
* snapshot reads and incremental sync.
*
* @param table the Paimon table
* @param entry the manifest entry representing a data file
* @param internalSchema the internal schema for partition value extraction
* @return InternalDataFile representation
*/
public InternalDataFile toInternalDataFile(
FileStoreTable table, ManifestEntry entry, InternalSchema internalSchema) {
return InternalDataFile.builder()
.physicalPath(toFullPhysicalPath(table, entry))
Expand Down Expand Up @@ -84,6 +107,60 @@ private List<ColumnStat> toColumnStats(DataFileMeta file) {
return Collections.emptyList();
}

/**
* Extracts file changes (added and removed files) from delta manifests for a given snapshot. This
* method reads only the delta manifests which contain the changes introduced in this specific
* snapshot, making it efficient for incremental sync.
*
* @param table the Paimon table
* @param snapshot the snapshot to extract changes from
* @param internalSchema the internal schema for partition value extraction
* @return InternalFilesDiff containing added and removed files
*/
public InternalFilesDiff extractFilesDiff(
FileStoreTable table, Snapshot snapshot, InternalSchema internalSchema) {

ManifestList manifestList = table.store().manifestListFactory().create();
ManifestFile manifestFile = table.store().manifestFileFactory().create();

// Read delta manifests - these contain only the changes in this snapshot
List<ManifestFileMeta> deltaManifests = manifestList.readDeltaManifests(snapshot);
log.debug("Found {} delta manifests for snapshot {}", deltaManifests.size(), snapshot.id());

Set<InternalDataFile> addedFiles = new HashSet<>();
Set<InternalDataFile> removedFiles = new HashSet<>();

// For primary key tables, only consider top-level files (fully compacted)
int topLevel = table.coreOptions().numLevels() - 1;
boolean hasPrimaryKeys = !table.schema().primaryKeys().isEmpty();

for (ManifestFileMeta manifestMeta : deltaManifests) {
List<ManifestEntry> entries = manifestFile.read(manifestMeta.fileName());
log.debug("Processing {} manifest entries from {}", entries.size(), manifestMeta.fileName());

for (ManifestEntry entry : entries) {
if (hasPrimaryKeys && entry.file().level() != topLevel) {
continue;
}

InternalDataFile dataFile = toInternalDataFile(table, entry, internalSchema);
if (entry.kind() == FileKind.ADD) {
addedFiles.add(dataFile);
} else if (entry.kind() == FileKind.DELETE) {
removedFiles.add(dataFile);
}
}
}

log.info(
"Snapshot {} has {} files added and {} files removed",
snapshot.id(),
addedFiles.size(),
removedFiles.size());

return InternalFilesDiff.builder().filesAdded(addedFiles).filesRemoved(removedFiles).build();
}

private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot snapshot) {
// If the table has primary keys, we read only the top level files
// which means we can only consider fully compacted files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,6 @@ private static Stream<Arguments> generateTestParametersForFormatsSyncModesAndPar
List<Arguments> arguments = new ArrayList<>();
for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) {
for (SyncMode syncMode : SyncMode.values()) {
if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL)
continue; // Paimon does not support incremental sync yet

for (boolean isPartitioned : new boolean[] {true, false}) {
arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned));
}
Expand Down
Loading