Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion core/src/main/java/org/apache/iceberg/rest/RESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,28 @@ public RESTCatalog(Function<Map<String, String>, RESTClient> clientBuilder) {
public RESTCatalog(
SessionCatalog.SessionContext context,
Function<Map<String, String>, RESTClient> clientBuilder) {
this.sessionCatalog = new RESTSessionCatalog(clientBuilder, null);
this.sessionCatalog = newSessionCatalog(clientBuilder);
this.delegate = sessionCatalog.asCatalog(context);
this.nsDelegate = (SupportsNamespaces) delegate;
this.context = context;
this.viewSessionCatalog = sessionCatalog.asViewCatalog(context);
}

/**
* Create a new {@link RESTSessionCatalog} instance.
*
* <p>This method can be overridden in subclasses to provide custom session catalog
* implementations, which in turn can provide custom table and view operations by overriding the
* protected methods in {@link RESTSessionCatalog}.
*
* @param clientBuilder a function to build REST clients
* @return a new RESTSessionCatalog instance
*/
protected RESTSessionCatalog newSessionCatalog(
Function<Map<String, String>, RESTClient> clientBuilder) {
return new RESTSessionCatalog(clientBuilder, null);
}

@Override
public void initialize(String name, Map<String, String> props) {
Preconditions.checkArgument(props != null, "Invalid configuration: null");
Expand Down
93 changes: 85 additions & 8 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Set;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.CatalogProperties;
Expand Down Expand Up @@ -450,7 +451,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {

RESTClient tableClient = client.withAuthSession(tableSession);
RESTTableOperations ops =
new RESTTableOperations(
newTableOps(
tableClient,
paths.table(finalIdentifier),
Map::of,
Expand Down Expand Up @@ -529,7 +530,7 @@ public Table registerTable(
AuthSession tableSession = authManager.tableSession(ident, tableConf, contextualSession);
RESTClient tableClient = client.withAuthSession(tableSession);
RESTTableOperations ops =
new RESTTableOperations(
newTableOps(
tableClient,
paths.table(ident),
Map::of,
Expand Down Expand Up @@ -788,7 +789,7 @@ public Table create() {
AuthSession tableSession = authManager.tableSession(ident, tableConf, contextualSession);
RESTClient tableClient = client.withAuthSession(tableSession);
RESTTableOperations ops =
new RESTTableOperations(
newTableOps(
tableClient,
paths.table(ident),
Map::of,
Expand All @@ -815,7 +816,7 @@ public Transaction createTransaction() {

RESTClient tableClient = client.withAuthSession(tableSession);
RESTTableOperations ops =
new RESTTableOperations(
newTableOpsForTransaction(
tableClient,
paths.table(ident),
Map::of,
Expand Down Expand Up @@ -878,7 +879,7 @@ public Transaction replaceTransaction() {

RESTClient tableClient = client.withAuthSession(tableSession);
RESTTableOperations ops =
new RESTTableOperations(
newTableOpsForTransaction(
tableClient,
paths.table(ident),
Map::of,
Expand Down Expand Up @@ -1010,6 +1011,82 @@ private FileIO tableFileIO(
return newFileIO(context, fullConf, storageCredentials);
}

/**
* Create a new {@link RESTTableOperations} instance for simple table operations.
*
* <p>This method can be overridden in subclasses to provide custom table operations
* implementations.
*
* @param restClient the REST client to use for communicating with the catalog server
* @param path the REST path for the table
* @param headers a supplier for additional HTTP headers to include in requests
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param current the current table metadata
* @param supportedEndpoints the set of supported REST endpoints
* @return a new RESTTableOperations instance
*/
protected RESTTableOperations newTableOps(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
FileIO fileIO,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTTableOperations(restClient, path, headers, fileIO, current, supportedEndpoints);
}

/**
* Create a new {@link RESTTableOperations} instance for transaction-based operations (create or
* replace).
*
* <p>This method can be overridden in subclasses to provide custom table operations
* implementations for transaction-based operations.
*
* @param restClient the REST client to use for communicating with the catalog server
* @param path the REST path for the table
* @param headers a supplier for additional HTTP headers to include in requests
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param updateType the type of update being performed (CREATE, REPLACE, or SIMPLE)
* @param createChanges the list of metadata updates to apply during table creation or replacement
* @param current the current table metadata (may be null for CREATE operations)
* @param supportedEndpoints the set of supported REST endpoints
* @return a new RESTTableOperations instance
*/
protected RESTTableOperations newTableOpsForTransaction(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
FileIO fileIO,
RESTTableOperations.UpdateType updateType,
List<MetadataUpdate> createChanges,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTTableOperations(
restClient, path, headers, fileIO, updateType, createChanges, current, supportedEndpoints);
}

/**
* Create a new {@link RESTViewOperations} instance.
*
* <p>This method can be overridden in subclasses to provide custom view operations
* implementations.
*
* @param restClient the REST client to use for communicating with the catalog server
* @param path the REST path for the view
* @param headers a supplier for additional HTTP headers to include in requests
* @param current the current view metadata
* @param supportedEndpoints the set of supported REST endpoints
* @return a new RESTViewOperations instance
*/
protected RESTViewOperations newViewOps(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
ViewMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTViewOperations(restClient, path, headers, current, supportedEndpoints);
}

private static ConfigResponse fetchConfig(
RESTClient client, AuthSession initialAuth, Map<String, String> properties) {
// send the client's warehouse location to the service to keep in sync
Expand Down Expand Up @@ -1154,7 +1231,7 @@ public View loadView(SessionContext context, TableIdentifier identifier) {
ViewMetadata metadata = response.metadata();

RESTViewOperations ops =
new RESTViewOperations(
newViewOps(
client.withAuthSession(tableSession),
paths.view(identifier),
Map::of,
Expand Down Expand Up @@ -1333,7 +1410,7 @@ public View create() {
Map<String, String> tableConf = response.config();
AuthSession tableSession = authManager.tableSession(identifier, tableConf, contextualSession);
RESTViewOperations ops =
new RESTViewOperations(
newViewOps(
client.withAuthSession(tableSession),
paths.view(identifier),
Map::of,
Expand Down Expand Up @@ -1424,7 +1501,7 @@ private View replace(LoadViewResponse response) {
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
AuthSession tableSession = authManager.tableSession(identifier, tableConf, contextualSession);
RESTViewOperations ops =
new RESTViewOperations(
newViewOps(
client.withAuthSession(tableSession),
paths.view(identifier),
Map::of,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.util.LocationUtil;

class RESTTableOperations implements TableOperations {
public class RESTTableOperations implements TableOperations {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this scope change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If users only want to make small adjustments to RESTTableOperations (for example, injecting a custom header), they can simply provide a custom implementation that extends RESTTableOperations, without having to copy the entire class.

This makes it much easier for them to upgrade to newer Iceberg SDK versions without dealing with merge conflicts or duplicated code.

I'm okay with either approach here, don't have a strong preference. WDYT?

private static final String METADATA_FOLDER_NAME = "metadata";

enum UpdateType {
public enum UpdateType {
CREATE,
REPLACE,
SIMPLE
Expand All @@ -63,7 +63,7 @@ enum UpdateType {
private UpdateType updateType;
private TableMetadata current;

RESTTableOperations(
protected RESTTableOperations(
RESTClient client,
String path,
Supplier<Map<String, String>> headers,
Expand All @@ -73,7 +73,7 @@ enum UpdateType {
this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints);
}

RESTTableOperations(
protected RESTTableOperations(
RESTClient client,
String path,
Supplier<Map<String, String>> headers,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import org.apache.iceberg.view.ViewMetadata;
import org.apache.iceberg.view.ViewOperations;

class RESTViewOperations implements ViewOperations {
public class RESTViewOperations implements ViewOperations {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this scope change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

private final RESTClient client;
private final String path;
private final Supplier<Map<String, String>> headers;
private final Set<Endpoint> endpoints;
private ViewMetadata current;

RESTViewOperations(
protected RESTViewOperations(
RESTClient client,
String path,
Supplier<Map<String, String>> headers,
Expand Down
115 changes: 115 additions & 0 deletions core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,14 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.http.HttpHeaders;
import org.apache.iceberg.BaseTable;
Expand All @@ -56,6 +61,7 @@
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.Transaction;
import org.apache.iceberg.UpdatePartitionSpec;
import org.apache.iceberg.UpdateSchema;
Expand All @@ -71,6 +77,7 @@
import org.apache.iceberg.exceptions.ServiceFailureException;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.inmemory.InMemoryCatalog;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand All @@ -89,6 +96,7 @@
import org.apache.iceberg.rest.responses.LoadTableResponse;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.view.ViewMetadata;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.awaitility.Awaitility;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -3066,6 +3074,113 @@ public void testCommitStateUnknownNotReconciled() {
.satisfies(ex -> assertThat(((CommitStateUnknownException) ex).getSuppressed()).isEmpty());
}

@Test
public void testCustomTableOperationsInjection() throws IOException {
AtomicBoolean customTableOps = new AtomicBoolean();
AtomicBoolean customTxnOps = new AtomicBoolean();
AtomicBoolean customViewOps = new AtomicBoolean();

// Custom RESTSessionCatalog that overrides table/view operations creation
class CustomRESTSessionCatalog extends RESTSessionCatalog {
CustomRESTSessionCatalog(
Function<Map<String, String>, RESTClient> clientBuilder,
BiFunction<SessionCatalog.SessionContext, Map<String, String>, FileIO> ioBuilder) {
super(clientBuilder, ioBuilder);
}

@Override
protected RESTTableOperations newTableOps(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
FileIO fileIO,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
customTableOps.set(true);
return super.newTableOps(restClient, path, headers, fileIO, current, supportedEndpoints);
}

@Override
protected RESTTableOperations newTableOpsForTransaction(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
FileIO fileIO,
RESTTableOperations.UpdateType updateType,
List<MetadataUpdate> createChanges,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
customTxnOps.set(true);
return super.newTableOpsForTransaction(
restClient,
path,
headers,
fileIO,
updateType,
createChanges,
current,
supportedEndpoints);
}

@Override
protected RESTViewOperations newViewOps(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
ViewMetadata current,
Set<Endpoint> supportedEndpoints) {
customViewOps.set(true);
return super.newViewOps(restClient, path, headers, current, supportedEndpoints);
}
}

// Custom RESTCatalog that provides the custom session catalog
class CustomRESTCatalog extends RESTCatalog {
CustomRESTCatalog(
SessionCatalog.SessionContext context,
Function<Map<String, String>, RESTClient> clientBuilder) {
super(context, clientBuilder);
}

@Override
protected RESTSessionCatalog newSessionCatalog(
Function<Map<String, String>, RESTClient> clientBuilder) {
return new CustomRESTSessionCatalog(clientBuilder, null);
}
}

try (CustomRESTCatalog catalog =
new CustomRESTCatalog(
SessionCatalog.SessionContext.createEmpty(),
(config) -> new RESTCatalogAdapter(backendCatalog))) {
catalog.setConf(new Configuration());
catalog.initialize(
"test",
ImmutableMap.of(
CatalogProperties.FILE_IO_IMPL, "org.apache.iceberg.inmemory.InMemoryFileIO"));

Namespace ns = Namespace.of("test_custom_ops");
catalog.createNamespace(ns);

catalog.createTable(TableIdentifier.of(ns, "table1"), SCHEMA);
assertThat(customTableOps).isTrue();

catalog
.buildTable(TableIdentifier.of(ns, "table2"), SCHEMA)
.createTransaction()
.commitTransaction();
assertThat(customTxnOps).isTrue();

catalog
.buildView(TableIdentifier.of(ns, "view1"))
.withSchema(SCHEMA)
.withDefaultNamespace(ns)
.withQuery("spark", "select * from ns.table")
.create();
assertThat(customViewOps).isTrue();
}
}

private RESTCatalog catalog(RESTCatalogAdapter adapter) {
RESTCatalog catalog =
new RESTCatalog(SessionCatalog.SessionContext.createEmpty(), (config) -> adapter);
Expand Down