From 10aaa8c8eea53f36e041567376fde98f4dbe7ef0 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Sat, 26 Nov 2022 09:51:10 -0800 Subject: [PATCH 01/12] Add all the required interface methods for message tracking. --- .../internal/AirbyteMessageTracker.java | 39 +++++++++++++++++-- .../workers/internal/MessageTracker.java | 8 ++++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index f7d8caa4be69..fe7d5018e80c 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -53,6 +53,8 @@ public class AirbyteMessageTracker implements MessageTracker { private final BiMap nameNamespacePairToIndex; private final Map streamToTotalBytesEmitted; private final Map streamToTotalRecordsEmitted; + private final Map streamToTotalBytesEstimated; + private final Map streamToTotalRecordsEstimated; private final StateDeltaTracker stateDeltaTracker; private final StateMetricsTracker stateMetricsTracker; private final List destinationErrorTraceMessages; @@ -95,6 +97,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, this.hashFunction = Hashing.murmur3_32_fixed(); this.streamToTotalBytesEmitted = new HashMap<>(); this.streamToTotalRecordsEmitted = new HashMap<>(); + this.streamToTotalBytesEstimated = new HashMap<>(); + this.streamToTotalRecordsEstimated = new HashMap<>(); this.stateDeltaTracker = stateDeltaTracker; this.stateMetricsTracker = stateMetricsTracker; this.nextStreamIndex = 0; @@ -252,7 +256,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec */ private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) { switch (traceMessage.getType()) { - case ESTIMATE -> handleEmittedEstimateTrace(traceMessage, connectorType); + case ESTIMATE -> handleEmittedEstimateTrace(traceMessage); case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType); default -> log.warn("Invalid message type for trace message: {}", traceMessage); } @@ -266,9 +270,16 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage } } - @SuppressWarnings("PMD") // until method is implemented - private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) { + private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage) { + // Assume the estimate is a whole number and not a sum i.e. each estimate replaces the previous + // estimate. + + final var estimate = estimateTraceMessage.getEstimate(); + log.info("Saving records estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName()); + final var index = getStreamIndex(new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace())); + streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate()); + streamToTotalBytesEstimated.put(index, estimate.getByteEstimate()); } private short getStreamIndex(final AirbyteStreamNameNamespacePair pair) { @@ -368,6 +379,12 @@ public Map getStreamToEmittedRecords() { entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); } + @Override + public Map getStreamToEstimatedRecords() { + return streamToTotalRecordsEstimated.entrySet().stream().collect(Collectors.toMap( + entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); + } + /** * Swap out stream indices for stream names and return total bytes emitted by stream. */ @@ -377,6 +394,12 @@ public Map getStreamToEmittedBytes() { entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); } + @Override + public Map getStreamToEstimatedBytes() { + return streamToTotalBytesEstimated.entrySet().stream().collect(Collectors.toMap( + entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); + } + /** * Compute sum of emitted record counts across all streams. */ @@ -385,6 +408,11 @@ public long getTotalRecordsEmitted() { return streamToTotalRecordsEmitted.values().stream().reduce(0L, Long::sum); } + @Override + public long getTotalRecordsEstimated() { + return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum); + } + /** * Compute sum of emitted bytes across all streams. */ @@ -393,6 +421,11 @@ public long getTotalBytesEmitted() { return streamToTotalBytesEmitted.values().stream().reduce(0L, Long::sum); } + @Override + public long getTotalBytesEstimated() { + return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum); + } + /** * Compute sum of committed record counts across all streams. If the delta tracker has exceeded its * capacity, return empty because committed record counts cannot be reliably computed. diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java index 09507ec7a374..3bb1ed81a529 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java @@ -66,6 +66,8 @@ public interface MessageTracker { */ Map getStreamToEmittedRecords(); + Map getStreamToEstimatedRecords(); + /** * Get the per-stream emitted byte count. This includes messages that were emitted by the source, * but never committed by the destination. @@ -74,6 +76,8 @@ public interface MessageTracker { */ Map getStreamToEmittedBytes(); + Map getStreamToEstimatedBytes(); + /** * Get the overall emitted record count. This includes messages that were emitted by the source, but * never committed by the destination. @@ -82,6 +86,8 @@ public interface MessageTracker { */ long getTotalRecordsEmitted(); + long getTotalRecordsEstimated(); + /** * Get the overall emitted bytes. This includes messages that were emitted by the source, but never * committed by the destination. @@ -90,6 +96,8 @@ public interface MessageTracker { */ long getTotalBytesEmitted(); + long getTotalBytesEstimated(); + /** * Get the overall committed record count. * From 63436da0757d4138266f21abbd3d62237d44e089 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Sat, 26 Nov 2022 09:59:48 -0800 Subject: [PATCH 02/12] Slight reformat. --- .../internal/AirbyteMessageTrackerTest.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index 5123b299453c..8a69df7c87c2 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -5,6 +5,7 @@ package io.airbyte.workers.internal; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import io.airbyte.commons.json.Jsons; @@ -19,6 +20,7 @@ import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -293,8 +295,8 @@ void testGetFirstDestinationAndSourceMessages() throws Exception { @Test void testGetFirstDestinationAndSourceMessagesWithNulls() throws Exception { - assertEquals(messageTracker.getFirstDestinationErrorTraceMessage(), null); - assertEquals(messageTracker.getFirstSourceErrorTraceMessage(), null); + assertNull(messageTracker.getFirstDestinationErrorTraceMessage()); + assertNull(messageTracker.getFirstSourceErrorTraceMessage()); } @Test @@ -309,7 +311,7 @@ void testErrorTraceMessageFailureWithMultipleTraceErrors() throws Exception { messageTracker.acceptFromDestination(destMessage2); final FailureReason failureReason = FailureHelper.sourceFailure(sourceMessage1.getTrace(), Long.valueOf(123), 1); - assertEquals(messageTracker.errorTraceMessageFailure(Long.valueOf(123), 1), + assertEquals(messageTracker.errorTraceMessageFailure(123L, 1), failureReason); } @@ -319,12 +321,22 @@ void testErrorTraceMessageFailureWithOneTraceError() throws Exception { messageTracker.acceptFromDestination(destMessage); final FailureReason failureReason = FailureHelper.destinationFailure(destMessage.getTrace(), Long.valueOf(123), 1); - assertEquals(messageTracker.errorTraceMessageFailure(Long.valueOf(123), 1), failureReason); + assertEquals(messageTracker.errorTraceMessageFailure(123L, 1), failureReason); } @Test void testErrorTraceMessageFailureWithNoTraceErrors() throws Exception { - assertEquals(messageTracker.errorTraceMessageFailure(Long.valueOf(123), 1), null); + assertEquals(messageTracker.errorTraceMessageFailure(123L, 1), null); + } + + @Nested + class Estimates { + // receiving an estimate for two streams should save + @Test + void shouldSaveAndReturnCorrectly() { + final AirbyteMessage estimate = AirbyteMessageUtils.createTraceMessage("dest trace 1", Double.valueOf(125)); + } + } } From b86fdf33b2cb455e57db7e3acfb6981547bed6b1 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Sat, 26 Nov 2022 10:34:35 -0800 Subject: [PATCH 03/12] Minor refactor of the AirbyteMessageUtils and the AirbyteMessageTrackerTest classes. --- .../test_utils/AirbyteMessageUtils.java | 45 ++++++++++++------- .../general/DefaultReplicationWorkerTest.java | 2 +- .../internal/AirbyteMessageTrackerTest.java | 37 +++++++-------- .../DefaultCheckConnectionWorkerTest.java | 2 +- .../DefaultDiscoverCatalogWorkerTest.java | 2 +- .../general/DefaultGetSpecWorkerTest.java | 2 +- 6 files changed, 53 insertions(+), 37 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java index 2aede7159739..f40f4094787f 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java @@ -8,6 +8,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.AirbyteErrorTraceMessage; +import io.airbyte.protocol.models.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.AirbyteGlobalState; import io.airbyte.protocol.models.AirbyteLogMessage; import io.airbyte.protocol.models.AirbyteMessage; @@ -102,29 +103,43 @@ public static AirbyteStreamState createStreamState(final String streamName) { return new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(streamName)); } + public static AirbyteMessage createEstimateMessage(final String namespace, final String name, final long byteEst, final long rowEst) { + final var est = new AirbyteEstimateTraceMessage() + .withType(AirbyteEstimateTraceMessage.Type.STREAM) + .withNamespace(namespace) + .withName(name) + .withByteEstimate(byteEst) + .withRowEstimate(rowEst); + + return new AirbyteMessage() + .withType(Type.TRACE) + .withTrace(new AirbyteTraceMessage().withType(AirbyteTraceMessage.Type.ESTIMATE) + .withEstimate(est)); + } + + public static AirbyteMessage createErrorMessage(final String message, final Double emittedAt) { + return new AirbyteMessage() + .withType(AirbyteMessage.Type.TRACE) + .withTrace(createErrorTraceMessage(message, emittedAt)); + } + public static AirbyteTraceMessage createErrorTraceMessage(final String message, final Double emittedAt) { - return new AirbyteTraceMessage() - .withType(io.airbyte.protocol.models.AirbyteTraceMessage.Type.ERROR) - .withEmittedAt(emittedAt) - .withError(new AirbyteErrorTraceMessage().withMessage(message)); + return createErrorTraceMessage(message, emittedAt, null); } public static AirbyteTraceMessage createErrorTraceMessage(final String message, final Double emittedAt, final AirbyteErrorTraceMessage.FailureType failureType) { - return new AirbyteTraceMessage() + final var msg = new AirbyteTraceMessage() .withType(io.airbyte.protocol.models.AirbyteTraceMessage.Type.ERROR) - .withEmittedAt(emittedAt) - .withError(new AirbyteErrorTraceMessage().withMessage(message).withFailureType(failureType)); - } + .withError(new AirbyteErrorTraceMessage().withMessage(message)) + .withEmittedAt(emittedAt); - public static AirbyteMessage createTraceMessage(final String message, final Double emittedAt) { - return new AirbyteMessage() - .withType(AirbyteMessage.Type.TRACE) - .withTrace(new AirbyteTraceMessage() - .withType(AirbyteTraceMessage.Type.ERROR) - .withEmittedAt(emittedAt) - .withError(new AirbyteErrorTraceMessage().withMessage(message))); + if (failureType != null) { + msg.getError().withFailureType(failureType); + } + + return msg; } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java index 7b915ed4943c..e13f44edb6b4 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/DefaultReplicationWorkerTest.java @@ -307,7 +307,7 @@ void testReplicationRunnableWorkerFailure() throws Exception { @Test void testOnlyStateAndRecordMessagesDeliveredToDestination() throws Exception { final AirbyteMessage LOG_MESSAGE = AirbyteMessageUtils.createLogMessage(Level.INFO, "a log message"); - final AirbyteMessage TRACE_MESSAGE = AirbyteMessageUtils.createTraceMessage("a trace message", 123456.0); + final AirbyteMessage TRACE_MESSAGE = AirbyteMessageUtils.createErrorMessage("a trace message", 123456.0); when(mapper.mapMessage(LOG_MESSAGE)).thenReturn(LOG_MESSAGE); when(mapper.mapMessage(TRACE_MESSAGE)).thenReturn(TRACE_MESSAGE); when(source.isFinished()).thenReturn(false, false, false, false, true); diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index 8a69df7c87c2..b42b60d5ddd0 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -279,11 +279,11 @@ void testGetTotalRecordsCommitted_emptyWhenCommitStateHashThrowsException() thro } @Test - void testGetFirstDestinationAndSourceMessages() throws Exception { - final AirbyteMessage sourceMessage1 = AirbyteMessageUtils.createTraceMessage("source trace 1", Double.valueOf(123)); - final AirbyteMessage sourceMessage2 = AirbyteMessageUtils.createTraceMessage("source trace 2", Double.valueOf(124)); - final AirbyteMessage destMessage1 = AirbyteMessageUtils.createTraceMessage("dest trace 1", Double.valueOf(125)); - final AirbyteMessage destMessage2 = AirbyteMessageUtils.createTraceMessage("dest trace 2", Double.valueOf(126)); + void testGetFirstDestinationAndSourceMessages() { + final AirbyteMessage sourceMessage1 = AirbyteMessageUtils.createErrorMessage("source trace 1", 123.0); + final AirbyteMessage sourceMessage2 = AirbyteMessageUtils.createErrorMessage("source trace 2", 124.0); + final AirbyteMessage destMessage1 = AirbyteMessageUtils.createErrorMessage("dest trace 1", 125.0); + final AirbyteMessage destMessage2 = AirbyteMessageUtils.createErrorMessage("dest trace 2", 126.0); messageTracker.acceptFromSource(sourceMessage1); messageTracker.acceptFromSource(sourceMessage2); messageTracker.acceptFromDestination(destMessage1); @@ -294,17 +294,17 @@ void testGetFirstDestinationAndSourceMessages() throws Exception { } @Test - void testGetFirstDestinationAndSourceMessagesWithNulls() throws Exception { + void testGetFirstDestinationAndSourceMessagesWithNulls() { assertNull(messageTracker.getFirstDestinationErrorTraceMessage()); assertNull(messageTracker.getFirstSourceErrorTraceMessage()); } @Test - void testErrorTraceMessageFailureWithMultipleTraceErrors() throws Exception { - final AirbyteMessage sourceMessage1 = AirbyteMessageUtils.createTraceMessage("source trace 1", Double.valueOf(123)); - final AirbyteMessage sourceMessage2 = AirbyteMessageUtils.createTraceMessage("source trace 2", Double.valueOf(124)); - final AirbyteMessage destMessage1 = AirbyteMessageUtils.createTraceMessage("dest trace 1", Double.valueOf(125)); - final AirbyteMessage destMessage2 = AirbyteMessageUtils.createTraceMessage("dest trace 2", Double.valueOf(126)); + void testErrorTraceMessageFailureWithMultipleTraceErrors() { + final AirbyteMessage sourceMessage1 = AirbyteMessageUtils.createErrorMessage("source trace 1", 123.0); + final AirbyteMessage sourceMessage2 = AirbyteMessageUtils.createErrorMessage("source trace 2", 124.0); + final AirbyteMessage destMessage1 = AirbyteMessageUtils.createErrorMessage("dest trace 1", 125.0); + final AirbyteMessage destMessage2 = AirbyteMessageUtils.createErrorMessage("dest trace 2", 126.0); messageTracker.acceptFromSource(sourceMessage1); messageTracker.acceptFromSource(sourceMessage2); messageTracker.acceptFromDestination(destMessage1); @@ -316,8 +316,8 @@ void testErrorTraceMessageFailureWithMultipleTraceErrors() throws Exception { } @Test - void testErrorTraceMessageFailureWithOneTraceError() throws Exception { - final AirbyteMessage destMessage = AirbyteMessageUtils.createTraceMessage("dest trace 1", Double.valueOf(125)); + void testErrorTraceMessageFailureWithOneTraceError() { + final AirbyteMessage destMessage = AirbyteMessageUtils.createErrorMessage("dest trace 1", 125.0); messageTracker.acceptFromDestination(destMessage); final FailureReason failureReason = FailureHelper.destinationFailure(destMessage.getTrace(), Long.valueOf(123), 1); @@ -325,17 +325,18 @@ void testErrorTraceMessageFailureWithOneTraceError() throws Exception { } @Test - void testErrorTraceMessageFailureWithNoTraceErrors() throws Exception { + void testErrorTraceMessageFailureWithNoTraceErrors() { assertEquals(messageTracker.errorTraceMessageFailure(123L, 1), null); } @Nested class Estimates { + // receiving an estimate for two streams should save - @Test - void shouldSaveAndReturnCorrectly() { - final AirbyteMessage estimate = AirbyteMessageUtils.createTraceMessage("dest trace 1", Double.valueOf(125)); - } + // @Test + // void shouldSaveAndReturnCorrectly() { + // final AirbyteMessage estimate = AirbyteMessageUtils.createErrorMessage("dest trace 1", 125.0); + // } } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java index e1443fbbb91a..f5685c245e44 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultCheckConnectionWorkerTest.java @@ -77,7 +77,7 @@ void setup() throws IOException, WorkerException { .withConnectionStatus(new AirbyteConnectionStatus().withStatus(AirbyteConnectionStatus.Status.FAILED).withMessage("failed to connect")); failureStreamFactory = noop -> Lists.newArrayList(failureMessage).stream(); - final AirbyteMessage traceMessage = AirbyteMessageUtils.createTraceMessage("some error from the connector", 123.0); + final AirbyteMessage traceMessage = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); traceMessageStreamFactory = noop -> Lists.newArrayList(traceMessage).stream(); } diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java index ff7534073108..b9fe4417657d 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultDiscoverCatalogWorkerTest.java @@ -141,7 +141,7 @@ void testDiscoverSchemaProcessFail() throws Exception { @Test void testDiscoverSchemaProcessFailWithTraceMessage() throws Exception { final AirbyteStreamFactory traceStreamFactory = noop -> Lists.newArrayList( - AirbyteMessageUtils.createTraceMessage("some error from the connector", 123.0)).stream(); + AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0)).stream(); when(process.exitValue()).thenReturn(1); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java index 787641266f72..7ddbdfc17d2c 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/general/DefaultGetSpecWorkerTest.java @@ -114,7 +114,7 @@ void testFailureOnNonzeroExitCode() throws InterruptedException, IOException { @Test void testFailureOnNonzeroExitCodeWithTraceMessage() throws WorkerException, InterruptedException { - final AirbyteMessage message = AirbyteMessageUtils.createTraceMessage("some error from the connector", 123.0); + final AirbyteMessage message = AirbyteMessageUtils.createErrorMessage("some error from the connector", 123.0); when(process.getInputStream()).thenReturn(new ByteArrayInputStream(Jsons.serialize(message).getBytes(Charsets.UTF_8))); when(process.waitFor(anyLong(), any())).thenReturn(true); From 6dee0fdd7d252c32b2b002ee3011dade4d8f921c Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Sat, 26 Nov 2022 10:56:26 -0800 Subject: [PATCH 04/12] Make test literals variables. --- .../test_utils/AirbyteMessageUtils.java | 2 +- .../internal/AirbyteMessageTrackerTest.java | 46 ++++++++++++++++--- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java index f40f4094787f..bb8332c4b4ac 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java @@ -103,7 +103,7 @@ public static AirbyteStreamState createStreamState(final String streamName) { return new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(streamName)); } - public static AirbyteMessage createEstimateMessage(final String namespace, final String name, final long byteEst, final long rowEst) { + public static AirbyteMessage createEstimateMessage(final String name, final String namespace, final long byteEst, final long rowEst) { final var est = new AirbyteEstimateTraceMessage() .withType(AirbyteEstimateTraceMessage.Type.STREAM) .withNamespace(namespace) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index b42b60d5ddd0..b0c50ed1e015 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -30,9 +30,10 @@ @ExtendWith(MockitoExtension.class) class AirbyteMessageTrackerTest { - private static final String STREAM_1 = "stream1"; - private static final String STREAM_2 = "stream2"; - private static final String STREAM_3 = "stream3"; + private static final String NAMESPACE_1 = "avengers"; + private static final String STREAM_1 = "iron man"; + private static final String STREAM_2 = "black widow"; + private static final String STREAM_3 = "hulk"; private static final String INDUCED_EXCEPTION = "induced exception"; private AirbyteMessageTracker messageTracker; @@ -333,10 +334,41 @@ void testErrorTraceMessageFailureWithNoTraceErrors() { class Estimates { // receiving an estimate for two streams should save - // @Test - // void shouldSaveAndReturnCorrectly() { - // final AirbyteMessage estimate = AirbyteMessageUtils.createErrorMessage("dest trace 1", 125.0); - // } + @Test + void shouldSaveAndReturnIndividualStreamCountsCorrectly() { + final var est1 = AirbyteMessageUtils.createEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); + final var est2 = AirbyteMessageUtils.createEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); + + messageTracker.acceptFromSource(est1); + messageTracker.acceptFromSource(est2); + + final var streamToEstBytes = messageTracker.getStreamToEstimatedBytes(); + final var expStreamToEstBytes = Map.of( + new AirbyteStreamNameNamespacePair(STREAM_1, NAMESPACE_1), 100L, + new AirbyteStreamNameNamespacePair(STREAM_2, NAMESPACE_1), 200L); + assertEquals(expStreamToEstBytes, streamToEstBytes); + + final var streamToEstRecs = messageTracker.getStreamToEstimatedRecords(); + final var expStreamToEstRecs = Map.of( + new AirbyteStreamNameNamespacePair(STREAM_1, NAMESPACE_1), 10L, + new AirbyteStreamNameNamespacePair(STREAM_2, NAMESPACE_1), 10L); + assertEquals(expStreamToEstRecs, streamToEstRecs); + } + + @Test + void shouldSaveAndReturnTotalCountsCorrectly() { + final var est1 = AirbyteMessageUtils.createEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); + final var est2 = AirbyteMessageUtils.createEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); + + messageTracker.acceptFromSource(est1); + messageTracker.acceptFromSource(est2); + + final var totalEstBytes = messageTracker.getTotalBytesEstimated(); + assertEquals(300L, totalEstBytes); + + final var totalEstRecs = messageTracker.getTotalRecordsEstimated(); + assertEquals(20L, totalEstRecs); + } } From ea43a2ddece27de479b78084682962a9862fb07a Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Sat, 26 Nov 2022 11:08:22 -0800 Subject: [PATCH 05/12] Add javadocs. --- .../internal/AirbyteMessageTracker.java | 12 ++++++++++ .../workers/internal/MessageTracker.java | 22 +++++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index fe7d5018e80c..f62a9c994803 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -379,6 +379,9 @@ public Map getStreamToEmittedRecords() { entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); } + /** + * Swap out stream indices for stream names and return total records estimated by stream. + */ @Override public Map getStreamToEstimatedRecords() { return streamToTotalRecordsEstimated.entrySet().stream().collect(Collectors.toMap( @@ -394,6 +397,9 @@ public Map getStreamToEmittedBytes() { entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); } + /** + * Swap out stream indices for stream names and return total bytes estimated by stream. + */ @Override public Map getStreamToEstimatedBytes() { return streamToTotalBytesEstimated.entrySet().stream().collect(Collectors.toMap( @@ -408,6 +414,9 @@ public long getTotalRecordsEmitted() { return streamToTotalRecordsEmitted.values().stream().reduce(0L, Long::sum); } + /** + * Compute sum of estimated record counts across all streams. + */ @Override public long getTotalRecordsEstimated() { return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum); @@ -421,6 +430,9 @@ public long getTotalBytesEmitted() { return streamToTotalBytesEmitted.values().stream().reduce(0L, Long::sum); } + /** + * Compute sum of estimated bytes across all streams. + */ @Override public long getTotalBytesEstimated() { return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum); diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java index 3bb1ed81a529..a2f31bf250d8 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/MessageTracker.java @@ -66,6 +66,12 @@ public interface MessageTracker { */ Map getStreamToEmittedRecords(); + /** + * Get the per-stream estimated record count provided by + * {@link io.airbyte.protocol.models.AirbyteEstimateTraceMessage}. + * + * @return returns a map of estimated record count by stream name. + */ Map getStreamToEstimatedRecords(); /** @@ -76,6 +82,12 @@ public interface MessageTracker { */ Map getStreamToEmittedBytes(); + /** + * Get the per-stream estimated byte count provided by + * {@link io.airbyte.protocol.models.AirbyteEstimateTraceMessage}. + * + * @return returns a map of estimated bytes by stream name. + */ Map getStreamToEstimatedBytes(); /** @@ -86,6 +98,11 @@ public interface MessageTracker { */ long getTotalRecordsEmitted(); + /** + * Get the overall estimated record count. + * + * @return returns the total count of estimated records across all streams. + */ long getTotalRecordsEstimated(); /** @@ -96,6 +113,11 @@ public interface MessageTracker { */ long getTotalBytesEmitted(); + /** + * Get the overall estimated bytes. + * + * @return returns the total count of estimated bytes across all streams. + */ long getTotalBytesEstimated(); /** From 2effea79fe6b075f0ab0b1134b9d4f8348b48a67 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 28 Nov 2022 11:54:04 -0800 Subject: [PATCH 06/12] Add logic to distinguish between SYNC and STREAM messages/ . --- .../internal/AirbyteMessageTracker.java | 45 +++++++++++++++---- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index f62a9c994803..accf7bbe7d17 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -8,6 +8,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Charsets; +import com.google.common.base.Preconditions; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; import com.google.common.hash.HashFunction; @@ -19,6 +20,7 @@ import io.airbyte.config.State; import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage; import io.airbyte.protocol.models.AirbyteControlMessage; +import io.airbyte.protocol.models.AirbyteEstimateTraceMessage; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; @@ -62,6 +64,11 @@ public class AirbyteMessageTracker implements MessageTracker { private final StateAggregator stateAggregator; private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages(); + // These variables support SYNC level estimates and are meant for sources where stream level + // estimates are not possible e.g. CDC sources. + private Long totalRecordsEstimatedSync; + private Long totalBytesEstimatedSync; + private short nextStreamIndex; /** @@ -256,7 +263,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec */ private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) { switch (traceMessage.getType()) { - case ESTIMATE -> handleEmittedEstimateTrace(traceMessage); + case ESTIMATE -> handleEmittedEstimateTrace(traceMessage.getEstimate()); case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType); default -> log.warn("Invalid message type for trace message: {}", traceMessage); } @@ -270,16 +277,36 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage } } - private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage) { - // Assume the estimate is a whole number and not a sum i.e. each estimate replaces the previous - // estimate. + /** + * There are several assumptions here: + *

