Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track per-stream record counts and records committed, and other sync summary metadata #9327

Merged
merged 25 commits into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4f35e3e
StateDeltaTracker class and tests
pmossman Jan 4, 2022
27175b9
working prototype implementation of per-stream record tracking
pmossman Jan 3, 2022
da2058f
misc stuff to get build working
pmossman Jan 3, 2022
4233967
add new fields to replicationAttemptSummary
pmossman Jan 5, 2022
408af15
update AirbyteMessageTracker to use StateDeltaTracker, and new interf…
pmossman Jan 5, 2022
02ecedb
finish implementation and tests for stateDeltaTracker and all new Rep…
pmossman Jan 6, 2022
b24c45b
undo temporary changes to files that I accidentally committed
pmossman Jan 6, 2022
c95b738
simplify interactions with byte buffers (#9331)
cgardens Jan 6, 2022
a8e995f
define a map instead of generic object for counts by stream
pmossman Jan 6, 2022
a1af3d3
follow convention of keyToValue instead of valueByKey for maps
pmossman Jan 6, 2022
1cc645e
use synchronized blocks instead of synchronized methods
pmossman Jan 6, 2022
e3d916f
add totalBytesEmitted field to eventually replace bytesSynced
pmossman Jan 6, 2022
79bb9c8
misc PR feedback nits
pmossman Jan 6, 2022
edefe7e
additionalProperties probably should still be false
pmossman Jan 6, 2022
b374baa
javadoc formatting
pmossman Jan 7, 2022
1316847
define syncStats and use it for total and per-stream stats
pmossman Jan 7, 2022
3ca65f2
change per-stream stats map to a list, and set stats in standardSyncS…
pmossman Jan 10, 2022
02910ca
wrap entire method bodies in synchronized block
pmossman Jan 10, 2022
954906d
use a long instead of a Long for required fields
pmossman Jan 10, 2022
8d0006e
remove extranneous 'this'
pmossman Jan 10, 2022
1d814db
set committed records to emitted records if sync has success status
pmossman Jan 10, 2022
ebd1e84
throw checked exception if commit state before add state, simplify ex…
pmossman Jan 10, 2022
8c1c1db
set delta tracker memory limit to 20MiB
pmossman Jan 10, 2022
0fc68b1
log error message that was thrown instead of assumed cause
pmossman Jan 10, 2022
0776303
StreamSyncStats wrapper, add test case for populating stats on failur…
pmossman Jan 10, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,25 @@ required:
- bytesSynced
- startTime
- endTime
- totalStats
- streamStats
additionalProperties: false
properties:
status:
"$ref": ReplicationStatus.yaml
recordsSynced:
recordsSynced: # TODO (parker) remove in favor of totalRecordsEmitted
type: integer
minValue: 0
bytesSynced:
bytesSynced: # TODO (parker) remove in favor of totalBytesEmitted
type: integer
minValue: 0
startTime:
type: integer
endTime:
type: integer
totalStats:
"$ref": SyncStats.yaml
streamStats:
type: array
items:
"$ref": StreamSyncStats.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,25 @@ required:
- bytesSynced
- startTime
- endTime
- totalStats
- streamStats
additionalProperties: false
properties:
status:
"$ref": ReplicationStatus.yaml
recordsSynced:
recordsSynced: # TODO (parker) remove in favor of totalRecordsEmitted
type: integer
minValue: 0
bytesSynced:
bytesSynced: # TODO (parker) remove in favor of totalBytesEmitted
type: integer
minValue: 0
startTime:
type: integer
endTime:
type: integer
totalStats:
"$ref": SyncStats.yaml
streamStats:
type: array
items:
"$ref": StreamSyncStats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/StreamSyncStats.yaml
title: StreamSyncStats
description: Sync stats for a particular stream.
type: object
required:
- streamName
- stats
additionalProperties: false
properties:
streamName:
type: string
stats:
"$ref": SyncStats.yaml
19 changes: 19 additions & 0 deletions airbyte-config/models/src/main/resources/types/SyncStats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
"$schema": http://json-schema.org/draft-07/schema#
"$id": https://github.com/airbytehq/airbyte/blob/master/airbyte-config/models/src/main/resources/types/SyncStats.yaml
title: SyncStats
description: sync stats.
type: object
required:
- recordsEmitted
- bytesEmitted
additionalProperties: false
properties:
recordsEmitted:
type: integer
bytesEmitted:
type: integer
stateMessagesEmitted: # TODO make required once per-stream state messages are supported in V2
type: integer
recordsCommitted:
pmossman marked this conversation as resolved.
Show resolved Hide resolved
type: integer # if unset, committed records could not be computed
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public void runJob() throws Exception {
airbyteSource,
new NamespacingMapper(syncInput.getNamespaceDefinition(), syncInput.getNamespaceFormat(), syncInput.getPrefix()),
new DefaultAirbyteDestination(workerConfigs, destinationLauncher),
new AirbyteMessageTracker(),
new AirbyteMessageTracker());

log.info("Running replication worker...");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import io.airbyte.config.StandardSyncInput;
import io.airbyte.config.StandardSyncSummary.ReplicationStatus;
import io.airbyte.config.State;
import io.airbyte.config.StreamSyncStats;
import io.airbyte.config.SyncStats;
import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.config.WorkerSourceConfig;
import io.airbyte.protocol.models.AirbyteMessage;
Expand All @@ -17,6 +19,7 @@
import io.airbyte.workers.protocols.airbyte.AirbyteSource;
import io.airbyte.workers.protocols.airbyte.MessageTracker;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -55,8 +58,7 @@ public class DefaultReplicationWorker implements ReplicationWorker {
private final AirbyteSource source;
private final AirbyteMapper mapper;
private final AirbyteDestination destination;
private final MessageTracker sourceMessageTracker;
private final MessageTracker destinationMessageTracker;
private final MessageTracker messageTracker;

private final ExecutorService executors;
private final AtomicBoolean cancelled;
Expand All @@ -67,15 +69,13 @@ public DefaultReplicationWorker(final String jobId,
final AirbyteSource source,
final AirbyteMapper mapper,
final AirbyteDestination destination,
final MessageTracker sourceMessageTracker,
final MessageTracker destinationMessageTracker) {
final MessageTracker messageTracker) {
this.jobId = jobId;
this.attempt = attempt;
this.source = source;
this.mapper = mapper;
this.destination = destination;
this.sourceMessageTracker = sourceMessageTracker;
this.destinationMessageTracker = destinationMessageTracker;
this.messageTracker = messageTracker;
this.executors = Executors.newFixedThreadPool(2);

this.cancelled = new AtomicBoolean(false);
Expand Down Expand Up @@ -120,11 +120,11 @@ public ReplicationOutput run(final StandardSyncInput syncInput, final Path jobRo
source.start(sourceConfig, jobRoot);

final CompletableFuture<?> destinationOutputThreadFuture = CompletableFuture.runAsync(
getDestinationOutputRunnable(destination, cancelled, destinationMessageTracker, mdc),
getDestinationOutputRunnable(destination, cancelled, messageTracker, mdc),
executors);

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, sourceMessageTracker, mdc),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc),
executors);

LOGGER.info("Waiting for source and destination threads to complete.");
Expand Down Expand Up @@ -155,10 +155,45 @@ else if (hasFailed.get()) {
outputStatus = ReplicationStatus.COMPLETED;
}

final SyncStats totalSyncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getTotalRecordsEmitted())
.withBytesEmitted(messageTracker.getTotalBytesEmitted())
.withStateMessagesEmitted(messageTracker.getTotalStateMessagesEmitted());

