Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Progress Bar Estimate #19814

Merged
merged 13 commits into from
Nov 29, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.collect.BiMap;
import com.google.common.collect.HashBiMap;
import com.google.common.hash.HashFunction;
Expand All @@ -19,6 +20,7 @@
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteControlConnectorConfigMessage;
import io.airbyte.protocol.models.AirbyteControlMessage;
import io.airbyte.protocol.models.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand Down Expand Up @@ -53,13 +55,20 @@ public class AirbyteMessageTracker implements MessageTracker {
private final BiMap<AirbyteStreamNameNamespacePair, Short> nameNamespacePairToIndex;
private final Map<Short, Long> streamToTotalBytesEmitted;
private final Map<Short, Long> streamToTotalRecordsEmitted;
private final Map<Short, Long> streamToTotalBytesEstimated;
private final Map<Short, Long> streamToTotalRecordsEstimated;
private final StateDeltaTracker stateDeltaTracker;
private final StateMetricsTracker stateMetricsTracker;
private final List<AirbyteTraceMessage> destinationErrorTraceMessages;
private final List<AirbyteTraceMessage> sourceErrorTraceMessages;
private final StateAggregator stateAggregator;
private final boolean logConnectorMessages = new EnvVariableFeatureFlags().logConnectorMessages();

// These variables support SYNC level estimates and are meant for sources where stream level
// estimates are not possible e.g. CDC sources.
private Long totalRecordsEstimatedSync;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are adding a new concept of Sync vs Stream, instead, would it make sense to use STREAM vs GLOBAL?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm following from the AirbyteTraceEstimateMessage type. Slight preference to keep the names consistent though happy to change the name within this file to Global.

With the AirbyteTraceEstiamteMessage, I do think STREAM vs SYNC are decent names for now.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking about consistency for the near future, Sync feels very coupled to how we current run the replication. For STREAM, I think we want to modify our current replication model to be able to replicate data at stream level rather than connection. If we are running per stream replication in parallel, Sync mode for progress looks outdated.

It probably feels more like future-proofing, I don't have reasons to feel strongly either way at this moment.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with your thoughts. Honestly, not 100% how the stream-as-first-class citizen refactor will play with CDC w.r.t the stats here.

In that case, I'm guessing we will be able to do away with the SYNC field. I'm inclined to keep this as is for now and revisit when we cross that. How does that sound?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me, conceptually, I think we are tracking the right way, we can address the naming later on.

private Long totalBytesEstimatedSync;

private short nextStreamIndex;

/**
Expand Down Expand Up @@ -95,6 +104,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
this.hashFunction = Hashing.murmur3_32_fixed();
this.streamToTotalBytesEmitted = new HashMap<>();
this.streamToTotalRecordsEmitted = new HashMap<>();
this.streamToTotalBytesEstimated = new HashMap<>();
this.streamToTotalRecordsEstimated = new HashMap<>();
this.stateDeltaTracker = stateDeltaTracker;
this.stateMetricsTracker = stateMetricsTracker;
this.nextStreamIndex = 0;
Expand Down Expand Up @@ -252,7 +263,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec
*/
private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) {
switch (traceMessage.getType()) {
case ESTIMATE -> handleEmittedEstimateTrace(traceMessage, connectorType);
case ESTIMATE -> handleEmittedEstimateTrace(traceMessage.getEstimate());
case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType);
default -> log.warn("Invalid message type for trace message: {}", traceMessage);
}
Expand All @@ -266,8 +277,35 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage
}
}

@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) {
/**
* There are several assumptions here:
* <p>
* - Assume the estimate is a whole number and not a sum i.e. each estimate replaces the previous
* estimate.
* <p>
* - Sources cannot emit both STREAM and SYNC estimates in a same sync. Error out if this happens.
*/
@SuppressWarnings("PMD.AvoidDuplicateLiterals")
private void handleEmittedEstimateTrace(final AirbyteEstimateTraceMessage estimate) {
switch (estimate.getType()) {
case STREAM -> {
Preconditions.checkArgument(totalBytesEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync.");
Preconditions.checkArgument(totalRecordsEstimatedSync == null, "STREAM and SYNC estimates should not be emitted in the same sync.");

log.info("Saving records estimates for namespace: {}, stream: {}", estimate.getNamespace(), estimate.getName());
final var index = getStreamIndex(new AirbyteStreamNameNamespacePair(estimate.getName(), estimate.getNamespace()));

streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate());
streamToTotalBytesEstimated.put(index, estimate.getByteEstimate());
}
case SYNC -> {
Preconditions.checkArgument(streamToTotalBytesEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync.");
Preconditions.checkArgument(streamToTotalRecordsEstimated.isEmpty(), "STREAM and SYNC estimates should not be emitted in the same sync.");

totalBytesEstimatedSync = estimate.getByteEstimate();
totalRecordsEstimatedSync = estimate.getRowEstimate();
}
}

}

Expand Down Expand Up @@ -368,6 +406,15 @@ public Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedRecords() {
entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue));
}

