From 865773a459fced9372b699721044d67e3eedc660 Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 5 Feb 2024 17:15:19 +0100 Subject: [PATCH 01/13] Source Zoom: Disable pypi (#34848) --- airbyte-integrations/connectors/source-zoom/metadata.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/airbyte-integrations/connectors/source-zoom/metadata.yaml b/airbyte-integrations/connectors/source-zoom/metadata.yaml index f3ecd8ab693a..7f8e9be8b7c3 100644 --- a/airbyte-integrations/connectors/source-zoom/metadata.yaml +++ b/airbyte-integrations/connectors/source-zoom/metadata.yaml @@ -10,7 +10,8 @@ data: name: Zoom remoteRegistries: pypi: - enabled: true + # TODO: Enable once build problems are fixed + enabled: false packageName: airbyte-source-zoom registries: cloud: From 529ff1ac34e8f7a6be9d10f259542d2f9de1db2a Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Mon, 5 Feb 2024 17:30:17 +0100 Subject: [PATCH 02/13] Source Gong: Adjust schemas (#34847) --- airbyte-integrations/connectors/source-gong/Dockerfile | 2 +- airbyte-integrations/connectors/source-gong/metadata.yaml | 2 +- .../connectors/source-gong/source_gong/schemas/calls.json | 3 +++ .../connectors/source-gong/source_gong/schemas/users.json | 3 +++ docs/integrations/sources/gong.md | 1 + 5 files changed, 9 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-gong/Dockerfile b/airbyte-integrations/connectors/source-gong/Dockerfile index 8c90d307427e..40e34b1cfb0d 100644 --- a/airbyte-integrations/connectors/source-gong/Dockerfile +++ b/airbyte-integrations/connectors/source-gong/Dockerfile @@ -34,5 +34,5 @@ COPY source_gong ./source_gong ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.0 +LABEL io.airbyte.version=0.1.1 LABEL io.airbyte.name=airbyte/source-gong diff --git a/airbyte-integrations/connectors/source-gong/metadata.yaml b/airbyte-integrations/connectors/source-gong/metadata.yaml index 42fb73e25f10..dd82f25883b3 100644 --- a/airbyte-integrations/connectors/source-gong/metadata.yaml +++ b/airbyte-integrations/connectors/source-gong/metadata.yaml @@ -2,7 +2,7 @@ data: connectorSubtype: api connectorType: source definitionId: 32382e40-3b49-4b99-9c5c-4076501914e7 - dockerImageTag: 0.1.0 + dockerImageTag: 0.1.1 dockerRepository: airbyte/source-gong githubIssueLabel: source-gong icon: gong.svg diff --git a/airbyte-integrations/connectors/source-gong/source_gong/schemas/calls.json b/airbyte-integrations/connectors/source-gong/source_gong/schemas/calls.json index d2488ac806f5..a8b5b7e3e7cb 100644 --- a/airbyte-integrations/connectors/source-gong/source_gong/schemas/calls.json +++ b/airbyte-integrations/connectors/source-gong/source_gong/schemas/calls.json @@ -60,6 +60,9 @@ }, "isPrivate": { "type": ["null", "boolean"] + }, + "calendarEventId": { + "type": ["null", "string"] } } } diff --git a/airbyte-integrations/connectors/source-gong/source_gong/schemas/users.json b/airbyte-integrations/connectors/source-gong/source_gong/schemas/users.json index f23814c77c14..726bec44117c 100644 --- a/airbyte-integrations/connectors/source-gong/source_gong/schemas/users.json +++ b/airbyte-integrations/connectors/source-gong/source_gong/schemas/users.json @@ -8,6 +8,9 @@ "emailAddress": { "type": ["null", "string"] }, + "trustedEmailAddress": { + "type": ["null", "string"] + }, "created": { "type": ["null", "string"], "format": "date-time" diff --git a/docs/integrations/sources/gong.md b/docs/integrations/sources/gong.md index c01ca9482657..04aeb7da4ff2 100644 --- a/docs/integrations/sources/gong.md +++ b/docs/integrations/sources/gong.md @@ -36,4 +36,5 @@ By default Gong limits your company's access to the service to 3 API calls per s | Version | Date | Pull Request | Subject | | :------ | :--------- | :------------------------------------------------------- | :------------------------ | +| 0.1.1 | 2024-02-05 | [34847](https://github.com/airbytehq/airbyte/pull/34847) | Adjust stream schemas and make ready for airbyte-lib | | 0.1.0 | 2022-10-27 | [18819](https://github.com/airbytehq/airbyte/pull/18819) | Add Gong Source Connector | From 247bc17c4a9d2122c85d0f2a389b90325452a6d8 Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Mon, 5 Feb 2024 13:52:56 -0300 Subject: [PATCH 03/13] Kubernetes docs: external logs with S3 (#34621) --- .../on-kubernetes-via-helm.md | 133 ++++++++++++++++-- 1 file changed, 122 insertions(+), 11 deletions(-) diff --git a/docs/deploying-airbyte/on-kubernetes-via-helm.md b/docs/deploying-airbyte/on-kubernetes-via-helm.md index 162a2de4b9cf..02b1e7602174 100644 --- a/docs/deploying-airbyte/on-kubernetes-via-helm.md +++ b/docs/deploying-airbyte/on-kubernetes-via-helm.md @@ -167,28 +167,139 @@ Before upgrading the chart update values.yaml as stated above and then run: - Perform upgrade of chart by running `helm upgrade %release_name% airbyte/airbyte --set auth.rootPassword=$ROOT_PASSWORD` - If you get an error about setting the auth.rootPassword, then you forgot to update the `values.yaml` file -### Custom logging and jobs configuration +### External Logs -Starting from `0.39.37-alpha` if you've configured logging yourself using `logging or jobs` section of `values.yaml` file, you need to update your configuration so you can continue to use your custom logging and jobs configuration. +::info +This was tested using [Airbyte Helm Chart Version 0.50.13](https://artifacthub.io/packages/helm/airbyte/airbyte/0.50.13) and S3 logs only. +Previous or newer version can change how to setup the external logs. +::: -Simply declare global value in `values.yaml` file and move everything related to logging and jobs under that section like in the example bellow: +Create a file called `airbyte-logs-secrets.yaml` to store the AWS Keys and other informations: +```yaml +apiVersion: v1 +kind: Secret +metadata: + name: airbyte-logs-secrets +type: Opaque +stringData: + AWS_KEY: + AWS_SECRET_KEY: + S3_LOG_BUCKET: + S3_LOG_BUCKET_REGION: +``` +Run `kubectl apply -f airbyte-logs-secrets.yaml -n ` to create the secret in the namespace you're using Airbyte. +This file contains more than just the keys but it needs for now. Future updates will make the configuration easier. -```text +Change the global section to use `S3` external logs. +```yaml global: - logging: - %your_logging_options_here% - jobs: - %your_jobs_options_here% + # <...> + state: + # -- Determines which state storage will be utilized; "MINIO", "S3", or "GCS" + storage: + type: "S3" + # <...> + logs: + accessKey: + password: "" + existingSecret: "airbyte-logs-secrets" + existingSecretKey: "AWS_KEY" + secretKey: + password: "" + existingSecret: "airbyte-logs-secrets" + existingSecretKey: "AWS_SECRET_KEY" + # <...> + storage: + type: "S3" + + minio: + # Change from true to false + enabled: false + nodeSelector: {} + tolerations: [] + affinity: {} ``` +You can try to use `GCS` or `External Minio` but both weren't tested yet. Feel free to run tests and update the documentation. -After updating `values.yaml` simply upgrade your chart by running command: +Add extra env variables to the following blocks: +```yaml +worker: + extraEnv: + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_KEY + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_SECRET_KEY + - name: STATE_STORAGE_S3_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_KEY + - name: STATE_STORAGE_S3_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_SECRET_KEY + - name: STATE_STORAGE_S3_BUCKET_NAME + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: S3_LOG_BUCKET + - name: STATE_STORAGE_S3_REGION + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: S3_LOG_BUCKET_REGION +``` -```shell -helm upgrade -f path/to/values.yaml %release_name% airbyte/airbyte +and also edit the server block: + +```yaml +server: + extraEnv: + - name: AWS_ACCESS_KEY_ID + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_KEY + - name: AWS_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_SECRET_KEY + - name: STATE_STORAGE_S3_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_KEY + - name: STATE_STORAGE_S3_SECRET_ACCESS_KEY + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: AWS_SECRET_KEY + - name: STATE_STORAGE_S3_BUCKET_NAME + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: S3_LOG_BUCKET + - name: STATE_STORAGE_S3_REGION + valueFrom: + secretKeyRef: + name: airbyte-logs-secrets + key: S3_LOG_BUCKET_REGION ``` +Than run: +`helm upgrade --install %RELEASE_NAME% airbyte/airbyte -n --values /path/to/values.yaml --version 0.50.13` + ### External Airbyte Database + ::info This was tested using [Airbyte Helm Chart Version 0.50.13](https://artifacthub.io/packages/helm/airbyte/airbyte/0.50.13). Previous or newer version can change how the external database can be configured. From c7c51ea4e1df26acc382083920446e5d486ef2a2 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni <113392464+akashkulk@users.noreply.github.com> Date: Mon, 5 Feb 2024 09:15:03 -0800 Subject: [PATCH 04/13] [source-mongodb-v2] : Fail sync if initial snapshot for any stream fails (#34759) --- .../source-mongodb-v2/metadata.yaml | 2 +- .../source/mongodb/MongoDbStateIterator.java | 29 ++++++++++++++----- .../mongodb/MongoDbStateIteratorTest.java | 2 +- docs/integrations/sources/mongodb-v2.md | 1 + 4 files changed, 25 insertions(+), 9 deletions(-) diff --git a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml index 6d4aa4f311c1..14ba912480e3 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml +++ b/airbyte-integrations/connectors/source-mongodb-v2/metadata.yaml @@ -5,7 +5,7 @@ data: connectorSubtype: database connectorType: source definitionId: b2e713cd-cc36-4c0a-b5bd-b47cb8a0561e - dockerImageTag: 1.2.6 + dockerImageTag: 1.2.7 dockerRepository: airbyte/source-mongodb-v2 documentationUrl: https://docs.airbyte.com/integrations/sources/mongodb-v2 githubIssueLabel: source-mongodb-v2 diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbStateIterator.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbStateIterator.java index c11a84806038..f659c2c06af6 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbStateIterator.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io/airbyte/integrations/source/mongodb/MongoDbStateIterator.java @@ -69,10 +69,15 @@ public class MongoDbStateIterator implements Iterator { private boolean finalStateNext = false; /** - * Tracks if the underlying iterator threw an exception. This helps to determine the final state - * status emitted from the final next call. + * Tracks if the underlying iterator threw an exception, indicating that the snapshot for this + * stream failed. This helps to determine the final state status emitted from the final next call. */ - private boolean iterThrewException = false; + private boolean initialSnapshotFailed = false; + + /** + * Tracks the exception thrown if there initial snapshot has failed. + */ + private Exception initialSnapshotException; /** * Constructor. @@ -111,14 +116,24 @@ public MongoDbStateIterator(final MongoCursor iter, @Override public boolean hasNext() { LOGGER.debug("Checking hasNext() for stream {}...", getStream()); + if (initialSnapshotFailed) { + // If the initial snapshot is incomplete for this stream, throw an exception failing the sync. This + // will ensure the platform retry logic + // kicks in and keeps retrying the sync until the initial snapshot is complete. + throw new RuntimeException(initialSnapshotException); + } try { if (iter.hasNext()) { return true; } } catch (final MongoException e) { - // If hasNext throws an exception, log it and then treat it as if hasNext returned false. - iterThrewException = true; + // If hasNext throws an exception, log it and set the flag to indicate that the initial snapshot + // failed. This indicates to the main iterator + // to emit state associated with what has been processed so far. + initialSnapshotFailed = true; + initialSnapshotException = e; LOGGER.info("hasNext threw an exception for stream {}: {}", getStream(), e.getMessage(), e); + return true; } // no more records in cursor + no record messages have been emitted => collection is empty @@ -145,9 +160,9 @@ public AirbyteMessage next() { // Should a state message be emitted based on then last time a state message was emitted? final var emitStateDueToDuration = count > 0 && Duration.between(lastCheckpoint, Instant.now()).compareTo(checkpointDuration) > 0; - if (finalStateNext) { + if (finalStateNext || initialSnapshotFailed) { LOGGER.debug("Emitting final state status for stream {}:{}...", stream.getStream().getNamespace(), stream.getStream().getName()); - final var finalStateStatus = iterThrewException ? InitialSnapshotStatus.IN_PROGRESS : InitialSnapshotStatus.COMPLETE; + final var finalStateStatus = initialSnapshotFailed ? InitialSnapshotStatus.IN_PROGRESS : InitialSnapshotStatus.COMPLETE; final var idType = IdType.findByJavaType(lastId.getClass().getSimpleName()) .orElseThrow(() -> new ConfigErrorException("Unsupported _id type " + lastId.getClass().getSimpleName())); final var state = new MongoDbStreamState(lastId.toString(), finalStateStatus, idType); diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateIteratorTest.java b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateIteratorTest.java index 7b2a35fabdfa..66bf277dddbc 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateIteratorTest.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/test/java/io/airbyte/integrations/source/mongodb/MongoDbStateIteratorTest.java @@ -187,7 +187,7 @@ void treatHasNextExceptionAsFalse() { message.getState().getGlobal().getStreamStates().get(0).getStreamState().get("status").asText(), "state status should be in_progress"); - assertFalse(iter.hasNext(), "should have no more records"); + assertThrows(RuntimeException.class, iter::hasNext, "next iteration should throw exception to fail the sync"); } @Test diff --git a/docs/integrations/sources/mongodb-v2.md b/docs/integrations/sources/mongodb-v2.md index a406039a1e2c..0edc105adaea 100644 --- a/docs/integrations/sources/mongodb-v2.md +++ b/docs/integrations/sources/mongodb-v2.md @@ -214,6 +214,7 @@ For more information regarding configuration parameters, please see [MongoDb Doc | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------------------------------------------------------------| +| 1.2.7 | 2024-02-01 | [34759](https://github.com/airbytehq/airbyte/pull/34759) | Fail sync if initial snapshot for any stream fails. | | 1.2.6 | 2024-01-31 | [34594](https://github.com/airbytehq/airbyte/pull/34594) | Scope initial resume token to streams of interest. | | 1.2.5 | 2024-01-29 | [34641](https://github.com/airbytehq/airbyte/pull/34641) | Allow resuming an initial snapshot when Id type is not of default ObjectId . | | 1.2.4 | 2024-01-26 | [34573](https://github.com/airbytehq/airbyte/pull/34573) | Adopt CDK v0.16.0. | From ee39d407d73386812598172ab060f58e1c7cb27a Mon Sep 17 00:00:00 2001 From: Marcos Marx Date: Mon, 5 Feb 2024 14:56:38 -0300 Subject: [PATCH 05/13] Docs: update pg13 requirement for external db (#34858) --- docs/enterprise-setup/implementation-guide.md | 4 ++++ docs/operator-guides/configuring-airbyte-db.md | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/docs/enterprise-setup/implementation-guide.md b/docs/enterprise-setup/implementation-guide.md index d23bf527896a..09bcbe673eec 100644 --- a/docs/enterprise-setup/implementation-guide.md +++ b/docs/enterprise-setup/implementation-guide.md @@ -105,6 +105,10 @@ To configure basic auth (deploy without SSO), remove the entire `auth:` section For Self-Managed Enterprise deployments, we recommend using a dedicated database instance for better reliability, and backups (such as AWS RDS or GCP Cloud SQL) instead of the default internal Postgres database (`airbyte/db`) that Airbyte spins up within the Kubernetes cluster. +:::info +Currently, Airbyte requires connection to a Postgres 13 instance. +::: + We assume in the following that you've already configured a Postgres instance:
diff --git a/docs/operator-guides/configuring-airbyte-db.md b/docs/operator-guides/configuring-airbyte-db.md index 899e86b01fed..9adc4c881b56 100644 --- a/docs/operator-guides/configuring-airbyte-db.md +++ b/docs/operator-guides/configuring-airbyte-db.md @@ -25,6 +25,10 @@ If you need to interact with it, for example, to make back-ups or perform some c ## Connecting to an External Postgres database +:::info +Currently, Airbyte requires connection to a Postgres 13 instance. +::: + Let's walk through what is required to use a Postgres instance that is not managed by Airbyte. First, for the sake of the tutorial, we will run a new instance of Postgres in its own docker container with the command below. If you already have Postgres running elsewhere, you can skip this step and use the credentials for that in future steps. ```bash From 06d44f813825bc05812780c409504447b2ca3c1d Mon Sep 17 00:00:00 2001 From: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com> Date: Mon, 5 Feb 2024 13:05:41 -0500 Subject: [PATCH 06/13] Improve error messages for concurrent CDK (#34754) --- .../sources/concurrent_source/thread_pool_manager.py | 6 +++++- .../airbyte_cdk/sources/streams/concurrent/cursor.py | 12 +++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py b/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py index 04508b3ff1f1..560989af0a6c 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/concurrent_source/thread_pool_manager.py @@ -64,7 +64,11 @@ def _prune_futures(self, futures: List[Future[Any]]) -> None: if optional_exception: # Exception handling should be done in the main thread. Hence, we only store the exception and expect the main # thread to call raise_if_exception - self._most_recently_seen_exception = RuntimeError(f"Failed reading with error: {optional_exception}") + # We do not expect this error to happen. The futures created during concurrent syncs should catch the exception and + # push it to the queue. If this exception occurs, please review the futures and how they handle exceptions. + self._most_recently_seen_exception = RuntimeError( + f"Failed processing a future: {optional_exception}. Please contact the Airbyte team." + ) futures.pop(index) def shutdown(self) -> None: diff --git a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py index e63358b715d5..82d11318f5ea 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/streams/concurrent/cursor.py @@ -153,9 +153,19 @@ def _add_slice_to_state(self, partition: Partition) -> None: ) elif self._most_recent_record: if self._has_closed_at_least_one_slice: + # If we track state value using records cursor field, we can only do that if there is one partition. This is because we save + # the state every time we close a partition. We assume that if there are multiple slices, they need to be providing + # boundaries. There are cases where partitions could not have boundaries: + # * The cursor should be per-partition + # * The stream state is actually the parent stream state + # There might be other cases not listed above. Those are not supported today hence the stream should not use this cursor for + # state management. For the specific user that was affected with this issue, we need to: + # * Fix state tracking (which is currently broken) + # * Make the new version available + # * (Probably) ask the user to reset the stream to avoid data loss raise ValueError( "Given that slice_boundary_fields is not defined and that per-partition state is not supported, only one slice is " - "expected." + "expected. Please contact the Airbyte team." ) self.state["slices"].append( From f6afd80f02838a84000d63034a8c879ce5472241 Mon Sep 17 00:00:00 2001 From: Ryan Waskewich <156025126+rwask@users.noreply.github.com> Date: Mon, 5 Feb 2024 13:09:49 -0500 Subject: [PATCH 07/13] Update http-streams.md typo (#34861) --- docs/connector-development/cdk-python/http-streams.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connector-development/cdk-python/http-streams.md b/docs/connector-development/cdk-python/http-streams.md index 010fdeddc418..ac4af4efe632 100644 --- a/docs/connector-development/cdk-python/http-streams.md +++ b/docs/connector-development/cdk-python/http-streams.md @@ -35,7 +35,7 @@ Using either authenticator is as simple as passing the created authenticator int ## Pagination -Most APIs, when facing a large call, tend to return the results in pages. The CDK accommodates paging via the `next_page_token` function. This function is meant to extract the next page "token" from the latest response. The contents of a "token" are completely up to the developer: it can be an ID, a page number, a partial URL etc.. The CDK will continue making requests as long as the `next_page_token` function. The CDK will continue making requests as long as the `next_page_token` continues returning non-`None` results. This can then be used in the `request_params` and other methods in `HttpStream` to page through API responses. Here is an [example](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L34) from the Stripe API. +Most APIs, when facing a large call, tend to return the results in pages. The CDK accommodates paging via the `next_page_token` function. This function is meant to extract the next page "token" from the latest response. The contents of a "token" are completely up to the developer: it can be an ID, a page number, a partial URL etc.. The CDK will continue making requests as long as the `next_page_token` continues returning non-`None` results. This can then be used in the `request_params` and other methods in `HttpStream` to page through API responses. Here is an [example](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connectors/source-stripe/source_stripe/streams.py#L34) from the Stripe API. ## Rate Limiting From c7a7b93dfa0a0660ac128d6675c758e82667d772 Mon Sep 17 00:00:00 2001 From: maxi297 Date: Mon, 5 Feb 2024 18:12:39 +0000 Subject: [PATCH 08/13] =?UTF-8?q?=F0=9F=A4=96=20Bump=20patch=20version=20o?= =?UTF-8?q?f=20Python=20CDK?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- airbyte-cdk/python/.bumpversion.cfg | 2 +- airbyte-cdk/python/CHANGELOG.md | 3 +++ airbyte-cdk/python/Dockerfile | 2 +- airbyte-cdk/python/setup.py | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/airbyte-cdk/python/.bumpversion.cfg b/airbyte-cdk/python/.bumpversion.cfg index 984be1fe90a0..104ad2b37183 100644 --- a/airbyte-cdk/python/.bumpversion.cfg +++ b/airbyte-cdk/python/.bumpversion.cfg @@ -1,5 +1,5 @@ [bumpversion] -current_version = 0.60.1 +current_version = 0.60.2 commit = False [bumpversion:file:setup.py] diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index 822a7c75e2ea..60d04dda14dc 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.60.2 +Improve error messages for concurrent CDK + ## 0.60.1 Emit state when no partitions are generated for ccdk and update StateBuilder diff --git a/airbyte-cdk/python/Dockerfile b/airbyte-cdk/python/Dockerfile index 759d20f4c61a..16b8eb589d6d 100644 --- a/airbyte-cdk/python/Dockerfile +++ b/airbyte-cdk/python/Dockerfile @@ -32,5 +32,5 @@ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] # needs to be the same as CDK -LABEL io.airbyte.version=0.60.1 +LABEL io.airbyte.version=0.60.2 LABEL io.airbyte.name=airbyte/source-declarative-manifest diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index 9082b99b5a9b..9e6e124276d2 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -36,7 +36,7 @@ name="airbyte-cdk", # The version of the airbyte-cdk package is used at runtime to validate manifests. That validation must be # updated if our semver format changes such as using release candidate versions. - version="0.60.1", + version="0.60.2", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", From 5727127c7215e3da3a89863dcdcaf778787c7994 Mon Sep 17 00:00:00 2001 From: "Pedro S. Lopez" Date: Mon, 5 Feb 2024 17:05:26 -0400 Subject: [PATCH 09/13] rollback source-github to 1.5.7 (#34870) --- airbyte-integrations/connectors/source-github/metadata.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/airbyte-integrations/connectors/source-github/metadata.yaml b/airbyte-integrations/connectors/source-github/metadata.yaml index 0976ec774043..a6ef24d551f4 100644 --- a/airbyte-integrations/connectors/source-github/metadata.yaml +++ b/airbyte-integrations/connectors/source-github/metadata.yaml @@ -24,6 +24,7 @@ data: packageName: airbyte-source-github registries: cloud: + dockerImageTag: 1.5.7 enabled: true oss: enabled: true From 22b63c7d70a64b296adff9993c36359fea682959 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 6 Feb 2024 00:25:10 -0800 Subject: [PATCH 10/13] AirbyteLib: Support write strategies: 'merge' and 'auto' (#34592) Co-authored-by: Joe Reuter --- airbyte-lib/airbyte_lib/_executor.py | 1 + airbyte-lib/airbyte_lib/_processors.py | 35 ++- airbyte-lib/airbyte_lib/caches/base.py | 200 +++++++++++++--- airbyte-lib/airbyte_lib/caches/duckdb.py | 100 ++++---- airbyte-lib/airbyte_lib/caches/postgres.py | 2 +- airbyte-lib/airbyte_lib/caches/snowflake.py | 1 + airbyte-lib/airbyte_lib/source.py | 90 +++++++- airbyte-lib/airbyte_lib/strategies.py | 35 +++ airbyte-lib/docs/generated/airbyte_lib.html | 19 +- .../docs/generated/airbyte_lib/caches.html | 10 +- airbyte-lib/poetry.lock | 105 ++++++--- airbyte-lib/pyproject.toml | 1 + airbyte-lib/tests/conftest.py | 63 +++++ .../test_source_faker_integration.py | 216 ++++++++++++++++++ ...gration.py => test_source_test_fixture.py} | 53 +++-- .../integration_tests/test_validation.py | 10 + 16 files changed, 766 insertions(+), 175 deletions(-) create mode 100644 airbyte-lib/airbyte_lib/strategies.py create mode 100644 airbyte-lib/tests/integration_tests/test_source_faker_integration.py rename airbyte-lib/tests/integration_tests/{test_integration.py => test_source_test_fixture.py} (96%) diff --git a/airbyte-lib/airbyte_lib/_executor.py b/airbyte-lib/airbyte_lib/_executor.py index d0a2a9d45cf6..86b361ae4c3c 100644 --- a/airbyte-lib/airbyte_lib/_executor.py +++ b/airbyte-lib/airbyte_lib/_executor.py @@ -278,6 +278,7 @@ def _get_installed_version( f"from importlib.metadata import version; print(version('{package_name}'))", ], universal_newlines=True, + stderr=subprocess.PIPE, # Don't print to stderr ).strip() except Exception: if raise_on_error: diff --git a/airbyte-lib/airbyte_lib/_processors.py b/airbyte-lib/airbyte_lib/_processors.py index a483b1345949..fb31a18f4abc 100644 --- a/airbyte-lib/airbyte_lib/_processors.py +++ b/airbyte-lib/airbyte_lib/_processors.py @@ -32,8 +32,9 @@ ) from airbyte_lib import exceptions as exc -from airbyte_lib._util import protocol_util # Internal utility functions +from airbyte_lib._util import protocol_util from airbyte_lib.progress import progress +from airbyte_lib.strategies import WriteStrategy if TYPE_CHECKING: @@ -114,6 +115,8 @@ def _streams_with_data(self) -> set[str]: @final def process_stdin( self, + write_strategy: WriteStrategy = WriteStrategy.AUTO, + *, max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Process the input stream from stdin. @@ -121,7 +124,9 @@ def process_stdin( Return a list of summaries for testing. """ input_stream = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8") - self.process_input_stream(input_stream, max_batch_size) + self.process_input_stream( + input_stream, write_strategy=write_strategy, max_batch_size=max_batch_size + ) @final def _airbyte_messages_from_buffer( @@ -135,6 +140,8 @@ def _airbyte_messages_from_buffer( def process_input_stream( self, input_stream: io.TextIOBase, + write_strategy: WriteStrategy = WriteStrategy.AUTO, + *, max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: """Parse the input stream and process data in batches. @@ -142,14 +149,27 @@ def process_input_stream( Return a list of summaries for testing. """ messages = self._airbyte_messages_from_buffer(input_stream) - self.process_airbyte_messages(messages, max_batch_size) + self.process_airbyte_messages( + messages, + write_strategy=write_strategy, + max_batch_size=max_batch_size, + ) @final def process_airbyte_messages( self, messages: Iterable[AirbyteMessage], + write_strategy: WriteStrategy, + *, max_batch_size: int = DEFAULT_BATCH_SIZE, ) -> None: + """Process a stream of Airbyte messages.""" + if not isinstance(write_strategy, WriteStrategy): + raise exc.AirbyteInternalError( + message="Invalid `write_strategy` argument. Expected instance of WriteStrategy.", + context={"write_strategy": write_strategy}, + ) + stream_batches: dict[str, list[dict]] = defaultdict(list, {}) # Process messages, writing to batches as we go @@ -189,7 +209,7 @@ def process_airbyte_messages( # Finalize any pending batches for stream_name in list(self._pending_batches.keys()): - self._finalize_batches(stream_name) + self._finalize_batches(stream_name, write_strategy=write_strategy) progress.log_stream_finalized(stream_name) @final @@ -260,13 +280,18 @@ def _get_batch_handle( batch_id = batch_id or self._new_batch_id() return f"{stream_name}_{batch_id}" - def _finalize_batches(self, stream_name: str) -> dict[str, BatchHandle]: + def _finalize_batches( + self, + stream_name: str, + write_strategy: WriteStrategy, + ) -> dict[str, BatchHandle]: """Finalize all uncommitted batches. Returns a mapping of batch IDs to batch handles, for processed batches. This is a generic implementation, which can be overridden. """ + _ = write_strategy # Unused with self._finalizing_batches(stream_name) as batches_to_finalize: if batches_to_finalize and not self.skip_finalize_step: raise NotImplementedError( diff --git a/airbyte-lib/airbyte_lib/caches/base.py b/airbyte-lib/airbyte_lib/caches/base.py index d58af67e55ab..d5850df904c6 100644 --- a/airbyte-lib/airbyte_lib/caches/base.py +++ b/airbyte-lib/airbyte_lib/caches/base.py @@ -14,7 +14,15 @@ import sqlalchemy import ulid from overrides import overrides -from sqlalchemy import create_engine, text +from sqlalchemy import ( + and_, + create_engine, + insert, + null, + select, + text, + update, +) from sqlalchemy.pool import StaticPool from sqlalchemy.sql.elements import TextClause @@ -24,6 +32,7 @@ from airbyte_lib.caches._catalog_manager import CatalogManager from airbyte_lib.config import CacheConfigBase from airbyte_lib.datasets._sql import CachedDataset +from airbyte_lib.strategies import WriteStrategy from airbyte_lib.types import SQLTypeConverter @@ -59,7 +68,6 @@ class SQLRuntimeError(Exception): class SQLCacheConfigBase(CacheConfigBase): """Same as a regular config except it exposes the 'get_sql_alchemy_url()' method.""" - dedupe_mode = RecordDedupeMode.REPLACE schema_name: str = "airbyte_raw" table_prefix: str | None = None @@ -126,6 +134,7 @@ def __init__( config, catalog_manager=self._catalog_manager ) self.type_converter = self.type_converter_class() + self._cached_table_definitions: dict[str, sqlalchemy.Table] = {} def __getitem__(self, stream: str) -> DatasetBase: return self.streams[stream] @@ -221,13 +230,28 @@ def get_sql_table( self, stream_name: str, ) -> sqlalchemy.Table: - """Return a temporary table name.""" - table_name = self.get_sql_table_name(stream_name) - return sqlalchemy.Table( - table_name, - sqlalchemy.MetaData(schema=self.config.schema_name), - autoload_with=self.get_sql_engine(), - ) + """Return the main table object for the stream.""" + return self._get_table_by_name(self.get_sql_table_name(stream_name)) + + def _get_table_by_name( + self, + table_name: str, + *, + force_refresh: bool = False, + ) -> sqlalchemy.Table: + """Return a table object from a table name. + + To prevent unnecessary round-trips to the database, the table is cached after the first + query. To ignore the cache and force a refresh, set 'force_refresh' to True. + """ + if force_refresh or table_name not in self._cached_table_definitions: + self._cached_table_definitions[table_name] = sqlalchemy.Table( + table_name, + sqlalchemy.MetaData(schema=self.config.schema_name), + autoload_with=self.get_sql_engine(), + ) + + return self._cached_table_definitions[table_name] @final @property @@ -400,10 +424,15 @@ def _create_table( self, table_name: str, column_definition_str: str, + primary_keys: list[str] | None = None, ) -> None: if DEBUG_MODE: assert table_name not in self._get_tables_list(), f"Table {table_name} already exists." + if primary_keys: + pk_str = ", ".join(primary_keys) + column_definition_str += f",\n PRIMARY KEY ({pk_str})" + cmd = f""" CREATE TABLE {self._fully_qualified(table_name)} ( {column_definition_str} @@ -476,7 +505,11 @@ def _cleanup_batch( @final @overrides - def _finalize_batches(self, stream_name: str) -> dict[str, BatchHandle]: + def _finalize_batches( + self, + stream_name: str, + write_strategy: WriteStrategy, + ) -> dict[str, BatchHandle]: """Finalize all uncommitted batches. This is a generic 'final' implementation, which should not be overridden. @@ -511,15 +544,16 @@ def _finalize_batches(self, stream_name: str) -> dict[str, BatchHandle]: ) temp_table_name = self._write_files_to_new_table( - files, - stream_name, - max_batch_id, + files=files, + stream_name=stream_name, + batch_id=max_batch_id, ) try: self._write_temp_table_to_final_table( - stream_name, - temp_table_name, - final_table_name, + stream_name=stream_name, + temp_table_name=temp_table_name, + final_table_name=final_table_name, + write_strategy=write_strategy, ) finally: self._drop_temp_table(temp_table_name, if_exists=True) @@ -600,33 +634,66 @@ def _write_temp_table_to_final_table( stream_name: str, temp_table_name: str, final_table_name: str, + write_strategy: WriteStrategy, ) -> None: - """Merge the temp table into the final table.""" - if self.config.dedupe_mode == RecordDedupeMode.REPLACE: - if not self.supports_merge_insert: - raise NotImplementedError( - "Deduping was requested but merge-insert is not yet supported.", - ) + """Write the temp table into the final table using the provided write strategy.""" + has_pks: bool = bool(self._get_primary_keys(stream_name)) + has_incremental_key: bool = bool(self._get_incremental_key(stream_name)) + if write_strategy == WriteStrategy.MERGE and not has_pks: + raise exc.AirbyteLibInputError( + message="Cannot use merge strategy on a stream with no primary keys.", + context={ + "stream_name": stream_name, + }, + ) - if not self._get_primary_keys(stream_name): - self._swap_temp_table_with_final_table( - stream_name, - temp_table_name, - final_table_name, - ) + if write_strategy == WriteStrategy.AUTO: + if has_pks: + write_strategy = WriteStrategy.MERGE + elif has_incremental_key: + write_strategy = WriteStrategy.APPEND else: - self._merge_temp_table_to_final_table( - stream_name, - temp_table_name, - final_table_name, - ) + write_strategy = WriteStrategy.REPLACE - else: + if write_strategy == WriteStrategy.REPLACE: + self._swap_temp_table_with_final_table( + stream_name=stream_name, + temp_table_name=temp_table_name, + final_table_name=final_table_name, + ) + return + + if write_strategy == WriteStrategy.APPEND: self._append_temp_table_to_final_table( stream_name=stream_name, temp_table_name=temp_table_name, final_table_name=final_table_name, ) + return + + if write_strategy == WriteStrategy.MERGE: + if not self.supports_merge_insert: + # Fallback to emulated merge if the database does not support merge natively. + self._emulated_merge_temp_table_to_final_table( + stream_name=stream_name, + temp_table_name=temp_table_name, + final_table_name=final_table_name, + ) + return + + self._merge_temp_table_to_final_table( + stream_name=stream_name, + temp_table_name=temp_table_name, + final_table_name=final_table_name, + ) + return + + raise exc.AirbyteLibInternalError( + message="Write strategy is not supported.", + context={ + "write_strategy": write_strategy, + }, + ) def _append_temp_table_to_final_table( self, @@ -663,6 +730,12 @@ def _get_primary_keys( return joined_pks + def _get_incremental_key( + self, + stream_name: str, + ) -> str | None: + return self._get_stream_config(stream_name).cursor_field + def _swap_temp_table_with_final_table( self, stream_name: str, @@ -727,6 +800,65 @@ def _merge_temp_table_to_final_table( """, ) + def _emulated_merge_temp_table_to_final_table( + self, + stream_name: str, + temp_table_name: str, + final_table_name: str, + ) -> None: + """Emulate the merge operation using a series of SQL commands. + + This is a fallback implementation for databases that do not support MERGE. + """ + final_table = self._get_table_by_name(final_table_name) + temp_table = self._get_table_by_name(temp_table_name) + pk_columns = self._get_primary_keys(stream_name) + + columns_to_update: set[str] = self._get_sql_column_definitions( + stream_name=stream_name + ).keys() - set(pk_columns) + + # Create a dictionary mapping columns in users_final to users_stage for updating + update_values = { + getattr(final_table.c, column): getattr(temp_table.c, column) + for column in columns_to_update + } + + # Craft the WHERE clause for composite primary keys + join_conditions = [ + getattr(final_table.c, pk_column) == getattr(temp_table.c, pk_column) + for pk_column in pk_columns + ] + join_clause = and_(*join_conditions) + + # Craft the UPDATE statement + update_stmt = update(final_table).values(update_values).where(join_clause) + + # Define a join between temp_table and final_table + joined_table = temp_table.outerjoin(final_table, join_clause) + + # Define a condition that checks for records in temp_table that do not have a corresponding + # record in final_table + where_not_exists_clause = final_table.c.id == null() + + # Select records from temp_table that are not in final_table + select_new_records_stmt = ( + select([temp_table]).select_from(joined_table).where(where_not_exists_clause) + ) + + # Craft the INSERT statement using the select statement + insert_new_records_stmt = insert(final_table).from_select( + names=[column.name for column in temp_table.columns], select=select_new_records_stmt + ) + + if DEBUG_MODE: + print(str(update_stmt)) + print(str(insert_new_records_stmt)) + + with self.get_sql_connection() as conn: + conn.execute(update_stmt) + conn.execute(insert_new_records_stmt) + @final def _table_exists( self, diff --git a/airbyte-lib/airbyte_lib/caches/duckdb.py b/airbyte-lib/airbyte_lib/caches/duckdb.py index 76f79968a503..8a58e1d232a8 100644 --- a/airbyte-lib/airbyte_lib/caches/duckdb.py +++ b/airbyte-lib/airbyte_lib/caches/duckdb.py @@ -10,7 +10,6 @@ from overrides import overrides -from airbyte_lib import exceptions as exc from airbyte_lib._file_writers import ParquetWriter, ParquetWriterConfig from airbyte_lib.caches.base import SQLCacheBase, SQLCacheConfigBase from airbyte_lib.telemetry import CacheTelemetryInfo @@ -55,7 +54,7 @@ class DuckDBCacheBase(SQLCacheBase): """ config_class = DuckDBCacheConfig - supports_merge_insert = True + supports_merge_insert = False @overrides def get_telemetry_info(self) -> CacheTelemetryInfo: @@ -82,37 +81,38 @@ class DuckDBCache(DuckDBCacheBase): file_writer_class = ParquetWriter - @overrides - def _merge_temp_table_to_final_table( - self, - stream_name: str, - temp_table_name: str, - final_table_name: str, - ) -> None: - """Merge the temp table into the main one. - - This implementation requires MERGE support in the SQL DB. - Databases that do not support this syntax can override this method. - """ - if not self._get_primary_keys(stream_name): - raise exc.AirbyteLibInternalError( - message="Primary keys not found. Cannot run merge updates without primary keys.", - context={ - "stream_name": stream_name, - }, - ) - - _ = stream_name - final_table = self._fully_qualified(final_table_name) - staging_table = self._fully_qualified(temp_table_name) - self._execute_sql( - # https://duckdb.org/docs/sql/statements/insert.html - # NOTE: This depends on primary keys being set properly in the final table. - f""" - INSERT OR REPLACE INTO {final_table} BY NAME - (SELECT * FROM {staging_table}) - """ - ) + # TODO: Delete or rewrite this method after DuckDB adds support for primary key inspection. + # @overrides + # def _merge_temp_table_to_final_table( + # self, + # stream_name: str, + # temp_table_name: str, + # final_table_name: str, + # ) -> None: + # """Merge the temp table into the main one. + + # This implementation requires MERGE support in the SQL DB. + # Databases that do not support this syntax can override this method. + # """ + # if not self._get_primary_keys(stream_name): + # raise exc.AirbyteLibInternalError( + # message="Primary keys not found. Cannot run merge updates without primary keys.", + # context={ + # "stream_name": stream_name, + # }, + # ) + + # _ = stream_name + # final_table = self._fully_qualified(final_table_name) + # staging_table = self._fully_qualified(temp_table_name) + # self._execute_sql( + # # https://duckdb.org/docs/sql/statements/insert.html + # # NOTE: This depends on primary keys being set properly in the final table. + # f""" + # INSERT OR REPLACE INTO {final_table} BY NAME + # (SELECT * FROM {staging_table}) + # """ + # ) @overrides def _ensure_compatible_table_schema( @@ -132,22 +132,24 @@ def _ensure_compatible_table_schema( ): return False - pk_cols = self._get_primary_keys(stream_name) - table = self.get_sql_table(stream_name) - table_name = self.get_sql_table_name(stream_name) - table_pk_cols = table.primary_key.columns.keys() - if set(pk_cols) != set(table_pk_cols): - if raise_on_error: - raise exc.AirbyteLibCacheTableValidationError( - violation="Primary keys do not match.", - context={ - "stream_name": stream_name, - "table_name": table_name, - "expected": pk_cols, - "found": table_pk_cols, - }, - ) - return False + # TODO: Add validation for primary keys after DuckDB adds support for primary key + # inspection: https://github.com/Mause/duckdb_engine/issues/594 + # This is a problem because DuckDB implicitly joins on primary keys during MERGE. + # pk_cols = self._get_primary_keys(stream_name) + # table = self.get_sql_table(table_name) + # table_pk_cols = table.primary_key.columns.keys() + # if set(pk_cols) != set(table_pk_cols): + # if raise_on_error: + # raise exc.AirbyteLibCacheTableValidationError( + # violation="Primary keys do not match.", + # context={ + # "stream_name": stream_name, + # "table_name": table_name, + # "expected": pk_cols, + # "found": table_pk_cols, + # }, + # ) + # return False return True diff --git a/airbyte-lib/airbyte_lib/caches/postgres.py b/airbyte-lib/airbyte_lib/caches/postgres.py index 5c7df3a898ad..324d29c2d58e 100644 --- a/airbyte-lib/airbyte_lib/caches/postgres.py +++ b/airbyte-lib/airbyte_lib/caches/postgres.py @@ -48,7 +48,7 @@ class PostgresCache(SQLCacheBase): config_class = PostgresCacheConfig file_writer_class = ParquetWriter - supports_merge_insert = True + supports_merge_insert = False # TODO: Add native implementation for merge insert @overrides def get_telemetry_info(self) -> CacheTelemetryInfo: diff --git a/airbyte-lib/airbyte_lib/caches/snowflake.py b/airbyte-lib/airbyte_lib/caches/snowflake.py index 86c30c995cf6..713285c72a9a 100644 --- a/airbyte-lib/airbyte_lib/caches/snowflake.py +++ b/airbyte-lib/airbyte_lib/caches/snowflake.py @@ -49,6 +49,7 @@ def get_sql_alchemy_url(self) -> str: password=self.password, database=self.database, warehouse=self.warehouse, + schema=self.schema_name, role=self.role, ) ) diff --git a/airbyte-lib/airbyte_lib/source.py b/airbyte-lib/airbyte_lib/source.py index 2fc3258ee179..ab76922c13eb 100644 --- a/airbyte-lib/airbyte_lib/source.py +++ b/airbyte-lib/airbyte_lib/source.py @@ -30,6 +30,7 @@ from airbyte_lib.datasets._lazy import LazyDataset from airbyte_lib.progress import progress from airbyte_lib.results import ReadResult +from airbyte_lib.strategies import WriteStrategy from airbyte_lib.telemetry import ( CacheTelemetryInfo, SyncState, @@ -46,10 +47,11 @@ @contextmanager -def as_temp_files(files: list[Any]) -> Generator[list[Any], Any, None]: +def as_temp_files(files_contents: list[dict]) -> Generator[list[str], Any, None]: + """Write the given contents to temporary files and yield the file paths as strings.""" temp_files: list[Any] = [] try: - for content in files: + for content in files_contents: temp_file = tempfile.NamedTemporaryFile(mode="w+t", delete=True) temp_file.write( json.dumps(content) if isinstance(content, dict) else content, @@ -250,13 +252,13 @@ def configured_catalog(self) -> ConfiguredAirbyteCatalog: streams=[ # TODO: Set sync modes and primary key to a sensible adaptive default ConfiguredAirbyteStream( - stream=s, + stream=stream, sync_mode=SyncMode.full_refresh, destination_sync_mode=DestinationSyncMode.overwrite, - primary_key=None, + primary_key=stream.source_defined_primary_key, ) - for s in self.discovered_catalog.streams - if streams_filter is None or s.name in streams_filter + for stream in self.discovered_catalog.streams + if streams_filter is None or stream.name in streams_filter ], ) @@ -305,7 +307,11 @@ def _with_missing_columns(records: Iterable[dict[str, Any]]) -> Iterator[dict[st iterator: Iterator[dict[str, Any]] = _with_missing_columns( protocol_util.airbyte_messages_to_record_dicts( - self._read_with_catalog(streaming_cache_info, configured_catalog), + self._read_with_catalog( + streaming_cache_info, + configured_catalog, + force_full_refresh=True, # Always full refresh when skipping the cache + ), ) ) return LazyDataset(iterator) @@ -352,7 +358,12 @@ def uninstall(self) -> None: """ self.executor.uninstall() - def _read(self, cache_info: CacheTelemetryInfo) -> Iterable[AirbyteRecordMessage]: + def _read( + self, + cache_info: CacheTelemetryInfo, + *, + force_full_refresh: bool, + ) -> Iterable[AirbyteRecordMessage]: """ Call read on the connector. @@ -366,12 +377,18 @@ def _read(self, cache_info: CacheTelemetryInfo) -> Iterable[AirbyteRecordMessage # Ensure discovered and configured catalog properties are cached before we start reading _ = self.discovered_catalog _ = self.configured_catalog - yield from self._read_with_catalog(cache_info, catalog=self.configured_catalog) + yield from self._read_with_catalog( + cache_info, + catalog=self.configured_catalog, + force_full_refresh=force_full_refresh, + ) def _read_with_catalog( self, cache_info: CacheTelemetryInfo, catalog: ConfiguredAirbyteCatalog, + *, + force_full_refresh: bool, ) -> Iterator[AirbyteMessage]: """Call read on the connector. @@ -381,7 +398,11 @@ def _read_with_catalog( * Listen to the messages and return the AirbyteRecordMessages that come along. * Send out telemetry on the performed sync (with information about which source was used and the type of the cache) + + TODO: When we add support for incremental syncs, we should only send `--state ` + if force_full_refresh is False. """ + _ = force_full_refresh # TODO: Use this decide whether to send `--state ` source_tracking_information = self.executor.get_telemetry_info() send_telemetry(source_tracking_information, cache_info, SyncState.STARTED) try: @@ -390,6 +411,7 @@ def _read_with_catalog( catalog_file, ]: yield from self._execute( + # TODO: Add support for incremental syncs by sending `--state ` ["read", "--config", config_file, "--catalog", catalog_file], ) except Exception: @@ -451,16 +473,62 @@ def _tally_records( yield message progress.log_records_read(self._processed_records) - def read(self, cache: SQLCacheBase | None = None) -> ReadResult: + def read( + self, + cache: SQLCacheBase | None = None, + *, + write_strategy: str | WriteStrategy = WriteStrategy.AUTO, + force_full_refresh: bool = False, + ) -> ReadResult: + """Read from the connector and write to the cache. + + Args: + cache: The cache to write to. If None, a default cache will be used. + write_strategy: The strategy to use when writing to the cache. If a string, it must be + one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one + of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or + WriteStrategy.AUTO. + force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, + streams will be read in incremental mode if supported by the connector. This option + must be True when using the "replace" strategy. + """ + if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: + raise exc.AirbyteLibInputError( + message="The replace strategy requires full refresh mode.", + context={ + "write_strategy": write_strategy, + "force_full_refresh": force_full_refresh, + }, + ) if cache is None: cache = get_default_cache() + if isinstance(write_strategy, str): + try: + write_strategy = WriteStrategy(write_strategy) + except ValueError: + raise exc.AirbyteLibInputError( + message="Invalid strategy", + context={ + "write_strategy": write_strategy, + "available_strategies": [s.value for s in WriteStrategy], + }, + ) from None + cache.register_source( source_name=self.name, incoming_source_catalog=self.configured_catalog, stream_names=set(self.get_selected_streams()), ) - cache.process_airbyte_messages(self._tally_records(self._read(cache.get_telemetry_info()))) + cache.process_airbyte_messages( + self._tally_records( + self._read( + cache.get_telemetry_info(), + force_full_refresh=force_full_refresh, + ), + ), + write_strategy=write_strategy, + ) return ReadResult( processed_records=self._processed_records, diff --git a/airbyte-lib/airbyte_lib/strategies.py b/airbyte-lib/airbyte_lib/strategies.py new file mode 100644 index 000000000000..4d0b75a06590 --- /dev/null +++ b/airbyte-lib/airbyte_lib/strategies.py @@ -0,0 +1,35 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +"""Read and write strategies for AirbyteLib.""" +from __future__ import annotations + +from enum import Enum + + +class WriteStrategy(str, Enum): + """Read strategies for AirbyteLib.""" + + MERGE = "merge" + """Merge new records with existing records. + + This requires a primary key to be set on the stream. + If no primary key is set, this will raise an exception. + + To apply this strategy in cases where some destination streams don't have a primary key, + please use the `auto` strategy instead. + """ + + APPEND = "append" + """Append new records to existing records.""" + + REPLACE = "replace" + """Replace existing records with new records.""" + + AUTO = "auto" + """Automatically determine the best strategy to use. + + This will use the following logic: + - If there's a primary key, use merge. + - Else, if there's an incremental key, use append. + - Else, use full replace (table swap). + """ diff --git a/airbyte-lib/docs/generated/airbyte_lib.html b/airbyte-lib/docs/generated/airbyte_lib.html index 609075fa1d62..73e8779d3fd7 100644 --- a/airbyte-lib/docs/generated/airbyte_lib.html +++ b/airbyte-lib/docs/generated/airbyte_lib.html @@ -236,8 +236,7 @@
Inherited Members
airbyte_lib.caches.base.SQLCacheConfigBase
-
dedupe_mode
-
table_prefix
+
table_prefix
table_suffix
@@ -687,13 +686,25 @@
Inherited Members
def - read( self, cache: airbyte_lib.caches.base.SQLCacheBase | None = None) -> ReadResult: + read( self, cache: airbyte_lib.caches.base.SQLCacheBase | None = None, *, write_strategy: str | airbyte_lib.strategies.WriteStrategy = <WriteStrategy.AUTO: 'auto'>, force_full_refresh: bool = False) -> ReadResult:
- +

Read from the connector and write to the cache.

+ +

Args: + cache: The cache to write to. If None, a default cache will be used. + write_strategy: The strategy to use when writing to the cache. If a string, it must be + one of "append", "upsert", "replace", or "auto". If a WriteStrategy, it must be one + of WriteStrategy.APPEND, WriteStrategy.UPSERT, WriteStrategy.REPLACE, or + WriteStrategy.AUTO. + force_full_refresh: If True, the source will operate in full refresh mode. Otherwise, + streams will be read in incremental mode if supported by the connector. This option + must be True when using the "replace" strategy.

+
+ diff --git a/airbyte-lib/docs/generated/airbyte_lib/caches.html b/airbyte-lib/docs/generated/airbyte_lib/caches.html index 45ec953c3571..0e02c058ad5b 100644 --- a/airbyte-lib/docs/generated/airbyte_lib/caches.html +++ b/airbyte-lib/docs/generated/airbyte_lib/caches.html @@ -165,8 +165,7 @@
Inherited Members
airbyte_lib.caches.base.SQLCacheConfigBase
-
dedupe_mode
-
table_prefix
+
table_prefix
table_suffix
@@ -226,7 +225,7 @@
Inherited Members
supports_merge_insert = -True +False
@@ -404,8 +403,7 @@
Inherited Members
airbyte_lib.caches.base.SQLCacheConfigBase
-
dedupe_mode
-
schema_name
+
schema_name
table_prefix
table_suffix
@@ -613,7 +611,7 @@
Inherited Members
-

Return a temporary table name.

+

Return the main table object for the stream.

diff --git a/airbyte-lib/poetry.lock b/airbyte-lib/poetry.lock index 5ac62115d29d..41892f7f8d15 100644 --- a/airbyte-lib/poetry.lock +++ b/airbyte-lib/poetry.lock @@ -51,6 +51,24 @@ files = [ [package.dependencies] pydantic = ">=1.9.2,<2.0.0" +[[package]] +name = "airbyte-source-faker" +version = "6.0.0" +description = "Source implementation for fake but realistic looking data." +optional = false +python-versions = "*" +files = [ + {file = "airbyte-source-faker-6.0.0.tar.gz", hash = "sha256:60c68980527b66f1ffa31e4e13c406233b6d9cfcb2a6acc23ebd827cd2532181"}, + {file = "airbyte_source_faker-6.0.0-py3-none-any.whl", hash = "sha256:d92265c5be31a8d2ac66e7ed40cbe134d62acbb63adbc05789697cf64784e5b7"}, +] + +[package.dependencies] +airbyte-cdk = ">=0.2,<1.0" +mimesis = "6.1.1" + +[package.extras] +tests = ["pytest (>=6.2,<7.0)", "pytest-mock (>=3.6.1,<3.7.0)", "requests-mock (>=1.9.3,<1.10.0)"] + [[package]] name = "asn1crypto" version = "1.5.1" @@ -551,13 +569,13 @@ files = [ [[package]] name = "google-api-core" -version = "2.15.0" +version = "2.16.0" description = "Google API client core library" optional = false python-versions = ">=3.7" files = [ - {file = "google-api-core-2.15.0.tar.gz", hash = "sha256:abc978a72658f14a2df1e5e12532effe40f94f868f6e23d95133bd6abcca35ca"}, - {file = "google_api_core-2.15.0-py3-none-any.whl", hash = "sha256:2aa56d2be495551e66bbff7f729b790546f87d5c90e74781aa77233bcb395a8a"}, + {file = "google-api-core-2.16.0.tar.gz", hash = "sha256:d1fc42e52aa4042ad812827b7aad858394e2bf73da8329af95ad8efa30bc886b"}, + {file = "google_api_core-2.16.0-py3-none-any.whl", hash = "sha256:c424f9f271c7f55366254708e0d0383963a72376286018af0a04f322be843400"}, ] [package.dependencies] @@ -995,6 +1013,17 @@ files = [ {file = "mdurl-0.1.2.tar.gz", hash = "sha256:bb413d29f5eea38f31dd4754dd7377d4465116fb207585f97bf925588687c1ba"}, ] +[[package]] +name = "mimesis" +version = "6.1.1" +description = "Mimesis: Fake Data Generator." +optional = false +python-versions = ">=3.8,<4.0" +files = [ + {file = "mimesis-6.1.1-py3-none-any.whl", hash = "sha256:eabe41d7afa23b01dffb51ebd9e10837df6417fef02fa9841989ca886e479790"}, + {file = "mimesis-6.1.1.tar.gz", hash = "sha256:044ac378c61db0e06832ff722548fd6e604881d36bc938002e0bd5b85eeb6a98"}, +] + [[package]] name = "mypy" version = "1.8.0" @@ -1159,13 +1188,13 @@ files = [ [[package]] name = "overrides" -version = "7.6.0" +version = "7.7.0" description = "A decorator to automatically detect mismatch when overriding a method." optional = false python-versions = ">=3.6" files = [ - {file = "overrides-7.6.0-py3-none-any.whl", hash = "sha256:c36e6635519ea9c5b043b65c36d4b886aee8bd45b7d4681d2a6df0898df4b654"}, - {file = "overrides-7.6.0.tar.gz", hash = "sha256:01e15bbbf15b766f0675c275baa1878bd1c7dc9bc7b9ee13e677cdba93dc1bd9"}, + {file = "overrides-7.7.0-py3-none-any.whl", hash = "sha256:c7ed9d062f78b8e4c1a7b70bd8796b35ead4d9f510227ef9c5dc7626c60d7e49"}, + {file = "overrides-7.7.0.tar.gz", hash = "sha256:55158fa3d93b98cc75299b1e67078ad9003ca27945c76162c1c0766d6f91820a"}, ] [[package]] @@ -1739,13 +1768,13 @@ testing = ["argcomplete", "attrs (>=19.2.0)", "hypothesis (>=3.56)", "mock", "no [[package]] name = "pytest-docker" -version = "2.0.1" +version = "2.2.0" description = "Simple pytest fixtures for Docker and Docker Compose based tests" optional = false python-versions = ">=3.6" files = [ - {file = "pytest-docker-2.0.1.tar.gz", hash = "sha256:1c17e9202a566f85ed5ef269fe2815bd4899e90eb639622e5d14277372ca7524"}, - {file = "pytest_docker-2.0.1-py3-none-any.whl", hash = "sha256:7103f97b8c479c826b63d73cfb83383dc1970d35105ed1ce78a722c90c7fe650"}, + {file = "pytest-docker-2.2.0.tar.gz", hash = "sha256:b083fd2ae69212369390033c22228d3263555a5f3b4bef87b74160e07218f377"}, + {file = "pytest_docker-2.2.0-py3-none-any.whl", hash = "sha256:8ee9c9742d58ac079c81c03635bb830881f7f4d529f0f53f4ba2c89ffc9c7137"}, ] [package.dependencies] @@ -1806,13 +1835,13 @@ files = [ [[package]] name = "pytz" -version = "2023.3.post1" +version = "2023.4" description = "World timezone definitions, modern and historical" optional = false python-versions = "*" files = [ - {file = "pytz-2023.3.post1-py2.py3-none-any.whl", hash = "sha256:ce42d816b81b68506614c11e8937d3aa9e41007ceb50bfdcb0749b921bf646c7"}, - {file = "pytz-2023.3.post1.tar.gz", hash = "sha256:7b4fddbeb94a1eba4b557da24f19fdf9db575192544270a9101d8509f9f43d7b"}, + {file = "pytz-2023.4-py2.py3-none-any.whl", hash = "sha256:f90ef520d95e7c46951105338d918664ebfd6f1d995bd7d153127ce90efafa6a"}, + {file = "pytz-2023.4.tar.gz", hash = "sha256:31d4583c4ed539cd037956140d695e42c033a19e984bfce9964a3f7d59bc2b40"}, ] [[package]] @@ -1900,13 +1929,13 @@ files = [ [[package]] name = "referencing" -version = "0.32.1" +version = "0.33.0" description = "JSON Referencing + Python" optional = false python-versions = ">=3.8" files = [ - {file = "referencing-0.32.1-py3-none-any.whl", hash = "sha256:7e4dc12271d8e15612bfe35792f5ea1c40970dadf8624602e33db2758f7ee554"}, - {file = "referencing-0.32.1.tar.gz", hash = "sha256:3c57da0513e9563eb7e203ebe9bb3a1b509b042016433bd1e45a2853466c3dd3"}, + {file = "referencing-0.33.0-py3-none-any.whl", hash = "sha256:39240f2ecc770258f28b642dd47fd74bc8b02484de54e1882b74b35ebd779bd5"}, + {file = "referencing-0.33.0.tar.gz", hash = "sha256:c775fedf74bc0f9189c2a3be1c12fd03e8c23f4d371dce795df44e06c5b412f7"}, ] [package.dependencies] @@ -2106,28 +2135,28 @@ pyasn1 = ">=0.1.3" [[package]] name = "ruff" -version = "0.1.14" +version = "0.1.15" description = "An extremely fast Python linter and code formatter, written in Rust." optional = false python-versions = ">=3.7" files = [ - {file = "ruff-0.1.14-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:96f76536df9b26622755c12ed8680f159817be2f725c17ed9305b472a757cdbb"}, - {file = "ruff-0.1.14-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:ab3f71f64498c7241123bb5a768544cf42821d2a537f894b22457a543d3ca7a9"}, - {file = "ruff-0.1.14-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:7060156ecc572b8f984fd20fd8b0fcb692dd5d837b7606e968334ab7ff0090ab"}, - {file = "ruff-0.1.14-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:a53d8e35313d7b67eb3db15a66c08434809107659226a90dcd7acb2afa55faea"}, - {file = "ruff-0.1.14-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:bea9be712b8f5b4ebed40e1949379cfb2a7d907f42921cf9ab3aae07e6fba9eb"}, - {file = "ruff-0.1.14-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:2270504d629a0b064247983cbc495bed277f372fb9eaba41e5cf51f7ba705a6a"}, - {file = "ruff-0.1.14-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:80258bb3b8909b1700610dfabef7876423eed1bc930fe177c71c414921898efa"}, - {file = "ruff-0.1.14-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:653230dd00aaf449eb5ff25d10a6e03bc3006813e2cb99799e568f55482e5cae"}, - {file = "ruff-0.1.14-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:87b3acc6c4e6928459ba9eb7459dd4f0c4bf266a053c863d72a44c33246bfdbf"}, - {file = "ruff-0.1.14-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:6b3dadc9522d0eccc060699a9816e8127b27addbb4697fc0c08611e4e6aeb8b5"}, - {file = "ruff-0.1.14-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:1c8eca1a47b4150dc0fbec7fe68fc91c695aed798532a18dbb1424e61e9b721f"}, - {file = "ruff-0.1.14-py3-none-musllinux_1_2_i686.whl", hash = "sha256:62ce2ae46303ee896fc6811f63d6dabf8d9c389da0f3e3f2bce8bc7f15ef5488"}, - {file = "ruff-0.1.14-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:b2027dde79d217b211d725fc833e8965dc90a16d0d3213f1298f97465956661b"}, - {file = "ruff-0.1.14-py3-none-win32.whl", hash = "sha256:722bafc299145575a63bbd6b5069cb643eaa62546a5b6398f82b3e4403329cab"}, - {file = "ruff-0.1.14-py3-none-win_amd64.whl", hash = "sha256:e3d241aa61f92b0805a7082bd89a9990826448e4d0398f0e2bc8f05c75c63d99"}, - {file = "ruff-0.1.14-py3-none-win_arm64.whl", hash = "sha256:269302b31ade4cde6cf6f9dd58ea593773a37ed3f7b97e793c8594b262466b67"}, - {file = "ruff-0.1.14.tar.gz", hash = "sha256:ad3f8088b2dfd884820289a06ab718cde7d38b94972212cc4ba90d5fbc9955f3"}, + {file = "ruff-0.1.15-py3-none-macosx_10_12_x86_64.macosx_11_0_arm64.macosx_10_12_universal2.whl", hash = "sha256:5fe8d54df166ecc24106db7dd6a68d44852d14eb0729ea4672bb4d96c320b7df"}, + {file = "ruff-0.1.15-py3-none-macosx_10_12_x86_64.whl", hash = "sha256:6f0bfbb53c4b4de117ac4d6ddfd33aa5fc31beeaa21d23c45c6dd249faf9126f"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e0d432aec35bfc0d800d4f70eba26e23a352386be3a6cf157083d18f6f5881c8"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:9405fa9ac0e97f35aaddf185a1be194a589424b8713e3b97b762336ec79ff807"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:c66ec24fe36841636e814b8f90f572a8c0cb0e54d8b5c2d0e300d28a0d7bffec"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_ppc64.manylinux2014_ppc64.whl", hash = "sha256:6f8ad828f01e8dd32cc58bc28375150171d198491fc901f6f98d2a39ba8e3ff5"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:86811954eec63e9ea162af0ffa9f8d09088bab51b7438e8b6488b9401863c25e"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:fd4025ac5e87d9b80e1f300207eb2fd099ff8200fa2320d7dc066a3f4622dc6b"}, + {file = "ruff-0.1.15-py3-none-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:b17b93c02cdb6aeb696effecea1095ac93f3884a49a554a9afa76bb125c114c1"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_aarch64.whl", hash = "sha256:ddb87643be40f034e97e97f5bc2ef7ce39de20e34608f3f829db727a93fb82c5"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_armv7l.whl", hash = "sha256:abf4822129ed3a5ce54383d5f0e964e7fef74a41e48eb1dfad404151efc130a2"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_i686.whl", hash = "sha256:6c629cf64bacfd136c07c78ac10a54578ec9d1bd2a9d395efbee0935868bf852"}, + {file = "ruff-0.1.15-py3-none-musllinux_1_2_x86_64.whl", hash = "sha256:1bab866aafb53da39c2cadfb8e1c4550ac5340bb40300083eb8967ba25481447"}, + {file = "ruff-0.1.15-py3-none-win32.whl", hash = "sha256:2417e1cb6e2068389b07e6fa74c306b2810fe3ee3476d5b8a96616633f40d14f"}, + {file = "ruff-0.1.15-py3-none-win_amd64.whl", hash = "sha256:3837ac73d869efc4182d9036b1405ef4c73d9b1f88da2413875e34e0d6919587"}, + {file = "ruff-0.1.15-py3-none-win_arm64.whl", hash = "sha256:9a933dfb1c14ec7a33cceb1e49ec4a16b51ce3c20fd42663198746efc0427360"}, + {file = "ruff-0.1.15.tar.gz", hash = "sha256:f6dfa8c1b21c913c326919056c390966648b680966febcb796cc9d1aaab8564e"}, ] [[package]] @@ -2358,13 +2387,13 @@ referencing = "*" [[package]] name = "types-pytz" -version = "2023.3.1.1" +version = "2023.4.0.20240130" description = "Typing stubs for pytz" optional = false -python-versions = "*" +python-versions = ">=3.8" files = [ - {file = "types-pytz-2023.3.1.1.tar.gz", hash = "sha256:cc23d0192cd49c8f6bba44ee0c81e4586a8f30204970fc0894d209a6b08dab9a"}, - {file = "types_pytz-2023.3.1.1-py3-none-any.whl", hash = "sha256:1999a123a3dc0e39a2ef6d19f3f8584211de9e6a77fe7a0259f04a524e90a5cf"}, + {file = "types-pytz-2023.4.0.20240130.tar.gz", hash = "sha256:33676a90bf04b19f92c33eec8581136bea2f35ddd12759e579a624a006fd387a"}, + {file = "types_pytz-2023.4.0.20240130-py3-none-any.whl", hash = "sha256:6ce76a9f8fd22bd39b01a59c35bfa2db39b60d11a2f77145e97b730de7e64fe0"}, ] [[package]] @@ -2561,4 +2590,4 @@ files = [ [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "7ed61ca7eaed73dbd7e0800aa87fcd5b2583739048d53e9e564fe2a6defa483f" +content-hash = "10dc6d712fcf12981716373f5718eee65ba75ded52e0bed86f7c5c2c0ffec71f" diff --git a/airbyte-lib/pyproject.toml b/airbyte-lib/pyproject.toml index 4625e688dc41..51f43cb35a84 100644 --- a/airbyte-lib/pyproject.toml +++ b/airbyte-lib/pyproject.toml @@ -46,6 +46,7 @@ types-jsonschema = "^4.20.0.0" google-cloud-secret-manager = "^2.17.0" types-requests = "2.31.0.4" freezegun = "^1.4.0" +airbyte-source-faker = "^6.0.0" [build-system] requires = ["poetry-core"] diff --git a/airbyte-lib/tests/conftest.py b/airbyte-lib/tests/conftest.py index 17009133a89d..56ee07b48c0e 100644 --- a/airbyte-lib/tests/conftest.py +++ b/airbyte-lib/tests/conftest.py @@ -5,8 +5,12 @@ import json import logging import os +import shutil import socket +import subprocess import time + +import ulid from airbyte_lib.caches.snowflake import SnowflakeCacheConfig import docker @@ -25,6 +29,8 @@ PYTEST_POSTGRES_CONTAINER = "postgres_pytest_container" PYTEST_POSTGRES_PORT = 5432 +LOCAL_TEST_REGISTRY_URL = "./tests/integration_tests/fixtures/registry.json" + def pytest_collection_modifyitems(items: list[Item]) -> None: """Override default pytest behavior, sorting our tests in a sensible execution order. @@ -145,6 +151,10 @@ def pg_dsn(): @pytest.fixture def new_pg_cache_config(pg_dsn): + """Fixture to return a fresh cache. + + Each test that uses this fixture will get a unique table prefix. + """ config = PostgresCacheConfig( host=pg_dsn, port=PYTEST_POSTGRES_PORT, @@ -152,6 +162,9 @@ def new_pg_cache_config(pg_dsn): password="postgres", database="postgres", schema_name="public", + + # TODO: Move this to schema name when we support it (breaks as of 2024-01-31): + table_prefix=f"test{str(ulid.ULID())[-6:]}_", ) yield config @@ -175,6 +188,56 @@ def snowflake_config(): database=secret["database"], warehouse=secret["warehouse"], role=secret["role"], + schema_name=f"test{str(ulid.ULID()).lower()[-6:]}", ) yield config + + +@pytest.fixture(autouse=True) +def source_test_registry(monkeypatch): + """ + Set environment variables for the test source. + + These are applied to this test file only. + + This means the normal registry is not usable. Expect AirbyteConnectorNotRegisteredError for + other connectors. + """ + env_vars = { + "AIRBYTE_LOCAL_REGISTRY": LOCAL_TEST_REGISTRY_URL, + } + for key, value in env_vars.items(): + monkeypatch.setenv(key, value) + + +@pytest.fixture(autouse=True) +def do_not_track(monkeypatch): + """ + Set environment variables for the test source. + + These are applied to this test file only. + """ + env_vars = { + "DO_NOT_TRACK": "true" + } + for key, value in env_vars.items(): + monkeypatch.setenv(key, value) + + +@pytest.fixture(scope="package") +def source_test_installation(): + """ + Prepare test environment. This will pre-install the test source from the fixtures array and set + the environment variable to use the local json file as registry. + """ + venv_dir = ".venv-source-test" + if os.path.exists(venv_dir): + shutil.rmtree(venv_dir) + + subprocess.run(["python", "-m", "venv", venv_dir], check=True) + subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True) + + yield + + shutil.rmtree(venv_dir) diff --git a/airbyte-lib/tests/integration_tests/test_source_faker_integration.py b/airbyte-lib/tests/integration_tests/test_source_faker_integration.py new file mode 100644 index 000000000000..c981d6c033b1 --- /dev/null +++ b/airbyte-lib/tests/integration_tests/test_source_faker_integration.py @@ -0,0 +1,216 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. + +"""Integration tests which leverage the source-faker connector to test the framework end-to-end. + +Since source-faker is included in dev dependencies, we can assume `source-faker` is installed +and available on PATH for the poetry-managed venv. +""" +from __future__ import annotations +from collections.abc import Generator +import os +import sys +import shutil +from unittest.mock import _patch_dict + +import pytest + +import airbyte_lib as ab +from airbyte_lib import caches +from airbyte_cdk.models import ConfiguredAirbyteCatalog + + +# Product count is always the same, regardless of faker scale. +NUM_PRODUCTS = 100 + +SEED_A = 1234 +SEED_B = 5678 + +# Number of records in each of the 'users' and 'purchases' streams. +FAKER_SCALE_A = 200 +# We want this to be different from FAKER_SCALE_A. +FAKER_SCALE_B = 300 + + +# Patch PATH to include the source-faker executable. + +@pytest.fixture(autouse=True) +def add_venv_bin_to_path(monkeypatch): + # Get the path to the bin directory of the virtual environment + venv_bin_path = os.path.join(sys.prefix, 'bin') + + # Add the bin directory to the PATH + new_path = f"{venv_bin_path}:{os.environ['PATH']}" + monkeypatch.setenv('PATH', new_path) + + +def test_which_source_faker() -> None: + """Test that source-faker is available on PATH.""" + assert shutil.which("source-faker") is not None, \ + f"Can't find source-faker on PATH: {os.environ['PATH']}" + + +@pytest.fixture(scope="function") # Each test gets a fresh source-faker instance. +def source_faker_seed_a() -> ab.Source: + """Fixture to return a source-faker connector instance.""" + source = ab.get_connector( + "source-faker", + local_executable="source-faker", + config={ + "count": FAKER_SCALE_A, + "seed": SEED_A, + "parallelism": 16, # Otherwise defaults to 4. + }, + install_if_missing=False, # Should already be on PATH + ) + source.check() + # TODO: We can optionally add back 'users' once Postgres can handle complex object types. + source.set_streams([ + "products", + "purchases", + ]) + return source + + +@pytest.fixture(scope="function") # Each test gets a fresh source-faker instance. +def source_faker_seed_b() -> ab.Source: + """Fixture to return a source-faker connector instance.""" + source = ab.get_connector( + "source-faker", + local_executable="source-faker", + config={ + "count": FAKER_SCALE_B, + "seed": SEED_B, + "parallelism": 16, # Otherwise defaults to 4. + }, + install_if_missing=False, # Should already be on PATH + ) + source.check() + # TODO: We can optionally add back 'users' once Postgres can handle complex object types. + source.set_streams([ + "products", + "purchases", + ]) + return source + + +@pytest.fixture(scope="function") +def duckdb_cache() -> Generator[caches.DuckDBCache, None, None]: + """Fixture to return a fresh cache.""" + cache: caches.DuckDBCache = ab.new_local_cache() + yield cache + # TODO: Delete cache DB file after test is complete. + return + + +@pytest.fixture(scope="function") +def snowflake_cache(snowflake_config) -> Generator[caches.SnowflakeCache, None, None]: + """Fixture to return a fresh cache.""" + cache: caches.SnowflakeCache = caches.SnowflakeSQLCache(snowflake_config) + yield cache + # TODO: Delete cache DB file after test is complete. + return + + +@pytest.fixture(scope="function") +def postgres_cache(new_pg_cache_config) -> Generator[caches.PostgresCache, None, None]: + """Fixture to return a fresh cache.""" + cache: caches.PostgresCache = caches.PostgresCache(config=new_pg_cache_config) + yield cache + # TODO: Delete cache DB file after test is complete. + return + + +@pytest.fixture +def all_cache_types( + duckdb_cache: ab.DuckDBCache, + snowflake_cache: ab.SnowflakeCache, + postgres_cache: ab.PostgresCache, +): + _ = postgres_cache + return [ + duckdb_cache, + postgres_cache, + # snowflake_cache, # Snowflake works, but is slow and expensive to test. # TODO: Re-enable. + ] + + +def test_faker_pks( + source_faker_seed_a: ab.Source, + duckdb_cache: ab.DuckDBCache, +) -> None: + """Test that the append strategy works as expected.""" + + catalog: ConfiguredAirbyteCatalog = source_faker_seed_a.configured_catalog + + assert len(catalog.streams) == 2 + assert catalog.streams[0].primary_key + assert catalog.streams[1].primary_key + + read_result = source_faker_seed_a.read(duckdb_cache, write_strategy="append") + assert read_result.cache._get_primary_keys("products") == ["id"] + assert read_result.cache._get_primary_keys("purchases") == ["id"] + + +def test_replace_strategy( + source_faker_seed_a: ab.Source, + all_cache_types: ab.DuckDBCache, +) -> None: + """Test that the append strategy works as expected.""" + for cache in all_cache_types: # Function-scoped fixtures can't be used in parametrized(). + for _ in range(2): + result = source_faker_seed_a.read( + cache, write_strategy="replace", force_full_refresh=True + ) + assert len(result.cache.streams) == 2 + assert len(list(result.cache.streams["products"])) == NUM_PRODUCTS + assert len(list(result.cache.streams["purchases"])) == FAKER_SCALE_A + + +def test_append_strategy( + source_faker_seed_a: ab.Source, + all_cache_types: ab.DuckDBCache, +) -> None: + """Test that the append strategy works as expected.""" + for cache in all_cache_types: # Function-scoped fixtures can't be used in parametrized(). + for iteration in range(1, 3): + result = source_faker_seed_a.read(cache, write_strategy="append") + assert len(result.cache.streams) == 2 + assert len(list(result.cache.streams["products"])) == NUM_PRODUCTS * iteration + assert len(list(result.cache.streams["purchases"])) == FAKER_SCALE_A * iteration + + +@pytest.mark.parametrize("strategy", ["merge", "auto"]) +def test_merge_strategy( + strategy: str, + source_faker_seed_a: ab.Source, + source_faker_seed_b: ab.Source, + all_cache_types: ab.DuckDBCache, +) -> None: + """Test that the merge strategy works as expected. + + Since all streams have primary keys, we should expect the auto strategy to be identical to the + merge strategy. + """ + for cache in all_cache_types: # Function-scoped fixtures can't be used in parametrized(). + # First run, seed A (counts should match the scale or the product count) + result = source_faker_seed_a.read(cache, write_strategy=strategy) + assert len(result.cache.streams) == 2 + assert len(list(result.cache.streams["products"])) == NUM_PRODUCTS + assert len(list(result.cache.streams["purchases"])) == FAKER_SCALE_A + + # Second run, also seed A (should have same exact data, no change in counts) + result = source_faker_seed_a.read(cache, write_strategy=strategy) + assert len(list(result.cache.streams["products"])) == NUM_PRODUCTS + assert len(list(result.cache.streams["purchases"])) == FAKER_SCALE_A + + # Third run, seed B - should increase record count to the scale of B, which is greater than A. + # TODO: See if we can reliably predict the exact number of records, since we use fixed seeds. + result = source_faker_seed_b.read(cache, write_strategy=strategy) + assert len(list(result.cache.streams["products"])) == NUM_PRODUCTS + assert len(list(result.cache.streams["purchases"])) == FAKER_SCALE_B + + # Third run, seed A again - count should stay at scale B, since A is smaller. + # TODO: See if we can reliably predict the exact number of records, since we use fixed seeds. + result = source_faker_seed_a.read(cache, write_strategy=strategy) + assert len(list(result.cache.streams["products"])) == NUM_PRODUCTS + assert len(list(result.cache.streams["purchases"])) == FAKER_SCALE_B diff --git a/airbyte-lib/tests/integration_tests/test_integration.py b/airbyte-lib/tests/integration_tests/test_source_test_fixture.py similarity index 96% rename from airbyte-lib/tests/integration_tests/test_integration.py rename to airbyte-lib/tests/integration_tests/test_source_test_fixture.py index 8b45a37a3cc8..c9dc8a11fadb 100644 --- a/airbyte-lib/tests/integration_tests/test_integration.py +++ b/airbyte-lib/tests/integration_tests/test_source_test_fixture.py @@ -8,6 +8,7 @@ from unittest.mock import Mock, call, patch import tempfile from pathlib import Path +from urllib import request from airbyte_lib.caches.base import SQLCacheBase from sqlalchemy import column, text @@ -29,30 +30,19 @@ import ulid -LOCAL_TEST_REGISTRY_URL = "./tests/integration_tests/fixtures/registry.json" +@pytest.fixture(scope="module", autouse=True) +def autouse_source_test_installation(source_test_installation): + return -@pytest.fixture(scope="package", autouse=True) -def prepare_test_env(): - """ - Prepare test environment. This will pre-install the test source from the fixtures array and set the environment variable to use the local json file as registry. - """ - venv_dir = ".venv-source-test" - if os.path.exists(venv_dir): - shutil.rmtree(venv_dir) - - subprocess.run(["python", "-m", "venv", venv_dir], check=True) - subprocess.run([f"{venv_dir}/bin/pip", "install", "-e", "./tests/integration_tests/fixtures/source-test"], check=True) +@pytest.fixture(scope="function", autouse=True) +def autouse_source_test_registry(source_test_registry): + return - os.environ["AIRBYTE_LOCAL_REGISTRY"] = LOCAL_TEST_REGISTRY_URL - os.environ["DO_NOT_TRACK"] = "true" - # Force-refresh the registry cache - _ = registry._get_registry_cache(force_refresh=True) - - yield - - shutil.rmtree(".venv-source-test") +@pytest.fixture +def source_test(source_test_env) -> ab.Source: + return ab.get_connector("source-test", config={"apiKey": "test"}) @pytest.fixture @@ -68,8 +58,6 @@ def expected_test_stream_data() -> dict[str, list[dict[str, str | int]]]: } def test_registry_get(): - assert registry._get_registry_url() == LOCAL_TEST_REGISTRY_URL - metadata = registry.get_connector_metadata("source-test") assert metadata.name == "source-test" assert metadata.latest_available_version == "0.0.1" @@ -124,14 +112,18 @@ def test_non_enabled_connector(): @pytest.mark.parametrize( "latest_available_version, requested_version, raises", [ - ("0.0.1", None, False), - ("1.2.3", None, False), ("0.0.1", "latest", False), - ("1.2.3", "latest", True), ("0.0.1", "0.0.1", False), + ("0.0.1", None, False), + ("1.2.3", None, False), # Don't raise if a version is not requested + ("1.2.3", "latest", True), ("1.2.3", "1.2.3", True), ]) -def test_version_enforcement(raises, latest_available_version, requested_version): +def test_version_enforcement( + raises: bool, + latest_available_version, + requested_version, +): """" Ensures version enforcement works as expected: * If no version is specified, the current version is accepted @@ -143,6 +135,9 @@ def test_version_enforcement(raises, latest_available_version, requested_version patched_entry = registry.ConnectorMetadata( name="source-test", latest_available_version=latest_available_version, pypi_package_name="airbyte-source-test" ) + + # We need to initialize the cache before we can patch it. + _ = registry._get_registry_cache() with patch.dict("airbyte_lib.registry.__cache", {"source-test": patched_entry}, clear=False): if raises: with pytest.raises(Exception): @@ -160,6 +155,10 @@ def test_version_enforcement(raises, latest_available_version, requested_version config={"apiKey": "abc"}, install_if_missing=False, ) + if requested_version: # Don't raise if a version is not requested + assert source.executor._get_installed_version(raise_on_error=True) == ( + requested_version or latest_available_version + ).replace("latest", latest_available_version) source.executor.ensure_installation(auto_fix=False) @@ -248,7 +247,7 @@ def test_dataset_list_and_len(expected_test_stream_data): def test_read_from_cache(expected_test_stream_data: dict[str, list[dict[str, str | int]]]): """ - Test that we can read from a cache that already has data (identigier by name) + Test that we can read from a cache that already has data (identifier by name) """ cache_name = str(ulid.ULID()) source = ab.get_connector("source-test", config={"apiKey": "test"}) diff --git a/airbyte-lib/tests/integration_tests/test_validation.py b/airbyte-lib/tests/integration_tests/test_validation.py index 69216232b28c..140a7d52023e 100644 --- a/airbyte-lib/tests/integration_tests/test_validation.py +++ b/airbyte-lib/tests/integration_tests/test_validation.py @@ -7,6 +7,16 @@ from airbyte_lib.validate import validate +@pytest.fixture(scope="module", autouse=True) +def autouse_source_test_installation(source_test_installation): + return + + +@pytest.fixture(scope="function", autouse=True) +def autouse_source_test_registry(source_test_registry): + return + + def test_validate_success(): validate("./tests/integration_tests/fixtures/source-test", "./tests/integration_tests/fixtures/valid_config.json", validate_install_only=False) From 6e8b87f6f3f9340e9f24ce420b6d7c63366ba455 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 6 Feb 2024 00:26:17 -0800 Subject: [PATCH 11/13] AirbyteLib: detect REPL and disable Rich.Live if so (#34782) --- airbyte-lib/airbyte_lib/progress.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte-lib/airbyte_lib/progress.py b/airbyte-lib/airbyte_lib/progress.py index d1b7c5355fe1..14e1044801a0 100644 --- a/airbyte-lib/airbyte_lib/progress.py +++ b/airbyte-lib/airbyte_lib/progress.py @@ -5,6 +5,7 @@ import datetime import math +import sys import time from contextlib import suppress from typing import cast @@ -14,6 +15,8 @@ from rich.markdown import Markdown as RichMarkdown +IS_REPL = hasattr(sys, "ps1") # True if we're in a Python REPL, in which case we can use Rich. + try: IS_NOTEBOOK = True from IPython import display as ipy_display @@ -93,7 +96,7 @@ def __init__(self) -> None: self.last_update_time: float | None = None self.rich_view: RichLive | None = None - if not IS_NOTEBOOK: + if not IS_NOTEBOOK and not IS_REPL: # If we're in a terminal, use a Rich view to display the progress updates. self.rich_view = RichLive() try: From 540b9d48b38cdbf8151ce17c29bc0e3a0396f668 Mon Sep 17 00:00:00 2001 From: "Aaron (\"AJ\") Steers" Date: Tue, 6 Feb 2024 00:36:52 -0800 Subject: [PATCH 12/13] AirbyteLib: Add basic secrets management (#34822) --- airbyte-lib/README.md | 47 +++++++--- airbyte-lib/airbyte_lib/__init__.py | 3 + airbyte-lib/airbyte_lib/exceptions.py | 13 +++ airbyte-lib/airbyte_lib/secrets.py | 86 ++++++++++++++++++ airbyte-lib/docs/generated/airbyte_lib.html | 96 +++++++++++++++++++++ airbyte-lib/examples/run_faker.py | 2 +- airbyte-lib/examples/run_github.py | 28 ++++++ 7 files changed, 264 insertions(+), 11 deletions(-) create mode 100644 airbyte-lib/airbyte_lib/secrets.py create mode 100644 airbyte-lib/examples/run_github.py diff --git a/airbyte-lib/README.md b/airbyte-lib/README.md index c65e5fba04fc..3fb34c0d517b 100644 --- a/airbyte-lib/README.md +++ b/airbyte-lib/README.md @@ -4,17 +4,44 @@ airbyte-lib is a library that allows to run Airbyte syncs embedded into any Pyth ## Development -* Make sure [Poetry is installed](https://python-poetry.org/docs/#). -* Run `poetry install` -* For examples, check out the `examples` folder. They can be run via `poetry run python examples/` -* Unit tests and type checks can be run via `poetry run pytest` +- Make sure [Poetry is installed](https://python-poetry.org/docs/#). +- Run `poetry install` +- For examples, check out the `examples` folder. They can be run via `poetry run python examples/` +- Unit tests and type checks can be run via `poetry run pytest` ## Release -* In your PR: - * Bump the version in `pyproject.toml` - * Add a changelog entry to the table below -* Once the PR is merged, go to Github and trigger the `Publish AirbyteLib Manually` workflow. This will publish the new version to PyPI. +- In your PR: + - Bump the version in `pyproject.toml` + - Add a changelog entry to the table below +- Once the PR is merged, go to Github and trigger the `Publish AirbyteLib Manually` workflow. This will publish the new version to PyPI. + +## Secrets Management + +AirbyteLib can auto-import secrets from the following sources: + +1. Environment variables. +2. [Google Colab secrets](https://medium.com/@parthdasawant/how-to-use-secrets-in-google-colab-450c38e3ec75). +3. Manual entry via [`getpass`](https://docs.python.org/3.9/library/getpass.html). + +_Note: Additional secret store options may be supported in the future. [More info here.](https://github.com/airbytehq/airbyte-lib-private-beta/discussions/5)_ + +### Retrieving Secrets + +```python +from airbyte_lib import get_secret, SecretSource + +source = get_connection("source-github") +source.set_config( + "credentials": { + "personal_access_token": get_secret("GITHUB_PERSONAL_ACCESS_TOKEN"), + } +) +``` + +The `get_secret()` function accepts an optional `source` argument of enum type `SecretSource`. If omitted or set to `SecretSource.ANY`, AirbyteLib will search all available secrets sources. If `source` is set to a specific source, then only that source will be checked. If a list of `SecretSource` entries is passed, then the sources will be checked using the provided ordering. + +By default, AirbyteLib will prompt the user for any requested secrets that are not provided via other secret managers. You can disable this prompt by passing `prompt=False` to `get_secret()`. ### Versioning @@ -24,13 +51,13 @@ Versioning follows [Semantic Versioning](https://semver.org/). For new features, Regular documentation lives in the `/docs` folder. Based on the doc strings of public methods, we generate API documentation using [pdoc](https://pdoc.dev). To generate the documentation, run `poetry run generate-docs`. The documentation will be generated in the `docs/generate` folder. This needs to be done manually when changing the public interface of the library. -A unit test validates the documentation is up to date. +A unit test validates the documentation is up to date. ## Validating source connectors To validate a source connector for compliance, the `airbyte-lib-validate-source` script can be used. It can be used like this: -``` +```bash airbyte-lib-validate-source —connector-dir . -—sample-config secrets/config.json ``` diff --git a/airbyte-lib/airbyte_lib/__init__.py b/airbyte-lib/airbyte_lib/__init__.py index 86ffb5e0fa40..63afc6ce99c6 100644 --- a/airbyte-lib/airbyte_lib/__init__.py +++ b/airbyte-lib/airbyte_lib/__init__.py @@ -6,6 +6,7 @@ from airbyte_lib.caches import DuckDBCache, DuckDBCacheConfig from airbyte_lib.datasets import CachedDataset from airbyte_lib.results import ReadResult +from airbyte_lib.secrets import SecretSource, get_secret from airbyte_lib.source import Source @@ -15,7 +16,9 @@ "DuckDBCacheConfig", "get_connector", "get_default_cache", + "get_secret", "new_local_cache", "ReadResult", + "SecretSource", "Source", ] diff --git a/airbyte-lib/airbyte_lib/exceptions.py b/airbyte-lib/airbyte_lib/exceptions.py index ac808ff50831..bcd5fb966fc0 100644 --- a/airbyte-lib/airbyte_lib/exceptions.py +++ b/airbyte-lib/airbyte_lib/exceptions.py @@ -250,3 +250,16 @@ class AirbyteStreamNotFoundError(AirbyteConnectorError): stream_name: str | None = None available_streams: list[str] | None = None + + +@dataclass +class AirbyteLibSecretNotFoundError(AirbyteError): + """Secret not found.""" + + guidance = "Please ensure that the secret is set." + help_url = ( + "https://docs.airbyte.com/using-airbyte/airbyte-lib/getting-started#secrets-management" + ) + + secret_name: str | None = None + sources: list[str] | None = None diff --git a/airbyte-lib/airbyte_lib/secrets.py b/airbyte-lib/airbyte_lib/secrets.py new file mode 100644 index 000000000000..90774e9ee9c2 --- /dev/null +++ b/airbyte-lib/airbyte_lib/secrets.py @@ -0,0 +1,86 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +"""Secrets management for AirbyteLib.""" +from __future__ import annotations + +import os +from enum import Enum, auto +from getpass import getpass + +from airbyte_lib import exceptions as exc + + +class SecretSource(Enum): + ENV = auto() + GOOGLE_COLAB = auto() + ANY = auto() + + PROMPT = auto() + + +ALL_SOURCES = [ + SecretSource.ENV, + SecretSource.GOOGLE_COLAB, +] + +try: + from google.colab import userdata as colab_userdata +except ImportError: + colab_userdata = None + + +def get_secret( + secret_name: str, + source: SecretSource | list[SecretSource] = SecretSource.ANY, + *, + prompt: bool = True, +) -> str: + """Get a secret from the environment. + + The optional `source` argument of enum type `SecretSource` or list of `SecretSource` options. + If left blank, the `source` arg will be `SecretSource.ANY`. If `source` is set to a specific + source, then only that source will be checked. If a list of `SecretSource` entries is passed, + then the sources will be checked using the provided ordering. + + If `prompt` to `True` or if SecretSource.PROMPT is declared in the `source` arg, then the + user will be prompted to enter the secret if it is not found in any of the other sources. + """ + sources = [source] if not isinstance(source, list) else source + if SecretSource.ANY in sources: + sources += [s for s in ALL_SOURCES if s not in sources] + sources.remove(SecretSource.ANY) + + if prompt or SecretSource.PROMPT in sources: + if SecretSource.PROMPT in sources: + sources.remove(SecretSource.PROMPT) + + sources.append(SecretSource.PROMPT) # Always check prompt last + + for s in sources: + val = _get_secret_from_source(secret_name, s) + if val: + return val + + raise exc.AirbyteLibSecretNotFoundError( + secret_name=secret_name, + sources=[str(s) for s in sources], + ) + + +def _get_secret_from_source( + secret_name: str, + source: SecretSource, +) -> str | None: + if source in [SecretSource.ENV, SecretSource.ANY] and secret_name in os.environ: + return os.environ[secret_name] + + if ( + source in [SecretSource.GOOGLE_COLAB, SecretSource.ANY] + and colab_userdata is not None + and colab_userdata.get(secret_name, None) + ): + return colab_userdata.get(secret_name) + + if source == SecretSource.PROMPT: + return getpass(f"Enter the value for secret '{secret_name}': ") + + return None diff --git a/airbyte-lib/docs/generated/airbyte_lib.html b/airbyte-lib/docs/generated/airbyte_lib.html index 73e8779d3fd7..00b5d41acf68 100644 --- a/airbyte-lib/docs/generated/airbyte_lib.html +++ b/airbyte-lib/docs/generated/airbyte_lib.html @@ -295,6 +295,29 @@
Inherited Members
+ +
+
+ + def + get_secret( secret_name: str, source: SecretSource | list[SecretSource] = <SecretSource.ANY: 3>, *, prompt: bool = True) -> str: + + +
+ + +

Get a secret from the environment.

+ +

The optional source argument of enum type SecretSource or list of SecretSource options. +If left blank, the source arg will be SecretSource.ANY. If source is set to a specific +source, then only that source will be checked. If a list of SecretSource entries is passed, +then the sources will be checked using the provided ordering.

+ +

If prompt to True or if SecretSource.PROMPT is declared in the source arg, then the +user will be prompted to enter the secret if it is not found in any of the other sources.

+
+ +
@@ -404,6 +427,79 @@
Inherited Members
items
values
+
+ + +
+
+
+ + class + SecretSource(enum.Enum): + + +
+ + +

An enumeration.

+
+ + +
+
+ ENV = +<SecretSource.ENV: 1> + + +
+ + + + +
+
+
+ GOOGLE_COLAB = +<SecretSource.GOOGLE_COLAB: 2> + + +
+ + + + +
+
+
+ ANY = +<SecretSource.ANY: 3> + + +
+ + + + +
+
+
+ PROMPT = +<SecretSource.PROMPT: 4> + + +
+ + + + +
+
+
Inherited Members
+
+
enum.Enum
+
name
+
value
+
diff --git a/airbyte-lib/examples/run_faker.py b/airbyte-lib/examples/run_faker.py index 55d1017ed393..0418e5ec6fbb 100644 --- a/airbyte-lib/examples/run_faker.py +++ b/airbyte-lib/examples/run_faker.py @@ -26,4 +26,4 @@ result = source.read() for name, records in result.streams.items(): - print(f"Stream {name}: {len(list(records))} records") + print(f"Stream {name}: {len(records)} records") diff --git a/airbyte-lib/examples/run_github.py b/airbyte-lib/examples/run_github.py new file mode 100644 index 000000000000..e54154d21562 --- /dev/null +++ b/airbyte-lib/examples/run_github.py @@ -0,0 +1,28 @@ +# Copyright (c) 2023 Airbyte, Inc., all rights reserved. +"""A simple test of AirbyteLib, using the Faker source connector. + +Usage (from airbyte-lib root directory): +> poetry run python ./examples/run_faker.py + +No setup is needed, but you may need to delete the .venv-source-faker folder +if your installation gets interrupted or corrupted. +""" +from __future__ import annotations + +import airbyte_lib as ab + + +GITHUB_TOKEN = ab.get_secret("GITHUB_PERSONAL_ACCESS_TOKEN") + + +source = ab.get_connector("source-github") +source.set_config( + {"repositories": ["airbytehq/airbyte"], "credentials": {"personal_access_token": GITHUB_TOKEN}} +) +source.check() +source.set_streams(["products", "users", "purchases"]) + +result = source.read() + +for name, records in result.streams.items(): + print(f"Stream {name}: {len(records)} records") From 0b8496cb8f95d7c19a720ad6413223ee41d7e8bd Mon Sep 17 00:00:00 2001 From: Joe Reuter Date: Tue, 6 Feb 2024 10:25:55 +0100 Subject: [PATCH 13/13] airbyte-lib: Add testing to connectors (#34044) --- airbyte-ci/connectors/pipelines/README.md | 3 + .../pipelines/airbyte_ci/connectors/consts.py | 1 + .../test/steps/python_connectors.py | 52 +++++++++++++ airbyte-ci/connectors/pipelines/poetry.lock | 13 +++- .../connectors/pipelines/pyproject.toml | 3 +- .../test_tests/test_python_connectors.py | 75 ++++++++++++++++++- 6 files changed, 143 insertions(+), 4 deletions(-) diff --git a/airbyte-ci/connectors/pipelines/README.md b/airbyte-ci/connectors/pipelines/README.md index fd93396e4b86..59a98e692937 100644 --- a/airbyte-ci/connectors/pipelines/README.md +++ b/airbyte-ci/connectors/pipelines/README.md @@ -261,6 +261,7 @@ flowchart TD build[Build connector docker image] unit[Run unit tests] integration[Run integration tests] + airbyte_lib_validation[Run airbyte-lib validation tests] cat[Run connector acceptance tests] secret[Load connector configuration] @@ -268,6 +269,7 @@ flowchart TD unit-->build secret-->integration secret-->cat + secret-->airbyte_lib_validation build-->integration build-->cat end @@ -610,6 +612,7 @@ E.G.: running `pytest` on a specific test folder: | Version | PR | Description | | ------- | ---------------------------------------------------------- | -------------------------------------------------------------------------------------------------------------------------- | +| 3.10.2 | [#34044](https://github.com/airbytehq/airbyte/pull/34044) | Add pypi validation testing. | | 3.10.1 | [#34756](https://github.com/airbytehq/airbyte/pull/34756) | Enable connectors tests in draft PRs. | | 3.10.0 | [#34606](https://github.com/airbytehq/airbyte/pull/34606) | Allow configuration of separate check URL to check whether package exists already. | | 3.9.0 | [#34606](https://github.com/airbytehq/airbyte/pull/34606) | Allow configuration of python registry URL via environment variable. | diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/consts.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/consts.py index 10e00bc67dad..349b52cbe597 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/consts.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/consts.py @@ -14,6 +14,7 @@ class CONNECTOR_TEST_STEP_ID(str, Enum): BUILD = "build" CHECK_BASE_IMAGE = "check_base_image" INTEGRATION = "integration" + AIRBYTE_LIB_VALIDATION = "airbyte_lib_validation" METADATA_VALIDATION = "metadata_validation" QA_CHECKS = "qa_checks" UNIT = "unit" diff --git a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py index 5b2d71c465c6..352a404a2433 100644 --- a/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py +++ b/airbyte-ci/connectors/pipelines/pipelines/airbyte_ci/connectors/test/steps/python_connectors.py @@ -7,15 +7,18 @@ from abc import ABC, abstractmethod from typing import List, Sequence, Tuple +import dpath.util import pipelines.dagger.actions.python.common import pipelines.dagger.actions.system.docker from dagger import Container, File +from pipelines import hacks from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages from pipelines.airbyte_ci.connectors.consts import CONNECTOR_TEST_STEP_ID from pipelines.airbyte_ci.connectors.context import ConnectorContext from pipelines.airbyte_ci.connectors.test.steps.common import AcceptanceTests, CheckBaseImageIsUsed from pipelines.consts import LOCAL_BUILD_PLATFORM from pipelines.dagger.actions import secrets +from pipelines.dagger.actions.python.poetry import with_poetry from pipelines.helpers.execution.run_steps import STEP_TREE, StepToRun from pipelines.models.steps import STEP_PARAMS, Step, StepResult @@ -189,6 +192,49 @@ def default_params(self) -> STEP_PARAMS: return super().default_params | coverage_options +class AirbyteLibValidation(Step): + """A step to validate the connector will work with airbyte-lib, using the airbyte-lib validation helper.""" + + title = "AirbyteLib validation tests" + + context: ConnectorContext + + async def _run(self, connector_under_test: Container) -> StepResult: + """Run all pytest tests declared in the test directory of the connector code. + Args: + connector_under_test (Container): The connector under test container. + Returns: + StepResult: Failure or success of the unit tests with stdout and stdout. + """ + if dpath.util.get(self.context.connector.metadata, "remoteRegistries/pypi/enabled", default=False) is False: + return self.skip("Connector is not published on pypi, skipping airbyte-lib validation.") + + test_environment = await self.install_testing_environment(with_poetry(self.context)) + test_execution = test_environment.with_( + hacks.never_fail_exec(["airbyte-lib-validate-source", "--connector-dir", ".", "--validate-install-only"]) + ) + + return await self.get_step_result(test_execution) + + async def install_testing_environment( + self, + built_connector_container: Container, + ) -> Container: + """Add airbyte-lib and secrets to the test environment.""" + context: ConnectorContext = self.context + + container_with_test_deps = await pipelines.dagger.actions.python.common.with_python_package( + self.context, built_connector_container.with_entrypoint([]), str(context.connector.code_directory) + ) + return container_with_test_deps.with_exec( + [ + "pip", + "install", + "airbyte-lib", + ] + ) + + class IntegrationTests(PytestStep): """A step to run the connector integration tests with Pytest.""" @@ -218,6 +264,12 @@ def get_test_steps(context: ConnectorContext) -> STEP_TREE: args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM]}, depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], ), + StepToRun( + id=CONNECTOR_TEST_STEP_ID.AIRBYTE_LIB_VALIDATION, + step=AirbyteLibValidation(context), + args=lambda results: {"connector_under_test": results[CONNECTOR_TEST_STEP_ID.BUILD].output_artifact[LOCAL_BUILD_PLATFORM]}, + depends_on=[CONNECTOR_TEST_STEP_ID.BUILD], + ), StepToRun( id=CONNECTOR_TEST_STEP_ID.ACCEPTANCE, step=AcceptanceTests(context, context.concurrent_cat), diff --git a/airbyte-ci/connectors/pipelines/poetry.lock b/airbyte-ci/connectors/pipelines/poetry.lock index ded75cac60a8..606704d4f78f 100644 --- a/airbyte-ci/connectors/pipelines/poetry.lock +++ b/airbyte-ci/connectors/pipelines/poetry.lock @@ -647,6 +647,17 @@ websocket-client = ">=0.32.0" [package.extras] ssh = ["paramiko (>=2.4.3)"] +[[package]] +name = "dpath" +version = "2.1.6" +description = "Filesystem-like pathing and searching for dictionaries" +optional = false +python-versions = ">=3.7" +files = [ + {file = "dpath-2.1.6-py3-none-any.whl", hash = "sha256:31407395b177ab63ef72e2f6ae268c15e938f2990a8ecf6510f5686c02b6db73"}, + {file = "dpath-2.1.6.tar.gz", hash = "sha256:f1e07c72e8605c6a9e80b64bc8f42714de08a789c7de417e49c3f87a19692e47"}, +] + [[package]] name = "editor" version = "1.6.5" @@ -2576,4 +2587,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "~3.10" -content-hash = "de011a00b912c9acd6d4f202c7cf54308010efc14a214ee25608712851306aa9" +content-hash = "68c67e470fdf10ce89ada657bbbaa94dfbad98324b19d866de9cbb924beab5f7" diff --git a/airbyte-ci/connectors/pipelines/pyproject.toml b/airbyte-ci/connectors/pipelines/pyproject.toml index ab09cd6bfca8..1a1fde77700d 100644 --- a/airbyte-ci/connectors/pipelines/pyproject.toml +++ b/airbyte-ci/connectors/pipelines/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "pipelines" -version = "3.10.1" +version = "3.10.2" description = "Packaged maintained by the connector operations team to perform CI for connectors' pipelines" authors = ["Airbyte "] @@ -30,6 +30,7 @@ certifi = "^2023.11.17" tomli = "^2.0.1" tomli-w = "^1.0.0" types-requests = "2.28.2" +dpath = "^2.1.6" [tool.poetry.group.dev.dependencies] freezegun = "^1.2.2" diff --git a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py index 2d89af9ec94d..4b36571fb7fa 100644 --- a/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py +++ b/airbyte-ci/connectors/pipelines/tests/test_tests/test_python_connectors.py @@ -2,12 +2,14 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +from unittest.mock import patch + import pytest from connector_ops.utils import Connector, ConnectorLanguage from pipelines.airbyte_ci.connectors.build_image.steps.python_connectors import BuildConnectorImages from pipelines.airbyte_ci.connectors.context import ConnectorContext -from pipelines.airbyte_ci.connectors.test.steps.python_connectors import UnitTests -from pipelines.models.steps import StepResult +from pipelines.airbyte_ci.connectors.test.steps.python_connectors import AirbyteLibValidation, UnitTests +from pipelines.models.steps import StepResult, StepStatus pytestmark = [ pytest.mark.anyio, @@ -105,3 +107,72 @@ def test_params(self, context_for_certified_connector_with_setup): f"--cov={context_for_certified_connector_with_setup.connector.technical_name.replace('-', '_')}", f"--cov-fail-under={step.MINIMUM_COVERAGE_FOR_CERTIFIED_CONNECTORS}", ] + + +class TestAirbyteLibValidationTests: + @pytest.fixture + def compatible_connector(self): + return Connector("source-faker") + + @pytest.fixture + def incompatible_connector(self): + return Connector("source-postgres") + + @pytest.fixture + def context_for_valid_connector(self, compatible_connector, dagger_client, current_platform): + context = ConnectorContext( + pipeline_name="test airbyte-lib validation", + connector=compatible_connector, + git_branch="test", + git_revision="test", + report_output_prefix="test", + is_local=True, + use_remote_secrets=True, + targeted_platforms=[current_platform], + ) + context.dagger_client = dagger_client + return context + + @pytest.fixture + def context_for_invalid_connector(self, incompatible_connector, dagger_client, current_platform): + context = ConnectorContext( + pipeline_name="test airbyte-lib validation", + connector=incompatible_connector, + git_branch="test", + git_revision="test", + report_output_prefix="test", + is_local=True, + use_remote_secrets=True, + targeted_platforms=[current_platform], + ) + context.dagger_client = dagger_client + return context + + async def test__run_validation_success(self, mocker, context_for_valid_connector: ConnectorContext): + result = await AirbyteLibValidation(context_for_valid_connector)._run(mocker.MagicMock()) + assert isinstance(result, StepResult) + assert result.status == StepStatus.SUCCESS + assert "Creating source and validating spec is returned successfully..." in result.stdout + + async def test__run_validation_skip_unpublished_connector( + self, + mocker, + context_for_invalid_connector: ConnectorContext, + ): + result = await AirbyteLibValidation(context_for_invalid_connector)._run(mocker.MagicMock()) + assert isinstance(result, StepResult) + assert result.status == StepStatus.SKIPPED + + async def test__run_validation_fail( + self, + mocker, + context_for_invalid_connector: ConnectorContext, + ): + metadata = context_for_invalid_connector.connector.metadata + metadata["remoteRegistries"] = {"pypi": {"enabled": True, "packageName": "airbyte-source-postgres"}} + metadata_mock = mocker.PropertyMock(return_value=metadata) + with patch.object(Connector, "metadata", metadata_mock): + result = await AirbyteLibValidation(context_for_invalid_connector)._run(mocker.MagicMock()) + assert isinstance(result, StepResult) + assert result.status == StepStatus.FAILURE + assert "does not appear to be a Python project" in result.stderr