diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java
index 36f05be67751..6e948dd92e5f 100644
--- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java
+++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/general/DefaultReplicationWorker.java
@@ -38,7 +38,7 @@
import io.airbyte.workers.internal.AirbyteDestination;
import io.airbyte.workers.internal.AirbyteMapper;
import io.airbyte.workers.internal.AirbyteSource;
-import io.airbyte.workers.internal.MessageTracker;
+import io.airbyte.workers.internal.book_keeping.MessageTracker;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTracker.java
similarity index 91%
rename from airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java
rename to airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTracker.java
index a94c00927431..aa0fa8ec70a8 100644
--- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java
+++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/book_keeping/AirbyteMessageTracker.java
@@ -2,7 +2,7 @@
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
-package io.airbyte.workers.internal;
+package io.airbyte.workers.internal.book_keeping;
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
@@ -28,7 +28,7 @@
import io.airbyte.protocol.models.AirbyteStreamNameNamespacePair;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.helper.FailureHelper;
-import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerNoStateMatchException;
+import io.airbyte.workers.internal.book_keeping.StateMetricsTracker.StateMetricsTrackerNoStateMatchException;
import io.airbyte.workers.internal.state_aggregator.DefaultStateAggregator;
import io.airbyte.workers.internal.state_aggregator.StateAggregator;
import java.time.LocalDateTime;
@@ -42,6 +42,13 @@
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
+/**
+ * This class is responsible for stats and metadata tracking surrounding
+ * {@link AirbyteRecordMessage}.
+ *
+ * It is not intended to perform meaningful operations - transforming, mutating, triggering
+ * downstream actions etc. - on specific messages.
+ */
@Slf4j
public class AirbyteMessageTracker implements MessageTracker {
@@ -54,8 +61,6 @@ public class AirbyteMessageTracker implements MessageTracker {
private final HashFunction hashFunction;
private final BiMap nameNamespacePairToIndex;
private final Map nameNamespacePairToStreamStats;
- private final Map streamToTotalBytesEmitted;
- private final Map streamToTotalRecordsEmitted;
private final StateDeltaTracker stateDeltaTracker;
private final StateMetricsTracker stateMetricsTracker;
private final List destinationErrorTraceMessages;
@@ -86,11 +91,6 @@ private enum ConnectorType {
DESTINATION
}
- /**
- * POJO for all per-stream stats.
- */
- private record StreamStats(long estimatedBytes, long emittedBytes, long estimatedRecords, long emittedRecords) {}
-
public AirbyteMessageTracker() {
this(new StateDeltaTracker(STATE_DELTA_TRACKER_MEMORY_LIMIT_BYTES),
new DefaultStateAggregator(new EnvVariableFeatureFlags().useStreamCapableState()),
@@ -107,8 +107,6 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
this.nameNamespacePairToIndex = HashBiMap.create();
this.hashFunction = Hashing.murmur3_32_fixed();
this.nameNamespacePairToStreamStats = new HashMap<>();
- this.streamToTotalBytesEmitted = new HashMap<>();
- this.streamToTotalRecordsEmitted = new HashMap<>();
this.stateDeltaTracker = stateDeltaTracker;
this.stateMetricsTracker = stateMetricsTracker;
this.nextStreamIndex = 0;
@@ -155,17 +153,19 @@ private void handleSourceEmittedRecord(final AirbyteRecordMessage recordMessage)
stateMetricsTracker.setFirstRecordReceivedAt(LocalDateTime.now());
}
- final short streamIndex = getStreamIndex(AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage));
+ final var nameNamespace = AirbyteStreamNameNamespacePair.fromRecordMessage(recordMessage);
+ final short streamIndex = getStreamIndex(nameNamespace);
final long currentRunningCount = streamToRunningCount.getOrDefault(streamIndex, 0L);
streamToRunningCount.put(streamIndex, currentRunningCount + 1);
- final long currentTotalCount = streamToTotalRecordsEmitted.getOrDefault(streamIndex, 0L);
- streamToTotalRecordsEmitted.put(streamIndex, currentTotalCount + 1);
+ final var currStats = nameNamespacePairToStreamStats.getOrDefault(nameNamespace, new StreamStats());
+ currStats.emittedRecords++;
final int estimatedNumBytes = Jsons.getEstimatedByteSize(recordMessage.getData());
- final long currentTotalStreamBytes = streamToTotalBytesEmitted.getOrDefault(streamIndex, 0L);
- streamToTotalBytesEmitted.put(streamIndex, currentTotalStreamBytes + estimatedNumBytes);
+ currStats.emittedBytes += estimatedNumBytes;
+
+ nameNamespacePairToStreamStats.put(nameNamespace, currStats);
}
/**
@@ -296,9 +296,11 @@ private void handleEmittedEstimateTrace(final AirbyteEstimateTraceMessage estima
Preconditions.checkArgument(totalRecordsEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync.");
log.debug("Saving stream estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName());
- nameNamespacePairToStreamStats.put(
- new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace()),
- new StreamStats(estimate.getByteEstimate(), 0L, estimate.getRowEstimate(), 0L));
+ final var nameNamespace = new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace());
+ final var currStats = nameNamespacePairToStreamStats.getOrDefault(nameNamespace, new StreamStats());
+ currStats.estimatedRecords = estimate.getRowEstimate();
+ currStats.estimatedBytes = estimate.getByteEstimate();
+ nameNamespacePairToStreamStats.put(nameNamespace, currStats);
}
case SYNC -> {
Preconditions.checkArgument(nameNamespacePairToStreamStats.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync.");
@@ -404,8 +406,8 @@ public Optional