diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index a4b654310d00..190442f88469 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -81,6 +81,7 @@ class Config: class TraceType(Enum): ERROR = "ERROR" + ESTIMATE = "ESTIMATE" class FailureType(Enum): @@ -98,6 +99,28 @@ class Config: failure_type: Optional[FailureType] = Field(None, description="The type of error") +class EstimateType(Enum): + STREAM = "STREAM" + SYNC = "SYNC" + + +class AirbyteEstimateTraceMessage(BaseModel): + class Config: + extra = Extra.allow + + name: str = Field(..., description="The name of the stream") + type: EstimateType = Field(..., description="The type of estimate", title="estimate type") + namespace: Optional[str] = Field(None, description="The namespace of the stream") + row_estimate: Optional[int] = Field( + None, + description="The estimated number of rows to be emitted by this sync for this stream", + ) + byte_estimate: Optional[int] = Field( + None, + description="The estimated number of bytes to be emitted by this sync for this stream", + ) + + class OrchestratorType(Enum): CONNECTOR_CONFIG = "CONNECTOR_CONFIG" @@ -213,6 +236,10 @@ class Config: type: TraceType = Field(..., description="the type of trace message", title="trace type") emitted_at: float = Field(..., description="the time in ms that the message was emitted") error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object") + estimate: Optional[AirbyteEstimateTraceMessage] = Field( + None, + description="Estimate trace message: a guess at how much data will be produced in this sync", + ) class AirbyteControlMessage(BaseModel): diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 9fdffcc703f4..181ddc096d99 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -5,7 +5,6 @@ import datetime import re from dataclasses import InitVar, dataclass, field -from dateutil.relativedelta import relativedelta from typing import Any, Iterable, Mapping, Optional, Union from airbyte_cdk.models import SyncMode @@ -17,6 +16,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState from dataclasses_jsonschema import JsonSchemaMixin +from dateutil.relativedelta import relativedelta @dataclass @@ -71,7 +71,9 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): stream_state_field_end: Optional[str] = None lookback_window: Optional[Union[InterpolatedString, str]] = None - timedelta_regex = re.compile(r"((?P[\.\d]+?)y)?" r"((?P[\.\d]+?)m)?" r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$") + timedelta_regex = re.compile( + r"((?P[\.\d]+?)y)?" r"((?P[\.\d]+?)m)?" r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$" + ) def __post_init__(self, options: Mapping[str, Any]): if not isinstance(self.start_datetime, MinMaxDatetime): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index a187a4eb5d35..b0a9aa70010c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -3,7 +3,6 @@ # import datetime -from dateutil.relativedelta import relativedelta from typing import List, Optional, Union import pytest @@ -41,6 +40,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from dateutil.relativedelta import relativedelta from jsonschema import ValidationError factory = DeclarativeComponentFactory() diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index aa4b348887ae..44539d0e23f9 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -250,6 +250,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec */ private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) { switch (traceMessage.getType()) { + case ESTIMATE -> handleEmittedEstimateTrace(traceMessage, connectorType); case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType); default -> log.warn("Invalid message type for trace message: {}", traceMessage); } @@ -263,6 +264,11 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage } } + @SuppressWarnings("PMD") // until method is implemented + private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) { + + } + private short getStreamIndex(final String streamName) { if (!streamNameToIndex.containsKey(streamName)) { streamNameToIndex.put(streamName, nextStreamIndex); diff --git a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index 9965bde95825..5c6baf92524f 100644 --- a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -4,7 +4,7 @@ title: AirbyteProtocol type: object description: AirbyteProtocol structs -version: 0.3.1 +version: 0.3.2 properties: airbyte_message: "$ref": "#/definitions/AirbyteMessage" @@ -174,12 +174,16 @@ definitions: type: string enum: - ERROR + - ESTIMATE emitted_at: description: "the time in ms that the message was emitted" type: number error: description: "error trace message: the error object" "$ref": "#/definitions/AirbyteErrorTraceMessage" + estimate: + description: "Estimate trace message: a guess at how much data will be produced in this sync" + "$ref": "#/definitions/AirbyteEstimateTraceMessage" AirbyteErrorTraceMessage: type: object additionalProperties: true @@ -201,6 +205,32 @@ definitions: enum: - system_error - config_error + AirbyteEstimateTraceMessage: + type: object + additionalProperties: true + required: + - name + - type + properties: + name: + description: The name of the stream + type: string + type: + title: "estimate type" # this title is required to avoid python codegen conflicts with the "type" parameter in AirbyteMessage. See https://github.com/airbytehq/airbyte/pull/12581 + description: The type of estimate + type: string + enum: + - STREAM + - SYNC + namespace: + description: The namespace of the stream + type: string + row_estimate: + description: The estimated number of rows to be emitted by this sync for this stream + type: integer + byte_estimate: + description: The estimated number of bytes to be emitted by this sync for this stream + type: integer AirbyteControlMessage: type: object additionalProperties: true diff --git a/docs/understanding-airbyte/airbyte-protocol.md b/docs/understanding-airbyte/airbyte-protocol.md index b54ca297cd80..d561b23ed07c 100644 --- a/docs/understanding-airbyte/airbyte-protocol.md +++ b/docs/understanding-airbyte/airbyte-protocol.md @@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the | Version | Date of Change | Pull Request(s) | Subject | | :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- | +| `v0.3.2` | 2022-10-128 | [18875](https://github.com/airbytehq/airbyte/pull/18875) | `AirbyteEstimateTraceMessage` added | | `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added | | `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added | | `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages | @@ -759,7 +760,7 @@ AirbyteLogMessage: ### AirbyteTraceMessage -The trace message allows an Actor to emit metadata about the runtime of the Actor. As currently implemented, it allows an Actor to surface information about errors. This message is designed to grow to handle other use cases, including progress and performance metrics. +The trace message allows an Actor to emit metadata about the runtime of the Actor, such as errors or estimates. This message is designed to grow to handle other use cases, including additonal performance metrics. ```yaml AirbyteTraceMessage: @@ -775,12 +776,16 @@ AirbyteTraceMessage: type: string enum: - ERROR + - ESTIMATE emitted_at: description: "the time in ms that the message was emitted" type: number error: description: "error trace message: the error object" "$ref": "#/definitions/AirbyteErrorTraceMessage" + estimate: + description: "Estimate trace message: a guess at how much data will be produced in this sync" + "$ref": "#/definitions/AirbyteEstimateTraceMessage" AirbyteErrorTraceMessage: type: object additionalProperties: true @@ -802,8 +807,47 @@ AirbyteErrorTraceMessage: enum: - system_error - config_error +AirbyteEstimateTraceMessage: + type: object + additionalProperties: true + required: + - name + - type + properties: + name: + description: The name of the stream + type: string + type: + description: The type of estimate + type: string + enum: + - STREAM + - SYNC + namespace: + description: The namespace of the stream + type: string + row_estimate: + description: The estimated number of rows to be emitted by this sync for this stream + type: integer + byte_estimate: + description: The estimated number of bytes to be emitted by this sync for this stream + type: integer ``` +#### AirbyteErrorTraceMessage + +Error Trace Messages are used when a sync is about to fail and the connector can provide meaningful information to the orhcestrator or user about what to do next. + +Of note, an `internal_message` might be an exception code, but an `external_message` is meant to be user-facing, e.g. "Your API Key is invalid". + +Syncs can fail for multiple reasons, and therefore multiple `AirbyteErrorTraceMessage` can be sent from a connector. + +#### AirbyteEstimateTraceMessage + +Estimate Trace Messages are used by connectors to inform the orchestrator about how much data they expect to move within the sync. This ise useful to present the user with estimates of the time remaining in the sync, or percentage complete. An example of this would be for every stream about to be synced from a databse to provde a `COUNT (*) from {table_name} where updated_at > {state}` to provide an estimate of the rows to be sent in this sync. + +`AirbyteEstimateTraceMessage` should be emitted early in the sync to provide an early estimate of the sync's duration. Multiple `AirbyteEstimateTraceMessage`s can be sent for the same stream, and an updated estimate will replace the previous value. + ### AirbyteControlMessage An `AirbyteControlMessage` is for connectors to signal to the Airbyte Platform or Orchestrator that an action with a side-effect should be taken. This means that the Orchestrator will likely be altering some stored data about the connector, connection, or sync.