Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions core/src/main/java/org/apache/iceberg/rest/CatalogHandlers.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import static org.apache.iceberg.TableProperties.COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.Collections;
Expand All @@ -33,10 +35,14 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.function.ToIntFunction;
import java.util.stream.Collectors;
import org.apache.iceberg.BaseMetadataTable;
Expand Down Expand Up @@ -66,6 +72,7 @@
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.exceptions.NoSuchViewException;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -108,8 +115,152 @@ public class CatalogHandlers {
InMemoryPlanningState.getInstance();
private static final ExecutorService ASYNC_PLANNING_POOL = Executors.newSingleThreadExecutor();

// Advanced idempotency store with TTL and in-flight coalescing.
//
// Note: This is a simple in-memory implementation meant for tests and lightweight usage.
// Production servers should provide a durable store.
private static final ConcurrentMap<String, IdempotencyEntry> IDEMPOTENCY_STORE =
Maps.newConcurrentMap();
private static volatile long idempotencyLifetimeMillis = TimeUnit.MINUTES.toMillis(30);

private CatalogHandlers() {}

@SuppressWarnings("unchecked")
static <T extends RESTResponse> T withIdempotency(HTTPRequest httpRequest, Supplier<T> action) {
return withIdempotencyInternal(httpRequest, action);
}

static void withIdempotency(HTTPRequest httpRequest, Runnable action) {
withIdempotencyInternal(
httpRequest,
() -> {
action.run();
return Boolean.TRUE;
});
}

@SuppressWarnings("unchecked")
private static <T> T withIdempotencyInternal(HTTPRequest httpRequest, Supplier<T> action) {
Optional<HTTPHeaders.HTTPHeader> keyHeader =
httpRequest.headers().firstEntry(RESTUtil.IDEMPOTENCY_KEY_HEADER);
if (keyHeader.isEmpty()) {
return action.get();
}

String key = keyHeader.get().value();

// The "first" request for this Idempotency-Key is the one that wins
// IDEMPOTENCY_STORE.compute(...)
// and creates (or replaces) the IN_PROGRESS entry. Only that request executes the action and
// finalizes the entry; concurrent requests for the same key wait on the latch and then replay
// the finalized result/error.
AtomicBoolean isFirst = new AtomicBoolean(false);
IdempotencyEntry entry =
IDEMPOTENCY_STORE.compute(
key,
(k, current) -> {
if (current == null || current.isExpired()) {
isFirst.set(true);
return IdempotencyEntry.inProgress();
}
return current;
});

// Fast-path: already finalized (another request completed earlier)
if (entry.status == IdempotencyEntry.Status.FINALIZED) {
if (entry.error != null) {
throw entry.error;
}
return (T) entry.responseBody;
}

if (!isFirst.get()) {
// In-flight coalescing: wait for the first request to finalize
entry.awaitFinalization();
if (entry.error != null) {
throw entry.error;
}
return (T) entry.responseBody;
}

// First request: execute the action and finalize the entry
try {
T res = action.get();
entry.finalizeSuccess(res);
return res;
} catch (RuntimeException e) {
entry.finalizeError(e);
throw e;
}
}

@VisibleForTesting
static void setIdempotencyLifetimeFromIso(String isoDuration) {
if (isoDuration == null) {
return;
}
try {
idempotencyLifetimeMillis = Duration.parse(isoDuration).toMillis();
} catch (Exception e) {
throw new IllegalArgumentException("Invalid idempotency lifetime: " + isoDuration, e);
}
}

private static final class IdempotencyEntry {
enum Status {
IN_PROGRESS,
FINALIZED
}

private final CountDownLatch latch;
private final long firstSeenMillis;
private volatile Status status;
private volatile Object responseBody;
private volatile RuntimeException error;

private IdempotencyEntry(Status status) {
this.status = status;
this.latch = new CountDownLatch(1);
this.firstSeenMillis = System.currentTimeMillis();
}

static IdempotencyEntry inProgress() {
return new IdempotencyEntry(Status.IN_PROGRESS);
}

void finalizeSuccess(Object body) {
this.responseBody = body;
this.status = Status.FINALIZED;
this.latch.countDown();
}

void finalizeError(RuntimeException cause) {
this.error = cause;
this.status = Status.FINALIZED;
this.latch.countDown();
}

void awaitFinalization() {
try {
this.latch.await();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new RuntimeException(
"Interrupted while waiting for idempotent request to complete", ie);
}
}

boolean isExpired() {
if (this.status != Status.FINALIZED) {
return false;
}

Instant expiry =
Instant.ofEpochMilli(this.firstSeenMillis).plusMillis(idempotencyLifetimeMillis);
return Instant.now().isAfter(expiry);
}
}

/**
* Exception used to avoid retrying commits when assertions fail.
*
Expand Down
Loading