Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AirbyteEstimateTraceMessage #18875

Merged
merged 10 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ class Config:

class TraceType(Enum):
ERROR = "ERROR"
ESTIMATE = "ESTIMATE"


class FailureType(Enum):
Expand All @@ -98,6 +99,28 @@ class Config:
failure_type: Optional[FailureType] = Field(None, description="The type of error")


class EstimateType(Enum):
STREAM = "STREAM"
SYNC = "SYNC"


class AirbyteEstimateTraceMessage(BaseModel):
class Config:
extra = Extra.allow

name: str = Field(..., description="The name of the stream")
type: EstimateType = Field(..., description="The type of estimate", title="estimate type")
namespace: Optional[str] = Field(None, description="The namespace of the stream")
row_estimate: Optional[int] = Field(
None,
description="The estimated number of rows to be emitted by this sync for this stream",
)
byte_estimate: Optional[int] = Field(
None,
description="The estimated number of bytes to be emitted by this sync for this stream",
)


class OrchestratorType(Enum):
CONNECTOR_CONFIG = "CONNECTOR_CONFIG"

Expand Down Expand Up @@ -213,6 +236,10 @@ class Config:
type: TraceType = Field(..., description="the type of trace message", title="trace type")
emitted_at: float = Field(..., description="the time in ms that the message was emitted")
error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object")
estimate: Optional[AirbyteEstimateTraceMessage] = Field(
None,
description="Estimate trace message: a guess at how much data will be produced in this sync",
)


class AirbyteControlMessage(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import datetime
import re
from dataclasses import InitVar, dataclass, field
from dateutil.relativedelta import relativedelta
from typing import Any, Iterable, Mapping, Optional, Union

from airbyte_cdk.models import SyncMode
Expand All @@ -17,6 +16,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer
from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState
from dataclasses_jsonschema import JsonSchemaMixin
from dateutil.relativedelta import relativedelta


@dataclass
Expand Down Expand Up @@ -71,7 +71,9 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin):
stream_state_field_end: Optional[str] = None
lookback_window: Optional[Union[InterpolatedString, str]] = None

timedelta_regex = re.compile(r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$")
timedelta_regex = re.compile(
r"((?P<years>[\.\d]+?)y)?" r"((?P<months>[\.\d]+?)m)?" r"((?P<weeks>[\.\d]+?)w)?" r"((?P<days>[\.\d]+?)d)?$"
)

def __post_init__(self, options: Mapping[str, Any]):
if not isinstance(self.start_datetime, MinMaxDatetime):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#

import datetime
from dateutil.relativedelta import relativedelta
from typing import List, Optional, Union

import pytest
Expand Down Expand Up @@ -41,6 +40,7 @@
from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition
from dateutil.relativedelta import relativedelta
from jsonschema import ValidationError

factory = DeclarativeComponentFactory()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec
*/
private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) {
switch (traceMessage.getType()) {
case ESTIMATE -> handleEmittedEstimateTrace(traceMessage, connectorType);
case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType);
default -> log.warn("Invalid message type for trace message: {}", traceMessage);
}
Expand All @@ -263,6 +264,11 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage
}
}

@SuppressWarnings("PMD") // until method is implemented
private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) {

}