+ * - Assume the estimate is a whole number and not a sum i.e. each estimate replaces the previous + * estimate. + *

+ * - Sources cannot emit both STREAM and SYNC estimates in a same sync. Error out if this happens. + */ + @SuppressWarnings("PMD.AvoidDuplicateLiterals") + private void handleEmittedEstimateTrace(final AirbyteEstimateTraceMessage estimate) { + switch (estimate.getType()) { + case STREAM -> { + Preconditions.checkArgument(totalBytesEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync."); + Preconditions.checkArgument(totalRecordsEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync."); + + log.info("Saving records estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName()); + final var index = getStreamIndex(new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace())); + + streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate()); + streamToTotalBytesEstimated.put(index, estimate.getByteEstimate()); + } + case SYNC -> { + Preconditions.checkArgument(streamToTotalBytesEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); + Preconditions.checkArgument(streamToTotalRecordsEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); - final var estimate = estimateTraceMessage.getEstimate(); - log.info("Saving records estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName()); - final var index = getStreamIndex(new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace())); + totalBytesEstimatedSync = estimate.getByteEstimate(); + totalRecordsEstimatedSync = estimate.getRowEstimate(); + } + } - streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate()); - streamToTotalBytesEstimated.put(index, estimate.getByteEstimate()); } private short getStreamIndex(final AirbyteStreamNameNamespacePair pair) { From 3dfefc73cf55587892b93dfa5bb2e0e8b0c6a6b7 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 28 Nov 2022 12:03:46 -0800 Subject: [PATCH 07/12] Update message utils to reflect STREAM vs SYNC estimates. --- .../internal/AirbyteMessageTracker.java | 12 ++++++++-- .../test_utils/AirbyteMessageUtils.java | 21 ++++++++++++---- .../internal/AirbyteMessageTrackerTest.java | 24 +++++++++++++++---- 3 files changed, 47 insertions(+), 10 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index accf7bbe7d17..290d243e3431 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -446,7 +446,11 @@ public long getTotalRecordsEmitted() { */ @Override public long getTotalRecordsEstimated() { - return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum); + if (!streamToTotalRecordsEstimated.isEmpty()) { + return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum); + } + + return totalRecordsEstimatedSync; } /** @@ -462,7 +466,11 @@ public long getTotalBytesEmitted() { */ @Override public long getTotalBytesEstimated() { - return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum); + if (!streamToTotalBytesEstimated.isEmpty()) { + return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum); + } + + return totalBytesEstimatedSync; } /** diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java index bb8332c4b4ac..5435dea8aac7 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java @@ -103,14 +103,27 @@ public static AirbyteStreamState createStreamState(final String streamName) { return new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(streamName)); } - public static AirbyteMessage createEstimateMessage(final String name, final String namespace, final long byteEst, final long rowEst) { + public static AirbyteMessage createStreamEstimateMessage(final String name, final String namespace, final long byteEst, final long rowEst) { + return createEstimateMessage(AirbyteEstimateTraceMessage.Type.STREAM, name, namespace, byteEst, rowEst); + } + + public static AirbyteMessage createSyncEstimateMessage(final long byteEst, final long rowEst) { + return createEstimateMessage(AirbyteEstimateTraceMessage.Type.SYNC, null, null, byteEst, rowEst); + } + + public static AirbyteMessage createEstimateMessage(AirbyteEstimateTraceMessage.Type type, final String name, final String namespace, final long byteEst, final long rowEst) { final var est = new AirbyteEstimateTraceMessage() - .withType(AirbyteEstimateTraceMessage.Type.STREAM) - .withNamespace(namespace) - .withName(name) + .withType(type) .withByteEstimate(byteEst) .withRowEstimate(rowEst); + if (name != null) { + est.withName(name); + } + if (namespace != null) { + est.withNamespace(namespace); + } + return new AirbyteMessage() .withType(Type.TRACE) .withTrace(new AirbyteTraceMessage().withType(AirbyteTraceMessage.Type.ESTIMATE) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index b0c50ed1e015..1c00577abc9f 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -336,8 +336,8 @@ class Estimates { // receiving an estimate for two streams should save @Test void shouldSaveAndReturnIndividualStreamCountsCorrectly() { - final var est1 = AirbyteMessageUtils.createEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); - final var est2 = AirbyteMessageUtils.createEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); + final var est1 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); + final var est2 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); messageTracker.acceptFromSource(est1); messageTracker.acceptFromSource(est2); @@ -357,8 +357,8 @@ void shouldSaveAndReturnIndividualStreamCountsCorrectly() { @Test void shouldSaveAndReturnTotalCountsCorrectly() { - final var est1 = AirbyteMessageUtils.createEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); - final var est2 = AirbyteMessageUtils.createEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); + final var est1 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); + final var est2 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); messageTracker.acceptFromSource(est1); messageTracker.acceptFromSource(est2); @@ -370,6 +370,22 @@ void shouldSaveAndReturnTotalCountsCorrectly() { assertEquals(20L, totalEstRecs); } + @Test + void shouldErrorOnBothStreamAndSyncEstimates() { + final var est1 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); + final var est2 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); + + messageTracker.acceptFromSource(est1); + messageTracker.acceptFromSource(est2); + + final var totalEstBytes = messageTracker.getTotalBytesEstimated(); + assertEquals(300L, totalEstBytes); + + final var totalEstRecs = messageTracker.getTotalRecordsEstimated(); + assertEquals(20L, totalEstRecs); + } + + } } From 8cf5dde1e9c2210091035113e7f627a64bfc317d Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 28 Nov 2022 12:54:58 -0800 Subject: [PATCH 08/12] Add tests to clarify stream and sync estimate behavior. --- .../internal/AirbyteMessageTrackerTest.java | 35 +++++++++++++++---- 1 file changed, 29 insertions(+), 6 deletions(-) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index 1c00577abc9f..3aefce85de51 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import io.airbyte.commons.json.Jsons; @@ -20,6 +21,7 @@ import java.util.HashMap; import java.util.Map; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Nested; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -335,7 +337,8 @@ class Estimates { // receiving an estimate for two streams should save @Test - void shouldSaveAndReturnIndividualStreamCountsCorrectly() { + @DisplayName("when given stream estimates, should return correct per-stream estimates") + void streamShouldSaveAndReturnIndividualStreamCountsCorrectly() { final var est1 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); final var est2 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); @@ -356,7 +359,8 @@ void shouldSaveAndReturnIndividualStreamCountsCorrectly() { } @Test - void shouldSaveAndReturnTotalCountsCorrectly() { + @DisplayName("when given stream estimates, should return correct total estimates") + void streamShouldSaveAndReturnTotalCountsCorrectly() { final var est1 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); final var est2 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); @@ -371,18 +375,37 @@ void shouldSaveAndReturnTotalCountsCorrectly() { } @Test + @DisplayName("should error when given both Stream and Sync estimates") void shouldErrorOnBothStreamAndSyncEstimates() { final var est1 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); - final var est2 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_2, NAMESPACE_1, 200L, 10L); + final var est2 = AirbyteMessageUtils.createSyncEstimateMessage( 200L, 10L); messageTracker.acceptFromSource(est1); - messageTracker.acceptFromSource(est2); + assertThrows(IllegalArgumentException.class, () -> messageTracker.acceptFromSource(est2)); + } + @Test + @DisplayName("when given sync estimates, should return correct total estimates") + void syncShouldSaveAndReturnTotalCountsCorrectly() { + final var est = AirbyteMessageUtils.createSyncEstimateMessage( 200L, 10L); + messageTracker.acceptFromSource(est); final var totalEstBytes = messageTracker.getTotalBytesEstimated(); - assertEquals(300L, totalEstBytes); + assertEquals(200L, totalEstBytes); final var totalEstRecs = messageTracker.getTotalRecordsEstimated(); - assertEquals(20L, totalEstRecs); + assertEquals(10L, totalEstRecs); + } + + @Test + @DisplayName("when given sync estimates, should not return any per-stream estimates") + void syncShouldNotHaveStreamEstimates() { + final var est = AirbyteMessageUtils.createSyncEstimateMessage( 200L, 10L); + messageTracker.acceptFromSource(est); + + final var streamToEstBytes = messageTracker.getStreamToEstimatedBytes(); + assertTrue(streamToEstBytes.isEmpty()); + final var streamToEstRecs = messageTracker.getStreamToEstimatedRecords(); + assertTrue(streamToEstRecs.isEmpty()); } From 8117afa568267fb81477db471f091d319276d498 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 28 Nov 2022 13:15:18 -0800 Subject: [PATCH 09/12] Format and add more tests. --- .../airbyte/workers/test_utils/AirbyteMessageUtils.java | 6 +++++- .../workers/internal/AirbyteMessageTrackerTest.java | 8 ++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java index 5435dea8aac7..244e85303c8c 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/AirbyteMessageUtils.java @@ -111,7 +111,11 @@ public static AirbyteMessage createSyncEstimateMessage(final long byteEst, final return createEstimateMessage(AirbyteEstimateTraceMessage.Type.SYNC, null, null, byteEst, rowEst); } - public static AirbyteMessage createEstimateMessage(AirbyteEstimateTraceMessage.Type type, final String name, final String namespace, final long byteEst, final long rowEst) { + public static AirbyteMessage createEstimateMessage(AirbyteEstimateTraceMessage.Type type, + final String name, + final String namespace, + final long byteEst, + final long rowEst) { final var est = new AirbyteEstimateTraceMessage() .withType(type) .withByteEstimate(byteEst) diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java index 3aefce85de51..aed444225cf9 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/AirbyteMessageTrackerTest.java @@ -378,15 +378,16 @@ void streamShouldSaveAndReturnTotalCountsCorrectly() { @DisplayName("should error when given both Stream and Sync estimates") void shouldErrorOnBothStreamAndSyncEstimates() { final var est1 = AirbyteMessageUtils.createStreamEstimateMessage(STREAM_1, NAMESPACE_1, 100L, 10L); - final var est2 = AirbyteMessageUtils.createSyncEstimateMessage( 200L, 10L); + final var est2 = AirbyteMessageUtils.createSyncEstimateMessage(200L, 10L); messageTracker.acceptFromSource(est1); assertThrows(IllegalArgumentException.class, () -> messageTracker.acceptFromSource(est2)); } + @Test @DisplayName("when given sync estimates, should return correct total estimates") void syncShouldSaveAndReturnTotalCountsCorrectly() { - final var est = AirbyteMessageUtils.createSyncEstimateMessage( 200L, 10L); + final var est = AirbyteMessageUtils.createSyncEstimateMessage(200L, 10L); messageTracker.acceptFromSource(est); final var totalEstBytes = messageTracker.getTotalBytesEstimated(); @@ -399,7 +400,7 @@ void syncShouldSaveAndReturnTotalCountsCorrectly() { @Test @DisplayName("when given sync estimates, should not return any per-stream estimates") void syncShouldNotHaveStreamEstimates() { - final var est = AirbyteMessageUtils.createSyncEstimateMessage( 200L, 10L); + final var est = AirbyteMessageUtils.createSyncEstimateMessage(200L, 10L); messageTracker.acceptFromSource(est); final var streamToEstBytes = messageTracker.getStreamToEstimatedBytes(); @@ -408,7 +409,6 @@ void syncShouldNotHaveStreamEstimates() { assertTrue(streamToEstRecs.isEmpty()); } - } } From 225f541fbd062786af8d77ff88e9da61d5df1e83 Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 28 Nov 2022 15:37:16 -0800 Subject: [PATCH 10/12] Respond to PR feedback. --- .../io/airbyte/workers/internal/AirbyteMessageTracker.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index 290d243e3431..890541adf761 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -292,7 +292,7 @@ private void handleEmittedEstimateTrace(final AirbyteEstimateTraceMessage estima Preconditions.checkArgument(totalBytesEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync."); Preconditions.checkArgument(totalRecordsEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync."); - log.info("Saving records estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName()); + log.debug("Saving stream estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName()); final var index = getStreamIndex(new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace())); streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate()); @@ -302,6 +302,7 @@ private void handleEmittedEstimateTrace(final AirbyteEstimateTraceMessage estima Preconditions.checkArgument(streamToTotalBytesEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); Preconditions.checkArgument(streamToTotalRecordsEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); + log.debug("Saving sync estimates"); totalBytesEstimatedSync = estimate.getByteEstimate(); totalRecordsEstimatedSync = estimate.getRowEstimate(); } From ace577f404619afbdaccb4d679ffa0a67c404e2a Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 28 Nov 2022 16:09:17 -0800 Subject: [PATCH 11/12] Use the StreamStats POJO instead of creating new maps. --- .../internal/AirbyteMessageTracker.java | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index 890541adf761..6c8387de5733 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -53,10 +53,9 @@ public class AirbyteMessageTracker implements MessageTracker { private final Map streamToRunningCount; private final HashFunction hashFunction; private final BiMap nameNamespacePairToIndex; + private final Map nameNamespacePairToStreamStats; private final Map streamToTotalBytesEmitted; private final Map streamToTotalRecordsEmitted; - private final Map streamToTotalBytesEstimated; - private final Map streamToTotalRecordsEstimated; private final StateDeltaTracker stateDeltaTracker; private final StateMetricsTracker stateMetricsTracker; private final List destinationErrorTraceMessages; @@ -87,6 +86,11 @@ private enum ConnectorType { DESTINATION } + /** + * POJO for all per-stream stats. + */ + private record StreamStats(long estimatedBytes, long emittedBytes, long estimatedRecords, long emittedRecords) {} + public AirbyteMessageTracker() { this(new StateDeltaTracker(STATE_DELTA_TRACKER_MEMORY_LIMIT_BYTES), new DefaultStateAggregator(new EnvVariableFeatureFlags().useStreamCapableState()), @@ -102,10 +106,9 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker, this.streamToRunningCount = new HashMap<>(); this.nameNamespacePairToIndex = HashBiMap.create(); this.hashFunction = Hashing.murmur3_32_fixed(); + this.nameNamespacePairToStreamStats = new HashMap<>(); this.streamToTotalBytesEmitted = new HashMap<>(); this.streamToTotalRecordsEmitted = new HashMap<>(); - this.streamToTotalBytesEstimated = new HashMap<>(); - this.streamToTotalRecordsEstimated = new HashMap<>(); this.stateDeltaTracker = stateDeltaTracker; this.stateMetricsTracker = stateMetricsTracker; this.nextStreamIndex = 0; @@ -293,14 +296,13 @@ private void handleEmittedEstimateTrace(final AirbyteEstimateTraceMessage estima Preconditions.checkArgument(totalRecordsEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync."); log.debug("Saving stream estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName()); - final var index = getStreamIndex(new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace())); - - streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate()); - streamToTotalBytesEstimated.put(index, estimate.getByteEstimate()); + nameNamespacePairToStreamStats.put( + new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace()), + new StreamStats(estimate.getByteEstimate(), 0L, estimate.getRowEstimate(), 0L) + ); } case SYNC -> { - Preconditions.checkArgument(streamToTotalBytesEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); - Preconditions.checkArgument(streamToTotalRecordsEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); + Preconditions.checkArgument(nameNamespacePairToStreamStats.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); log.debug("Saving sync estimates"); totalBytesEstimatedSync = estimate.getByteEstimate(); @@ -412,8 +414,10 @@ public Map getStreamToEmittedRecords() { */ @Override public Map getStreamToEstimatedRecords() { - return streamToTotalRecordsEstimated.entrySet().stream().collect(Collectors.toMap( - entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); + return nameNamespacePairToStreamStats.entrySet().stream().collect( + Collectors.toMap( + Entry::getKey, + entry -> entry.getValue().estimatedRecords())); } /** @@ -430,8 +434,10 @@ public Map getStreamToEmittedBytes() { */ @Override public Map getStreamToEstimatedBytes() { - return streamToTotalBytesEstimated.entrySet().stream().collect(Collectors.toMap( - entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue)); + return nameNamespacePairToStreamStats.entrySet().stream().collect( + Collectors.toMap( + Entry::getKey, + entry -> entry.getValue().estimatedBytes())); } /** @@ -447,8 +453,10 @@ public long getTotalRecordsEmitted() { */ @Override public long getTotalRecordsEstimated() { - if (!streamToTotalRecordsEstimated.isEmpty()) { - return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum); + if (!nameNamespacePairToStreamStats.isEmpty()) { + return nameNamespacePairToStreamStats.values().stream() + .map(e -> e.estimatedRecords) + .reduce(0L,Long::sum); } return totalRecordsEstimatedSync; @@ -467,8 +475,10 @@ public long getTotalBytesEmitted() { */ @Override public long getTotalBytesEstimated() { - if (!streamToTotalBytesEstimated.isEmpty()) { - return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum); + if (!nameNamespacePairToStreamStats.isEmpty()) { + return nameNamespacePairToStreamStats.values().stream() + .map(e -> e.estimatedBytes) + .reduce(0L,Long::sum); } return totalBytesEstimatedSync; From ca082fe76b04fb4b7dea79961399e1c82ebdad9a Mon Sep 17 00:00:00 2001 From: Davin Chia Date: Mon, 28 Nov 2022 16:28:04 -0800 Subject: [PATCH 12/12] Use the StreamStats POJO instead of creating new maps. --- .../io/airbyte/workers/internal/AirbyteMessageTracker.java | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index 6c8387de5733..a94c00927431 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -298,8 +298,7 @@ private void handleEmittedEstimateTrace(final AirbyteEstimateTraceMessage estima log.debug("Saving stream estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName()); nameNamespacePairToStreamStats.put( new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace()), - new StreamStats(estimate.getByteEstimate(), 0L, estimate.getRowEstimate(), 0L) - ); + new StreamStats(estimate.getByteEstimate(), 0L, estimate.getRowEstimate(), 0L)); } case SYNC -> { Preconditions.checkArgument(nameNamespacePairToStreamStats.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync."); @@ -456,7 +455,7 @@ public long getTotalRecordsEstimated() { if (!nameNamespacePairToStreamStats.isEmpty()) { return nameNamespacePairToStreamStats.values().stream() .map(e -> e.estimatedRecords) - .reduce(0L,Long::sum); + .reduce(0L, Long::sum); } return totalRecordsEstimatedSync; @@ -478,7 +477,7 @@ public long getTotalBytesEstimated() { if (!nameNamespacePairToStreamStats.isEmpty()) { return nameNamespacePairToStreamStats.values().stream() .map(e -> e.estimatedBytes) - .reduce(0L,Long::sum); + .reduce(0L, Long::sum); } return totalBytesEstimatedSync;