diff --git a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java index 70653097d2f8..d4e46d7cc3d5 100644 --- a/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java +++ b/airbyte-cdk/java/airbyte-cdk/typing-deduping/src/testFixtures/java/io/airbyte/integrations/base/destination/typing_deduping/BaseTypingDedupingTest.java @@ -35,6 +35,8 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; @@ -601,14 +603,12 @@ public void identicalNameSimultaneousSync() throws Exception { for (int i = 0; i < 100_000; i++) { pushMessages(messages2, sync2); } - // This will dump sync1's entire stdout to our stdout endSync(sync1); // Write some more messages to the second sync. It should not be affected by the first sync's // shutdown. for (int i = 0; i < 100_000; i++) { pushMessages(messages2, sync2); } - // And this will dump sync2's entire stdout to our stdout endSync(sync2); // For simplicity, don't verify the raw table. Assume that if the final table is correct, then @@ -825,6 +825,26 @@ protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog, destination.start(destinationConfig, jobRoot, Collections.emptyMap()); + // In the background, read messages from the destination until it terminates. We need to clear + // stdout in real time, to prevent the buffer from filling up and blocking the destination. + // TODO Eventually we'll want to somehow extract the state messages while a sync is running, to + // verify checkpointing. + final ExecutorService messageHandler = Executors.newSingleThreadExecutor( + // run as a daemon thread just in case we run into an exception or something + r -> { + final Thread t = Executors.defaultThreadFactory().newThread(r); + t.setDaemon(true); + return t; + }); + messageHandler.submit(() -> { + while (!destination.isFinished()) { + // attemptRead isn't threadsafe, we read stdout fully here. + // i.e. we shouldn't call attemptRead anywhere else. + destination.attemptRead(); + } + }); + messageHandler.shutdown(); + return destination; } @@ -833,14 +853,8 @@ protected static void pushMessages(final List messages, final Ai message -> Exceptions.toRuntime(() -> destination.accept(convertProtocolObject(message, io.airbyte.protocol.models.AirbyteMessage.class)))); } - // TODO Eventually we'll want to somehow extract the state messages while a sync is running, to - // verify checkpointing. - // That's going to require some nontrivial changes to how attemptRead() works. protected static void endSync(final AirbyteDestination destination) throws Exception { destination.notifyEndOfInput(); - while (!destination.isFinished()) { - destination.attemptRead(); - } destination.close(); }