private short getStreamIndex(final String streamName) {
if (!streamNameToIndex.containsKey(streamName)) {
streamNameToIndex.put(streamName, nextStreamIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
title: AirbyteProtocol
type: object
description: AirbyteProtocol structs
version: 0.3.1
version: 0.3.2
properties:
airbyte_message:
"$ref": "#/definitions/AirbyteMessage"
Expand Down Expand Up @@ -174,12 +174,16 @@ definitions:
type: string
enum:
- ERROR
- ESTIMATE
emitted_at:
description: "the time in ms that the message was emitted"
type: number
error:
description: "error trace message: the error object"
"$ref": "#/definitions/AirbyteErrorTraceMessage"
estimate:
description: "Estimate trace message: a guess at how much data will be produced in this sync"
"$ref": "#/definitions/AirbyteEstimateTraceMessage"
AirbyteErrorTraceMessage:
type: object
additionalProperties: true
Expand All @@ -201,6 +205,32 @@ definitions:
enum:
- system_error
- config_error
AirbyteEstimateTraceMessage:
type: object
additionalProperties: true
required:
- name
- type
properties:
name:
description: The name of the stream
type: string
type:
title: "estimate type" # this title is required to avoid python codegen conflicts with the "type" parameter in AirbyteMessage. See https://github.com/airbytehq/airbyte/pull/12581
description: The type of estimate
type: string
enum:
- STREAM
- SYNC
cgardens marked this conversation as resolved.
Show resolved Hide resolved
namespace:
description: The namespace of the stream
type: string
row_estimate:
description: The estimated number of rows to be emitted by this sync for this stream
type: integer
byte_estimate:
description: The estimated number of bytes to be emitted by this sync for this stream
type: integer
AirbyteControlMessage:
type: object
additionalProperties: true
Expand Down
46 changes: 45 additions & 1 deletion docs/understanding-airbyte/airbyte-protocol.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the

| Version | Date of Change | Pull Request(s) | Subject |
| :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- |
| `v0.3.2` | 2022-10-128 | [18875](https://github.com/airbytehq/airbyte/pull/18875) | `AirbyteEstimateTraceMessage` added |
| `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added |
| `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added |
| `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages |
Expand Down Expand Up @@ -759,7 +760,7 @@ AirbyteLogMessage:

### AirbyteTraceMessage

The trace message allows an Actor to emit metadata about the runtime of the Actor. As currently implemented, it allows an Actor to surface information about errors. This message is designed to grow to handle other use cases, including progress and performance metrics.
The trace message allows an Actor to emit metadata about the runtime of the Actor, such as errors or estimates. This message is designed to grow to handle other use cases, including additonal performance metrics.

```yaml
AirbyteTraceMessage:
Expand All @@ -775,12 +776,16 @@ AirbyteTraceMessage:
type: string
enum:
- ERROR
- ESTIMATE
emitted_at:
description: "the time in ms that the message was emitted"
type: number
error:
description: "error trace message: the error object"
"$ref": "#/definitions/AirbyteErrorTraceMessage"
estimate:
description: "Estimate trace message: a guess at how much data will be produced in this sync"
"$ref": "#/definitions/AirbyteEstimateTraceMessage"
AirbyteErrorTraceMessage:
type: object
additionalProperties: true
Expand All @@ -802,8 +807,47 @@ AirbyteErrorTraceMessage:
enum:
- system_error
- config_error
AirbyteEstimateTraceMessage:
type: object
additionalProperties: true
required:
- name
- type
properties:
name:
description: The name of the stream
type: string
type:
description: The type of estimate
type: string
enum:
- STREAM
- SYNC
namespace:
description: The namespace of the stream
type: string
row_estimate:
description: The estimated number of rows to be emitted by this sync for this stream
type: integer
byte_estimate:
description: The estimated number of bytes to be emitted by this sync for this stream
type: integer
```

#### AirbyteErrorTraceMessage

Error Trace Messages are used when a sync is about to fail and the connector can provide meaningful information to the orhcestrator or user about what to do next.

Of note, an `internal_message` might be an exception code, but an `external_message` is meant to be user-facing, e.g. "Your API Key is invalid".

Syncs can fail for multiple reasons, and therefore multiple `AirbyteErrorTraceMessage` can be sent from a connector.

#### AirbyteEstimateTraceMessage

Estimate Trace Messages are used by connectors to inform the orchestrator about how much data they expect to move within the sync. This ise useful to present the user with estimates of the time remaining in the sync, or percentage complete. An example of this would be for every stream about to be synced from a databse to provde a `COUNT (*) from {table_name} where updated_at > {state}` to provide an estimate of the rows to be sent in this sync.

`AirbyteEstimateTraceMessage` should be emitted early in the sync to provide an early estimate of the sync's duration. Multiple `AirbyteEstimateTraceMessage`s can be sent for the same stream, and an updated estimate will replace the previous value.

### AirbyteControlMessage

An `AirbyteControlMessage` is for connectors to signal to the Airbyte Platform or Orchestrator that an action with a side-effect should be taken. This means that the Orchestrator will likely be altering some stored data about the connector, connection, or sync.
Expand Down