diff --git a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java index 229497576af9..18de8493f49d 100644 --- a/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java +++ b/core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java @@ -24,6 +24,8 @@ import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT; import java.io.IOException; +import java.time.Duration; +import java.time.Instant; import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.util.Collections; @@ -33,10 +35,14 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Predicate; +import java.util.function.Supplier; import java.util.function.ToIntFunction; import java.util.stream.Collectors; import org.apache.iceberg.BaseMetadataTable; @@ -66,6 +72,7 @@ import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.exceptions.NoSuchViewException; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -108,8 +115,152 @@ public class CatalogHandlers { InMemoryPlanningState.getInstance(); private static final ExecutorService ASYNC_PLANNING_POOL = Executors.newSingleThreadExecutor(); + // Advanced idempotency store with TTL and in-flight coalescing. + // + // Note: This is a simple in-memory implementation meant for tests and lightweight usage. + // Production servers should provide a durable store. + private static final ConcurrentMap IDEMPOTENCY_STORE = + Maps.newConcurrentMap(); + private static volatile long idempotencyLifetimeMillis = TimeUnit.MINUTES.toMillis(30); + private CatalogHandlers() {} + @SuppressWarnings("unchecked") + static T withIdempotency(HTTPRequest httpRequest, Supplier action) { + return withIdempotencyInternal(httpRequest, action); + } + + static void withIdempotency(HTTPRequest httpRequest, Runnable action) { + withIdempotencyInternal( + httpRequest, + () -> { + action.run(); + return Boolean.TRUE; + }); + } + + @SuppressWarnings("unchecked") + private static T withIdempotencyInternal(HTTPRequest httpRequest, Supplier action) { + Optional keyHeader = + httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER); + if (keyHeader.isEmpty()) { + return action.get(); + } + + String key = keyHeader.get().value(); + + // The "first" request for this Idempotency-Key is the one that wins + // IDEMPOTENCY_STORE.compute(...) + // and creates (or replaces) the IN_PROGRESS entry. Only that request executes the action and + // finalizes the entry; concurrent requests for the same key wait on the latch and then replay + // the finalized result/error. + AtomicBoolean isFirst = new AtomicBoolean(false); + IdempotencyEntry entry = + IDEMPOTENCY_STORE.compute( + key, + (k, current) -> { + if (current == null || current.isExpired()) { + isFirst.set(true); + return IdempotencyEntry.inProgress(); + } + return current; + }); + + // Fast-path: already finalized (another request completed earlier) + if (entry.status == IdempotencyEntry.Status.FINALIZED) { + if (entry.error != null) { + throw entry.error; + } + return (T) entry.responseBody; + } + + if (!isFirst.get()) { + // In-flight coalescing: wait for the first request to finalize + entry.awaitFinalization(); + if (entry.error != null) { + throw entry.error; + } + return (T) entry.responseBody; + } + + // First request: execute the action and finalize the entry + try { + T res = action.get(); + entry.finalizeSuccess(res); + return res; + } catch (RuntimeException e) { + entry.finalizeError(e); + throw e; + } + } + + @VisibleForTesting + static void setIdempotencyLifetimeFromIso(String isoDuration) { + if (isoDuration == null) { + return; + } + try { + idempotencyLifetimeMillis = Duration.parse(isoDuration).toMillis(); + } catch (Exception e) { + throw new IllegalArgumentException("Invalid idempotency lifetime: " + isoDuration, e); + } + } + + private static final class IdempotencyEntry { + enum Status { + IN_PROGRESS, + FINALIZED + } + + private final CountDownLatch latch; + private final long firstSeenMillis; + private volatile Status status; + private volatile Object responseBody; + private volatile RuntimeException error; + + private IdempotencyEntry(Status status) { + this.status = status; + this.latch = new CountDownLatch(1); + this.firstSeenMillis = System.currentTimeMillis(); + } + + static IdempotencyEntry inProgress() { + return new IdempotencyEntry(Status.IN_PROGRESS); + } + + void finalizeSuccess(Object body) { + this.responseBody = body; + this.status = Status.FINALIZED; + this.latch.countDown(); + } + + void finalizeError(RuntimeException cause) { + this.error = cause; + this.status = Status.FINALIZED; + this.latch.countDown(); + } + + void awaitFinalization() { + try { + this.latch.await(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException( + "Interrupted while waiting for idempotent request to complete", ie); + } + } + + boolean isExpired() { + if (this.status != Status.FINALIZED) { + return false; + } + + Instant expiry = + Instant.ofEpochMilli(this.firstSeenMillis).plusMillis(idempotencyLifetimeMillis); + return Instant.now().isAfter(expiry); + } + } + /** * Exception used to avoid retrying commits when assertions fail. * diff --git a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java index e62937b6df6e..0600ef551582 100644 --- a/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java +++ b/core/src/test/java/org/apache/iceberg/rest/RESTCatalogAdapter.java @@ -207,8 +207,11 @@ public T handleRequest( case CREATE_NAMESPACE: if (asNamespaceCatalog != null) { CreateNamespaceRequest request = castRequest(CreateNamespaceRequest.class, body); - return castResponse( - responseType, CatalogHandlers.createNamespace(asNamespaceCatalog, request)); + return CatalogHandlers.withIdempotency( + httpRequest, + () -> + castResponse( + responseType, CatalogHandlers.createNamespace(asNamespaceCatalog, request))); } break; @@ -229,7 +232,9 @@ public T handleRequest( case DROP_NAMESPACE: if (asNamespaceCatalog != null) { - CatalogHandlers.dropNamespace(asNamespaceCatalog, namespaceFromPathVars(vars)); + CatalogHandlers.withIdempotency( + httpRequest, + () -> CatalogHandlers.dropNamespace(asNamespaceCatalog, namespaceFromPathVars(vars))); return null; } break; @@ -239,9 +244,13 @@ public T handleRequest( Namespace namespace = namespaceFromPathVars(vars); UpdateNamespacePropertiesRequest request = castRequest(UpdateNamespacePropertiesRequest.class, body); - return castResponse( - responseType, - CatalogHandlers.updateNamespaceProperties(asNamespaceCatalog, namespace, request)); + return CatalogHandlers.withIdempotency( + httpRequest, + () -> + castResponse( + responseType, + CatalogHandlers.updateNamespaceProperties( + asNamespaceCatalog, namespace, request))); } break; @@ -268,19 +277,29 @@ public T handleRequest( return castResponse( responseType, CatalogHandlers.stageTableCreate(catalog, namespace, request)); } else { - LoadTableResponse response = CatalogHandlers.createTable(catalog, namespace, request); - responseHeaders.accept( - ImmutableMap.of(HttpHeaders.ETAG, ETagProvider.of(response.metadataLocation()))); - return castResponse(responseType, response); + return CatalogHandlers.withIdempotency( + httpRequest, + () -> { + LoadTableResponse response = + CatalogHandlers.createTable(catalog, namespace, request); + responseHeaders.accept( + ImmutableMap.of( + HttpHeaders.ETAG, ETagProvider.of(response.metadataLocation()))); + return castResponse(responseType, response); + }); } } case DROP_TABLE: { if (PropertyUtil.propertyAsBoolean(vars, "purgeRequested", false)) { - CatalogHandlers.purgeTable(catalog, tableIdentFromPathVars(vars)); + CatalogHandlers.withIdempotency( + httpRequest, + () -> CatalogHandlers.purgeTable(catalog, tableIdentFromPathVars(vars))); } else { - CatalogHandlers.dropTable(catalog, tableIdentFromPathVars(vars)); + CatalogHandlers.withIdempotency( + httpRequest, + () -> CatalogHandlers.dropTable(catalog, tableIdentFromPathVars(vars))); } return null; } @@ -352,36 +371,47 @@ public T handleRequest( case REGISTER_TABLE: { - LoadTableResponse response = - CatalogHandlers.registerTable( - catalog, - namespaceFromPathVars(vars), - castRequest(RegisterTableRequest.class, body)); - - responseHeaders.accept( - ImmutableMap.of(HttpHeaders.ETAG, ETagProvider.of(response.metadataLocation()))); - - return castResponse(responseType, response); + return CatalogHandlers.withIdempotency( + httpRequest, + () -> { + LoadTableResponse response = + CatalogHandlers.registerTable( + catalog, + namespaceFromPathVars(vars), + castRequest(RegisterTableRequest.class, body)); + + responseHeaders.accept( + ImmutableMap.of( + HttpHeaders.ETAG, ETagProvider.of(response.metadataLocation()))); + + return castResponse(responseType, response); + }); } case UPDATE_TABLE: { - LoadTableResponse response = - CatalogHandlers.updateTable( - catalog, - tableIdentFromPathVars(vars), - castRequest(UpdateTableRequest.class, body)); - - responseHeaders.accept( - ImmutableMap.of(HttpHeaders.ETAG, ETagProvider.of(response.metadataLocation()))); - - return castResponse(responseType, response); + return CatalogHandlers.withIdempotency( + httpRequest, + () -> { + LoadTableResponse response = + CatalogHandlers.updateTable( + catalog, + tableIdentFromPathVars(vars), + castRequest(UpdateTableRequest.class, body)); + + responseHeaders.accept( + ImmutableMap.of( + HttpHeaders.ETAG, ETagProvider.of(response.metadataLocation()))); + + return castResponse(responseType, response); + }); } case RENAME_TABLE: { RenameTableRequest request = castRequest(RenameTableRequest.class, body); - CatalogHandlers.renameTable(catalog, request); + CatalogHandlers.withIdempotency( + httpRequest, () -> CatalogHandlers.renameTable(catalog, request)); return null; } @@ -395,7 +425,7 @@ public T handleRequest( case COMMIT_TRANSACTION: { CommitTransactionRequest request = castRequest(CommitTransactionRequest.class, body); - commitTransaction(catalog, request); + CatalogHandlers.withIdempotency(httpRequest, () -> commitTransaction(catalog, request)); return null; } @@ -422,8 +452,12 @@ public T handleRequest( if (null != asViewCatalog) { Namespace namespace = namespaceFromPathVars(vars); CreateViewRequest request = castRequest(CreateViewRequest.class, body); - return castResponse( - responseType, CatalogHandlers.createView(asViewCatalog, namespace, request)); + return CatalogHandlers.withIdempotency( + httpRequest, + () -> + castResponse( + responseType, + CatalogHandlers.createView(asViewCatalog, namespace, request))); } break; } @@ -451,8 +485,11 @@ public T handleRequest( if (null != asViewCatalog) { TableIdentifier ident = viewIdentFromPathVars(vars); UpdateTableRequest request = castRequest(UpdateTableRequest.class, body); - return castResponse( - responseType, CatalogHandlers.updateView(asViewCatalog, ident, request)); + return CatalogHandlers.withIdempotency( + httpRequest, + () -> + castResponse( + responseType, CatalogHandlers.updateView(asViewCatalog, ident, request))); } break; } @@ -461,7 +498,8 @@ public T handleRequest( { if (null != asViewCatalog) { RenameTableRequest request = castRequest(RenameTableRequest.class, body); - CatalogHandlers.renameView(asViewCatalog, request); + CatalogHandlers.withIdempotency( + httpRequest, () -> CatalogHandlers.renameView(asViewCatalog, request)); return null; } break; @@ -470,7 +508,9 @@ public T handleRequest( case DROP_VIEW: { if (null != asViewCatalog) { - CatalogHandlers.dropView(asViewCatalog, viewIdentFromPathVars(vars)); + CatalogHandlers.withIdempotency( + httpRequest, + () -> CatalogHandlers.dropView(asViewCatalog, viewIdentFromPathVars(vars))); return null; } break; @@ -568,8 +608,10 @@ protected T execute( vars.putAll(request.queryParameters()); vars.putAll(routeAndVars.second()); - return handleRequest( - routeAndVars.first(), vars.build(), request, responseType, responseHeaders); + T resp = + handleRequest( + routeAndVars.first(), vars.build(), request, responseType, responseHeaders); + return resp; } catch (RuntimeException e) { configureResponseFromException(e, errorBuilder); } diff --git a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java index d202680e5626..40dc050311fe 100644 --- a/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java +++ b/core/src/test/java/org/apache/iceberg/rest/TestRESTCatalog.java @@ -21,6 +21,7 @@ import static org.apache.iceberg.types.Types.NestedField.required; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode; import static org.assertj.core.api.InstanceOfAssertFactories.map; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyMap; @@ -69,28 +70,36 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.UpdatePartitionSpec; import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.CatalogTests; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SessionCatalog; import org.apache.iceberg.catalog.TableCommit; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.CommitFailedException; import org.apache.iceberg.exceptions.CommitStateUnknownException; import org.apache.iceberg.exceptions.NotAuthorizedException; import org.apache.iceberg.exceptions.NotFoundException; +import org.apache.iceberg.exceptions.RESTException; import org.apache.iceberg.exceptions.ServiceFailureException; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.inmemory.InMemoryCatalog; import org.apache.iceberg.io.FileIO; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.rest.HTTPRequest.HTTPMethod; import org.apache.iceberg.rest.RESTCatalogProperties.SnapshotMode; +import org.apache.iceberg.rest.auth.AuthManager; +import org.apache.iceberg.rest.auth.AuthManagers; +import org.apache.iceberg.rest.auth.AuthSession; import org.apache.iceberg.rest.auth.AuthSessionUtil; import org.apache.iceberg.rest.auth.OAuth2Properties; import org.apache.iceberg.rest.auth.OAuth2Util; +import org.apache.iceberg.rest.requests.CreateTableRequest; import org.apache.iceberg.rest.requests.UpdateTableRequest; import org.apache.iceberg.rest.responses.ConfigResponse; import org.apache.iceberg.rest.responses.CreateNamespaceResponse; @@ -100,6 +109,7 @@ import org.apache.iceberg.rest.responses.LoadTableResponse; import org.apache.iceberg.rest.responses.OAuthTokenResponse; import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.Pair; import org.assertj.core.api.InstanceOfAssertFactories; import org.awaitility.Awaitility; import org.eclipse.jetty.server.Server; @@ -124,12 +134,134 @@ public class TestRESTCatalog extends CatalogTests { RESTCatalogProperties.NAMESPACE_SEPARATOR, RESTCatalogAdapter.NAMESPACE_SEPARATOR_URLENCODED_UTF_8)); + private static final class IdempotentEnv { + private final TableIdentifier ident; + private final RESTClient http; + private final Map headers; + + private IdempotentEnv(TableIdentifier ident, RESTClient http, Map headers) { + this.ident = ident; + this.http = http; + this.headers = headers; + } + } + + /** + * Test-only adapter that keeps request/response round-trip serialization and header validation + * from the base test setup, while also allowing specific tests to inject transient failures. + */ + private static class HeaderValidatingAdapter extends RESTCatalogAdapter { + private final HTTPHeaders catalogHeaders; + private final HTTPHeaders contextHeaders; + private final java.util.concurrent.ConcurrentMap + simulateFailureOnFirstSuccessByKey = new java.util.concurrent.ConcurrentHashMap<>(); + + HeaderValidatingAdapter( + Catalog catalog, HTTPHeaders catalogHeaders, HTTPHeaders contextHeaders) { + super(catalog); + this.catalogHeaders = catalogHeaders; + this.contextHeaders = contextHeaders; + } + + /** + * Test helper to simulate a transient failure after the first successful mutation for a key. + * + *

Useful to validate that idempotency correctly replays a finalized result when the client + * retries after a post-success transient failure. + */ + public void simulateFailureOnFirstSuccessForKey(String key, RuntimeException failure) { + Preconditions.checkArgument(key != null, "Invalid idempotency key: null"); + Preconditions.checkArgument(failure != null, "Invalid failure: null"); + simulateFailureOnFirstSuccessByKey.put(key, failure); + } + + /** Test helper to simulate a transient 503 after the first successful mutation for a key. */ + public void simulate503OnFirstSuccessForKey(String key) { + simulateFailureOnFirstSuccessForKey( + key, + new CommitStateUnknownException( + new RuntimeException("simulated transient 503 after success"))); + } + + @Override + public T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders) { + if (!ResourcePaths.tokens().equals(request.path())) { + if (ResourcePaths.config().equals(request.path())) { + assertThat(request.headers().entries()).containsAll(catalogHeaders.entries()); + } else { + assertThat(request.headers().entries()).containsAll(contextHeaders.entries()); + } + } + + Object body = roundTripSerialize(request.body(), "request"); + HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); + T response = super.execute(req, responseType, errorHandler, responseHeaders); + return roundTripSerialize(response, "response"); + } + + @Override + protected T execute( + HTTPRequest request, + Class responseType, + Consumer errorHandler, + Consumer> responseHeaders, + ParserContext parserContext) { + ErrorResponse.Builder errorBuilder = ErrorResponse.builder(); + Pair> routeAndVars = Route.from(request.method(), request.path()); + if (routeAndVars != null) { + try { + ImmutableMap.Builder vars = ImmutableMap.builder(); + vars.putAll(request.queryParameters()); + vars.putAll(routeAndVars.second()); + + T resp = + handleRequest( + routeAndVars.first(), vars.build(), request, responseType, responseHeaders); + + // For tests: simulate a transient 503 after the first successful mutation for a key. + Optional keyHeader = + request.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER); + boolean isMutation = + request.method() == HTTPMethod.POST || request.method() == HTTPMethod.DELETE; + if (isMutation && keyHeader.isPresent()) { + String key = keyHeader.get().value(); + RuntimeException failure = simulateFailureOnFirstSuccessByKey.remove(key); + if (failure != null) { + throw failure; + } + } + + return resp; + } catch (RuntimeException e) { + configureResponseFromException(e, errorBuilder); + } + + } else { + errorBuilder + .responseCode(400) + .withType("BadRequestException") + .withMessage( + String.format("No route for request: %s %s", request.method(), request.path())); + } + + ErrorResponse error = errorBuilder.build(); + errorHandler.accept(error); + + // if the error handler doesn't throw an exception, throw a generic one + throw new RESTException("Unhandled error: %s", error); + } + } + @TempDir public Path temp; private RESTCatalog restCatalog; private InMemoryCatalog backendCatalog; private Server httpServer; - private RESTCatalogAdapter adapterForRESTServer; + private HeaderValidatingAdapter adapterForRESTServer; @BeforeEach public void createCatalog() throws Exception { @@ -156,31 +288,7 @@ public void createCatalog() throws Exception { "test-value")); adapterForRESTServer = - Mockito.spy( - new RESTCatalogAdapter(backendCatalog) { - @Override - public T execute( - HTTPRequest request, - Class responseType, - Consumer errorHandler, - Consumer> responseHeaders) { - // this doesn't use a Mockito spy because this is used for catalog tests, which have - // different method calls - if (!ResourcePaths.tokens().equals(request.path())) { - if (ResourcePaths.config().equals(request.path())) { - assertThat(request.headers().entries()).containsAll(catalogHeaders.entries()); - } else { - assertThat(request.headers().entries()).containsAll(contextHeaders.entries()); - } - } - - Object body = roundTripSerialize(request.body(), "request"); - HTTPRequest req = ImmutableHTTPRequest.builder().from(request).body(body).build(); - T response = super.execute(req, responseType, errorHandler, responseHeaders); - T responseAfterSerialization = roundTripSerialize(response, "response"); - return responseAfterSerialization; - } - }); + Mockito.spy(new HeaderValidatingAdapter(backendCatalog, catalogHeaders, contextHeaders)); ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS); @@ -3313,6 +3421,136 @@ public void testClientDoesNotSendIdempotencyWhenServerNotAdvertising() { local.dropTable(ident); } + @Test + public void testIdempotentDuplicateCreateReturnsCached() { + String key = "dup-create-key"; + Namespace ns = Namespace.of("ns_dup"); + IdempotentEnv env = idempotentEnv(key, ns, "t_dup"); + CreateTableRequest req = createReq(env.ident); + + // First create succeeds + LoadTableResponse first = + env.http.post( + ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), + req, + LoadTableResponse.class, + env.headers, + ErrorHandlers.tableErrorHandler()); + assertThat(first).isNotNull(); + + // Verify request shape (method, path, headers including Idempotency-Key) + verifyCreatePost(ns, env.headers); + + // Duplicate with same key returns cached 200 OK + LoadTableResponse second = + env.http.post( + ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), + req, + LoadTableResponse.class, + env.headers, + ErrorHandlers.tableErrorHandler()); + assertThat(second).isNotNull(); + } + + @Test + public void testIdempotencyKeyLifetimeExpiredTreatsAsNew() { + // Set TTL to 0 so cached success expires immediately + CatalogHandlers.setIdempotencyLifetimeFromIso("PT0S"); + try { + String key = "expired-create-key"; + Namespace ns = Namespace.of("ns_exp"); + IdempotentEnv env = idempotentEnv(key, ns, "t_exp"); + CreateTableRequest req = createReq(env.ident); + + // First create succeeds + LoadTableResponse created = + env.http.post( + ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), + req, + LoadTableResponse.class, + env.headers, + ErrorHandlers.tableErrorHandler()); + assertThat(created).isNotNull(); + + // Verify request shape (method, path, headers including Idempotency-Key) + verifyCreatePost(ns, env.headers); + + // TTL expired -> duplicate with same key should be treated as new and fail with AlreadyExists + assertThatThrownBy( + () -> + env.http.post( + ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), + req, + LoadTableResponse.class, + env.headers, + ErrorHandlers.tableErrorHandler())) + .isInstanceOf(AlreadyExistsException.class) + .hasMessageContaining(env.ident.toString()); + } finally { + // Restore default TTL for other tests + CatalogHandlers.setIdempotencyLifetimeFromIso("PT30M"); + } + } + + @Test + public void testIdempotentCreateReplayAfterSimulated503() { + // Use a fixed key and simulate 503 after first success for that key + String key = "idemp-create-503"; + adapterForRESTServer.simulate503OnFirstSuccessForKey(key); + Namespace ns = Namespace.of("ns_idemp"); + IdempotentEnv env = idempotentEnv(key, ns, "t_idemp"); + CreateTableRequest req = createReq(env.ident); + + // First attempt: server finalizes success but responds 503 + assertThatThrownBy( + () -> + env.http.post( + ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), + req, + LoadTableResponse.class, + env.headers, + ErrorHandlers.tableErrorHandler())) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("simulated transient 503"); + + // Verify request shape (method, path, headers including Idempotency-Key) + verifyCreatePost(ns, env.headers); + + // Retry with same key: server should replay 200 OK + LoadTableResponse replay = + env.http.post( + ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), + req, + LoadTableResponse.class, + env.headers, + ErrorHandlers.tableErrorHandler()); + assertThat(replay).isNotNull(); + } + + @Test + public void testIdempotentDropDuplicateNoop() { + String key = "idemp-drop-void"; + Namespace ns = Namespace.of("ns_void"); + IdempotentEnv env = idempotentEnv(key, ns, "t_void"); + + // Create a table to drop + restCatalog.createTable( + env.ident, + new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get())), + PartitionSpec.unpartitioned()); + + String path = ResourcePaths.forCatalogProperties(ImmutableMap.of()).table(env.ident); + + // First drop: table exists -> drop succeeds + env.http.delete(path, null, env.headers, ErrorHandlers.tableErrorHandler()); + assertThat(restCatalog.tableExists(env.ident)).isFalse(); + + // Second drop with the same key: should be a no-op (no exception) + assertThatCode( + () -> env.http.delete(path, null, env.headers, ErrorHandlers.tableErrorHandler())) + .doesNotThrowAnyException(); + } + @Test public void nestedNamespaceWithLegacySeparator() { RESTCatalogAdapter adapter = Mockito.spy(new RESTCatalogAdapter(backendCatalog)); @@ -3454,6 +3692,71 @@ public T execute( return local; } + private Pair> httpAndHeaders(String idempotencyKey) { + Map headers = + ImmutableMap.of( + RESTUtil.IDEMPOTENCY_KEY_HEADER, + idempotencyKey, + "Authorization", + "Bearer client-credentials-token:sub=user", + "test-header", + "test-value"); + + Map conf = + ImmutableMap.of( + CatalogProperties.URI, + httpServer.getURI().toString(), + HTTPClient.REST_SOCKET_TIMEOUT_MS, + "600000", + HTTPClient.REST_CONNECTION_TIMEOUT_MS, + "600000", + "header.test-header", + "test-value"); + RESTClient httpBase = + HTTPClient.builder(conf) + .uri(conf.get(CatalogProperties.URI)) + .withHeaders(RESTUtil.configHeaders(conf)) + .build(); + AuthManager am = AuthManagers.loadAuthManager("test", conf); + AuthSession httpSession = am.initSession(httpBase, conf); + RESTClient http = httpBase.withAuthSession(httpSession); + return Pair.of(http, headers); + } + + private Pair>> prepareIdempotentEnv( + String key, Namespace ns, String tableName) { + TableIdentifier ident = TableIdentifier.of(ns, tableName); + restCatalog.createNamespace(ns, ImmutableMap.of()); + return Pair.of(ident, httpAndHeaders(key)); + } + + private IdempotentEnv idempotentEnv(String key, Namespace ns, String tableName) { + Pair>> env = + prepareIdempotentEnv(key, ns, tableName); + Pair> httpAndHeaders = env.second(); + return new IdempotentEnv(env.first(), httpAndHeaders.first(), httpAndHeaders.second()); + } + + private static CreateTableRequest createReq(TableIdentifier ident) { + return CreateTableRequest.builder() + .withName(ident.name()) + .withSchema(new Schema(Types.NestedField.required(1, "id", Types.IntegerType.get()))) + .withPartitionSpec(PartitionSpec.unpartitioned()) + .build(); + } + + private void verifyCreatePost(Namespace ns, Map headers) { + verify(adapterForRESTServer, atLeastOnce()) + .execute( + reqMatcherContainsHeaders( + HTTPMethod.POST, + ResourcePaths.forCatalogProperties(ImmutableMap.of()).tables(ns), + headers), + eq(LoadTableResponse.class), + any(), + any()); + } + @Test @Override public void testLoadTableWithMissingMetadataFile(@TempDir Path tempDir) { @@ -3545,6 +3848,15 @@ static HTTPRequest reqMatcher( && Objects.equals(req.body(), body)); } + static HTTPRequest reqMatcherContainsHeaders( + HTTPMethod method, String path, Map headers) { + return argThat( + req -> + req.method() == method + && req.path().equals(path) + && req.headers().entries().containsAll(HTTPHeaders.of(headers).entries())); + } + private static List allRequests(RESTCatalogAdapter adapter) { ArgumentCaptor captor = ArgumentCaptor.forClass(HTTPRequest.class); verify(adapter, atLeastOnce()).execute(captor.capture(), any(), any(), any());