/**
* Swap out stream indices for stream names and return total records estimated by stream.
*/
@Override
public Map<AirbyteStreamNameNamespacePair, Long> getStreamToEstimatedRecords() {
return streamToTotalRecordsEstimated.entrySet().stream().collect(Collectors.toMap(
entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue));
}

/**
* Swap out stream indices for stream names and return total bytes emitted by stream.
*/
Expand All @@ -377,6 +424,15 @@ public Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedBytes() {
entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue));
}

/**
* Swap out stream indices for stream names and return total bytes estimated by stream.
*/
@Override
public Map<AirbyteStreamNameNamespacePair, Long> getStreamToEstimatedBytes() {
return streamToTotalBytesEstimated.entrySet().stream().collect(Collectors.toMap(
entry -> nameNamespacePairToIndex.inverse().get(entry.getKey()), Entry::getValue));
}

/**
* Compute sum of emitted record counts across all streams.
*/
Expand All @@ -385,6 +441,18 @@ public long getTotalRecordsEmitted() {
return streamToTotalRecordsEmitted.values().stream().reduce(0L, Long::sum);
}

/**
* Compute sum of estimated record counts across all streams.
*/
@Override
public long getTotalRecordsEstimated() {
if (!streamToTotalRecordsEstimated.isEmpty()) {
return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum);
}

return totalRecordsEstimatedSync;
}

/**
* Compute sum of emitted bytes across all streams.
*/
Expand All @@ -393,6 +461,18 @@ public long getTotalBytesEmitted() {
return streamToTotalBytesEmitted.values().stream().reduce(0L, Long::sum);
}

/**
* Compute sum of estimated bytes across all streams.
*/
@Override
public long getTotalBytesEstimated() {
if (!streamToTotalBytesEstimated.isEmpty()) {
return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum);
}

return totalBytesEstimatedSync;
}

/**
* Compute sum of committed record counts across all streams. If the delta tracker has exceeded its
* capacity, return empty because committed record counts cannot be reliably computed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,14 @@ public interface MessageTracker {
*/
Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedRecords();

/**
* Get the per-stream estimated record count provided by
* {@link io.airbyte.protocol.models.AirbyteEstimateTraceMessage}.
*
* @return returns a map of estimated record count by stream name.
*/
Map<AirbyteStreamNameNamespacePair, Long> getStreamToEstimatedRecords();

/**
* Get the per-stream emitted byte count. This includes messages that were emitted by the source,
* but never committed by the destination.
Expand All @@ -74,6 +82,14 @@ public interface MessageTracker {
*/
Map<AirbyteStreamNameNamespacePair, Long> getStreamToEmittedBytes();

/**
* Get the per-stream estimated byte count provided by
* {@link io.airbyte.protocol.models.AirbyteEstimateTraceMessage}.
*
* @return returns a map of estimated bytes by stream name.
*/
Map<AirbyteStreamNameNamespacePair, Long> getStreamToEstimatedBytes();

/**
* Get the overall emitted record count. This includes messages that were emitted by the source, but
* never committed by the destination.
Expand All @@ -82,6 +98,13 @@ public interface MessageTracker {
*/
long getTotalRecordsEmitted();

/**
* Get the overall estimated record count.
*
* @return returns the total count of estimated records across all streams.
*/
long getTotalRecordsEstimated();

/**
* Get the overall emitted bytes. This includes messages that were emitted by the source, but never
* committed by the destination.
Expand All @@ -90,6 +113,13 @@ public interface MessageTracker {
*/
long getTotalBytesEmitted();

