Skip to content

Flink: FLIP-27 source enumerator help classes#4329

Merged
rdblue merged 11 commits intoapache:masterfrom
stevenzwu:SplitEnumerator
Jun 6, 2022
Merged

Flink: FLIP-27 source enumerator help classes#4329
rdblue merged 11 commits intoapache:masterfrom
stevenzwu:SplitEnumerator

Conversation

@stevenzwu
Copy link
Contributor

@stevenzwu stevenzwu commented Mar 15, 2022

It mainly contains classes for streaming read (continuous split discovery) along with config and enumeration position classes.

*/
class ScanContext implements Serializable {
@Internal
public class ScanContext implements Serializable {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is made public because it is accessed by ContinuousSplitPlannerImpl class in this PR.

Copy link
Contributor

@yittg yittg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @stevenzwu, i have some question about this change, hoping you can help

Comment on lines 58 to 66
private void validate() {
Preconditions.checkArgument(scanContext.snapshotId() == null,
"Can't set snapshotId in ScanContext for continuous enumerator");
Preconditions.checkArgument(scanContext.asOfTimestamp() == null,
"Can't set asOfTimestamp in ScanContext for continuous enumerator");
Preconditions.checkArgument(scanContext.startSnapshotId() == null,
"Can't set startSnapshotId in ScanContext for continuous enumerator");
Preconditions.checkArgument(scanContext.endSnapshotId() == null,
"Can't set endSnapshotId in ScanContext for continuous enumerator");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that mean scanContext#monitorInterval should be null, because it is configured in IcebergEnumeratorConfig?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that is a good question. I am also pondering maybe we should merge the IcebergEnumeratorConfig with ScanContext. As you pointed out, there are some overlapped configs, like monitorInterval, startSnapshotId. Please let me know your preference. Will also get some input from Ryan.

Right now, I am leaning toward merging them.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merged the IcebergEnumeratorConfig into ScanContext.

Comment on lines 122 to 125
public Builder startingStrategy(StartingStrategy strategy) {
this.startingStrategy = strategy;
return this;
}
Copy link
Contributor

@yittg yittg Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to use named strategy method accepting required options? like

Suggested change
public Builder startingStrategy(StartingStrategy strategy) {
this.startingStrategy = strategy;
return this;
}
public Builder startingWithSpecificSnapshot(long startId) {
this.startingStrategy = StartingStrategy.SPECIFIC_START_SNAPSHOT_ID;
this.startSnapshotId = startId;
return this;
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, I like your suggestion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually while working on merging the configs to ScanContext, I think we need to stay in the simple POJO style for ScanContext.Builder.fromProperties

}

@VisibleForTesting
static HistoryEntry getStartSnapshot(Table table, IcebergEnumeratorConfig enumeratorConfig) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a little confused whether the start snapshot is counted or not? it is exclusive based on TableScan#appendsBetween. However, it does not match the intuition for strategy EARLIEST_SNAPSHOT, even SPECIFIC_START_SNAPSHOT_ID and SPECIFIC_START_SNAPSHOT_TIMESTAMP

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start snapshot is exclusive. I am thinking that we can document the behavior better for StartingStrategy enum class.

Copy link
Contributor

@yittg yittg Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, these strategies mean semantic with AFTER?

EARLIEST_SNAPSHOT actually means START_AFTER_ EARLIEST_SNAPSHOT;
SPECIFIC_START_SNAPSHOT_ID actually means START_AFTER_SNAPSHOT_ID;
SPECIFIC_START_SNAPSHOT_TIMESTAMP actually means START_AFTER_SNAPSHOT_TIMESTAMP;

@stevenzwu stevenzwu force-pushed the SplitEnumerator branch 3 times, most recently from c70e105 to 651affb Compare March 17, 2022 16:06
if (isStreaming) {
if (startingStrategy == StreamingStartingStrategy.SPECIFIC_START_SNAPSHOT_ID) {
Preconditions.checkArgument(startSnapshotId != null,
"startSnapshotId cannot be null for SPECIFIC_START_SNAPSHOT_ID starting strategy");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to remove the specific variable names from the error messages. And we also phrase error messages more directly: (Problem): (context)

This should probably be: Invalid starting snapshot for SPECIFIC_START_SNAPSHOT_ID strategy: null

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

* Start incremental mode from a specific startTimestamp.
* Starting snapshot has a timestamp lower than or equal to the specified timestamp.
*/
SPECIFIC_START_SNAPSHOT_TIMESTAMP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if table history doesn't go back that far? It should probably fail because the user's request can't be satisfied.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switched to SnapshotUtil#snapshotIdAsOfTime, which handles this scenario internally (throwing exception)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

INCREMENTAL_AFTER_TIMESTAMP?

/**
* Start incremental mode from a specific startSnapshotId
*/
SPECIFIC_START_SNAPSHOT_ID,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inclusive of the changes in this snapshot?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mentioned the exclusive behave at the class-level Javadoc

this.scanContext = scanContext;
// Within a JVM, table name should be unique across sources.
// Hence it is used as worker pool thread name prefix.
this.workerPool = ThreadPools.newWorkerPool("iceberg-worker-pool-" + table.name(), scanContext.planParallelism());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible to process the table twice in the same JVM? I agree operator ID would be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make the thread pool name a constructor argument.

Because FLIP-27 source interface doesn't expose the operator ID, for now tableName-UUID is used to guarantee uniqueness. comments will be updated.

public ContinuousEnumerationResult planSplits(IcebergEnumeratorPosition lastPosition) {
table.refresh();
if (lastPosition != null) {
return discoverDeltaSplits(lastPosition);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would avoid using "Delta" because that has multiple meanings. Incremental is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

}

@VisibleForTesting
static HistoryEntry getStartSnapshot(Table table, ScanContext scanContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you probably want to use SnapshotUtil, which handles most of these cases using the current table state's ancestors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will switch

"Snapshot id not found in history: {}" + scanContext.startSnapshotId());
}
break;
case SPECIFIC_START_SNAPSHOT_TIMESTAMP:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will switch

case LATEST_SNAPSHOT:
startEntry = historyEntries.get(historyEntries.size() - 1);
break;
case EARLIEST_SNAPSHOT:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will switch

switch (scanContext.startingStrategy()) {
case TABLE_SCAN_THEN_INCREMENTAL:
case LATEST_SNAPSHOT:
startEntry = historyEntries.get(historyEntries.size() - 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table.currentSnapshot()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will switch

startEntry = historyEntries.get(0);
break;
case SPECIFIC_START_SNAPSHOT_ID:
Optional<HistoryEntry> matchedEntry = historyEntries.stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

table.snapshot(snapshotId)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will switch

*/
private ContinuousEnumerationResult discoverInitialSplits() {
HistoryEntry startSnapshotEntry = getStartSnapshot(table, scanContext);
LOG.info("get startSnapshotId {} based on starting strategy {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean up log message?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update

}
if (startingStrategy == StreamingStartingStrategy.SPECIFIC_START_SNAPSHOT_TIMESTAMP) {
Preconditions.checkArgument(startSnapshotTimestamp != null,
"startSnapshotTimestamp cannot be null for SPECIFIC_START_SNAPSHOT_TIMESTAMP starting strategy");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we validate that startSnapshotId isn't supplied if the user has set the startingStrategy to be by timestamp.

As well as making sure there's not a provided startSnapshotTimestamp when using snapshot id starting strategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure. will add them

this.table = table;
this.scanContext = scanContext;
this.workerPool = ThreadPools.newWorkerPool(
"iceberg-enumerator-pool-" + threadPoolName, scanContext.planParallelism());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Would it be more informative to mention split-planner-pool-** or something that correlates more to the class name?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we use the current naming convention in FlinkInputFormat, we can probably change it to iceberg-plan-worker-pool-<threadPoolName>.

}

@VisibleForTesting
static Snapshot getStartSnapshot(Table table, ScanContext scanContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would you deal with an empty table, i.e. no snapshots are produced?

Guess this method should return an Optional<Snapshot> ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. it is a corner case. will switch to Optional and also add a unit test

if (matchedSnapshotById != null) {
return Optional.of(matchedSnapshotById);
} else {
throw new IllegalArgumentException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We usually prefer Preconditions.checkArgument instead of an extra if statement.

case INCREMENTAL_FROM_SNAPSHOT_TIMESTAMP:
long snapshotIdAsOfTime = SnapshotUtil.snapshotIdAsOfTime(table, scanContext.startSnapshotTimestamp());
Snapshot matchedSnapshotByTimestamp = table.snapshot(snapshotIdAsOfTime);
if (matchedSnapshotByTimestamp != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are guaranteed that the snapshot from snapshotIdAsOfTime is known, then we can avoid this check.

LOG.info("Skip incremental scan because table is empty");
return new ContinuousEnumerationResult(Collections.emptyList(), lastPosition, lastPosition);
} else {
if (lastPosition.snapshotId() != null && currentSnapshot.snapshotId() == lastPosition.snapshotId()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: this could be an else if

import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;

public class TestContinuousSplitPlannerImplStartStrategy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good!

Assert.assertEquals(1, result.splits().size());
IcebergSourceSplit split = result.splits().iterator().next();
Assert.assertEquals(1, split.task().files().size());
Assert.assertEquals(dataFile.path().toString(), split.task().files().iterator().next().file().path().toString());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: you can use Iterables.getOnlyElement to avoid calling iterator().next() without validation.

public ContinuousSplitPlannerImpl(Table table, ScanContext scanContext, String threadPoolName) {
this.table = table;
this.scanContext = scanContext;
this.workerPool = ThreadPools.newWorkerPool(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: for testing, could we set this to null so that we don't spawn a worker pool?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the pool is never closed. Should this be closeable? If not, should we pass in the worker pool so that its lifecycle is attached to a closeable instance?

ContinuousEnumerationResult emptyTableInitialDiscoveryResult = splitPlanner.planSplits(null);
Assert.assertTrue(emptyTableInitialDiscoveryResult.splits().isEmpty());
Assert.assertNull(emptyTableInitialDiscoveryResult.fromPosition());
Assert.assertNull(emptyTableInitialDiscoveryResult.toPosition().snapshotId());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to add an isEmpty method to check this rather than checking snapshotId() is null.

@github-actions github-actions bot added the build label Jun 6, 2022
@rdblue rdblue merged commit 31dafee into apache:master Jun 6, 2022
@rdblue
Copy link
Contributor

rdblue commented Jun 6, 2022

Thanks, @stevenzwu!

namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
namrathamyske pushed a commit to namrathamyske/iceberg that referenced this pull request Jul 10, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants