Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AirbyteEstimateTraceMessage and UI sync Estimates - Progress Bars #18630

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
2edfb70
`AirbyteEstimateTraceMessage`
evantahler Oct 28, 2022
970c799
upper-case enum options
evantahler Oct 28, 2022
6cf24c5
Faker emits TraceEstimateMessages
evantahler Oct 28, 2022
b759a72
move emitted_at
evantahler Oct 28, 2022
5f3466e
Mock out job history handler so we have running throughput.
davinchia Oct 28, 2022
d0355ca
Also mock the jobs/list api.
davinchia Oct 29, 2022
8c176f5
[wip] top-level progress bar
evantahler Oct 29, 2022
92f1f26
Merge branch 'master' into evan-davin/hack-day-progress
davinchia Oct 31, 2022
5a1c0b7
UI Basics
evantahler Oct 31, 2022
848ee72
time estimates
evantahler Oct 31, 2022
d422544
fix test
evantahler Oct 31, 2022
eb10300
more localization
evantahler Oct 31, 2022
f9d510d
estimate throughput
evantahler Nov 1, 2022
15538ef
Include byte estimates
evantahler Nov 1, 2022
05297da
Merge branch 'master' into evan-davin/hack-day-progress
evantahler Nov 1, 2022
8976ef8
update display when now estimate is present
evantahler Nov 1, 2022
6867085
Backend changes to support saving state mid progress. (#18723)
davinchia Nov 2, 2022
109411c
support for just displaying totals
evantahler Nov 2, 2022
e19f727
display progressive enhancment
evantahler Nov 2, 2022
1f492ca
Save and return per stream stats. (#18834)
davinchia Nov 2, 2022
4438dd0
formatting fixes
evantahler Nov 2, 2022
7b090dd
Merge branch 'evan-davin/hack-day-progress' of github.com:airbytehq/a…
evantahler Nov 2, 2022
b7ed66f
better source estimate
evantahler Nov 2, 2022
1445e7f
show/hide streams
evantahler Nov 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ JOB_ERROR_REPORTING_STRATEGY=logging
# Although not present as an env var, expected by Log4J configuration.
LOG_LEVEL=INFO


### APPLICATIONS ###
# Worker #
WORKERS_MICRONAUT_ENVIRONMENTS=control-plane
Expand Down
43 changes: 43 additions & 0 deletions airbyte-api/src/main/openapi/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2264,6 +2264,26 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"
/v1/attempt/save_stats:
post:
tags:
- attempt
- internal
summary: For worker to set running attempt stats.
operationId: saveStats
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SaveStatsRequestBody"
required: true
responses:
"200":
description: Successful Operation
content:
application/json:
schema:
$ref: "#/components/schemas/InternalOperationResult"

components:
securitySchemes:
Expand Down Expand Up @@ -4038,6 +4058,12 @@ components:
recordsCommitted:
type: integer
format: int64
estimatedRecords:
type: integer
format: int64
estimatedBytes:
type: integer
format: int64
AttemptStreamStats:
type: object
required:
Expand Down Expand Up @@ -4881,6 +4907,23 @@ components:
processingTaskQueue:
type: string
default: ""
SaveStatsRequestBody:
type: object
required:
- jobId
- attemptNumber
- stats
properties:
jobId:
$ref: "#/components/schemas/JobId"
attemptNumber:
$ref: "#/components/schemas/AttemptNumber"
stats:
$ref: "#/components/schemas/AttemptStats"
streamStats:
type: array
items:
$ref: "#/components/schemas/AttemptStreamStats"
InternalOperationResult:
type: object
required:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ void testBootloaderAppBlankDb() throws Exception {
bootloader.load();

val jobsMigrator = new JobsDatabaseMigrator(jobDatabase, jobsFlyway);
assertEquals("0.40.14.001", jobsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.40.17.001", jobsMigrator.getLatestMigration().getVersion().getVersion());

val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, configsFlyway);
// this line should change with every new migration
Expand Down
27 changes: 27 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Config:

class TraceType(Enum):
ERROR = "ERROR"
ESTIMATE = "ESTIMATE"


class FailureType(Enum):
Expand All @@ -98,6 +99,28 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class Type1(Enum):
STREAM = "STREAM"
SYNC = "SYNC"


class AirbyteEstimateTraceMessage(BaseModel):
class Config:
extra = Extra.allow

name: str = Field(..., description="The name of the stream")
type: Type1 = Field(..., description="The type of estimate")
namespace: Optional[str] = Field(None, description="The namespace of the stream")
row_estimate: Optional[float] = Field(
None,
description="The estimated number of rows to be emitted by this sync for this stream",
)
byte_estimate: Optional[float] = Field(
None,
description="The estimated number of bytes to be emitted by this sync for this stream",
)


class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"

Expand Down Expand Up @@ -213,6 +236,10 @@ class Config:
type: TraceType = Field(..., description="the type of trace message", title="trace type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")
estimate: Optional[AirbyteEstimateTraceMessage] = Field(
None,
description="Estimate trace message: a guess at how much data will be produced in this sync",
)


class AirbyteControlMessage(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class OrchestratorConstants {
EnvConfigs.DD_AGENT_HOST,
EnvConfigs.DD_DOGSTATSD_PORT,
EnvConfigs.METRIC_CLIENT,
EnvConfigs.INTERNAL_API_HOST,
LOG_LEVEL,
LogClientSingleton.GCS_LOG_BUCKET,
LogClientSingleton.GOOGLE_APPLICATION_CREDENTIALS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import datadog.trace.api.Trace;
import io.airbyte.api.client.AirbyteApiClient;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.AttemptStats;
import io.airbyte.api.client.model.generated.AttemptStreamStats;
import io.airbyte.api.client.model.generated.SaveStatsRequestBody;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.config.Configs;
import io.airbyte.config.Configs.WorkerEnvironment;
import io.airbyte.config.EnvConfigs;
import io.airbyte.config.FailureReason;
import io.airbyte.config.ReplicationAttemptSummary;
import io.airbyte.config.ReplicationOutput;
Expand Down Expand Up @@ -80,6 +89,27 @@ public class DefaultReplicationWorker implements ReplicationWorker {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultReplicationWorker.class);

private static final Configs CONFIGS = new EnvConfigs();
private static final AirbyteApiClient CLIENT = getAirbyteApiClient();

// Passing env vars to the container orchestrator isn't working properly. Hack around this for now.
// TODO(Davin): This doesn't work for Kube. Need to figure it out.
private static AirbyteApiClient getAirbyteApiClient() {
if (CONFIGS.getWorkerEnvironment() == WorkerEnvironment.DOCKER) {
return new AirbyteApiClient(
new ApiClient().setScheme("http")
.setHost(CONFIGS.getAirbyteApiHost())
.setPort(CONFIGS.getAirbyteApiPort())
.setBasePath("/api"));
}

return new AirbyteApiClient(
new ApiClient().setScheme("http")
.setHost("airbyte-server-svc")
.setPort(8001)
.setBasePath("/api"));
}

private final String jobId;
private final int attempt;
private final AirbyteSource source;
Expand Down Expand Up @@ -180,7 +210,8 @@ public final ReplicationOutput run(final StandardSyncInput syncInput, final Path
});

final CompletableFuture<?> replicationThreadFuture = CompletableFuture.runAsync(
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker),
getReplicationRunnable(source, destination, cancelled, mapper, messageTracker, mdc, recordSchemaValidator, metricReporter, timeTracker,
Long.parseLong(jobId), attempt),
executors)
.whenComplete((msg, ex) -> {
if (ex != null) {
Expand Down Expand Up @@ -347,7 +378,9 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
final Map<String, String> mdc,
final RecordSchemaValidator recordSchemaValidator,
final WorkerMetricReporter metricReporter,
final ThreadedTimeTracker timeHolder) {
final ThreadedTimeTracker timeHolder,
final Long jobId,
final Integer attemptNumber) {
return () -> {
MDC.setContextMap(mdc);
LOGGER.info("Replication thread started.");
Expand All @@ -367,8 +400,15 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
validateSchema(recordSchemaValidator, validationErrors, airbyteMessage);
final AirbyteMessage message = mapper.mapMessage(airbyteMessage);

// metrics block
messageTracker.acceptFromSource(message);

// config/mutating platform state block
if (message.getType() == Type.STATE || message.getType() == Type.TRACE) {
saveStats(messageTracker, jobId, attemptNumber);
}

// continue processing
try {
if (message.getType() == Type.RECORD || message.getType() == Type.STATE) {
destination.accept(message);
Expand Down Expand Up @@ -427,6 +467,37 @@ private static Runnable getReplicationRunnable(final AirbyteSource source,
};
}

private static void saveStats(MessageTracker messageTracker, Long jobId, Integer attemptNumber) {
final AttemptStats totalStats = new AttemptStats()
.bytesEmitted(messageTracker.getTotalBytesEmitted())
.recordsEmitted(messageTracker.getTotalRecordsEmitted())
.estimatedBytes(messageTracker.getTotalBytesEstimated())
.estimatedRecords(messageTracker.getTotalRecordsEstimated());

// calculate per stream stats
List<AttemptStreamStats> streamStats = messageTracker.getStreamToEstimatedBytes().keySet().stream().map(stream -> {
final var syncStats = new AttemptStats()
.recordsEmitted(messageTracker.getStreamToEmittedRecords().get(stream))
.bytesEmitted(messageTracker.getStreamToEmittedBytes().get(stream))
.estimatedBytes(messageTracker.getStreamToEstimatedBytes().get(stream))
.estimatedRecords(messageTracker.getStreamToEstimatedRecords().get(stream));

return new AttemptStreamStats().streamName(stream).stats(syncStats);
}).collect(Collectors.toList());;

final SaveStatsRequestBody saveStatsRequestBody = new SaveStatsRequestBody()
.jobId(jobId)
.attemptNumber(attemptNumber)
.stats(totalStats)
.streamStats(streamStats);
LOGGER.info("saving stats");
try {
CLIENT.getAttemptApi().saveStats(saveStatsRequestBody);
} catch (ApiException e) {
LOGGER.warn("error trying to save stats: ", e);
}
}

private static void validateSchema(final RecordSchemaValidator recordSchemaValidator,
final Map<String, ImmutablePair<Set<String>, Integer>> validationErrors,
final AirbyteMessage message) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import io.airbyte.protocol.models.AirbyteStateMessage.AirbyteStateType;
import io.airbyte.protocol.models.AirbyteTraceMessage;
import io.airbyte.workers.helper.FailureHelper;
import io.airbyte.workers.internal.StateDeltaTracker.StateDeltaTrackerException;
import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerNoStateMatchException;
import io.airbyte.workers.internal.StateMetricsTracker.StateMetricsTrackerOomException;
import io.airbyte.workers.internal.state_aggregator.DefaultStateAggregator;
import io.airbyte.workers.internal.state_aggregator.StateAggregator;
import java.time.LocalDateTime;
Expand All @@ -51,6 +53,10 @@ public class AirbyteMessageTracker implements MessageTracker {
private final BiMap<String, Short> streamNameToIndex;
private final Map<Short, Long> streamToTotalBytesEmitted;
private final Map<Short, Long> streamToTotalRecordsEmitted;

private final Map<Short, Long> streamToTotalBytesEstimated;

private final Map<Short, Long> streamToTotalRecordsEstimated;
private final StateDeltaTracker stateDeltaTracker;
private final StateMetricsTracker stateMetricsTracker;
private final List<AirbyteTraceMessage> destinationErrorTraceMessages;
Expand Down Expand Up @@ -93,6 +99,8 @@ protected AirbyteMessageTracker(final StateDeltaTracker stateDeltaTracker,
this.hashFunction = Hashing.murmur3_32_fixed();
this.streamToTotalBytesEmitted = new HashMap<>();
this.streamToTotalRecordsEmitted = new HashMap<>();
this.streamToTotalBytesEstimated = new HashMap<>();
this.streamToTotalRecordsEstimated = new HashMap<>();
this.stateDeltaTracker = stateDeltaTracker;
this.stateMetricsTracker = stateMetricsTracker;
this.nextStreamIndex = 0;
Expand Down Expand Up @@ -173,12 +181,12 @@ private void handleSourceEmittedState(final AirbyteStateMessage stateMessage) {
if (!unreliableStateTimingMetrics) {
stateMetricsTracker.addState(stateMessage, stateHash, timeEmittedStateMessage);
}
} catch (final StateDeltaTracker.StateDeltaTrackerException e) {
} catch (final StateDeltaTrackerException e) {
log.warn("The message tracker encountered an issue that prevents committed record counts from being reliably computed.");
log.warn("This only impacts metadata and does not indicate a problem with actual sync data.");
log.warn(e.getMessage(), e);
unreliableCommittedCounts = true;
} catch (final StateMetricsTracker.StateMetricsTrackerOomException e) {
} catch (final StateMetricsTrackerOomException e) {
log.warn("The StateMetricsTracker encountered an out of memory error that prevents new state metrics from being recorded");
log.warn("This only affects metrics and does not indicate a problem with actual sync data.");
unreliableStateTimingMetrics = true;
Expand Down Expand Up @@ -251,6 +259,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec
private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) {
switch (traceMessage.getType()) {
case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType);
case ESTIMATE -> handleEmittedEstimateTrace(traceMessage);
default -> log.warn("Invalid message type for trace message: {}", traceMessage);
}
}
Expand All @@ -263,6 +272,19 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage
}
}

@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage) {
// Assume the estimate is a whole number and not a sum i.e. each estimate replaces the previous
// estimate.

log.info("====== saving trace estimates");
final var estimate = estimateTraceMessage.getEstimate();
final var index = getStreamIndex(estimate.getName());

streamToTotalRecordsEstimated.put(index, estimate.getRowEstimate());
streamToTotalBytesEstimated.put(index, estimate.getByteEstimate());
}

private short getStreamIndex(final String streamName) {
if (!streamNameToIndex.containsKey(streamName)) {
streamNameToIndex.put(streamName, nextStreamIndex);
Expand Down Expand Up @@ -363,6 +385,13 @@ public Map<String, Long> getStreamToEmittedRecords() {
Map.Entry::getValue));
}

@Override
public Map<String, Long> getStreamToEstimatedRecords() {
return streamToTotalRecordsEstimated.entrySet().stream().collect(Collectors.toMap(
entry -> streamNameToIndex.inverse().get(entry.getKey()),
Map.Entry::getValue));
}

/**
* Swap out stream indices for stream names and return total bytes emitted by stream.
*/
Expand All @@ -373,6 +402,13 @@ public Map<String, Long> getStreamToEmittedBytes() {
Map.Entry::getValue));
}

@Override
public Map<String, Long> getStreamToEstimatedBytes() {
return streamToTotalBytesEstimated.entrySet().stream().collect(Collectors.toMap(
entry -> streamNameToIndex.inverse().get(entry.getKey()),
Map.Entry::getValue));
}

/**
* Compute sum of emitted record counts across all streams.
*/
Expand All @@ -381,6 +417,11 @@ public long getTotalRecordsEmitted() {
return streamToTotalRecordsEmitted.values().stream().reduce(0L, Long::sum);
}

@Override
public long getTotalRecordsEstimated() {
return streamToTotalRecordsEstimated.values().stream().reduce(0L, Long::sum);
}

/**
* Compute sum of emitted bytes across all streams.
*/
Expand All @@ -389,6 +430,11 @@ public long getTotalBytesEmitted() {
return streamToTotalBytesEmitted.values().stream().reduce(0L, Long::sum);
}

@Override
public long getTotalBytesEstimated() {
return streamToTotalBytesEstimated.values().stream().reduce(0L, Long::sum);
}

/**
* Compute sum of committed record counts across all streams. If the delta tracker has exceeded its
* capacity, return empty because committed record counts cannot be reliably computed.
Expand Down
Loading