Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 63 additions & 22 deletions core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public class RESTSessionCatalog extends BaseViewSessionCatalog
private Integer pageSize = null;
private CloseableGroup closeables = null;
private Set<Endpoint> endpoints;
private Supplier<Map<String, String>> mutationHeaders = Map::of;

public RESTSessionCatalog() {
this(
Expand Down Expand Up @@ -203,6 +204,11 @@ public void initialize(String name, Map<String, String> unresolved) {
// build the final configuration and set up the catalog's auth
Map<String, String> mergedProps = config.merge(props);

// Enable Idempotency-Key header for mutation endpoints if the server advertises support
if (config.idempotencyKeyLifetime() != null) {
this.mutationHeaders = RESTUtil::idempotencyHeaders;
}

if (config.endpoints().isEmpty()) {
this.endpoints =
PropertyUtil.propertyAsBoolean(
Expand Down Expand Up @@ -307,7 +313,8 @@ public boolean dropTable(SessionContext context, TableIdentifier identifier) {
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
client
.withAuthSession(contextualSession)
.delete(paths.table(identifier), null, Map.of(), ErrorHandlers.tableErrorHandler());
.delete(
paths.table(identifier), null, mutationHeaders, ErrorHandlers.tableErrorHandler());
return true;
} catch (NoSuchTableException e) {
return false;
Expand All @@ -327,7 +334,7 @@ public boolean purgeTable(SessionContext context, TableIdentifier identifier) {
paths.table(identifier),
ImmutableMap.of("purgeRequested", "true"),
null,
Map.of(),
mutationHeaders,
ErrorHandlers.tableErrorHandler());
return true;
} catch (NoSuchTableException e) {
Expand All @@ -348,7 +355,7 @@ public void renameTable(SessionContext context, TableIdentifier from, TableIdent
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
client
.withAuthSession(contextualSession)
.post(paths.rename(), request, null, Map.of(), ErrorHandlers.tableErrorHandler());
.post(paths.rename(), request, null, mutationHeaders, ErrorHandlers.tableErrorHandler());
}

@Override
Expand Down Expand Up @@ -455,6 +462,7 @@ public Table loadTable(SessionContext context, TableIdentifier identifier) {
tableClient,
paths.table(finalIdentifier),
Map::of,
mutationHeaders,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not adjusting new RESTMetricsReporter(restClient, metricsEndpoint, Map::of); is probably fine for now as well, since we didn't add the param to the Spec and because typically we don't care too much whether metrics actually arrive or not, since it's best-effort and we don't want to interfere with the read/write path and add additional latency

tableFileIO(context, tableConf, response.credentials()),
tableMetadata,
endpoints);
Expand Down Expand Up @@ -523,7 +531,7 @@ public Table registerTable(
paths.register(ident.namespace()),
request,
LoadTableResponse.class,
Map.of(),
mutationHeaders,
ErrorHandlers.tableErrorHandler());

Map<String, String> tableConf = response.config();
Expand All @@ -534,6 +542,7 @@ public Table registerTable(
tableClient,
paths.table(ident),
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
response.tableMetadata(),
endpoints);
Expand All @@ -559,7 +568,7 @@ public void createNamespace(
paths.namespaces(),
request,
CreateNamespaceResponse.class,
Map.of(),
mutationHeaders,
ErrorHandlers.namespaceErrorHandler());
}

Expand Down Expand Up @@ -645,7 +654,11 @@ public boolean dropNamespace(SessionContext context, Namespace ns) {
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
client
.withAuthSession(contextualSession)
.delete(paths.namespace(ns), null, Map.of(), ErrorHandlers.dropNamespaceErrorHandler());
.delete(
paths.namespace(ns),
null,
mutationHeaders,
ErrorHandlers.dropNamespaceErrorHandler());
return true;
} catch (NoSuchNamespaceException e) {
return false;
Expand All @@ -669,7 +682,7 @@ public boolean updateNamespaceMetadata(
paths.namespaceProperties(ns),
request,
UpdateNamespacePropertiesResponse.class,
Map.of(),
mutationHeaders,
ErrorHandlers.namespaceErrorHandler());

return !response.updated().isEmpty();
Expand Down Expand Up @@ -782,7 +795,7 @@ public Table create() {
paths.tables(ident.namespace()),
request,
LoadTableResponse.class,
Map.of(),
mutationHeaders,
ErrorHandlers.tableErrorHandler());

Map<String, String> tableConf = response.config();
Expand All @@ -793,6 +806,7 @@ public Table create() {
tableClient,
paths.table(ident),
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
response.tableMetadata(),
endpoints);
Expand Down Expand Up @@ -820,6 +834,7 @@ public Transaction createTransaction() {
tableClient,
paths.table(ident),
Map::of,
mutationHeaders,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we missing the same change for L806?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed L806 to use mutationHeaders

tableFileIO(context, tableConf, response.credentials()),
RESTTableOperations.UpdateType.CREATE,
createChanges(meta),
Expand Down Expand Up @@ -883,6 +898,7 @@ public Transaction replaceTransaction() {
tableClient,
paths.table(ident),
Map::of,
mutationHeaders,
tableFileIO(context, tableConf, response.credentials()),
RESTTableOperations.UpdateType.REPLACE,
changes.build(),
Expand Down Expand Up @@ -932,7 +948,7 @@ private LoadTableResponse stageCreate() {
paths.tables(ident.namespace()),
request,
LoadTableResponse.class,
Map.of(),
mutationHeaders,
ErrorHandlers.tableErrorHandler());
}
}
Expand Down Expand Up @@ -1019,7 +1035,10 @@ private FileIO tableFileIO(
*
* @param restClient the REST client to use for communicating with the catalog server
* @param path the REST path for the table
* @param headers a supplier for additional HTTP headers to include in requests
* @param readHeaders a supplier for additional HTTP headers to include in read requests
* (GET/HEAD)
* @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation
* requests (POST/DELETE)
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param current the current table metadata
* @param supportedEndpoints the set of supported REST endpoints
Expand All @@ -1028,11 +1047,13 @@ private FileIO tableFileIO(
protected RESTTableOperations newTableOps(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaderSupplier,
FileIO fileIO,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTTableOperations(restClient, path, headers, fileIO, current, supportedEndpoints);
return new RESTTableOperations(
restClient, path, readHeaders, mutationHeaderSupplier, fileIO, current, supportedEndpoints);
}

/**
Expand All @@ -1044,7 +1065,10 @@ protected RESTTableOperations newTableOps(
*
* @param restClient the REST client to use for communicating with the catalog server
* @param path the REST path for the table
* @param headers a supplier for additional HTTP headers to include in requests
* @param readHeaders a supplier for additional HTTP headers to include in read requests
* (GET/HEAD)
* @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation
* requests (POST/DELETE)
* @param fileIO the FileIO implementation for reading and writing table metadata and data files
* @param updateType the {@link RESTTableOperations.UpdateType} being performed
* @param createChanges the list of metadata updates to apply during table creation or replacement
Expand All @@ -1055,14 +1079,23 @@ protected RESTTableOperations newTableOps(
protected RESTTableOperations newTableOps(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaderSupplier,
FileIO fileIO,
RESTTableOperations.UpdateType updateType,
List<MetadataUpdate> createChanges,
TableMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTTableOperations(
restClient, path, headers, fileIO, updateType, createChanges, current, supportedEndpoints);
restClient,
path,
readHeaders,
mutationHeaderSupplier,
fileIO,
updateType,
createChanges,
current,
supportedEndpoints);
}

/**
Expand All @@ -1073,18 +1106,23 @@ protected RESTTableOperations newTableOps(
*
* @param restClient the REST client to use for communicating with the catalog server
* @param path the REST path for the view
* @param headers a supplier for additional HTTP headers to include in requests
* @param readHeaders a supplier for additional HTTP headers to include in read requests
* (GET/HEAD)
* @param mutationHeaderSupplier a supplier for additional HTTP headers to include in mutation
* requests (POST/DELETE)
* @param current the current view metadata
* @param supportedEndpoints the set of supported REST endpoints
* @return a new RESTViewOperations instance
*/
protected RESTViewOperations newViewOps(
RESTClient restClient,
String path,
Supplier<Map<String, String>> headers,
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaderSupplier,
ViewMetadata current,
Set<Endpoint> supportedEndpoints) {
return new RESTViewOperations(restClient, path, headers, current, supportedEndpoints);
return new RESTViewOperations(
restClient, path, readHeaders, mutationHeaderSupplier, current, supportedEndpoints);
}

private static ConfigResponse fetchConfig(
Expand Down Expand Up @@ -1148,7 +1186,7 @@ public void commitTransaction(SessionContext context, List<TableCommit> commits)
paths.commitTransaction(),
new CommitTransactionRequest(tableChanges),
null,
Map.of(),
mutationHeaders,
ErrorHandlers.tableCommitHandler());
}

Expand Down Expand Up @@ -1235,6 +1273,7 @@ public View loadView(SessionContext context, TableIdentifier identifier) {
client.withAuthSession(tableSession),
paths.view(identifier),
Map::of,
mutationHeaders,
metadata,
endpoints);

Expand All @@ -1255,7 +1294,7 @@ public boolean dropView(SessionContext context, TableIdentifier identifier) {
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
client
.withAuthSession(contextualSession)
.delete(paths.view(identifier), null, Map.of(), ErrorHandlers.viewErrorHandler());
.delete(paths.view(identifier), null, mutationHeaders, ErrorHandlers.viewErrorHandler());
return true;
} catch (NoSuchViewException e) {
return false;
Expand All @@ -1274,7 +1313,7 @@ public void renameView(SessionContext context, TableIdentifier from, TableIdenti
AuthSession contextualSession = authManager.contextualSession(context, catalogAuth);
client
.withAuthSession(contextualSession)
.post(paths.renameView(), request, null, Map.of(), ErrorHandlers.viewErrorHandler());
.post(paths.renameView(), request, null, mutationHeaders, ErrorHandlers.viewErrorHandler());
}

private class RESTViewBuilder implements ViewBuilder {
Expand Down Expand Up @@ -1404,7 +1443,7 @@ public View create() {
paths.views(identifier.namespace()),
request,
LoadViewResponse.class,
Map.of(),
mutationHeaders,
ErrorHandlers.viewErrorHandler());

Map<String, String> tableConf = response.config();
Expand All @@ -1414,6 +1453,7 @@ public View create() {
client.withAuthSession(tableSession),
paths.view(identifier),
Map::of,
mutationHeaders,
response.metadata(),
endpoints);

Expand Down Expand Up @@ -1505,6 +1545,7 @@ private View replace(LoadViewResponse response) {
client.withAuthSession(tableSession),
paths.view(identifier),
Map::of,
mutationHeaders,
metadata,
endpoints);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ enum UpdateType {

private final RESTClient client;
private final String path;
private final Supplier<Map<String, String>> headers;
private final Supplier<Map<String, String>> readHeaders;
private final Supplier<Map<String, String>> mutationHeaders;
private final FileIO io;
private final List<MetadataUpdate> createChanges;
private final TableMetadata replaceBase;
Expand All @@ -70,7 +71,16 @@ enum UpdateType {
FileIO io,
TableMetadata current,
Set<Endpoint> endpoints) {
this(client, path, headers, io, UpdateType.SIMPLE, Lists.newArrayList(), current, endpoints);
this(
client,
path,
headers,
headers,
io,
UpdateType.SIMPLE,
Lists.newArrayList(),
current,
endpoints);
}

RESTTableOperations(
Expand All @@ -82,9 +92,43 @@ enum UpdateType {
List<MetadataUpdate> createChanges,
TableMetadata current,
Set<Endpoint> endpoints) {
this(client, path, headers, headers, io, updateType, createChanges, current, endpoints);
}

RESTTableOperations(
RESTClient client,
String path,
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaders,
FileIO io,
TableMetadata current,
Set<Endpoint> endpoints) {
this(
client,
path,
readHeaders,
mutationHeaders,
io,
UpdateType.SIMPLE,
Lists.newArrayList(),
current,
endpoints);
}

RESTTableOperations(
RESTClient client,
String path,
Supplier<Map<String, String>> readHeaders,
Supplier<Map<String, String>> mutationHeaders,
FileIO io,
UpdateType updateType,
List<MetadataUpdate> createChanges,
TableMetadata current,
Set<Endpoint> endpoints) {
this.client = client;
this.path = path;
this.headers = headers;
this.readHeaders = readHeaders;
this.mutationHeaders = mutationHeaders;
this.io = io;
this.updateType = updateType;
this.createChanges = createChanges;
Expand All @@ -106,7 +150,7 @@ public TableMetadata current() {
public TableMetadata refresh() {
Endpoint.check(endpoints, Endpoint.V1_LOAD_TABLE);
return updateCurrentMetadata(
client.get(path, LoadTableResponse.class, headers, ErrorHandlers.tableErrorHandler()));
client.get(path, LoadTableResponse.class, readHeaders, ErrorHandlers.tableErrorHandler()));
}

@Override
Expand Down Expand Up @@ -159,7 +203,7 @@ public void commit(TableMetadata base, TableMetadata metadata) {
// TODO: ensure that the HTTP client lib passes HTTP client errors to the error handler
LoadTableResponse response;
try {
response = client.post(path, request, LoadTableResponse.class, headers, errorHandler);
response = client.post(path, request, LoadTableResponse.class, mutationHeaders, errorHandler);
} catch (CommitStateUnknownException e) {
// Lightweight reconciliation for snapshot-add-only updates on transient unknown commit state
if (updateType == UpdateType.SIMPLE && reconcileOnSimpleUpdate(updates, e)) {
Expand Down
Loading