Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support refreshing Iceberg tables #5707

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
470b09c
Initial commit of refreshing Iceberg.
lbooker42 Jul 2, 2024
a8d957a
Rebased to main.
lbooker42 Jul 2, 2024
264fdb1
Change IcebergInstructions refreshing indicator to enum instead of bo…
lbooker42 Jul 2, 2024
58d0a73
WIP, for review
lbooker42 Jul 3, 2024
e090474
Manual and auto-refreshing working, better documentation.
lbooker42 Jul 23, 2024
57021ad
Addressed more PR comments, some remaining.
lbooker42 Jul 23, 2024
fb882e8
WIP, some PR comments addressed.
lbooker42 Jul 26, 2024
5bbdeb2
WIP, even more PR comments addressed.
lbooker42 Jul 27, 2024
3da205c
Nearly all PR comments addressed.
lbooker42 Jul 27, 2024
91acf9b
merged with main
lbooker42 Jul 27, 2024
08dd329
Adjustment to IcebergInstructions update mode.
lbooker42 Jul 29, 2024
7af0d1d
Added python wrapper for Iceberg refreshing tables.
lbooker42 Jul 29, 2024
2d79c38
Changes to mocked tests for ColumnSourceManager and PartitionAwareSou…
lbooker42 Jul 29, 2024
3809f21
Added DHError handler and add'l documentation to python `snapshots()`…
lbooker42 Jul 30, 2024
5273a15
Fixed typo in JavaDoc
lbooker42 Jul 30, 2024
b9e2c6e
WIP
lbooker42 Jul 31, 2024
9937f79
Suggestion from review
lbooker42 Jul 31, 2024
cd08038
WIP, changes to revert some transaction token code.
lbooker42 Jul 31, 2024
f28325f
Correct logic across multiple transactions.
lbooker42 Aug 21, 2024
2d92b3f
Merged with main
lbooker42 Aug 21, 2024
cd31d82
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 23, 2024
d680c0c
Moved transaction accumulation to AbstractTableLocationProvider
lbooker42 Aug 26, 2024
6607fc3
PR comments addressed.
lbooker42 Aug 28, 2024
893336f
Updated to use IcebergTableAdapter and exposed in python. Addressed P…
lbooker42 Aug 30, 2024
68e4546
Incorporated external PR to update PartitioningColumnDataIndex for re…
lbooker42 Aug 30, 2024
273f5c1
Added additional snapshots with removes to IcebergToolsTest resources.
lbooker42 Sep 3, 2024
1e92a19
Merge branch 'main' into lab-iceberg-refreshing
lbooker42 Sep 3, 2024
f72c1b7
Manual and auto refreshing tests for Iceberg.
lbooker42 Sep 4, 2024
5c7ff12
Manual and auto refreshing tests for Iceberg, not passing.
lbooker42 Sep 4, 2024
92eec61
PR comments addressed.
lbooker42 Sep 4, 2024
95194b7
Implemented improved location reference counting in AbstractTableLoca…
lbooker42 Sep 5, 2024
09e2b6e
Fixing doc problem.
lbooker42 Sep 5, 2024
30910dd
For review only, does not compile :(
lbooker42 Sep 12, 2024
dd12240
Compiles now, still many problems
lbooker42 Sep 13, 2024
944dac6
Working through problems.
lbooker42 Sep 13, 2024
912b9f2
Cleanup and minor changes
lbooker42 Sep 13, 2024
e2b6fd0
Refactored ATLP
lbooker42 Sep 18, 2024
72b03be
Merged with main.
lbooker42 Sep 18, 2024
01e50fe
Updated but RCSM still not referenced properly.
lbooker42 Sep 18, 2024
81e88d8
Refreshing tests still need work.
lbooker42 Sep 20, 2024
b37a04e
Better tests and improved liveness management for the TableLocation k…
lbooker42 Sep 25, 2024
dead9c4
Added TLP state (add, append, static, refreshing)
lbooker42 Sep 25, 2024
e5d10e7
Added TLP state (add, append, static, refreshing)
lbooker42 Sep 27, 2024
b30e240
Addressed PR comments, some TODO remaining to address.
lbooker42 Sep 30, 2024
fa9d154
Improved table location management in SourcePartitionedTable
lbooker42 Oct 1, 2024
d807a94
Merge with main
lbooker42 Oct 2, 2024
db2b031
Post-merge cleanup
lbooker42 Oct 2, 2024
746b343
Post-merge cleanup and test updating.
lbooker42 Oct 2, 2024
d69ddcd
Addressed PR comments and test failures.
lbooker42 Oct 4, 2024
06a3bd5
More test failure fixes.
lbooker42 Oct 4, 2024
91ba92c
Liveness management re-ordering in SourcePartitionedTable
lbooker42 Oct 7, 2024
73e8824
Addressing open PR comments.
lbooker42 Oct 9, 2024
be689ef
Many PR comments addressed.
lbooker42 Oct 15, 2024
b15c78a
IcebergTableAdapter synchronization changes and cleanup.
lbooker42 Oct 16, 2024
de9f6ae
PR comments addressed.
lbooker42 Oct 18, 2024
d5c13e6
Fix for iceberg.py to use table adapter
lbooker42 Oct 18, 2024
851260c
Merged with main.
lbooker42 Oct 18, 2024
d1557c5
Addressed PR comments.
lbooker42 Oct 18, 2024
975a013
Close to merging.
lbooker42 Oct 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public enum TableUpdateMode {
STATIC, APPEND_ONLY, ADD_ONLY, ADD_REMOVE;

/**
* Returns true if the addition is allowed.
* Returns true if addition is allowed.
*/
public boolean addAllowed() {
switch (this) {
Expand All @@ -30,7 +30,7 @@ public boolean addAllowed() {
}

/**
* Returns true if the removal is allowed.
* Returns true if removal is allowed.
*/
public boolean removeAllowed() {
switch (this) {
Expand All @@ -56,9 +56,6 @@ public boolean removeAllowed() {
* @return the most permissive mode encountered in the stream
*/
public static TableUpdateMode mostPermissiveMode(Stream<TableUpdateMode> modes) {
rcaudy marked this conversation as resolved.
Show resolved Hide resolved
// Analyze the location update modes of the input providers to determine the location update mode
// of the composite provider. The resultant mode is the most permissive mode of the input provider
// locations.
final MutableBoolean anyRemoves = new MutableBoolean(false);
final MutableBoolean anyAdditions = new MutableBoolean(false);
final MutableBoolean anyAppends = new MutableBoolean(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

import io.deephaven.api.SortColumn;
import io.deephaven.base.verify.Assert;
import io.deephaven.configuration.Configuration;
import io.deephaven.engine.liveness.LiveSupplier;
import io.deephaven.engine.liveness.StandaloneLivenessManager;
import io.deephaven.engine.table.BasicDataIndex;
Expand All @@ -14,9 +13,7 @@
import io.deephaven.engine.table.impl.locations.impl.AbstractTableLocationProvider;
import io.deephaven.engine.table.impl.locations.local.URITableLocationKey;
import io.deephaven.engine.testutil.testcase.RefreshingTableTestCase;
import io.deephaven.io.logger.StreamLoggerImpl;
import io.deephaven.test.types.OutOfBandTest;
import io.deephaven.util.process.ProcessEnvironment;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
Expand Down Expand Up @@ -92,17 +89,17 @@ protected void destroy() {

@Override
protected @NotNull ColumnLocation makeColumnLocation(@NotNull String name) {
return null;
throw new UnsupportedOperationException();
}

@Override
public @Nullable BasicDataIndex loadDataIndex(@NotNull String... columns) {
return null;
throw new UnsupportedOperationException();
}

@Override
public void refresh() {
Assert.statementNeverExecuted();
throw new UnsupportedOperationException();
}

@Override
Expand All @@ -123,19 +120,10 @@ public boolean hasDataIndex(@NotNull String... columns) {

@Override
public void setUp() throws Exception {
if (null == ProcessEnvironment.tryGet()) {
ProcessEnvironment.basicServerInitialization(Configuration.getInstance(),
"AbstractTableLocationProviderTest", new StreamLoggerImpl());
}
super.setUp();
setExpectError(false);
}

@Override
public void tearDown() throws Exception {
super.tearDown();
}

private List<TableLocationKey> createKeys(final int count) {
final List<TableLocationKey> keys = new ArrayList<>();
for (int i = 0; i < count; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.jetbrains.annotations.NotNull;

import java.util.HashSet;
import java.util.Set;

/**
* <p>
* Automatically refreshing {@link TableLocationProvider} implementation that delegates {@link TableLocationKey location
Expand Down Expand Up @@ -68,44 +65,24 @@ public String getImplementationName() {

@Override
public synchronized void refresh() {
adapter.refresh();
final Snapshot latestSnapshot = adapter.currentSnapshot();
if (latestSnapshot.sequenceNumber() > locationKeyFinder.snapshot.sequenceNumber()) {
locationKeyFinder.snapshot = latestSnapshot;
refreshSnapshot();
if (locationKeyFinder.maybeUpdateSnapshot()) {
refreshLocations();
}
}

@Override
public void update() {
throw new IllegalStateException("An automatically refreshing Iceberg table cannot be manually updated");
throw new UnsupportedOperationException("Automatically refreshing Iceberg tables cannot be manually updated");
}

@Override
public void update(long snapshotId) {
throw new IllegalStateException("An automatically refreshing Iceberg table cannot be manually updated");
throw new UnsupportedOperationException("Automatically refreshing Iceberg tables cannot be manually updated");
}

@Override
public void update(Snapshot snapshot) {
throw new IllegalStateException("An automatically refreshing Iceberg table cannot be manually updated");
}

/**
* Refresh the table location provider with the latest snapshot from the catalog. This method will identify new
* locations and removed locations.
*/
private void refreshSnapshot() {
beginTransaction(this);
final Set<ImmutableTableLocationKey> missedKeys = new HashSet<>();
getTableLocationKeys(ttlk -> missedKeys.add(ttlk.get()));
locationKeyFinder.findKeys(tableLocationKey -> {
missedKeys.remove(tableLocationKey);
handleTableLocationKeyAdded(tableLocationKey, this);
});
missedKeys.forEach(tlk -> handleTableLocationKeyRemoved(tlk, this));
endTransaction(this);
setInitialized();
throw new UnsupportedOperationException("Automatically refreshing Iceberg tables cannot be manually updated");
}

// ------------------------------------------------------------------------------------------------------------------
Expand All @@ -114,7 +91,7 @@ private void refreshSnapshot() {

@Override
protected final void activateUnderlyingDataSource() {
refreshSnapshot();
refresh();
subscriptionToken = refreshService.scheduleTableLocationProviderRefresh(this, refreshIntervalMs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ protected IcebergTableLocationKey locationKey(
*/
public IcebergBaseLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final Snapshot tableSnapshot,
@Nullable final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableAdapter = tableAdapter;
Expand Down Expand Up @@ -166,4 +166,62 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
String.format("%s:%d - error finding Iceberg locations", tableAdapter, snapshot.snapshotId()), e);
}
}

/**
* Update the snapshot to the latest snapshot from the catalog if
*/
protected boolean maybeUpdateSnapshot() {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
final Snapshot latestSnapshot = tableAdapter.currentSnapshot();
if (latestSnapshot == null) {
return false;
}
if (snapshot == null || latestSnapshot.sequenceNumber() > snapshot.sequenceNumber()) {
snapshot = latestSnapshot;
return true;
}
return false;
}

/**
* Update the snapshot to the user specified snapshot. This will fail with an {@link IllegalArgumentException} if
* the input snapshot is not newer (higher in sequence number) than the current snapshot or if the snapshot cannot
* be found.
*
* @param snapshotId the id of the snapshot to load
*/
protected void updateSnapshot(long snapshotId) {
final List<Snapshot> snapshots = tableAdapter.listSnapshots();

final Snapshot snapshot = snapshots.stream()
.filter(s -> s.snapshotId() == snapshotId).findFirst()
.orElse(null);

if (snapshot == null) {
throw new IllegalArgumentException(
"Snapshot " + snapshotId + " was not found in the list of snapshots for table " + tableAdapter
+ ". Snapshots: " + snapshots);
}
updateSnapshot(snapshot);
}

/**
* Update the snapshot to the user specified snapshot. This will fail with an {@link IllegalArgumentException} if
* the input snapshot is not newer (higher in sequence number) than the current snapshot.
*
* @param snapshot the snapshot to load
*/
protected void updateSnapshot(Snapshot snapshot) {
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
// Validate that we are not trying to update to an older snapshot.
if (snapshot == null) {
throw new IllegalArgumentException("Input snapshot cannot be null");
}
if (this.snapshot != null && snapshot.sequenceNumber() <= this.snapshot.sequenceNumber()) {
throw new IllegalArgumentException(
"Update snapshot sequence number (" + snapshot.sequenceNumber()
+ ") must be higher than the current snapshot sequence number ("
+ this.snapshot.sequenceNumber() + ") for table " + tableAdapter);
}

this.snapshot = snapshot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.deephaven.iceberg.util.IcebergTableAdapter;
import org.apache.iceberg.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;

Expand All @@ -25,7 +26,7 @@ public final class IcebergFlatLayout extends IcebergBaseLayout {
*/
public IcebergFlatLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final Snapshot tableSnapshot,
@Nullable final Snapshot tableSnapshot,
@NotNull final IcebergInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
super(tableAdapter, tableSnapshot, instructions, dataInstructionsProvider);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.iceberg.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.*;
Expand Down Expand Up @@ -46,7 +47,7 @@ public ColumnData(String name, Class<?> type, int index) {
*/
public IcebergKeyValuePartitionedLayout(
@NotNull final IcebergTableAdapter tableAdapter,
@NotNull final org.apache.iceberg.Snapshot tableSnapshot,
@Nullable final Snapshot tableSnapshot,
@NotNull final PartitionSpec partitionSpec,
@NotNull final IcebergInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.jetbrains.annotations.NotNull;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* <p>
* Manually refreshing {@link TableLocationProvider} implementation that delegates {@link TableLocationKey location key}
Expand All @@ -28,8 +24,6 @@ public class IcebergManualRefreshTableLocationProvider<TK extends TableKey, TLK

private static final String IMPLEMENTATION_NAME = IcebergManualRefreshTableLocationProvider.class.getSimpleName();

private boolean initialized = false;

public IcebergManualRefreshTableLocationProvider(
@NotNull final TK tableKey,
@NotNull final IcebergBaseLayout locationKeyFinder,
Expand Down Expand Up @@ -58,62 +52,29 @@ public String getImplementationName() {

@Override
public synchronized void refresh() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should not be a UOE. If a user creates a static table using this TLP implementation, that should work.

I would actually expect this method to be where we load partitions from the initial snapshot. If that's not so, it should just be a no-op.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would the user create a static table using this? We would use IcebergStaticTableLocationProvider

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That goes back to "if we make a TableDataService around this" we have to support static tables using ticking TLPs. Same reason we need to move the snapshot atomicity up to ATLP, rather than down in the subscription buffer, although we could argue we don't need it in Iceberg at this point.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this is too forward-looking. I guess we're unlikely to use a manual update strategy if we were ever to put these in a TLS anyway. That said, I think having this method call refreshSnaphsot() would be a harmless way to future-proof it. See other comment regarding making sure refreshSnapshot() avoids redundant work, since you call it from activateUnderlyingDataSource().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be a no-op, or it should make sure we've delivered whatever initial snapshot the user has provided.

// There should be no refresh service for this provider.
throw new UnsupportedOperationException();
// There is no refresh service for this provider, but this is called as part of the initialization process.
refreshLocations();
lbooker42 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public synchronized void update() {
adapter.refresh();
update(adapter.currentSnapshot());
if (locationKeyFinder.maybeUpdateSnapshot()) {
refreshLocations();
}
}

@Override
public synchronized void update(final long snapshotId) {
adapter.refresh();
final List<Snapshot> snapshots = adapter.listSnapshots();

final Snapshot snapshot = snapshots.stream()
.filter(s -> s.snapshotId() == snapshotId).findFirst()
.orElse(null);

if (snapshot == null) {
throw new IllegalArgumentException(
"Snapshot " + snapshotId + " was not found in the list of snapshots for table " + tableIdentifier
+ ". Snapshots: " + snapshots);
}
update(snapshot);
// delegate to the locationKeyFinder to update the snapshot
locationKeyFinder.updateSnapshot(snapshotId);
refreshLocations();
}

@Override
public synchronized void update(final Snapshot snapshot) {
// Verify that the input snapshot is newer (higher in sequence number) than the current snapshot.
if (snapshot.sequenceNumber() <= locationKeyFinder.snapshot.sequenceNumber()) {
throw new IllegalArgumentException(
"Update snapshot sequence number (" + snapshot.sequenceNumber()
+ ") must be higher than the current snapshot sequence number ("
+ locationKeyFinder.snapshot.sequenceNumber() + ") for table " + tableIdentifier);
}
// Update the snapshot.
locationKeyFinder.snapshot = snapshot;
refreshSnapshot();
}

/**
* Refresh the table location provider with the latest snapshot from the catalog. This method will identify new
* locations and removed locations.
*/
private void refreshSnapshot() {
beginTransaction(this);
final Set<ImmutableTableLocationKey> missedKeys = new HashSet<>();
getTableLocationKeys(ttlk -> missedKeys.add(ttlk.get()));
locationKeyFinder.findKeys(tlk -> {
missedKeys.remove(tlk);
handleTableLocationKeyAdded(tlk, this);
});
missedKeys.forEach(tlk -> handleTableLocationKeyRemoved(tlk, this));
endTransaction(this);
setInitialized();
locationKeyFinder.updateSnapshot(snapshot);
refreshLocations();
}

// ------------------------------------------------------------------------------------------------------------------
Expand All @@ -122,11 +83,8 @@ private void refreshSnapshot() {

@Override
protected void activateUnderlyingDataSource() {
if (!initialized) {
refreshSnapshot();
activationSuccessful(this);
initialized = true;
}
ensureInitialized();
activationSuccessful(this);
}

@Override
Expand Down
Loading