From 7de79572f4056e61dd1afd3c3a6ebd9205eaf0c4 Mon Sep 17 00:00:00 2001 From: evantahler Date: Wed, 2 Nov 2022 13:46:35 -0700 Subject: [PATCH 1/9] `AirbyteEstimateTraceMessage` --- .../airbyte_cdk/models/airbyte_protocol.py | 29 +++++++++++- .../internal/AirbyteMessageTracker.java | 7 +++ .../airbyte_protocol/airbyte_protocol.yaml | 35 ++++++++++++-- .../understanding-airbyte/airbyte-protocol.md | 46 ++++++++++++++++++- 4 files changed, 112 insertions(+), 5 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index a4b654310d00..8283b6787e3c 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -81,6 +81,7 @@ class Config: class TraceType(Enum): ERROR = "ERROR" + ESTIMATE = "ESTIMATE" class FailureType(Enum): @@ -98,6 +99,28 @@ class Config: failure_type: Optional[FailureType] = Field(None, description="The type of error") +class Type1(Enum): + STREAM = "STREAM" + SYNC = "SYNC" + + +class AirbyteEstimateTraceMessage(BaseModel): + class Config: + extra = Extra.allow + + name: str = Field(..., description="The name of the stream") + type: Type1 = Field(..., description="The type of estimate") + namespace: Optional[str] = Field(None, description="The namespace of the stream") + row_estimate: Optional[int] = Field( + None, + description="The estimated number of rows to be emitted by this sync for this stream", + ) + byte_estimate: Optional[int] = Field( + None, + description="The estimated number of bytes to be emitted by this sync for this stream", + ) + + class OrchestratorType(Enum): CONNECTOR_CONFIG = "CONNECTOR_CONFIG" @@ -186,7 +209,7 @@ class Config: ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations that\nalso need to be merged back into the connector configuration at runtime.\nThis is a subset configuration of `complete_oauth_server_input_specification` that filters fields out to retain only the ones that\nare necessary for the connector to function with OAuth. (some fields could be used during oauth flows but not needed afterwards, therefore\nthey would be listed in the `complete_oauth_server_input_specification` but not `complete_oauth_server_output_specification`)\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nconnector when using OAuth flow APIs.\nThese fields are to be merged back to `ConnectorSpecification.connectionSpecification`.\nFor each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it,\n\nExamples:\n\n complete_oauth_server_output_specification={\n client_id: {\n type: string,\n path_in_connector_config: ['credentials', 'client_id']\n },\n client_secret: {\n type: string,\n path_in_connector_config: ['credentials', 'client_secret']\n }\n }", + description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations that\nalso need to be merged back into the connector configuration at runtime.\nThis is a subset configuration of `complete_oauth_server_input_specification` that filters fields out to retain only the ones that\nare necessary for the connector to function with OAuth. (some fields could be used during oauth flows but not needed afterwards, therefore\nthey would be listed in the `complete_oauth_server_input_specification` but not `complete_oauth_server_output_specification`)\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Worksdefaultrepace Admins to be used by the\nconnector when using OAuth flow APIs.\nThese fields are to be merged back to `ConnectorSpecification.connectionSpecification`.\nFor each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it,\n\nExamples:\n\n complete_oauth_server_output_specification={\n client_id: {\n type: string,\n path_in_connector_config: ['credentials', 'client_id']\n },\n client_secret: {\n type: string,\n path_in_connector_config: ['credentials', 'client_secret']\n }\n }", ) @@ -213,6 +236,10 @@ class Config: type: TraceType = Field(..., description="the type of trace message", title="trace type") emitted_at: float = Field(..., description="the time in ms that the message was emitted") error: Optional[AirbyteErrorTraceMessage] = Field(None, description="error trace message: the error object") + estimate: Optional[AirbyteEstimateTraceMessage] = Field( + None, + description="Estimate trace message: a guess at how much data will be produced in this sync", + ) class AirbyteControlMessage(BaseModel): diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index aa4b348887ae..8a99dd9ad347 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -250,6 +250,7 @@ private void handleEmittedOrchestratorConnectorConfig(final AirbyteControlConnec */ private void handleEmittedTrace(final AirbyteTraceMessage traceMessage, final ConnectorType connectorType) { switch (traceMessage.getType()) { + case ESTIMATE -> handleEmittedEstimateTrace(traceMessage, connectorType); case ERROR -> handleEmittedErrorTrace(traceMessage, connectorType); default -> log.warn("Invalid message type for trace message: {}", traceMessage); } @@ -263,6 +264,12 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage } } + + @SuppressWarnings("PMD") // until method is implemented + private void handleEmittedEstimateTraceTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) { + + } + private short getStreamIndex(final String streamName) { if (!streamNameToIndex.containsKey(streamName)) { streamNameToIndex.put(streamName, nextStreamIndex); diff --git a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index 9965bde95825..26819940e4e9 100644 --- a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -4,7 +4,7 @@ title: AirbyteProtocol type: object description: AirbyteProtocol structs -version: 0.3.1 +version: 0.3.2 properties: airbyte_message: "$ref": "#/definitions/AirbyteMessage" @@ -174,12 +174,16 @@ definitions: type: string enum: - ERROR + - ESTIMATE emitted_at: description: "the time in ms that the message was emitted" type: number error: description: "error trace message: the error object" "$ref": "#/definitions/AirbyteErrorTraceMessage" + estimate: + description: "Estimate trace message: a guess at how much data will be produced in this sync" + "$ref": "#/definitions/AirbyteEstimateTraceMessage" AirbyteErrorTraceMessage: type: object additionalProperties: true @@ -201,6 +205,31 @@ definitions: enum: - system_error - config_error + AirbyteEstimateTraceMessage: + type: object + additionalProperties: true + required: + - name + - type + properties: + name: + description: The name of the stream + type: string + type: + description: The type of estimate + type: string + enum: + - STREAM + - SYNC + namespace: + description: The namespace of the stream + type: string + row_estimate: + description: The estimated number of rows to be emitted by this sync for this stream + type: integer + byte_estimate: + description: The estimated number of bytes to be emitted by this sync for this stream + type: integer AirbyteControlMessage: type: object additionalProperties: true @@ -541,7 +570,7 @@ definitions: This is a subset configuration of `complete_oauth_server_input_specification` that filters fields out to retain only the ones that are necessary for the connector to function with OAuth. (some fields could be used during oauth flows but not needed afterwards, therefore they would be listed in the `complete_oauth_server_input_specification` but not `complete_oauth_server_output_specification`) - Must be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the + Must be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Worksdefaultrepace Admins to be used by the connector when using OAuth flow APIs. These fields are to be merged back to `ConnectorSpecification.connectionSpecification`. For each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it, @@ -559,4 +588,4 @@ definitions: } } type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode + existingJavaType: com.fasterxml.jackson.databind.JsonNode \ No newline at end of file diff --git a/docs/understanding-airbyte/airbyte-protocol.md b/docs/understanding-airbyte/airbyte-protocol.md index 381e03f05aa1..447aebfe48a1 100644 --- a/docs/understanding-airbyte/airbyte-protocol.md +++ b/docs/understanding-airbyte/airbyte-protocol.md @@ -28,6 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the | Version | Date of Change | Pull Request(s) | Subject | | :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- | +| `v0.3.2` | 2022-10-128 | [xxx](https://github.com/airbytehq/airbyte/pull/xxx) | `AirbyteEstimateTraceMessage` added | | `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added | | `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added | | `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages | @@ -759,7 +760,7 @@ AirbyteLogMessage: ## AirbyteTraceMessage -The trace message allows an Actor to emit metadata about the runtime of the Actor. As currently implemented, it allows an Actor to surface information about errors. This message is designed to grow to handle other use cases, including progress and performance metrics. +The trace message allows an Actor to emit metadata about the runtime of the Actor, such as errors or estimates. This message is designed to grow to handle other use cases, including additonal performance metrics. ```yaml AirbyteTraceMessage: @@ -775,12 +776,16 @@ AirbyteTraceMessage: type: string enum: - ERROR + - ESTIMATE emitted_at: description: "the time in ms that the message was emitted" type: number error: description: "error trace message: the error object" "$ref": "#/definitions/AirbyteErrorTraceMessage" + estimate: + description: "Estimate trace message: a guess at how much data will be produced in this sync" + "$ref": "#/definitions/AirbyteEstimateTraceMessage" AirbyteErrorTraceMessage: type: object additionalProperties: true @@ -802,8 +807,47 @@ AirbyteErrorTraceMessage: enum: - system_error - config_error +AirbyteEstimateTraceMessage: + type: object + additionalProperties: true + required: + - name + - type + properties: + name: + description: The name of the stream + type: string + type: + description: The type of estimate + type: string + enum: + - STREAM + - SYNC + namespace: + description: The namespace of the stream + type: string + row_estimate: + description: The estimated number of rows to be emitted by this sync for this stream + type: integer + byte_estimate: + description: The estimated number of bytes to be emitted by this sync for this stream + type: integer ``` +### AirbyteErrorTraceMessage + +Error Trace Messages are used when a sync is about to fail and the connector can provide meaningful information to the orhcestrator or user about what to do next. + +Of note, an `internal_message` might be an exception code, but an `external_message` is meant to be user-facing, e.g. "Your API Key is invalid". + +Syncs can fail for multiple reasons, and therefore multiple `AirbyteErrorTraceMessage` can be sent from a connector. + +### AirbyteEstimateTraceMessage + +Estimate Trace Messages are used by connectors to inform the orchestrator about how much data they expect to move within the sync. This ise useful to present the user with estimates of the time remaining in the sync, or percentage complete. An example of this would be for every stream about to be synced from a databse to provde a `COUNT (*) from {table_name} where updated_at > {state}` to provide an estimate of the rows to be sent in this sync. + +`AirbyteEstimateTraceMessage` should be emitted early in th sync to provide an early estimate of the sync's duration. Multiple `AirbyteEstimateTraceMessage`s can be sent for the same stream, and an updated estimate will replace the previous value. + ## AirbyteControlMessage An `AirbyteControlMessage` is for connectors to signal to the Airbyte Platform or Orchestrator that an action with a side-effect should be taken. This means that the Orchestrator will likely be altering some stored data about the connector, connection, or sync. From f1c92127bbff6be15afab69b6345df5a60df95e2 Mon Sep 17 00:00:00 2001 From: evantahler Date: Wed, 2 Nov 2022 13:50:47 -0700 Subject: [PATCH 2/9] Add PR number --- docs/understanding-airbyte/airbyte-protocol.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/understanding-airbyte/airbyte-protocol.md b/docs/understanding-airbyte/airbyte-protocol.md index 447aebfe48a1..d06e4069a861 100644 --- a/docs/understanding-airbyte/airbyte-protocol.md +++ b/docs/understanding-airbyte/airbyte-protocol.md @@ -28,7 +28,7 @@ The Airbyte Protocol is versioned independently of the Airbyte Platform, and the | Version | Date of Change | Pull Request(s) | Subject | | :------- | :------------- | :------------------------------------------------------------------------------------------------------------------ | :------------------------------------------------------------------------------- | -| `v0.3.2` | 2022-10-128 | [xxx](https://github.com/airbytehq/airbyte/pull/xxx) | `AirbyteEstimateTraceMessage` added | +| `v0.3.2` | 2022-10-128 | [18875](https://github.com/airbytehq/airbyte/pull/18875) | `AirbyteEstimateTraceMessage` added | | `v0.3.1` | 2022-10-12 | [17907](https://github.com/airbytehq/airbyte/pull/17907) | `AirbyteControlMessage.ConnectorConfig` added | | `v0.3.0` | 2022-09-09 | [16479](https://github.com/airbytehq/airbyte/pull/16479) | `AirbyteLogMessage.stack_trace` added | | `v0.2.0` | 2022-06-10 | [13573](https://github.com/airbytehq/airbyte/pull/13573) & [12586](https://github.com/airbytehq/airbyte/pull/12586) | `STREAM` and `GLOBAL` STATE messages | From b7db48619bc991b3d1de6182a9f5151df570a359 Mon Sep 17 00:00:00 2001 From: evantahler Date: Wed, 2 Nov 2022 13:54:34 -0700 Subject: [PATCH 3/9] fix method name --- .../java/io/airbyte/workers/internal/AirbyteMessageTracker.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index 8a99dd9ad347..fa29b6c5e30b 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -266,7 +266,7 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage @SuppressWarnings("PMD") // until method is implemented - private void handleEmittedEstimateTraceTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) { + private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) { } From 51a3536617920c383792e812ea5c39158addfd1b Mon Sep 17 00:00:00 2001 From: evantahler Date: Wed, 2 Nov 2022 14:11:06 -0700 Subject: [PATCH 4/9] Lint --- .../java/io/airbyte/workers/internal/AirbyteMessageTracker.java | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java index fa29b6c5e30b..44539d0e23f9 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/internal/AirbyteMessageTracker.java @@ -264,7 +264,6 @@ private void handleEmittedErrorTrace(final AirbyteTraceMessage errorTraceMessage } } - @SuppressWarnings("PMD") // until method is implemented private void handleEmittedEstimateTrace(final AirbyteTraceMessage estimateTraceMessage, final ConnectorType connectorType) { From 607fa7068f4c046b227981025204cfa01d338dda Mon Sep 17 00:00:00 2001 From: evantahler Date: Wed, 2 Nov 2022 14:11:17 -0700 Subject: [PATCH 5/9] Lint --- .../src/main/resources/airbyte_protocol/airbyte_protocol.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index 26819940e4e9..cace30d2437b 100644 --- a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -588,4 +588,4 @@ definitions: } } type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode \ No newline at end of file + existingJavaType: com.fasterxml.jackson.databind.JsonNode From 7793f867d998064b17ac87893c990896086ad28e Mon Sep 17 00:00:00 2001 From: evantahler Date: Wed, 2 Nov 2022 14:14:47 -0700 Subject: [PATCH 6/9] fix merge --- airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py | 2 +- .../src/main/resources/airbyte_protocol/airbyte_protocol.yaml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index 8283b6787e3c..e81c14e866d7 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -209,7 +209,7 @@ class Config: ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations that\nalso need to be merged back into the connector configuration at runtime.\nThis is a subset configuration of `complete_oauth_server_input_specification` that filters fields out to retain only the ones that\nare necessary for the connector to function with OAuth. (some fields could be used during oauth flows but not needed afterwards, therefore\nthey would be listed in the `complete_oauth_server_input_specification` but not `complete_oauth_server_output_specification`)\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Worksdefaultrepace Admins to be used by the\nconnector when using OAuth flow APIs.\nThese fields are to be merged back to `ConnectorSpecification.connectionSpecification`.\nFor each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it,\n\nExamples:\n\n complete_oauth_server_output_specification={\n client_id: {\n type: string,\n path_in_connector_config: ['credentials', 'client_id']\n },\n client_secret: {\n type: string,\n path_in_connector_config: ['credentials', 'client_secret']\n }\n }", + description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations that\nalso need to be merged back into the connector configuration at runtime.\nThis is a subset configuration of `complete_oauth_server_input_specification` that filters fields out to retain only the ones that\nare necessary for the connector to function with OAuth. (some fields could be used during oauth flows but not needed afterwards, therefore\nthey would be listed in the `complete_oauth_server_input_specification` but not `complete_oauth_server_output_specification`)\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nconnector when using OAuth flow APIs.\nThese fields are to be merged back to `ConnectorSpecification.connectionSpecification`.\nFor each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it,\n\nExamples:\n\n complete_oauth_server_output_specification={\n client_id: {\n type: string,\n path_in_connector_config: ['credentials', 'client_id']\n },\n client_secret: {\n type: string,\n path_in_connector_config: ['credentials', 'client_secret']\n }\n }", ) diff --git a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index cace30d2437b..8c60e9b6e0a6 100644 --- a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -570,7 +570,7 @@ definitions: This is a subset configuration of `complete_oauth_server_input_specification` that filters fields out to retain only the ones that are necessary for the connector to function with OAuth. (some fields could be used during oauth flows but not needed afterwards, therefore they would be listed in the `complete_oauth_server_input_specification` but not `complete_oauth_server_output_specification`) - Must be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Worksdefaultrepace Admins to be used by the + Must be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the connector when using OAuth flow APIs. These fields are to be merged back to `ConnectorSpecification.connectionSpecification`. For each field, a special annotation `path_in_connector_config` can be specified to determine where to merge it, From af825e2a94ccf979f40a9c00ca7519703675ba55 Mon Sep 17 00:00:00 2001 From: Evan Tahler Date: Thu, 3 Nov 2022 08:31:56 -0700 Subject: [PATCH 7/9] Update docs/understanding-airbyte/airbyte-protocol.md Co-authored-by: Davin Chia --- docs/understanding-airbyte/airbyte-protocol.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/understanding-airbyte/airbyte-protocol.md b/docs/understanding-airbyte/airbyte-protocol.md index d06e4069a861..8c555ac0fd40 100644 --- a/docs/understanding-airbyte/airbyte-protocol.md +++ b/docs/understanding-airbyte/airbyte-protocol.md @@ -846,7 +846,7 @@ Syncs can fail for multiple reasons, and therefore multiple `AirbyteErrorTraceMe Estimate Trace Messages are used by connectors to inform the orchestrator about how much data they expect to move within the sync. This ise useful to present the user with estimates of the time remaining in the sync, or percentage complete. An example of this would be for every stream about to be synced from a databse to provde a `COUNT (*) from {table_name} where updated_at > {state}` to provide an estimate of the rows to be sent in this sync. -`AirbyteEstimateTraceMessage` should be emitted early in th sync to provide an early estimate of the sync's duration. Multiple `AirbyteEstimateTraceMessage`s can be sent for the same stream, and an updated estimate will replace the previous value. +`AirbyteEstimateTraceMessage` should be emitted early in the sync to provide an early estimate of the sync's duration. Multiple `AirbyteEstimateTraceMessage`s can be sent for the same stream, and an updated estimate will replace the previous value. ## AirbyteControlMessage From b6d20696705be2434e1bcb3079c5eef76fb7a304 Mon Sep 17 00:00:00 2001 From: evantahler Date: Mon, 7 Nov 2022 07:44:31 -0500 Subject: [PATCH 8/9] `EstimateType` sub type in python --- airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py | 4 ++-- .../src/main/resources/airbyte_protocol/airbyte_protocol.yaml | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py index e81c14e866d7..190442f88469 100644 --- a/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py +++ b/airbyte-cdk/python/airbyte_cdk/models/airbyte_protocol.py @@ -99,7 +99,7 @@ class Config: failure_type: Optional[FailureType] = Field(None, description="The type of error") -class Type1(Enum): +class EstimateType(Enum): STREAM = "STREAM" SYNC = "SYNC" @@ -109,7 +109,7 @@ class Config: extra = Extra.allow name: str = Field(..., description="The name of the stream") - type: Type1 = Field(..., description="The type of estimate") + type: EstimateType = Field(..., description="The type of estimate", title="estimate type") namespace: Optional[str] = Field(None, description="The namespace of the stream") row_estimate: Optional[int] = Field( None, diff --git a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index 8c60e9b6e0a6..5c6baf92524f 100644 --- a/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/protocol-models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -216,6 +216,7 @@ definitions: description: The name of the stream type: string type: + title: "estimate type" # this title is required to avoid python codegen conflicts with the "type" parameter in AirbyteMessage. See https://github.com/airbytehq/airbyte/pull/12581 description: The type of estimate type: string enum: From 66dd108eab7d73042bb5d530410f86840888fc82 Mon Sep 17 00:00:00 2001 From: evantahler Date: Mon, 7 Nov 2022 11:53:00 -0800 Subject: [PATCH 9/9] lint --- .../declarative/stream_slicers/datetime_stream_slicer.py | 6 ++++-- .../python/unit_tests/sources/declarative/test_factory.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py index 9fdffcc703f4..181ddc096d99 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/stream_slicers/datetime_stream_slicer.py @@ -5,7 +5,6 @@ import datetime import re from dataclasses import InitVar, dataclass, field -from dateutil.relativedelta import relativedelta from typing import Any, Iterable, Mapping, Optional, Union from airbyte_cdk.models import SyncMode @@ -17,6 +16,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer import StreamSlicer from airbyte_cdk.sources.declarative.types import Config, Record, StreamSlice, StreamState from dataclasses_jsonschema import JsonSchemaMixin +from dateutil.relativedelta import relativedelta @dataclass @@ -71,7 +71,9 @@ class DatetimeStreamSlicer(StreamSlicer, JsonSchemaMixin): stream_state_field_end: Optional[str] = None lookback_window: Optional[Union[InterpolatedString, str]] = None - timedelta_regex = re.compile(r"((?P[\.\d]+?)y)?" r"((?P[\.\d]+?)m)?" r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$") + timedelta_regex = re.compile( + r"((?P[\.\d]+?)y)?" r"((?P[\.\d]+?)m)?" r"((?P[\.\d]+?)w)?" r"((?P[\.\d]+?)d)?$" + ) def __post_init__(self, options: Mapping[str, Any]): if not isinstance(self.start_datetime, MinMaxDatetime): diff --git a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py index a187a4eb5d35..b0a9aa70010c 100644 --- a/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py +++ b/airbyte-cdk/python/unit_tests/sources/declarative/test_factory.py @@ -3,7 +3,6 @@ # import datetime -from dateutil.relativedelta import relativedelta from typing import List, Optional, Union import pytest @@ -41,6 +40,7 @@ from airbyte_cdk.sources.declarative.stream_slicers.list_stream_slicer import ListStreamSlicer from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields from airbyte_cdk.sources.declarative.transformations.add_fields import AddedFieldDefinition +from dateutil.relativedelta import relativedelta from jsonschema import ValidationError factory = DeclarativeComponentFactory()