if (outputStatus == ReplicationStatus.COMPLETED) {
totalSyncStats.setRecordsCommitted(totalSyncStats.getRecordsEmitted());
} else if (messageTracker.getTotalRecordsCommitted().isPresent()) {
totalSyncStats.setRecordsCommitted(messageTracker.getTotalRecordsCommitted().get());
} else {
LOGGER.warn("Could not reliably determine committed record counts, committed record stats will be set to null");
totalSyncStats.setRecordsCommitted(null);
}

// assume every stream with stats is in streamToEmittedRecords map
final List<StreamSyncStats> streamSyncStats = messageTracker.getStreamToEmittedRecords().keySet().stream().map(stream -> {
final SyncStats syncStats = new SyncStats()
.withRecordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream))
.withBytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream))
.withStateMessagesEmitted(null); // TODO (parker) populate per-stream state messages emitted once supported in V2

if (outputStatus == ReplicationStatus.COMPLETED) {
syncStats.setRecordsCommitted(messageTracker.getStreamToEmittedRecords().get(stream));
} else if (messageTracker.getStreamToCommittedRecords().isPresent()) {
syncStats.setRecordsCommitted(messageTracker.getStreamToCommittedRecords().get().get(stream));
} else {
syncStats.setRecordsCommitted(null);
}
return new StreamSyncStats()
.withStreamName(stream)
.withStats(syncStats);
}).collect(Collectors.toList());

