Skip to content

Commit

Permalink
DV2 TypingDedupingTest: read container stdout in real time (#34173)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored and jbfbell committed Jan 12, 2024
1 parent 1fc244c commit 4f20b6e
Showing 1 changed file with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -601,14 +603,12 @@ public void identicalNameSimultaneousSync() throws Exception {
for (int i = 0; i < 100_000; i++) {
pushMessages(messages2, sync2);
}
// This will dump sync1's entire stdout to our stdout
endSync(sync1);
// Write some more messages to the second sync. It should not be affected by the first sync's
// shutdown.
for (int i = 0; i < 100_000; i++) {
pushMessages(messages2, sync2);
}
// And this will dump sync2's entire stdout to our stdout
endSync(sync2);

// For simplicity, don't verify the raw table. Assume that if the final table is correct, then
Expand Down Expand Up @@ -825,6 +825,26 @@ protected AirbyteDestination startSync(final ConfiguredAirbyteCatalog catalog,

destination.start(destinationConfig, jobRoot, Collections.emptyMap());

// In the background, read messages from the destination until it terminates. We need to clear
// stdout in real time, to prevent the buffer from filling up and blocking the destination.
// TODO Eventually we'll want to somehow extract the state messages while a sync is running, to
// verify checkpointing.
final ExecutorService messageHandler = Executors.newSingleThreadExecutor(
// run as a daemon thread just in case we run into an exception or something
r -> {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
return t;
});
messageHandler.submit(() -> {
while (!destination.isFinished()) {
// attemptRead isn't threadsafe, we read stdout fully here.
// i.e. we shouldn't call attemptRead anywhere else.
destination.attemptRead();
}
});
messageHandler.shutdown();

return destination;
}

Expand All @@ -833,14 +853,8 @@ protected static void pushMessages(final List<AirbyteMessage> messages, final Ai
message -> Exceptions.toRuntime(() -> destination.accept(convertProtocolObject(message, io.airbyte.protocol.models.AirbyteMessage.class))));
}

// TODO Eventually we'll want to somehow extract the state messages while a sync is running, to
// verify checkpointing.
// That's going to require some nontrivial changes to how attemptRead() works.
protected static void endSync(final AirbyteDestination destination) throws Exception {
destination.notifyEndOfInput();
while (!destination.isFinished()) {
destination.attemptRead();
}
destination.close();
}

Expand Down

0 comments on commit 4f20b6e

Please sign in to comment.