/**
* Get the overall estimated bytes.
*
* @return returns the total count of estimated bytes across all streams.
*/
long getTotalBytesEstimated();

/**
* Get the overall committed record count.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteErrorTraceMessage;
import io.airbyte.protocol.models.AirbyteEstimateTraceMessage;
import io.airbyte.protocol.models.AirbyteGlobalState;
import io.airbyte.protocol.models.AirbyteLogMessage;
import io.airbyte.protocol.models.AirbyteMessage;
Expand Down Expand Up @@ -102,29 +103,60 @@ public static AirbyteStreamState createStreamState(final String streamName) {
return new AirbyteStreamState().withStreamDescriptor(new StreamDescriptor().withName(streamName));
}

public static AirbyteMessage createStreamEstimateMessage(final String name, final String namespace, final long byteEst, final long rowEst) {
return createEstimateMessage(AirbyteEstimateTraceMessage.Type.STREAM, name, namespace, byteEst, rowEst);
}

public static AirbyteMessage createSyncEstimateMessage(final long byteEst, final long rowEst) {
return createEstimateMessage(AirbyteEstimateTraceMessage.Type.SYNC, null, null, byteEst, rowEst);
}

public static AirbyteMessage createEstimateMessage(AirbyteEstimateTraceMessage.Type type,
final String name,
final String namespace,
final long byteEst,
final long rowEst) {
final var est = new AirbyteEstimateTraceMessage()
.withType(type)
.withByteEstimate(byteEst)
.withRowEstimate(rowEst);

if (name != null) {
est.withName(name);
}
if (namespace != null) {
est.withNamespace(namespace);
}

return new AirbyteMessage()
.withType(Type.TRACE)
.withTrace(new AirbyteTraceMessage().withType(AirbyteTraceMessage.Type.ESTIMATE)
.withEstimate(est));
}

public static AirbyteMessage createErrorMessage(final String message, final Double emittedAt) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
.withTrace(createErrorTraceMessage(message, emittedAt));
}

public static AirbyteTraceMessage createErrorTraceMessage(final String message, final Double emittedAt) {
return new AirbyteTraceMessage()
.withType(io.airbyte.protocol.models.AirbyteTraceMessage.Type.ERROR)
.withEmittedAt(emittedAt)
.withError(new AirbyteErrorTraceMessage().withMessage(message));
return createErrorTraceMessage(message, emittedAt, null);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consolidate the various trace message creation into one function to avoid duplication.

}

public static AirbyteTraceMessage createErrorTraceMessage(final String message,
final Double emittedAt,
final AirbyteErrorTraceMessage.FailureType failureType) {
return new AirbyteTraceMessage()
final var msg = new AirbyteTraceMessage()
.withType(io.airbyte.protocol.models.AirbyteTraceMessage.Type.ERROR)
.withEmittedAt(emittedAt)
.withError(new AirbyteErrorTraceMessage().withMessage(message).withFailureType(failureType));
}
.withError(new AirbyteErrorTraceMessage().withMessage(message))
.withEmittedAt(emittedAt);

public static AirbyteMessage createTraceMessage(final String message, final Double emittedAt) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
.withTrace(new AirbyteTraceMessage()
.withType(AirbyteTraceMessage.Type.ERROR)
.withEmittedAt(emittedAt)
.withError(new AirbyteErrorTraceMessage().withMessage(message)));
if (failureType != null) {
msg.getError().withFailureType(failureType);
}

return msg;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ void testReplicationRunnableWorkerFailure() throws Exception {
@Test
void testOnlyStateAndRecordMessagesDeliveredToDestination() throws Exception {
final AirbyteMessage LOG_MESSAGE = AirbyteMessageUtils.createLogMessage(Level.INFO, "a log message");
final AirbyteMessage TRACE_MESSAGE = AirbyteMessageUtils.createTraceMessage("a trace message", 123456.0);
final AirbyteMessage TRACE_MESSAGE = AirbyteMessageUtils.createErrorMessage("a trace message", 123456.0);
when(mapper.mapMessage(LOG_MESSAGE)).thenReturn(LOG_MESSAGE);
when(mapper.mapMessage(TRACE_MESSAGE)).thenReturn(TRACE_MESSAGE);
when(source.isFinished()).thenReturn(false, false, false, false, true);
Expand Down
Loading