diff --git a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java index c669ab7..76511a8 100644 --- a/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java +++ b/src/main/java/com/uid2/optout/partner/OptOutPartnerEndpoint.java @@ -15,6 +15,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.http.HttpRequest; +import java.time.Duration; import java.util.Set; import java.util.regex.Pattern; @@ -77,7 +78,7 @@ public Future send(OptOutEntry entry) { LOGGER.info("replaying optout " + config.url() + " - advertising_id: " + Utils.maskPii(entry.advertisingId) + ", epoch: " + entry.timestamp); - return builder.build(); + return builder.timeout(Duration.ofSeconds(30)).build(); }, resp -> { if (resp == null) { diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSender.java b/src/main/java/com/uid2/optout/vertx/OptOutSender.java index 81e08c8..846648a 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSender.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSender.java @@ -29,6 +29,7 @@ import java.time.temporal.ChronoUnit; import java.util.*; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; @@ -62,6 +63,14 @@ public void error(String message, Object... args) { } } + // When the partner config changes, Verticles are undeployed and new ones + // are created. These newly created Verticles register Micrometer gauges. + // However, you can't "re-register" a gauge with a new number. Therefore, + // we need to re-use the numbers that the gauges track across different + // Verticle instances. + private static final ConcurrentHashMap lastEntrySentMap = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap pendingFilesCountMap = new ConcurrentHashMap<>(); + private final OptOutSenderLogger logger; private final HealthComponent healthComponent; private final String deltaConsumerDir; @@ -72,8 +81,8 @@ public void error(String message, Object... args) { private final IOptOutPartnerEndpoint remotePartner; private final String eventCloudSyncDownloaded; private final Map, Counter> entryReplayStatusCounters = new HashMap<>(); - private final AtomicInteger pendingFilesCount = new AtomicInteger(0); - private final AtomicLong lastEntrySent = new AtomicLong(0); + private final AtomicInteger pendingFilesCount; + private final AtomicLong lastEntrySent; private LinkedList pendingFiles = new LinkedList<>(); private AtomicBoolean isReplaying = new AtomicBoolean(false); private CompletableFuture pendingAsyncOp = null; @@ -107,6 +116,9 @@ public OptOutSender(JsonObject jsonConfig, IOptOutPartnerEndpoint optOutPartner, this.timestampFile = Paths.get(jsonConfig.getString(Const.Config.OptOutDataDirProp), "remote_replicate", this.remotePartner.name() + "_timestamp.txt"); this.processedDeltasFile = Paths.get(jsonConfig.getString(Const.Config.OptOutDataDirProp), "remote_replicate", this.remotePartner.name() + "_processed.txt"); + this.pendingFilesCount = pendingFilesCountMap.computeIfAbsent(remotePartner.name(), s -> new AtomicInteger(0)); + this.lastEntrySent = lastEntrySentMap.computeIfAbsent(remotePartner.name(), s -> new AtomicLong(0)); + Gauge.builder("uid2.optout.last_entry_sent", () -> this.lastEntrySent.get()) .description("gauge for last entry send epoch seconds, per each remote partner") .tag("remote_partner", remotePartner.name()) diff --git a/src/main/java/com/uid2/optout/web/RetryingWebClient.java b/src/main/java/com/uid2/optout/web/RetryingWebClient.java index b69a3cf..632fc82 100644 --- a/src/main/java/com/uid2/optout/web/RetryingWebClient.java +++ b/src/main/java/com/uid2/optout/web/RetryingWebClient.java @@ -1,8 +1,8 @@ package com.uid2.optout.web; +import com.google.common.base.Stopwatch; import io.netty.handler.codec.http.HttpMethod; import io.vertx.core.Future; -import io.vertx.core.Promise; import io.vertx.core.Vertx; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,7 +11,9 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.function.BiFunction; @@ -35,51 +37,46 @@ public RetryingWebClient(Vertx vertx, String uri, HttpMethod method, int retryCo } public Future send(BiFunction requestCreator, Function responseValidator) { - return this.send(requestCreator, responseValidator, 0); + final UUID requestId = UUID.randomUUID(); + return this.send(requestCreator, responseValidator, 0, requestId) + .onFailure(ex -> LOGGER.error("requestId={} Request to {} failed", requestId, uri, ex)); } - public Future send(BiFunction requestCreator, Function responseValidator, int currentRetries) { - Promise promise = Promise.promise(); - + public Future send(BiFunction requestCreator, Function responseValidator, int currentRetries, UUID requestId) { HttpRequest request = requestCreator.apply(this.uri, this.method); - CompletableFuture> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.ofString()); - asyncResponse.thenAccept(response -> { - try { - Boolean responseOK = responseValidator.apply(response); - if (responseOK == null) { - throw new RuntimeException("Response validator returned null"); - } + LOGGER.info("requestId={} Sending request to {}, currentRetries={}", requestId, uri, currentRetries); + + final Stopwatch sw = Stopwatch.createStarted(); + + final CompletableFuture> asyncResponse = this.httpClient.sendAsync(request, HttpResponse.BodyHandlers.discarding()); + + return Future.fromCompletionStage(asyncResponse, vertx.getOrCreateContext()).compose(response -> { + sw.stop(); + + LOGGER.info("requestId={} Request to {} completed in {}ms, currentRetries={}, status={}, version={}", requestId, uri, sw.elapsed(TimeUnit.MILLISECONDS), currentRetries, response.statusCode(), response.version()); + + Boolean responseOK = responseValidator.apply(response); + if (responseOK == null) { + return Future.failedFuture(new RuntimeException("Response validator returned null")); + } + + if (responseOK) { + return Future.succeededFuture(); + } - if (responseOK) { - promise.complete(); - } else if (currentRetries < this.retryCount) { - LOGGER.error("failed sending to " + uri + ", currentRetries: " + currentRetries + ", backing off before retrying"); - if (this.retryBackoffMs > 0) { - vertx.setTimer(this.retryBackoffMs, i -> { - send(requestCreator, responseValidator, currentRetries + 1) - .onComplete(ar2 -> promise.handle(ar2)); - }); - } else { - send(requestCreator, responseValidator, currentRetries + 1) - .onComplete(ar2 -> promise.handle(ar2)); - } + if (currentRetries < this.retryCount) { + LOGGER.error("requestId={} failed sending to {}, currentRetries={}, backing off for {}ms before retrying", requestId, uri, currentRetries, this.retryBackoffMs); + if (this.retryBackoffMs > 0) { + return vertx.timer(this.retryBackoffMs) + .compose(v -> send(requestCreator, responseValidator, currentRetries + 1, requestId)); } else { - LOGGER.error("retry count exceeded for sending to " + this.uri); - throw new TooManyRetriesException(currentRetries); + return send(requestCreator, responseValidator, currentRetries + 1, requestId); } } - catch (Throwable ex) { - promise.fail(ex); - } - }); - asyncResponse.exceptionally(ex -> { - promise.fail(ex); - return null; + LOGGER.error("requestId={} retry count exceeded for sending to {}", requestId, this.uri); + return Future.failedFuture(new TooManyRetriesException(currentRetries)); }); - - - return promise.future(); } } diff --git a/src/main/resources/localstack/s3/optout/optout-v2/delta/2023-01-01/optout-delta-000_2023-01-01T00.00.00Z_418810dc.dat b/src/main/resources/localstack/s3/optout/optout-v2/delta/2024-04-01/optout-delta-000_2024-04-01T00.00.00Z_418810dc.dat similarity index 100% rename from src/main/resources/localstack/s3/optout/optout-v2/delta/2023-01-01/optout-delta-000_2023-01-01T00.00.00Z_418810dc.dat rename to src/main/resources/localstack/s3/optout/optout-v2/delta/2024-04-01/optout-delta-000_2024-04-01T00.00.00Z_418810dc.dat