final ReplicationAttemptSummary summary = new ReplicationAttemptSummary()
.withStatus(outputStatus)
.withRecordsSynced(sourceMessageTracker.getRecordCount())
.withBytesSynced(sourceMessageTracker.getBytesCount())
.withRecordsSynced(messageTracker.getTotalRecordsEmitted()) // TODO (parker) remove in favor of totalRecordsEmitted
.withBytesSynced(messageTracker.getTotalBytesEmitted()) // TODO (parker) remove in favor of totalBytesEmitted
.withTotalStats(totalSyncStats)
.withStreamStats(streamSyncStats)
.withStartTime(startTime)
.withEndTime(System.currentTimeMillis());

Expand All @@ -168,15 +203,15 @@ else if (hasFailed.get()) {
.withReplicationAttemptSummary(summary)
.withOutputCatalog(destinationConfig.getCatalog());

if (sourceMessageTracker.getOutputState().isPresent()) {
if (messageTracker.getSourceOutputState().isPresent()) {
LOGGER.info("Source output at least one state message");
} else {
LOGGER.info("Source did not output any state messages");
}

if (destinationMessageTracker.getOutputState().isPresent()) {
LOGGER.info("State capture: Updated state to: {}", destinationMessageTracker.getOutputState());
final State state = destinationMessageTracker.getOutputState().get();
if (messageTracker.getDestinationOutputState().isPresent()) {
LOGGER.info("State capture: Updated state to: {}", messageTracker.getDestinationOutputState());
final State state = messageTracker.getDestinationOutputState().get();
output.withState(state);
} else if (syncInput.getState() != null) {
LOGGER.warn("State capture: No new state, falling back on input state: {}", syncInput.getState());
Expand All @@ -196,7 +231,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final AirbyteDestination destination,
final AtomicBoolean cancelled,
final AirbyteMapper mapper,
final MessageTracker sourceMessageTracker,
final MessageTracker messageTracker,
final Map<String, String> mdc) {
return () -> {
MDC.setContextMap(mdc);
Expand All @@ -208,7 +243,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
if (messageOptional.isPresent()) {
final AirbyteMessage message = mapper.mapMessage(messageOptional.get());

sourceMessageTracker.accept(message);
messageTracker.acceptFromSource(message);
destination.accept(message);
recordsRead += 1;

Expand All @@ -235,7 +270,7 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,

private static Runnable getDestinationOutputRunnable(final AirbyteDestination destination,
final AtomicBoolean cancelled,
final MessageTracker destinationMessageTracker,
final MessageTracker messageTracker,
final Map<String, String> mdc) {
return () -> {
MDC.setContextMap(mdc);
Expand All @@ -245,7 +280,7 @@ private static Runnable getDestinationOutputRunnable(final AirbyteDestination de
final Optional<AirbyteMessage> messageOptional = destination.attemptRead();
if (messageOptional.isPresent()) {
LOGGER.info("state in DefaultReplicationWorker from Destination: {}", messageOptional.get());
destinationMessageTracker.accept(messageOptional.get());
messageTracker.acceptFromDestination(messageOptional.get());
}
}
if (!cancelled.get() && destination.getExitValue() != 0) {
Expand Down
Loading