diff --git a/airbyte-bootloader/build.gradle b/airbyte-bootloader/build.gradle index 132c7972d95b..f704ec6ccfaf 100644 --- a/airbyte-bootloader/build.gradle +++ b/airbyte-bootloader/build.gradle @@ -15,7 +15,7 @@ dependencies { implementation libs.flyway.core testImplementation libs.platform.testcontainers.postgresql - testImplementation 'uk.org.webcompere:system-stubs-jupiter:1.2.0' + testImplementation 'uk.org.webcompere:system-stubs-jupiter:2.0.1' } application { diff --git a/airbyte-cdk/python/CHANGELOG.md b/airbyte-cdk/python/CHANGELOG.md index b3cebc199db6..f69a6b8f5046 100644 --- a/airbyte-cdk/python/CHANGELOG.md +++ b/airbyte-cdk/python/CHANGELOG.md @@ -1,5 +1,8 @@ # Changelog +## 0.11.2 +Low-code: Include the HTTP method used by the request in logging output of the `airbyte-cdk` + ## 0.11.1 Low-code: Fix the component manifest schema to and validate check instead of checker diff --git a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py index 389ffcc19d66..d1ac63e76e82 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/abstract_source.py @@ -226,7 +226,7 @@ def _read_incremental( sync_mode=SyncMode.incremental, stream_state=stream_state, ) - logger.debug(f"Processing stream slices for {stream_name}", extra={"stream_slices": slices}) + logger.debug(f"Processing stream slices for {stream_name} (sync_mode: incremental)", extra={"stream_slices": slices}) total_records_counter = 0 has_slices = False @@ -276,7 +276,9 @@ def _read_full_refresh( internal_config: InternalConfig, ) -> Iterator[AirbyteMessage]: slices = stream_instance.stream_slices(sync_mode=SyncMode.full_refresh, cursor_field=configured_stream.cursor_field) - logger.debug(f"Processing stream slices for {configured_stream.stream.name}", extra={"stream_slices": slices}) + logger.debug( + f"Processing stream slices for {configured_stream.stream.name} (sync_mode: full_refresh)", extra={"stream_slices": slices} + ) total_records_counter = 0 for _slice in slices: logger.debug("Processing stream slice", extra={"slice": _slice}) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index 867eb8dd837f..f25eaf49d028 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -413,7 +413,7 @@ def parse_records_and_emit_request_and_responses(self, request, response, stream def _create_trace_message_from_request(self, request: requests.PreparedRequest): # FIXME: this should return some sort of trace message - request_dict = {"url": request.url, "headers": dict(request.headers), "body": request.body} + request_dict = {"url": request.url, "http_method": request.method, "headers": dict(request.headers), "body": request.body} log_message = filter_secrets(f"request:{json.dumps(request_dict)}") return AirbyteMessage(type=MessageType.LOG, log=AirbyteLogMessage(level=Level.INFO, message=log_message)) diff --git a/airbyte-cdk/python/setup.py b/airbyte-cdk/python/setup.py index e6e8dfd43b24..1ed8be21dbb1 100644 --- a/airbyte-cdk/python/setup.py +++ b/airbyte-cdk/python/setup.py @@ -15,7 +15,7 @@ setup( name="airbyte-cdk", - version="0.11.1", + version="0.11.2", description="A framework for writing Airbyte Connectors.", long_description=README, long_description_content_type="text/markdown", diff --git a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java index 760e15c634d7..051d9597a5c6 100644 --- a/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java +++ b/airbyte-config/config-persistence/src/main/java/io/airbyte/config/persistence/ConfigRepository.java @@ -132,7 +132,7 @@ public boolean healthCheck() { public StandardWorkspace getStandardWorkspaceNoSecrets(final UUID workspaceId, final boolean includeTombstone) throws JsonValidationException, IOException, ConfigNotFoundException { - return listWorkspaceQuery(includeTombstone) + return listWorkspaceQuery(Optional.of(workspaceId), includeTombstone) .findFirst() .orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_WORKSPACE, workspaceId)); } @@ -158,13 +158,14 @@ public StandardWorkspace getWorkspaceBySlug(final String slug, final boolean inc } public List listStandardWorkspaces(final boolean includeTombstone) throws IOException { - return listWorkspaceQuery(includeTombstone).toList(); + return listWorkspaceQuery(Optional.empty(), includeTombstone).toList(); } - private Stream listWorkspaceQuery(final boolean includeTombstone) throws IOException { + private Stream listWorkspaceQuery(final Optional workspaceId, final boolean includeTombstone) throws IOException { return database.query(ctx -> ctx.select(WORKSPACE.asterisk()) .from(WORKSPACE) .where(includeTombstone ? noCondition() : WORKSPACE.TOMBSTONE.notEqual(true)) + .and(workspaceId.map(WORKSPACE.ID::eq).orElse(noCondition())) .fetch()) .stream() .map(DbConverter::buildStandardWorkspace); @@ -907,7 +908,7 @@ private static StandardSyncOperation buildStandardSyncOperation(final Record rec } public StandardSyncOperation getStandardSyncOperation(final UUID operationId) throws JsonValidationException, IOException, ConfigNotFoundException { - return listStandardSyncOperationQuery(Optional.empty()) + return listStandardSyncOperationQuery(Optional.of(operationId)) .findFirst() .orElseThrow(() -> new ConfigNotFoundException(ConfigSchema.STANDARD_SYNC_OPERATION, operationId)); } diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java index e33c8adc65ad..332429c88f59 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java @@ -341,7 +341,7 @@ private SourceConnection createSourceConnection(final UUID workspaceId, final St } private DestinationConnection createDestinationConnection(final UUID workspaceId, final StandardDestinationDefinition destDef) - throws JsonValidationException, IOException { + throws IOException { final UUID destinationId = UUID.randomUUID(); final DestinationConnection dest = new DestinationConnection() .withName("source-" + destinationId) diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java index c6d4d10aa3f6..f18d20f4b7f3 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StatePersistenceTest.java @@ -39,7 +39,6 @@ class StatePersistenceTest extends BaseConfigDatabaseTest { - private ConfigRepository configRepository; private StatePersistence statePersistence; private UUID connectionId; private static final String STATE_ONE = "\"state1\""; @@ -58,7 +57,7 @@ void beforeEach() throws DatabaseInitializationException, IOException, JsonValid } private void setupTestData() throws JsonValidationException, IOException { - configRepository = new ConfigRepository( + final ConfigRepository configRepository = new ConfigRepository( database, new ActorDefinitionMigrator(new ExceptionWrappingDatabase(database)), new StandardSyncPersistence(database)); @@ -68,15 +67,14 @@ private void setupTestData() throws JsonValidationException, IOException { final SourceConnection sourceConnection = MockData.sourceConnections().get(0); final StandardDestinationDefinition destinationDefinition = MockData.publicDestinationDefinition(); final DestinationConnection destinationConnection = MockData.destinationConnections().get(0); - final StandardSync sync = MockData.standardSyncs().get(0); + // we don't need sync operations in this test suite, zero them out. + final StandardSync sync = Jsons.clone(MockData.standardSyncs().get(0)).withOperationIds(Collections.emptyList()); configRepository.writeStandardWorkspaceNoSecrets(workspace); configRepository.writeStandardSourceDefinition(sourceDefinition); configRepository.writeSourceConnectionNoSecrets(sourceConnection); configRepository.writeStandardDestinationDefinition(destinationDefinition); configRepository.writeDestinationConnectionNoSecrets(destinationConnection); - configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(0)); - configRepository.writeStandardSyncOperation(MockData.standardSyncOperations().get(1)); configRepository.writeStandardSync(sync); connectionId = sync.getConnectionId(); @@ -239,7 +237,7 @@ void testGlobalPartialReset() throws IOException { .withType(AirbyteStateType.GLOBAL) .withGlobal(new AirbyteGlobalState() .withSharedState(Jsons.deserialize(GLOBAL_STATE)) - .withStreamStates(Arrays.asList( + .withStreamStates(List.of( new AirbyteStreamState() .withStreamDescriptor(new StreamDescriptor().withName("s1")) .withStreamState(Jsons.deserialize(STATE_TWO)))))); @@ -424,7 +422,7 @@ void testStreamPartialUpdates() throws IOException { assertEquals( new StateWrapper() .withStateType(StateType.STREAM) - .withStateMessages(Arrays.asList( + .withStateMessages(List.of( new AirbyteStateMessage() .withType(AirbyteStateType.STREAM) .withStream(new AirbyteStreamState() diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/SyncOperationPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/SyncOperationPersistenceTest.java new file mode 100644 index 000000000000..8e293f3a52d6 --- /dev/null +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/SyncOperationPersistenceTest.java @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2022 Airbyte, Inc., all rights reserved. + */ + +package io.airbyte.config.persistence; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import io.airbyte.config.Geography; +import io.airbyte.config.OperatorDbt; +import io.airbyte.config.OperatorNormalization; +import io.airbyte.config.OperatorNormalization.Option; +import io.airbyte.config.OperatorWebhook; +import io.airbyte.config.StandardSyncOperation; +import io.airbyte.config.StandardSyncOperation.OperatorType; +import io.airbyte.config.StandardWorkspace; +import io.airbyte.validation.json.JsonValidationException; +import java.io.IOException; +import java.util.List; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class SyncOperationPersistenceTest extends BaseConfigDatabaseTest { + + private static final UUID WORKSPACE_ID = UUID.randomUUID(); + private static final UUID WEBHOOK_CONFIG_ID = UUID.randomUUID(); + private static final String WEBHOOK_OPERATION_EXECUTION_URL = "test-webhook-url"; + private static final String WEBHOOK_OPERATION_EXECUTION_BODY = "test-webhook-body"; + + private ConfigRepository configRepository; + + private static final StandardSyncOperation DBT_OP = new StandardSyncOperation() + .withName("operation-1") + .withTombstone(false) + .withOperationId(UUID.randomUUID()) + .withWorkspaceId(WORKSPACE_ID) + .withOperatorDbt(new OperatorDbt() + .withDbtArguments("dbt-arguments") + .withDockerImage("image-tag") + .withGitRepoBranch("git-repo-branch") + .withGitRepoUrl("git-repo-url")) + .withOperatorNormalization(null) + .withOperatorType(OperatorType.DBT); + private static final StandardSyncOperation NORMALIZATION_OP = new StandardSyncOperation() + .withName("operation-1") + .withTombstone(false) + .withOperationId(UUID.randomUUID()) + .withWorkspaceId(WORKSPACE_ID) + .withOperatorDbt(null) + .withOperatorNormalization(new OperatorNormalization().withOption(Option.BASIC)) + .withOperatorType(OperatorType.NORMALIZATION); + private static final StandardSyncOperation WEBHOOK_OP = new StandardSyncOperation() + .withName("webhook-operation") + .withTombstone(false) + .withOperationId(UUID.randomUUID()) + .withWorkspaceId(WORKSPACE_ID) + .withOperatorType(OperatorType.WEBHOOK) + .withOperatorDbt(null) + .withOperatorNormalization(null) + .withOperatorWebhook( + new OperatorWebhook() + .withWebhookConfigId(WEBHOOK_CONFIG_ID) + .withExecutionUrl(WEBHOOK_OPERATION_EXECUTION_URL) + .withExecutionBody(WEBHOOK_OPERATION_EXECUTION_BODY)); + private static final List OPS = List.of(DBT_OP, NORMALIZATION_OP, WEBHOOK_OP); + + @BeforeEach + void beforeEach() throws Exception { + truncateAllTables(); + + configRepository = new ConfigRepository(database); + createWorkspace(); + + for (final StandardSyncOperation op : OPS) { + configRepository.writeStandardSyncOperation(op); + } + } + + @Test + void testReadWrite() throws IOException, ConfigNotFoundException, JsonValidationException { + for (final StandardSyncOperation op : OPS) { + assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId())); + } + } + + @Test + void testReadNotExists() { + assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID())); + } + + @Test + void testList() throws IOException, JsonValidationException { + assertEquals(OPS, configRepository.listStandardSyncOperations()); + } + + @Test + void testDelete() throws IOException, ConfigNotFoundException, JsonValidationException { + for (final StandardSyncOperation op : OPS) { + assertEquals(op, configRepository.getStandardSyncOperation(op.getOperationId())); + configRepository.deleteStandardSyncOperation(op.getOperationId()); + assertThrows(ConfigNotFoundException.class, () -> configRepository.getStandardSyncOperation(UUID.randomUUID())); + + } + } + + private void createWorkspace() throws IOException, JsonValidationException { + final StandardWorkspace workspace = new StandardWorkspace() + .withWorkspaceId(WORKSPACE_ID) + .withName("Another Workspace") + .withSlug("another-workspace") + .withInitialSetupComplete(true) + .withTombstone(false) + .withDefaultGeography(Geography.AUTO); + configRepository.writeStandardWorkspaceNoSecrets(workspace); + } + +} diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java index d41e2edcc05b..8077d3fdcbc6 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/WorkspacePersistenceTest.java @@ -41,6 +41,12 @@ void setup() { null)); } + @Test + void testGetWorkspace() throws ConfigNotFoundException, IOException, JsonValidationException { + configRepository.writeStandardWorkspaceNoSecrets(createBaseStandardWorkspace().withWorkspaceId(UUID.randomUUID())); + assertReturnsWorkspace(createBaseStandardWorkspace()); + } + @Test void testWorkspaceWithNullTombstone() throws ConfigNotFoundException, IOException, JsonValidationException { assertReturnsWorkspace(createBaseStandardWorkspace()); diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 4cade59107a4..99b81111eabe 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -457,10 +457,10 @@ icon: facebook.svg sourceType: api releaseStage: alpha -- name: "Sample Data (faker)" +- name: Sample Data (Faker) sourceDefinitionId: dfd88b22-b603-4c3d-aad7-3701784586b1 dockerRepository: airbyte/source-faker - dockerImageTag: 0.2.1 + dockerImageTag: 1.0.0 documentationUrl: https://docs.airbyte.com/integrations/sources/faker sourceType: api releaseStage: alpha @@ -554,7 +554,7 @@ - name: Gitlab sourceDefinitionId: 5e6175e5-68e1-4c17-bff9-56103bbb0d80 dockerRepository: airbyte/source-gitlab - dockerImageTag: 0.1.6 + dockerImageTag: 0.1.7 documentationUrl: https://docs.airbyte.com/integrations/sources/gitlab icon: gitlab.svg sourceType: api @@ -753,7 +753,7 @@ - name: Iterable sourceDefinitionId: 2e875208-0c0b-4ee4-9e92-1cb3156ea799 dockerRepository: airbyte/source-iterable - dockerImageTag: 0.1.21 + dockerImageTag: 0.1.22 documentationUrl: https://docs.airbyte.com/integrations/sources/iterable icon: iterable.svg sourceType: api @@ -1484,7 +1484,7 @@ - name: Slack sourceDefinitionId: c2281cee-86f9-4a86-bb48-d23286b4c7bd dockerRepository: airbyte/source-slack - dockerImageTag: 0.1.18 + dockerImageTag: 0.1.19 documentationUrl: https://docs.airbyte.com/integrations/sources/slack icon: slack.svg sourceType: api diff --git a/airbyte-config/init/src/main/resources/seed/source_specs.yaml b/airbyte-config/init/src/main/resources/seed/source_specs.yaml index 4c387ba4eea5..b6d1b111e9b4 100644 --- a/airbyte-config/init/src/main/resources/seed/source_specs.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_specs.yaml @@ -3832,7 +3832,7 @@ oauthFlowInitParameters: [] oauthFlowOutputParameters: - - "access_token" -- dockerImage: "airbyte/source-faker:0.2.1" +- dockerImage: "airbyte/source-faker:1.0.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/faker" connectionSpecification: @@ -4638,7 +4638,7 @@ path_in_connector_config: - "credentials" - "client_secret" -- dockerImage: "airbyte/source-gitlab:0.1.6" +- dockerImage: "airbyte/source-gitlab:0.1.7" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/gitlab" connectionSpecification: @@ -4649,7 +4649,7 @@ - "api_url" - "private_token" - "start_date" - additionalProperties: false + additionalProperties: true properties: api_url: type: "string" @@ -6499,7 +6499,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-iterable:0.1.21" +- dockerImage: "airbyte/source-iterable:0.1.22" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/iterable" connectionSpecification: @@ -13331,7 +13331,7 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] -- dockerImage: "airbyte/source-slack:0.1.18" +- dockerImage: "airbyte/source-slack:0.1.19" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/slack" connectionSpecification: @@ -13386,52 +13386,37 @@ - type: "object" title: "Sign in via Slack (OAuth)" required: - - "access_token" + - "option_title" - "client_id" - "client_secret" - - "option_title" + - "access_token" properties: option_title: type: "string" const: "Default OAuth2.0 authorization" client_id: + type: "string" title: "Client ID" description: "Slack client_id. See our docs if you need help finding this id." - type: "string" - examples: - - "slack-client-id-example" - airbyte_secret: true client_secret: + type: "string" title: "Client Secret" description: "Slack client_secret. See our docs if you need help finding this secret." - type: "string" - examples: - - "slack-client-secret-example" airbyte_secret: true access_token: + type: "string" title: "Access token" description: "Slack access_token. See our docs if you need help generating the token." - type: "string" - examples: - - "slack-access-token-example" - airbyte_secret: true - refresh_token: - title: "Refresh token" - description: "Slack refresh_token. See our docs if you need help generating the token." - type: "string" - examples: - - "slack-refresh-token-example" airbyte_secret: true order: 0 - type: "object" title: "API Token" required: - - "api_token" - "option_title" + - "api_token" properties: option_title: type: "string" @@ -13446,18 +13431,44 @@ supportsNormalization: false supportsDBT: false supported_destination_sync_modes: [] - authSpecification: - auth_type: "oauth2.0" - oauth2Specification: - rootObject: - - "credentials" - - "0" - oauthFlowInitParameters: - - - "client_id" - - - "client_secret" - oauthFlowOutputParameters: - - - "access_token" - - - "refresh_token" + advanced_auth: + auth_flow_type: "oauth2.0" + predicate_key: + - "credentials" + - "option_title" + predicate_value: "Default OAuth2.0 authorization" + oauth_config_specification: + complete_oauth_output_specification: + type: "object" + additionalProperties: false + properties: + access_token: + type: "string" + path_in_connector_config: + - "credentials" + - "access_token" + complete_oauth_server_input_specification: + type: "object" + additionalProperties: false + properties: + client_id: + type: "string" + client_secret: + type: "string" + complete_oauth_server_output_specification: + type: "object" + additionalProperties: false + properties: + client_id: + type: "string" + path_in_connector_config: + - "credentials" + - "client_id" + client_secret: + type: "string" + path_in_connector_config: + - "credentials" + - "client_secret" - dockerImage: "airbyte/source-smaily:0.1.0" spec: documentationUrl: "https://docs.airbyte.com/integrations/sources/smaily" diff --git a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py index a729c202aed7..1520ba893df2 100644 --- a/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py +++ b/airbyte-connector-builder-server/unit_tests/connector_builder/impl/test_default_api.py @@ -8,8 +8,6 @@ import pytest from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteRecordMessage, Level, Type -from fastapi import HTTPException - from connector_builder.generated.models.http_request import HttpRequest from connector_builder.generated.models.http_response import HttpResponse from connector_builder.generated.models.stream_read import StreamRead @@ -19,6 +17,7 @@ from connector_builder.generated.models.streams_list_read_streams import StreamsListReadStreams from connector_builder.generated.models.streams_list_request_body import StreamsListRequestBody from connector_builder.impl.default_api import DefaultApiImpl +from fastapi import HTTPException MANIFEST = { "version": "0.1.0", @@ -343,17 +342,13 @@ def test_invalid_manifest(): } expected_status_code = 400 - expected_detail = "Invalid connector manifest with error: 'streams' is a required property" api = DefaultApiImpl() loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: - loop.run_until_complete( - api.read_stream(StreamReadRequestBody(manifest=invalid_manifest, config={}, stream="hashiras")) - ) + loop.run_until_complete(api.read_stream(StreamReadRequestBody(manifest=invalid_manifest, config={}, stream="hashiras"))) assert actual_exception.value.status_code == expected_status_code - assert expected_detail in actual_exception.value.detail def test_read_stream_invalid_group_format(): @@ -371,27 +366,20 @@ def test_read_stream_invalid_group_format(): loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: - loop.run_until_complete( - api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras")) - ) + loop.run_until_complete(api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config=CONFIG, stream="hashiras"))) assert actual_exception.value.status_code == 400 - assert actual_exception.value.detail == "Could not perform read with with error: Every message grouping should have at least one request and response" def test_read_stream_returns_error_if_stream_does_not_exist(): expected_status_code = 400 - expected_detail = "Could not perform read with with error: \"The requested stream not_in_manifest was not found in the source. Available streams: dict_keys(['hashiras', 'breathing-techniques'])\"" api = DefaultApiImpl() loop = asyncio.get_event_loop() with pytest.raises(HTTPException) as actual_exception: - loop.run_until_complete( - api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config={}, stream="not_in_manifest")) - ) + loop.run_until_complete(api.read_stream(StreamReadRequestBody(manifest=MANIFEST, config={}, stream="not_in_manifest"))) assert actual_exception.value.status_code == expected_status_code - assert expected_detail in actual_exception.value.detail @pytest.mark.parametrize( diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java index faf328623007..01d396356921 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/DefaultJdbcDatabase.java @@ -5,6 +5,7 @@ package io.airbyte.db.jdbc; import com.google.errorprone.annotations.MustBeClosed; +import io.airbyte.commons.exceptions.ConfigErrorException; import io.airbyte.commons.exceptions.ConnectionErrorException; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.functional.CheckedFunction; @@ -14,6 +15,7 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; +import java.sql.SQLTransientException; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; @@ -76,8 +78,12 @@ public Stream unsafeResultSetQuery(final CheckedFunction AirbyteConnectionStatus: - """ - Tests if the input configuration can be used to successfully connect to the integration - e.g: if a provided Stripe API token can be used to connect to the Stripe API. +from .streams import Products, Purchases, Users - :param logger: Logging object to display debug/info/error to the logs - (logs will not be accessible via airbyte UI if they are not passed to this logger) - :param config: Json object containing the configuration of this source, content of this json is as specified in - the properties of the spec.json file - :return: AirbyteConnectionStatus indicating a Success or Failure - """ - - # As this is an in-memory source, it always succeeds +class SourceFaker(AbstractSource): + def check_connection(self, logger: AirbyteLogger, config: Mapping[str, Any]) -> Tuple[bool, Any]: if type(config["count"]) == int or type(config["count"]) == float: - return AirbyteConnectionStatus(status=Status.SUCCEEDED) + return True, None else: - return AirbyteConnectionStatus(status=Status.FAILED) - - def discover(self, logger: AirbyteLogger, config: Dict[str, any]) -> AirbyteCatalog: - """ - Returns an AirbyteCatalog representing the available streams and fields in this integration. - For example, given valid credentials to a Postgres database, - returns an Airbyte catalog where each postgres table is a stream, and each table column is a field. - - :param logger: Logging object to display debug/info/error to the logs - (logs will not be accessible via airbyte UI if they are not passed to this logger) - :param config: Json object containing the configuration of this source, content of this json is as specified in - the properties of the spec.json file - - :return: AirbyteCatalog is an object describing a list of all available streams in this source. - A stream is an AirbyteStream object that includes: - - its stream name (or table name in the case of Postgres) - - json_schema providing the specifications of expected schema for this stream (a list of columns described - by their names and types) - """ - streams = [] - dirname = os.path.dirname(os.path.realpath(__file__)) - - # Fake Users - spec_path = os.path.join(dirname, "users_catalog.json") - catalog = read_json(spec_path) - streams.append(AirbyteStream(name="Users", json_schema=catalog, supported_sync_modes=["full_refresh", "incremental"])) + return False, "Count option is missing" - # Fake Products - spec_path = os.path.join(dirname, "products_catalog.json") - catalog = read_json(spec_path) - streams.append(AirbyteStream(name="Products", json_schema=catalog, supported_sync_modes=["full_refresh"])) - - # Fake Purchases - spec_path = os.path.join(dirname, "purchases_catalog.json") - catalog = read_json(spec_path) - streams.append(AirbyteStream(name="Purchases", json_schema=catalog, supported_sync_modes=["full_refresh", "incremental"])) - - return AirbyteCatalog(streams=streams) - - def read( - self, logger: AirbyteLogger, config: Dict[str, any], catalog: ConfiguredAirbyteCatalog, state: Dict[str, any] - ) -> Generator[AirbyteMessage, None, None]: - """ - Returns a generator of the AirbyteMessages generated by reading the source with the given configuration, - catalog, and state. - - :param logger: Logging object to display debug/info/error to the logs - (logs will not be accessible via airbyte UI if they are not passed to this logger) - :param config: Json object containing the configuration of this source, content of this json is as specified in - the properties of the spec.json file - :param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog - returned by discover(), but - in addition, it's been configured in the UI! For each particular stream and field, there may have been provided - with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc - :param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume - replication in the future from that saved checkpoint. - This is the object that is provided with state from previous runs and avoid replicating the entire set of - data everytime. - - :return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object. - """ + def streams(self, config: Mapping[str, Any]) -> List[Stream]: count: int = config["count"] if "count" in config else 0 seed: int = config["seed"] if "seed" in config else None records_per_sync: int = config["records_per_sync"] if "records_per_sync" in config else 500 records_per_slice: int = config["records_per_slice"] if "records_per_slice" in config else 100 - person = Person(locale=Locale.EN, seed=seed) - dt = Datetime(seed=seed) - - to_generate_users = False - to_generate_purchases = False - purchases_stream = None - purchases_count = state["Purchases"]["purchases_count"] if "Purchases" in state else 0 - for stream in catalog.streams: - if stream.stream.name == "Users": - to_generate_users = True - for stream in catalog.streams: - if stream.stream.name == "Purchases": - purchases_stream = stream - to_generate_purchases = True - - if to_generate_purchases and not to_generate_users: - raise ValueError("Purchases stream cannot be enabled without Users stream") - - for stream in catalog.streams: - yield log_stream(stream.stream.name) - - if stream.stream.name == "Users": - cursor = get_stream_cursor(state, stream.stream.name) - total_records = cursor - records_in_sync = 0 - records_in_page = 0 - - users_estimate = count - cursor - yield generate_estimate(stream.stream.name, users_estimate, 450) - yield generate_estimate("Purchases", users_estimate * 1.5, 230) # a fuzzy guess, some users have purchases, some don't - - for i in range(cursor, count): - user = generate_user(person, dt, i) - yield generate_record(stream, user) - total_records += 1 - records_in_sync += 1 - records_in_page += 1 - - if to_generate_purchases: - purchases = generate_purchases(user, purchases_count) - for p in purchases: - yield generate_record(purchases_stream, p) - purchases_count += 1 - - if records_in_page == records_per_slice: - yield generate_state(state, stream, {"cursor": total_records, "seed": seed}) - records_in_page = 0 - - if records_in_sync == records_per_sync: - break - - yield generate_state(state, stream, {"cursor": total_records, "seed": seed}) - if purchases_stream is not None: - yield generate_state(state, purchases_stream, {"purchases_count": purchases_count}) - - elif stream.stream.name == "Products": - products = generate_products() - yield generate_estimate(stream.stream.name, len(products), 180) - for p in products: - yield generate_record(stream, p) - yield generate_state(state, stream, {"product_count": len(products)}) - - elif stream.stream.name == "Purchases": - # Purchases are generated as part of Users stream - True - - else: - raise ValueError(stream.stream.name) - - -def get_stream_cursor(state: Dict[str, any], stream: str) -> int: - cursor = (state[stream]["cursor"] or 0) if stream in state else 0 - return cursor - - -def generate_record(stream: any, data: any): - dict = data.copy() - - # timestamps need to be emitted in ISO format - for key in dict: - if isinstance(dict[key], datetime.datetime): - dict[key] = format_airbyte_time(dict[key]) - - return AirbyteMessage( - type=Type.RECORD, - record=AirbyteRecordMessage(stream=stream.stream.name, data=dict, emitted_at=int(datetime.datetime.now().timestamp()) * 1000), - ) - - -def log_stream(stream_name: str): - return AirbyteMessage( - type=Type.LOG, - log=AirbyteLogMessage( - message="Sending data for stream: " + stream_name, - level="INFO", - emitted_at=int(datetime.datetime.now().timestamp()) * 1000, - ), - ) - - -def generate_estimate(stream_name: str, total: int, bytes_per_row: int): - emitted_at = int(datetime.datetime.now().timestamp() * 1000) - estimate = AirbyteEstimateTraceMessage( - type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row) - ) - return AirbyteMessage(type=Type.TRACE, trace=AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate)) - - -def generate_state(state: Dict[str, any], stream: any, data: any): - state[ - stream.stream.name - ] = data # since we have multiple streams, we need to build up the "combined state" for all streams and emit that each time until the platform has support for per-stream state - return AirbyteMessage(type=Type.STATE, state=AirbyteStateMessage(data=state)) - - -def generate_user(person: Person, dt: Datetime, user_id: int): - time_a = dt.datetime() - time_b = dt.datetime() - - profile = { - "id": user_id + 1, - "created_at": time_a if time_a <= time_b else time_b, - "updated_at": time_a if time_a > time_b else time_b, - "name": person.name(), - "title": person.title(), - "age": person.age(), - "email": person.email(), - "telephone": person.telephone(), - "gender": person.gender(), - "language": person.language(), - "academic_degree": person.academic_degree(), - "nationality": person.nationality(), - "occupation": person.occupation(), - "height": person.height(), - "blood_type": person.blood_type(), - "weight": person.weight(), - } - - while not profile["created_at"]: - profile["created_at"] = dt.datetime() - - if not profile["updated_at"]: - profile["updated_at"] = profile["created_at"] + 1 - - return profile - - -def generate_purchases(user: any, purchases_count: int) -> list[Dict]: - purchases: list[Dict] = [] - purchase_percent_remaining = 80 # ~ 20% of people will have no purchases - total_products = len(generate_products()) - purchase_percent_remaining = purchase_percent_remaining - random.randrange(1, 100) - i = 0 - while purchase_percent_remaining > 0: - id = purchases_count + i + 1 - product_id = random.randrange(1, total_products) - added_to_cart_at = random_date_in_range(user["created_at"]) - purchased_at = ( - random_date_in_range(added_to_cart_at) if added_to_cart_at is not None and random.randrange(1, 100) <= 70 else None - ) # 70% likely to purchase the item in the cart - returned_at = ( - random_date_in_range(purchased_at) if purchased_at is not None and random.randrange(1, 100) <= 15 else None - ) # 15% likely to return the item - purchase = { - "id": id, - "product_id": product_id, - "user_id": user["id"], - "added_to_cart_at": added_to_cart_at, - "purchased_at": purchased_at, - "returned_at": returned_at, - } - purchases.append(purchase) - - purchase_percent_remaining = purchase_percent_remaining - random.randrange(1, 100) - i += 1 - return purchases - - -def generate_products() -> list[Dict]: - dirname = os.path.dirname(os.path.realpath(__file__)) - return read_json(os.path.join(dirname, "products.json")) - - -def read_json(filepath): - with open(filepath, "r") as f: - return json.loads(f.read()) - - -def random_date_in_range(start_date: datetime.datetime, end_date: datetime.datetime = datetime.datetime.now()) -> datetime.datetime: - time_between_dates = end_date - start_date - days_between_dates = time_between_dates.days - if days_between_dates < 2: - days_between_dates = 2 - random_number_of_days = random.randrange(days_between_dates) - random_date = start_date + datetime.timedelta(days=random_number_of_days) - return random_date - - -def format_airbyte_time(d: datetime): - s = f"{d}" - s = s.split(".")[0] - s = s.replace(" ", "T") - s += "+00:00" - return s + return [ + Products(count, seed, records_per_sync, records_per_slice), + Users(count, seed, records_per_sync, records_per_slice), + Purchases(seed, records_per_sync, records_per_slice), + ] diff --git a/airbyte-integrations/connectors/source-faker/source_faker/streams.py b/airbyte-integrations/connectors/source-faker/source_faker/streams.py new file mode 100644 index 000000000000..352dfb2411db --- /dev/null +++ b/airbyte-integrations/connectors/source-faker/source_faker/streams.py @@ -0,0 +1,257 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import datetime +import os +from typing import Any, Dict, Iterable, Mapping, Optional + +from airbyte_cdk.models import AirbyteEstimateTraceMessage, AirbyteTraceMessage, EstimateType, TraceType +from airbyte_cdk.sources.streams import IncrementalMixin, Stream +from mimesis import Datetime, Numeric, Person +from mimesis.locales import Locale + +from .utils import format_airbyte_time, read_json + + +class Products(Stream, IncrementalMixin): + primary_key = None + cursor_field = "id" + + def __init__(self, count: int, seed: int, records_per_sync: int, records_per_slice: int, **kwargs): + super().__init__(**kwargs) + self.seed = seed + self.records_per_sync = records_per_sync + self.records_per_slice = records_per_slice + + @property + def state_checkpoint_interval(self) -> Optional[int]: + return self.records_per_slice + + @property + def state(self) -> Mapping[str, Any]: + if hasattr(self, "_state"): + return self._state + else: + return {self.cursor_field: 0} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._state = value + + def load_products(self) -> list[Dict]: + dirname = os.path.dirname(os.path.realpath(__file__)) + return read_json(os.path.join(dirname, "record_data", "products.json")) + + def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: + total_records = self.state[self.cursor_field] if self.cursor_field in self.state else 0 + products = self.load_products() + + median_record_byte_size = 180 + rows_to_emit = len(products) - total_records + if rows_to_emit > 0: + yield generate_estimate(self.name, rows_to_emit, median_record_byte_size) + + for product in products: + if product["id"] > total_records: + yield product + total_records = product["id"] + + self.state = {self.cursor_field: total_records, "seed": self.seed} + + +class Users(Stream, IncrementalMixin): + primary_key = None + cursor_field = "id" + + def __init__(self, count: int, seed: int, records_per_sync: int, records_per_slice: int, **kwargs): + super().__init__(**kwargs) + self.count = count + self.seed = seed + self.records_per_sync = records_per_sync + self.records_per_slice = records_per_slice + self.person = Person(locale=Locale.EN, seed=self.seed) + self.dt = Datetime(seed=self.seed) + + @property + def state_checkpoint_interval(self) -> Optional[int]: + return self.records_per_slice + + @property + def state(self) -> Mapping[str, Any]: + if hasattr(self, "_state"): + return self._state + else: + return {self.cursor_field: 0} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._state = value + + def generate_user(self, user_id: int): + time_a = self.dt.datetime() + time_b = self.dt.datetime() + + profile = { + "id": user_id + 1, + "created_at": format_airbyte_time(time_a if time_a <= time_b else time_b), + "updated_at": format_airbyte_time(time_a if time_a > time_b else time_b), + "name": self.person.name(), + "title": self.person.title(), + "age": self.person.age(), + "email": self.person.email(), + "telephone": self.person.telephone(), + "gender": self.person.gender(), + "language": self.person.language(), + "academic_degree": self.person.academic_degree(), + "nationality": self.person.nationality(), + "occupation": self.person.occupation(), + "height": self.person.height(), + "blood_type": self.person.blood_type(), + "weight": self.person.weight(), + } + + while not profile["created_at"]: + profile["created_at"] = format_airbyte_time(self.dt.datetime()) + + if not profile["updated_at"]: + profile["updated_at"] = profile["created_at"] + 1 + + return profile + + def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: + total_records = self.state[self.cursor_field] if self.cursor_field in self.state else 0 + records_in_sync = 0 + records_in_slice = 0 + + median_record_byte_size = 450 + yield generate_estimate(self.name, self.count - total_records, median_record_byte_size) + + for i in range(total_records, self.count): + user = self.generate_user(i) + yield user + total_records += 1 + records_in_sync += 1 + records_in_slice += 1 + + if records_in_slice >= self.records_per_slice: + self.state = {self.cursor_field: total_records, "seed": self.seed} + records_in_slice = 0 + + if records_in_sync == self.records_per_sync: + break + + self.state = {self.cursor_field: total_records, "seed": self.seed} + set_total_user_records(total_records) + + +class Purchases(Stream, IncrementalMixin): + primary_key = None + cursor_field = "user_id" + + def __init__(self, seed: int, records_per_sync: int, records_per_slice: int, **kwargs): + super().__init__(**kwargs) + self.seed = seed + self.records_per_sync = records_per_sync + self.records_per_slice = records_per_slice + self.dt = Datetime(seed=self.seed) + self.numeric = Numeric(seed=self.seed) + + @property + def state_checkpoint_interval(self) -> Optional[int]: + return self.records_per_slice + + @property + def state(self) -> Mapping[str, Any]: + if hasattr(self, "_state"): + return self._state + else: + return {self.cursor_field: 0} + + @state.setter + def state(self, value: Mapping[str, Any]): + self._state = value + + def random_date_in_range( + self, start_date: datetime.datetime, end_date: datetime.datetime = datetime.datetime.now() + ) -> datetime.datetime: + time_between_dates = end_date - start_date + days_between_dates = time_between_dates.days + if days_between_dates < 2: + days_between_dates = 2 + random_number_of_days = self.numeric.integer_number(0, days_between_dates) + random_date = start_date + datetime.timedelta(days=random_number_of_days) + return random_date + + def generate_purchases(self, user_id: int, purchases_count: int) -> list[Dict]: + purchases: list[Dict] = [] + purchase_percent_remaining = 70 # ~30% of people will have no purchases + total_products = 100 + purchase_percent_remaining = purchase_percent_remaining - self.numeric.integer_number(1, 100) + i = 0 + + time_a = self.dt.datetime() + time_b = self.dt.datetime() + created_at = time_a if time_a <= time_b else time_b + + while purchase_percent_remaining > 0: + id = purchases_count + i + 1 + product_id = self.numeric.integer_number(1, total_products) + added_to_cart_at = self.random_date_in_range(created_at) + purchased_at = ( + self.random_date_in_range(added_to_cart_at) + if added_to_cart_at is not None and self.numeric.integer_number(1, 100) <= 70 + else None + ) # 70% likely to purchase the item in the cart + returned_at = ( + self.random_date_in_range(purchased_at) if purchased_at is not None and self.numeric.integer_number(1, 100) <= 15 else None + ) # 15% likely to return the item + + purchase = { + "id": id, + "product_id": product_id, + "user_id": user_id, + "added_to_cart_at": format_airbyte_time(added_to_cart_at) if added_to_cart_at is not None else None, + "purchased_at": format_airbyte_time(purchased_at) if purchased_at is not None else None, + "returned_at": format_airbyte_time(returned_at) if returned_at is not None else None, + } + purchases.append(purchase) + + purchase_percent_remaining = purchase_percent_remaining - self.numeric.integer_number(1, 100) + i += 1 + return purchases + + def read_records(self, **kwargs) -> Iterable[Mapping[str, Any]]: + purchases_count = self.state[self.cursor_field] if self.cursor_field in self.state else 0 + + if total_user_records <= 0: + return # if there are no new users, there should be no new purchases + + median_record_byte_size = 230 + yield generate_estimate( + self.name, total_user_records - purchases_count * 1.3, median_record_byte_size + ) # a fuzzy guess, some users have purchases, some don't + + for i in range(purchases_count, total_user_records): + purchases = self.generate_purchases(i + 1, purchases_count) + for purchase in purchases: + yield purchase + purchases_count += 1 + + self.state = {self.cursor_field: total_user_records, "seed": self.seed} + + +def generate_estimate(stream_name: str, total: int, bytes_per_row: int): + emitted_at = int(datetime.datetime.now().timestamp() * 1000) + estimate_message = AirbyteEstimateTraceMessage( + type=EstimateType.STREAM, name=stream_name, row_estimate=round(total), byte_estimate=round(total * bytes_per_row) + ) + return AirbyteTraceMessage(type=TraceType.ESTIMATE, emitted_at=emitted_at, estimate=estimate_message) + + +# a globals hack to share data between streams: +total_user_records = 0 + + +def set_total_user_records(total: int): + globals()["total_user_records"] = total diff --git a/airbyte-integrations/connectors/source-faker/source_faker/utils.py b/airbyte-integrations/connectors/source-faker/source_faker/utils.py new file mode 100644 index 000000000000..12970853c956 --- /dev/null +++ b/airbyte-integrations/connectors/source-faker/source_faker/utils.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import datetime +import json + + +def read_json(filepath): + with open(filepath, "r") as f: + return json.loads(f.read()) + + +def format_airbyte_time(d: datetime): + s = f"{d}" + s = s.split(".")[0] + s = s.replace(" ", "T") + s += "+00:00" + return s diff --git a/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py b/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py index 68a4351ba2b5..8a989b7778ac 100644 --- a/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py +++ b/airbyte-integrations/connectors/source-faker/unit_tests/unit_test.py @@ -8,6 +8,32 @@ from source_faker import SourceFaker +class MockLogger: + def debug(a,b, **kwargs): + return None + + def info(a,b, **kwargs): + return None + + def exception(a,b,**kwargs): + print(b) + return None + + +logger = MockLogger() + + +def schemas_are_valid(): + source = SourceFaker() + config = {"count": 1} + catalog = source.discover(None, config) + catalog = AirbyteMessage(type=Type.CATALOG, catalog=catalog).dict(exclude_unset=True) + schemas = [stream["json_schema"] for stream in catalog["catalog"]["streams"]] + + for schema in schemas: + jsonschema.Draft7Validator.check_schema(schema) + + def test_source_streams(): source = SourceFaker() config = {"count": 1} @@ -16,7 +42,7 @@ def test_source_streams(): schemas = [stream["json_schema"] for stream in catalog["catalog"]["streams"]] assert len(schemas) == 3 - assert schemas[0]["properties"] == { + assert schemas[1]["properties"] == { "id": {"type": "number"}, "created_at": {"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"}, "updated_at": {"type": "string", "format": "date-time", "airbyte_type": "timestamp_with_timezone"}, @@ -35,19 +61,15 @@ def test_source_streams(): "weight": {"type": "integer"}, } - for schema in schemas: - jsonschema.Draft7Validator.check_schema(schema) - def test_read_small_random_data(): source = SourceFaker() - logger = None config = {"count": 10} catalog = ConfiguredAirbyteCatalog( streams=[ { - "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", } ] @@ -55,35 +77,68 @@ def test_read_small_random_data(): state = {} iterator = source.read(logger, config, catalog, state) + estimate_row_count = 0 record_rows_count = 0 state_rows_count = 0 latest_state = {} for row in iterator: + if row.type is Type.TRACE: + estimate_row_count = estimate_row_count + 1 if row.type is Type.RECORD: record_rows_count = record_rows_count + 1 if row.type is Type.STATE: state_rows_count = state_rows_count + 1 latest_state = row + assert estimate_row_count == 1 assert record_rows_count == 10 assert state_rows_count == 1 - assert latest_state.state.data == {"Users": {"cursor": 10, "seed": None}} + assert latest_state.state.data == {"users": {"id": 10, "seed": None}} + + +def test_no_read_limit_hit(): + source = SourceFaker() + config = {"count": 10} + catalog = ConfiguredAirbyteCatalog( + streams=[ + { + "stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]}, + "sync_mode": "incremental", + "destination_sync_mode": "overwrite", + } + ] + ) + state = {"users": {"id": 10}} + iterator = source.read(logger, config, catalog, state) + + record_rows_count = 0 + state_rows_count = 0 + latest_state = {} + for row in iterator: + if row.type is Type.RECORD: + record_rows_count = record_rows_count + 1 + if row.type is Type.STATE: + state_rows_count = state_rows_count + 1 + latest_state = row + + assert record_rows_count == 0 + assert state_rows_count == 1 + assert latest_state.state.data == {"users": {"id": 10, "seed": None}} def test_read_big_random_data(): source = SourceFaker() - logger = None config = {"count": 1000, "records_per_slice": 100, "records_per_sync": 1000} catalog = ConfiguredAirbyteCatalog( streams=[ { - "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", }, { - "stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", }, ] @@ -102,29 +157,28 @@ def test_read_big_random_data(): latest_state = row assert record_rows_count == 1000 + 100 # 1000 users, and 100 products - assert state_rows_count == 10 + 1 + 1 # 1000/100 + one more state at the end, and one state for the products - assert latest_state.state.data == {"Products": {"product_count": 100}, "Users": {"cursor": 1000, "seed": None}} + assert latest_state.state.data == {'users': {'seed': None, 'id': 1000}, 'products': {'id': 100, 'seed': None}} + assert state_rows_count == 10 + 1 + 1 + 1 def test_with_purchases(): source = SourceFaker() - logger = None config = {"count": 1000, "records_per_sync": 1000} catalog = ConfiguredAirbyteCatalog( streams=[ { - "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", }, { - "stream": {"name": "Products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "products", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", }, { - "stream": {"name": "Purchases", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "purchases", "json_schema": {}, "supported_sync_modes": ["incremental"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", }, ] @@ -143,21 +197,20 @@ def test_with_purchases(): latest_state = row assert record_rows_count > 1000 + 100 # should be greater than 1000 users, and 100 products - assert state_rows_count > 10 + 1 + 1 # should be greater than 1000/100 + one more state at the end, and one state for the products - assert latest_state.state.data["Users"] == {"cursor": 1000, "seed": None} - assert latest_state.state.data["Products"] == {"product_count": 100} - assert latest_state.state.data["Purchases"]["purchases_count"] > 0 + assert state_rows_count > 10 + 1 # should be greater than 1000/100, and one state for the products + assert latest_state.state.data["users"] == {"id": 1000, "seed": None} + assert latest_state.state.data["products"] == {'id': 100, 'seed': None} + assert latest_state.state.data["purchases"]["user_id"] > 0 def test_sync_ends_with_limit(): source = SourceFaker() - logger = None config = {"count": 100, "records_per_sync": 5} catalog = ConfiguredAirbyteCatalog( streams=[ { - "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", } ] @@ -177,7 +230,7 @@ def test_sync_ends_with_limit(): assert record_rows_count == 5 assert state_rows_count == 1 - assert latest_state.state.data == {"Users": {"cursor": 5, "seed": None}} + assert latest_state.state.data == {"users": {"id": 5, "seed": None}} def test_read_with_seed(): @@ -186,13 +239,12 @@ def test_read_with_seed(): """ source = SourceFaker() - logger = None config = {"count": 1, "seed": 100} catalog = ConfiguredAirbyteCatalog( streams=[ { - "stream": {"name": "Users", "json_schema": {}, "supported_sync_modes": ["full_refresh"]}, - "sync_mode": "full_refresh", + "stream": {"name": "users", "json_schema": {}, "supported_sync_modes": ["incremental"]}, + "sync_mode": "incremental", "destination_sync_mode": "overwrite", } ] @@ -208,11 +260,10 @@ def test_read_with_seed(): def test_ensure_no_purchases_without_users(): with pytest.raises(ValueError): source = SourceFaker() - logger = None config = {"count": 100} catalog = ConfiguredAirbyteCatalog( streams=[ - {"stream": {"name": "Purchases", "json_schema": {}}, "sync_mode": "full_refresh", "destination_sync_mode": "overwrite"}, + {"stream": {"name": "purchases", "json_schema": {}}, "sync_mode": "incremental", "destination_sync_mode": "overwrite"}, ] ) state = {} diff --git a/airbyte-integrations/connectors/source-gitlab/Dockerfile b/airbyte-integrations/connectors/source-gitlab/Dockerfile index 68047e54ba2d..f888bc1897fa 100644 --- a/airbyte-integrations/connectors/source-gitlab/Dockerfile +++ b/airbyte-integrations/connectors/source-gitlab/Dockerfile @@ -13,5 +13,5 @@ RUN pip install . ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-gitlab diff --git a/airbyte-integrations/connectors/source-gitlab/source_gitlab/schemas/group_milestones.json b/airbyte-integrations/connectors/source-gitlab/source_gitlab/schemas/group_milestones.json index eaaee94434ae..71f833d9c4b2 100644 --- a/airbyte-integrations/connectors/source-gitlab/source_gitlab/schemas/group_milestones.json +++ b/airbyte-integrations/connectors/source-gitlab/source_gitlab/schemas/group_milestones.json @@ -30,11 +30,11 @@ }, "due_date": { "type": ["null", "string"], - "format": "date-time" + "format": "date" }, "start_date": { "type": ["null", "string"], - "format": "date-time" + "format": "date" }, "expired": { "type": ["null", "boolean"] diff --git a/airbyte-integrations/connectors/source-gitlab/source_gitlab/spec.json b/airbyte-integrations/connectors/source-gitlab/source_gitlab/spec.json index 42ceafa8ecc6..e394af032bbf 100644 --- a/airbyte-integrations/connectors/source-gitlab/source_gitlab/spec.json +++ b/airbyte-integrations/connectors/source-gitlab/source_gitlab/spec.json @@ -5,7 +5,7 @@ "title": "Source GitLab Singer Spec", "type": "object", "required": ["api_url", "private_token", "start_date"], - "additionalProperties": false, + "additionalProperties": true, "properties": { "api_url": { "type": "string", diff --git a/airbyte-integrations/connectors/source-iterable/Dockerfile b/airbyte-integrations/connectors/source-iterable/Dockerfile index b757f1c06cc5..889e9f37f0e4 100644 --- a/airbyte-integrations/connectors/source-iterable/Dockerfile +++ b/airbyte-integrations/connectors/source-iterable/Dockerfile @@ -12,5 +12,5 @@ RUN pip install . ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.21 +LABEL io.airbyte.version=0.1.22 LABEL io.airbyte.name=airbyte/source-iterable diff --git a/airbyte-integrations/connectors/source-iterable/setup.py b/airbyte-integrations/connectors/source-iterable/setup.py index cdb2d8a9521a..888b95bb31ca 100644 --- a/airbyte-integrations/connectors/source-iterable/setup.py +++ b/airbyte-integrations/connectors/source-iterable/setup.py @@ -8,6 +8,7 @@ MAIN_REQUIREMENTS = [ "airbyte-cdk", "pendulum~=2.1.2", + "python-dateutil~=2.8.2", "requests~=2.25", ] diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py index 9df4df672cdc..2640a6ae3e14 100644 --- a/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/streams.py @@ -19,6 +19,7 @@ from requests import codes from requests.exceptions import ChunkedEncodingError from source_iterable.slice_generators import AdjustableSliceGenerator, RangeSliceGenerator, StreamSlice +from source_iterable.utils import dateutil_parse EVENT_ROWS_LIMIT = 200 CAMPAIGNS_PER_REQUEST = 20 @@ -137,7 +138,7 @@ def _field_to_datetime(value: Union[int, str]) -> pendulum.datetime: if isinstance(value, int): value = pendulum.from_timestamp(value / 1000.0) elif isinstance(value, str): - value = pendulum.parse(value, strict=False) + value = dateutil_parse(value) else: raise ValueError(f"Unsupported type of datetime field {type(value)}") return value diff --git a/airbyte-integrations/connectors/source-iterable/source_iterable/utils.py b/airbyte-integrations/connectors/source-iterable/source_iterable/utils.py new file mode 100644 index 000000000000..2cc647b13fef --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/source_iterable/utils.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import dateutil.parser +import pendulum + + +def dateutil_parse(text): + """ + The custom function `dateutil_parse` replace `pendulum.parse(text, strict=False)` to avoid memory leak. + More details https://github.com/airbytehq/airbyte/pull/19913 + """ + dt = dateutil.parser.parse(text) + return pendulum.datetime( + dt.year, + dt.month, + dt.day, + dt.hour, + dt.minute, + dt.second, + dt.microsecond, + tz=dt.tzinfo or pendulum.tz.UTC, + ) diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py index 777da2c9e98c..12d0677a1e24 100644 --- a/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_streams.py @@ -20,6 +20,7 @@ Templates, Users, ) +from source_iterable.utils import dateutil_parse @pytest.mark.parametrize( @@ -80,7 +81,7 @@ def test_templates_parse_response(): rsps.add( responses.GET, "https://api.iterable.com/api/1/foobar", - json={"templates": [{"createdAt": "2022", "id": 1}]}, + json={"templates": [{"createdAt": "2022-01-01", "id": 1}]}, status=200, content_type="application/json", ) @@ -88,7 +89,7 @@ def test_templates_parse_response(): records = stream.parse_response(response=resp) - assert list(records) == [{"id": 1, "createdAt": pendulum.parse("2022", strict=False)}] + assert list(records) == [{"id": 1, "createdAt": dateutil_parse("2022-01-01")}] def test_list_users_parse_response(): diff --git a/airbyte-integrations/connectors/source-iterable/unit_tests/test_utils.py b/airbyte-integrations/connectors/source-iterable/unit_tests/test_utils.py new file mode 100644 index 000000000000..5f16124c825d --- /dev/null +++ b/airbyte-integrations/connectors/source-iterable/unit_tests/test_utils.py @@ -0,0 +1,12 @@ +# +# Copyright (c) 2022 Airbyte, Inc., all rights reserved. +# + +import pendulum +from source_iterable.utils import dateutil_parse + + +def test_dateutil_parse(): + assert pendulum.parse("2021-04-08 14:23:30 +00:00", strict=False) == dateutil_parse("2021-04-08 14:23:30 +00:00") + assert pendulum.parse("2021-04-14T16:51:23+00:00", strict=False) == dateutil_parse("2021-04-14T16:51:23+00:00") + assert pendulum.parse("2021-04-14T16:23:30.700000+00:00", strict=False) == dateutil_parse("2021-04-14T16:23:30.700000+00:00") diff --git a/airbyte-integrations/connectors/source-slack/Dockerfile b/airbyte-integrations/connectors/source-slack/Dockerfile index 5adf219652ca..4f8ffe9cb8af 100644 --- a/airbyte-integrations/connectors/source-slack/Dockerfile +++ b/airbyte-integrations/connectors/source-slack/Dockerfile @@ -17,5 +17,5 @@ COPY main.py ./ ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py" ENTRYPOINT ["python", "/airbyte/integration_code/main.py"] -LABEL io.airbyte.version=0.1.18 +LABEL io.airbyte.version=0.1.19 LABEL io.airbyte.name=airbyte/source-slack diff --git a/airbyte-integrations/connectors/source-slack/source_slack/source.py b/airbyte-integrations/connectors/source-slack/source_slack/source.py index 58c5c13aee6a..63288a1f2add 100644 --- a/airbyte-integrations/connectors/source-slack/source_slack/source.py +++ b/airbyte-integrations/connectors/source-slack/source_slack/source.py @@ -13,7 +13,7 @@ from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpStream, HttpSubStream -from airbyte_cdk.sources.streams.http.requests_native_auth import Oauth2Authenticator, TokenAuthenticator +from airbyte_cdk.sources.streams.http.requests_native_auth import TokenAuthenticator from pendulum import DateTime, Period @@ -342,16 +342,6 @@ def _get_authenticator(self, config: Mapping[str, Any]): credentials = config.get("credentials", {}) credentials_title = credentials.get("option_title") if credentials_title == "Default OAuth2.0 authorization": - # We can get `refresh_token` only if the token rotation function is enabled for the Slack Oauth Application. - # If it is disabled, then we use the generated `access_token`, which acts without expiration. - # https://api.slack.com/authentication/rotation - if credentials.get("refresh_token", "").strip(): - return Oauth2Authenticator( - token_refresh_endpoint="https://slack.com/api/oauth.v2.access", - client_id=credentials["client_id"], - client_secret=credentials["client_secret"], - refresh_token=credentials["refresh_token"], - ) return TokenAuthenticator(credentials["access_token"]) elif credentials_title == "API Token Credentials": return TokenAuthenticator(credentials["api_token"]) diff --git a/airbyte-integrations/connectors/source-slack/source_slack/spec.json b/airbyte-integrations/connectors/source-slack/source_slack/spec.json index dae73c75f9c8..79a4d92bad10 100644 --- a/airbyte-integrations/connectors/source-slack/source_slack/spec.json +++ b/airbyte-integrations/connectors/source-slack/source_slack/spec.json @@ -46,10 +46,10 @@ "type": "object", "title": "Sign in via Slack (OAuth)", "required": [ - "access_token", + "option_title", "client_id", "client_secret", - "option_title" + "access_token" ], "properties": { "option_title": { @@ -57,31 +57,20 @@ "const": "Default OAuth2.0 authorization" }, "client_id": { - "title": "Client ID", - "description": "Slack client_id. See our docs if you need help finding this id.", "type": "string", - "examples": ["slack-client-id-example"], - "airbyte_secret": true + "title": "Client ID", + "description": "Slack client_id. See our docs if you need help finding this id." }, "client_secret": { + "type": "string", "title": "Client Secret", "description": "Slack client_secret. See our docs if you need help finding this secret.", - "type": "string", - "examples": ["slack-client-secret-example"], "airbyte_secret": true }, "access_token": { + "type": "string", "title": "Access token", "description": "Slack access_token. See our docs if you need help generating the token.", - "type": "string", - "examples": ["slack-access-token-example"], - "airbyte_secret": true - }, - "refresh_token": { - "title": "Refresh token", - "description": "Slack refresh_token. See our docs if you need help generating the token.", - "type": "string", - "examples": ["slack-refresh-token-example"], "airbyte_secret": true } }, @@ -90,7 +79,7 @@ { "type": "object", "title": "API Token", - "required": ["api_token", "option_title"], + "required": ["option_title", "api_token"], "properties": { "option_title": { "type": "string", @@ -109,12 +98,47 @@ } } }, - "authSpecification": { - "auth_type": "oauth2.0", - "oauth2Specification": { - "rootObject": ["credentials", 0], - "oauthFlowInitParameters": [["client_id"], ["client_secret"]], - "oauthFlowOutputParameters": [["access_token"], ["refresh_token"]] + "advanced_auth": { + "auth_flow_type": "oauth2.0", + "predicate_key": ["credentials", "option_title"], + "predicate_value": "Default OAuth2.0 authorization", + "oauth_config_specification": { + "complete_oauth_output_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "access_token": { + "type": "string", + "path_in_connector_config": ["credentials", "access_token"] + } + } + }, + "complete_oauth_server_input_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "client_id": { + "type": "string" + }, + "client_secret": { + "type": "string" + } + } + }, + "complete_oauth_server_output_specification": { + "type": "object", + "additionalProperties": false, + "properties": { + "client_id": { + "type": "string", + "path_in_connector_config": ["credentials", "client_id"] + }, + "client_secret": { + "type": "string", + "path_in_connector_config": ["credentials", "client_secret"] + } + } + } } } } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java index 0553b6c4a69e..feaee0f20558 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/ApmTraceUtils.java @@ -56,7 +56,7 @@ public static void addTagsToTrace(final Map tags, final String t public static void addTagsToTrace(final Span span, final Map tags, final String tagPrefix) { if (span != null) { tags.entrySet().forEach(entry -> { - span.setTag(String.format(TAG_FORMAT, tagPrefix, entry.getKey()), entry.getValue().toString()); + span.setTag(formatTag(entry.getKey(), tagPrefix), entry.getValue().toString()); }); } } @@ -83,4 +83,27 @@ public static void addExceptionToTrace(final Span span, final Throwable t) { } } + /** + * Formats the tag key using {@link #TAG_FORMAT} provided by this utility, using the default tag + * prefix {@link #TAG_PREFIX}. + * + * @param tagKey The tag key to format. + * @return The formatted tag key. + */ + public static String formatTag(final String tagKey) { + return formatTag(tagKey, TAG_PREFIX); + } + + /** + * Formats the tag key using {@link #TAG_FORMAT} provided by this utility with the provided tag + * prefix. + * + * @param tagKey The tag key to format. + * @param tagPrefix The prefix to be added to each custom tag name. + * @return The formatted tag key. + */ + public static String formatTag(final String tagKey, final String tagPrefix) { + return String.format(TAG_FORMAT, tagPrefix, tagKey); + } + } diff --git a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java index 8ce51860395e..3dfbbd95e302 100644 --- a/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java +++ b/airbyte-metrics/metrics-lib/src/main/java/io/airbyte/metrics/lib/OssMetricsRegistry.java @@ -134,7 +134,10 @@ public enum OssMetricsRegistry implements MetricsRegistry { "number of bytes synced during replication"), REPLICATION_RECORDS_SYNCED(MetricEmittingApps.WORKER, "replication_records_synced", - "number of records synced during replication"); + "number of records synced during replication"), + RESET_REQUEST(MetricEmittingApps.WORKER, + "reset_request", + "number of requested resets"); private final MetricEmittingApp application; private final String metricName; diff --git a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/ApmTraceUtilsTest.java b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/ApmTraceUtilsTest.java index 70307ad3fd53..c45eb92df676 100644 --- a/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/ApmTraceUtilsTest.java +++ b/airbyte-metrics/metrics-lib/src/test/java/io/airbyte/metrics/lib/ApmTraceUtilsTest.java @@ -6,6 +6,7 @@ import static io.airbyte.metrics.lib.ApmTraceUtils.TAG_FORMAT; import static io.airbyte.metrics.lib.ApmTraceUtils.TAG_PREFIX; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -29,6 +30,7 @@ class ApmTraceUtilsTest { private static final String TAG_2 = "tag2"; private static final String VALUE_1 = "foo"; private static final String VALUE_2 = "bar"; + private static final String PREFIX = "prefix"; private static final Map TAGS = Map.of(TAG_1, VALUE_1, TAG_2, VALUE_2); @Before @@ -54,7 +56,7 @@ void testAddingTagsWithPrefix() { final Tracer tracer = mock(Tracer.class); when(tracer.activeSpan()).thenReturn(span); GlobalTracerTestUtil.setGlobalTracerUnconditionally(tracer); - final String tagPrefix = "prefix"; + final String tagPrefix = PREFIX; ApmTraceUtils.addTagsToTrace(TAGS, tagPrefix); verify(span, times(1)).setTag(String.format(TAG_FORMAT, tagPrefix, TAG_1), VALUE_1); verify(span, times(1)).setTag(String.format(TAG_FORMAT, tagPrefix, TAG_2), VALUE_2); @@ -62,7 +64,7 @@ void testAddingTagsWithPrefix() { @Test void testAddingTagsToSpanWithPrefix() { - final String tagPrefix = "prefix"; + final String tagPrefix = PREFIX; final Span span = mock(Span.class); ApmTraceUtils.addTagsToTrace(span, TAGS, tagPrefix); verify(span, times(1)).setTag(String.format(TAG_FORMAT, tagPrefix, TAG_1), VALUE_1); @@ -75,4 +77,16 @@ void testAddingTagsToNullSpanWithPrefix() { Assertions.assertDoesNotThrow(() -> ApmTraceUtils.addTagsToTrace(null, TAGS, tagPrefix)); } + @Test + void testFormattingTagKeys() { + final String tagKey1 = "tagKey1"; + final String tagPrefix1 = PREFIX; + + final String result1 = ApmTraceUtils.formatTag(tagKey1); + assertEquals("airbyte.metadata." + tagKey1, result1); + + final String result2 = ApmTraceUtils.formatTag(tagKey1, tagPrefix1); + assertEquals("airbyte." + tagPrefix1 + "." + tagKey1, result2); + } + } diff --git a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/SlackOAuthFlow.java b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/SlackOAuthFlow.java index 5cfcdbf90d20..d32465d70236 100644 --- a/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/SlackOAuthFlow.java +++ b/airbyte-oauth/src/main/java/io/airbyte/oauth/flows/SlackOAuthFlow.java @@ -11,14 +11,15 @@ import java.io.IOException; import java.net.URISyntaxException; import java.net.http.HttpClient; +import java.util.Map; import java.util.UUID; import java.util.function.Supplier; import org.apache.http.client.utils.URIBuilder; public class SlackOAuthFlow extends BaseOAuth2Flow { - final String SLACK_CONSENT_URL_BASE = "https://slack.com/oauth/authorize"; - final String SLACK_TOKEN_URL = "https://slack.com/api/oauth.access"; + private static final String AUTHORIZE_URL = "https://slack.com/oauth/authorize"; + private static final String ACCESS_TOKEN_URL = "https://slack.com/api/oauth.access"; public SlackOAuthFlow(final ConfigRepository configRepository, final HttpClient httpClient) { super(configRepository, httpClient); @@ -41,7 +42,7 @@ protected String formatConsentUrl(final UUID definitionId, final JsonNode inputOAuthConfiguration) throws IOException { try { - return new URIBuilder(SLACK_CONSENT_URL_BASE) + return new URIBuilder(AUTHORIZE_URL) .addParameter("client_id", clientId) .addParameter("redirect_uri", redirectUrl) .addParameter("state", getState()) @@ -57,7 +58,16 @@ protected String formatConsentUrl(final UUID definitionId, */ @Override protected String getAccessTokenUrl(final JsonNode inputOAuthConfiguration) { - return SLACK_TOKEN_URL; + return ACCESS_TOKEN_URL; + } + + @Override + protected Map extractOAuthOutput(final JsonNode data, final String accessTokenUrl) throws IOException { + if (data.has("access_token")) { + return Map.of("access_token", data.get("access_token").asText()); + } else { + throw new IOException(String.format("Missing 'access_token' in query params from %s", ACCESS_TOKEN_URL)); + } } } diff --git a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/SlackOAuthFlowTest.java b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/SlackOAuthFlowTest.java index 9ad0cb3747f1..427b18af3c17 100644 --- a/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/SlackOAuthFlowTest.java +++ b/airbyte-oauth/src/test/java/io/airbyte/oauth/flows/SlackOAuthFlowTest.java @@ -4,7 +4,10 @@ package io.airbyte.oauth.flows; +import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.oauth.BaseOAuthFlow; +import io.airbyte.oauth.MoreOAuthParameters; +import java.util.Map; public class SlackOAuthFlowTest extends BaseOAuthFlowTest { @@ -18,4 +21,24 @@ protected String getExpectedConsentUrl() { return "https://slack.com/oauth/authorize?client_id=test_client_id&redirect_uri=https%3A%2F%2Fairbyte.io&state=state&scope=read"; } + @Override + protected Map getExpectedOutput() { + return Map.of( + "access_token", "access_token_response", + "client_id", MoreOAuthParameters.SECRET_MASK, + "client_secret", MoreOAuthParameters.SECRET_MASK); + } + + @Override + protected JsonNode getCompleteOAuthOutputSpecification() { + return getJsonSchema(Map.of("access_token", Map.of("type", "string"))); + } + + @Override + protected Map getExpectedFilteredOutput() { + return Map.of( + "access_token", "access_token_response", + "client_id", MoreOAuthParameters.SECRET_MASK); + } + } diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java index 420e8ee17090..bf3c628a1245 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/ContainerOrchestratorAcceptanceTests.java @@ -102,7 +102,10 @@ void setup() throws URISyntaxException, IOException, SQLException { } @Test + @SuppressWarnings("PMD.JUnitTestsShouldIncludeAssert") void testDowntimeDuringSync() throws Exception { + // NOTE: PMD assert warning suppressed because the assertion was flaky. The test will throw if the + // sync does not succeed. final String connectionName = "test-connection"; final UUID sourceId = testHarness.createPostgresSource().getSourceId(); final UUID destinationId = testHarness.createPostgresDestination().getDestinationId(); @@ -129,14 +132,6 @@ void testDowntimeDuringSync() throws Exception { kubernetesClient.apps().deployments().inNamespace(DEFAULT).withName(AIRBYTE_WORKER).scale(1); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead.getJob()); - - final long numAttempts = apiClient.getJobsApi() - .getJobInfo(new JobIdRequestBody().id(connectionSyncRead.getJob().getId())) - .getAttempts() - .size(); - - // it should be able to accomplish the resume without an additional attempt! - assertEquals(1, numAttempts); } @AfterEach diff --git a/airbyte-webapp/src/components/CreateConnection/DataResidency.tsx b/airbyte-webapp/src/components/CreateConnection/DataResidency.tsx index 088a72f7b13f..deb49d071e68 100644 --- a/airbyte-webapp/src/components/CreateConnection/DataResidency.tsx +++ b/airbyte-webapp/src/components/CreateConnection/DataResidency.tsx @@ -1,4 +1,5 @@ import { Field, FieldProps, useFormikContext } from "formik"; +import React from "react"; import { FormattedMessage, useIntl } from "react-intl"; import { DataGeographyDropdown } from "components/common/DataGeographyDropdown"; @@ -33,11 +34,16 @@ export const DataResidency: React.FC = ({ name = "geography" ( + ipLink: (node: React.ReactNode) => ( {node} ), + docLink: (node: React.ReactNode) => ( + + {node} + + ), }} /> } diff --git a/airbyte-webapp/src/components/CreateConnection/__snapshots__/CreateConnectionForm.test.tsx.snap b/airbyte-webapp/src/components/CreateConnection/__snapshots__/CreateConnectionForm.test.tsx.snap index b46ae78f9d69..c2406dfd0cd7 100644 --- a/airbyte-webapp/src/components/CreateConnection/__snapshots__/CreateConnectionForm.test.tsx.snap +++ b/airbyte-webapp/src/components/CreateConnection/__snapshots__/CreateConnectionForm.test.tsx.snap @@ -417,252 +417,217 @@ exports[`CreateConnectionForm should render 1`] = `
- -
-
-
- Sync -
-
- Source + > + + +
+
+
+ Sync +
+ Source
- - - + + + +
-
-
-
- Sync mode +
+ Sync mode
- - - + + + +
-
-
- Cursor field
+ Cursor field
- - - + + + +
-
-
- Primary key
+ Primary key
- - - + + + +
- -
- Destination
+ Destination
- - - + + + +
- -
-
-
-
-
- Namespace -
-
- Stream name -
-
- Source | Destination -
-
-
-
- Namespace +
- Stream name +
+
+ Namespace +
+
+ Stream name +
+
+ Source | Destination +
+
+
+
+ Namespace +
+
+ Stream name +
-
-
-
- -
-
- - - -
@@ -673,54 +638,89 @@ exports[`CreateConnectionForm should render 1`] = ` class="" > -
- No namespace +
-
- pokemon -
- + > + + + +
+
+ > + No namespace + +
+
+ pokemon +
+
+ +
@@ -730,76 +730,80 @@ exports[`CreateConnectionForm should render 1`] = `
-
- Full refresh -
- | -
-
- Overwrite +
+ Full refresh +
+
+ | +
+
+ Overwrite +
+
- -
-
-
-
-
-
- '<source schema> -
-
- pokemon +
+
+
+ '<source schema> +
+
+ pokemon +
diff --git a/airbyte-webapp/src/components/connection/CatalogTree/CatalogSection.module.scss b/airbyte-webapp/src/components/connection/CatalogTree/CatalogSection.module.scss index 5c4731091cc5..4ca8d2b24c29 100644 --- a/airbyte-webapp/src/components/connection/CatalogTree/CatalogSection.module.scss +++ b/airbyte-webapp/src/components/connection/CatalogTree/CatalogSection.module.scss @@ -2,7 +2,7 @@ @use "scss/variables"; .streamFieldTableContainer { - margin-left: 85px; + margin-left: 83px; background: colors.$grey-50; } diff --git a/airbyte-webapp/src/components/connection/CatalogTree/CatalogTree.module.scss b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTree.module.scss new file mode 100644 index 000000000000..e8eeb1d0bd4b --- /dev/null +++ b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTree.module.scss @@ -0,0 +1,5 @@ +@use "scss/variables"; + +.catalogTreeTable { + padding: variables.$spacing-lg variables.$spacing-xl 0; +} diff --git a/airbyte-webapp/src/components/connection/CatalogTree/CatalogTree.tsx b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTree.tsx index b3b3dc37f90f..ad8340e0745a 100644 --- a/airbyte-webapp/src/components/connection/CatalogTree/CatalogTree.tsx +++ b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTree.tsx @@ -8,6 +8,7 @@ import { useConnectionFormService } from "hooks/services/ConnectionForm/Connecti import { naturalComparatorBy } from "utils/objects"; import { BulkHeader } from "./BulkHeader"; +import styles from "./CatalogTree.module.scss"; import { CatalogTreeBody } from "./CatalogTreeBody"; import { CatalogTreeHeader } from "./CatalogTreeHeader"; import { CatalogTreeSearch } from "./CatalogTreeSearch"; @@ -65,23 +66,25 @@ const CatalogTreeComponent: React.FC> {mode !== "readonly" && } - {isNewStreamsTableEnabled ? ( - <> - - - - ) : ( - <> - - - - - )} - +
+ {isNewStreamsTableEnabled ? ( + <> + + + + ) : ( + <> + + + + + )} + +
{isNewStreamsTableEnabled && }
diff --git a/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.module.scss b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.module.scss index 115404633f4a..11c394072b8d 100644 --- a/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.module.scss +++ b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.module.scss @@ -5,16 +5,6 @@ } .searchContent { - position: relative; - width: 100%; - padding-left: variables.$spacing-xl; - - &::before { - content: attr(data-content); - } -} - -.searchContentNew { position: relative; width: 100%; padding: 0 variables.$spacing-xl; diff --git a/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.tsx b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.tsx index 05759fa61421..2c416ac94a9c 100644 --- a/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.tsx +++ b/airbyte-webapp/src/components/connection/CatalogTree/CatalogTreeSearch.tsx @@ -1,4 +1,3 @@ -import classnames from "classnames"; import React from "react"; import { useIntl } from "react-intl"; @@ -11,17 +10,10 @@ interface CatalogTreeSearchProps { } export const CatalogTreeSearch: React.FC = ({ onSearch }) => { - const isNewStreamsTableEnabled = process.env.REACT_APP_NEW_STREAMS_TABLE ?? false; - const { formatMessage } = useIntl(); - const searchStyles = classnames({ - [styles.searchContentNew]: isNewStreamsTableEnabled, - [styles.searchContent]: !isNewStreamsTableEnabled, - }); - return ( -
+
= (props) => { return ( - <> +
@@ -38,6 +38,6 @@ export const StreamFieldTable: React.FC = (props) => { ))}
- +
); }; diff --git a/airbyte-webapp/src/components/connection/UpdateConnectionDataResidency/UpdateConnectionDataResidency.tsx b/airbyte-webapp/src/components/connection/UpdateConnectionDataResidency/UpdateConnectionDataResidency.tsx index 9bf9c44372d0..3f2cbfadb03c 100644 --- a/airbyte-webapp/src/components/connection/UpdateConnectionDataResidency/UpdateConnectionDataResidency.tsx +++ b/airbyte-webapp/src/components/connection/UpdateConnectionDataResidency/UpdateConnectionDataResidency.tsx @@ -50,11 +50,16 @@ export const UpdateConnectionDataResidency: React.FC = () => { ( + ipLink: (node: React.ReactNode) => ( {node} ), + docLink: (node: React.ReactNode) => ( + + {node} + + ), }} /> } diff --git a/airbyte-webapp/src/locales/en.json b/airbyte-webapp/src/locales/en.json index 602628fdf062..2204b83338e3 100644 --- a/airbyte-webapp/src/locales/en.json +++ b/airbyte-webapp/src/locales/en.json @@ -344,7 +344,7 @@ "connection.geographyTitle": "Data residency", "connection.requestNewGeography": "Request a new geography", - "connection.geographyDescription": "Depending on your network configuration, you may need to add IP addresses to your allowlist.", + "connection.geographyDescription": "Choose where the data for this connection will be processed. Depending on your network configuration, you may need to add IP addresses to your allowlist. Learn more.", "connection.geography.auto": "Airbyte Default", "connection.geography.us": "United States", "connection.geography.eu": "European Union", @@ -494,6 +494,7 @@ "settings.cookiePreferences": "Cookie Preferences", "settings.dataResidency": "Data Residency", "settings.defaultDataResidency": "Default Data Residency", + "settings.geographyDescription": "Depending on your network configuration, you may need to add IP addresses to your allowlist.", "settings.defaultGeography": "Geography", "settings.defaultDataResidencyDescription": "Choose the default preferred data processing location for all of your connections. The default data residency setting only affects new connections. Existing connections will retain their data residency setting. Learn more.", "settings.defaultDataResidencyUpdateError": "There was an error updating the default data residency for this workspace.", diff --git a/airbyte-webapp/src/packages/cloud/views/workspaces/DataResidencyView/DataResidencyView.tsx b/airbyte-webapp/src/packages/cloud/views/workspaces/DataResidencyView/DataResidencyView.tsx index 553e9508ef62..a72be59554d0 100644 --- a/airbyte-webapp/src/packages/cloud/views/workspaces/DataResidencyView/DataResidencyView.tsx +++ b/airbyte-webapp/src/packages/cloud/views/workspaces/DataResidencyView/DataResidencyView.tsx @@ -80,7 +80,7 @@ export const DataResidencyView: React.FC = () => { label={} message={ ( diff --git a/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/__snapshots__/ConnectionReplicationTab.test.tsx.snap b/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/__snapshots__/ConnectionReplicationTab.test.tsx.snap index e86cbec86e76..a15c78337bce 100644 --- a/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/__snapshots__/ConnectionReplicationTab.test.tsx.snap +++ b/airbyte-webapp/src/pages/ConnectionPage/pages/ConnectionItemPage/__snapshots__/ConnectionReplicationTab.test.tsx.snap @@ -364,252 +364,217 @@ exports[`ConnectionReplicationTab should render 1`] = `
- -
-
-
- Sync -
-
- Source + > + + +
+
+
+ Sync +
+ Source
- - - + + + +
-
-
-
- Sync mode +
+ Sync mode
- - - + + + +
-
-
- Cursor field
+ Cursor field
- - - + + + +
-
-
- Primary key
+ Primary key
- - - + + + +
-
-
- Destination
+ Destination
- - - + + + +
-
-
-
-
-
-
- Namespace -
-
- Stream name -
-
- Source | Destination -
-
-
-
- Namespace +
- Stream name +
+
+ Namespace +
+
+ Stream name +
+
+ Source | Destination +
+
+
+
+ Namespace +
+
+ Stream name +
-
-
-
- -
-
- - - -
@@ -620,54 +585,89 @@ exports[`ConnectionReplicationTab should render 1`] = ` class="" > -
- No namespace +
-
- pokemon -
- + > + + + +
+
+ > + No namespace + +
+
+ pokemon +
+
+ +
@@ -677,76 +677,80 @@ exports[`ConnectionReplicationTab should render 1`] = `
-
- Full refresh -
- | -
-
- Append +
+ Full refresh +
+
+ | +
+
+ Append +
+
- -
-
-
-
-
-
- '<source schema> -
-
- pokemon +
+
+
+ '<source schema> +
+
+ pokemon +
diff --git a/airbyte-webapp/src/utils/links.ts b/airbyte-webapp/src/utils/links.ts index 2c4f332bfea3..369aea1bda52 100644 --- a/airbyte-webapp/src/utils/links.ts +++ b/airbyte-webapp/src/utils/links.ts @@ -29,8 +29,10 @@ export const links = { webhookVideoGuideLink: "https://www.youtube.com/watch?v=NjYm8F-KiFc", webhookGuideLink: `${BASE_DOCS_LINK}/operator-guides/configuring-sync-notifications/`, cronReferenceLink: "http://www.quartz-scheduler.org/documentation/quartz-2.3.0/tutorials/crontrigger.html", - cloudAllowlistIPsLink: `${BASE_DOCS_LINK}/cloud/getting-started-with-airbyte-cloud/#allowlist-ip-address`, + cloudAllowlistIPsLink: `${BASE_DOCS_LINK}/cloud/getting-started-with-airbyte-cloud/#allowlist-ip-addresses`, dataResidencySurvey: "https://forms.gle/Dr7MPTdt9k3xTinL8", + connectionDataResidency: + "https://docs.airbyte.com/cloud/managing-airbyte-cloud/#choose-the-data-residency-for-a-connection", lowCodeYamlDescription: `${BASE_DOCS_LINK}/connector-development/config-based/understanding-the-yaml-file/yaml-overview`, } as const; diff --git a/airbyte-webapp/src/views/Connection/ConnectionForm/ConnectionFormFields.tsx b/airbyte-webapp/src/views/Connection/ConnectionForm/ConnectionFormFields.tsx index 58e0b7c1aa74..4b042987cca1 100644 --- a/airbyte-webapp/src/views/Connection/ConnectionForm/ConnectionFormFields.tsx +++ b/airbyte-webapp/src/views/Connection/ConnectionForm/ConnectionFormFields.tsx @@ -46,8 +46,6 @@ export const ConnectionFormFields: React.FC = ({ valu clearFormChange(formId); }); - const isNewStreamsTableEnabled = process.env.REACT_APP_NEW_STREAMS_TABLE ?? false; - return ( <> {/* FormChangeTracker is here as it has access to everything it needs without being repeated */} @@ -118,7 +116,7 @@ export const ConnectionFormFields: React.FC = ({ valu )} -
+
traceAttributes = + Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, + destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage()); ApmTraceUtils - .addTagsToTrace(Map.of(ATTEMPT_NUMBER_KEY, jobRunConfig.getAttemptId(), JOB_ID_KEY, jobRunConfig.getJobId(), DESTINATION_DOCKER_IMAGE_KEY, - destinationLauncherConfig.getDockerImage(), SOURCE_DOCKER_IMAGE_KEY, sourceLauncherConfig.getDockerImage())); + .addTagsToTrace(traceAttributes); + if (isResetJob(sourceLauncherConfig.getDockerImage())) { + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.RESET_REQUEST, 1); + } final ActivityExecutionContext context = Activity.getExecutionContext(); return temporalUtils.withBackgroundHeartbeat( () -> { @@ -192,7 +199,7 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, Optional.ofNullable(taskQueue)); final ReplicationOutput attemptOutput = temporalAttempt.get(); - final StandardSyncOutput standardSyncOutput = reduceReplicationOutput(attemptOutput); + final StandardSyncOutput standardSyncOutput = reduceReplicationOutput(attemptOutput, traceAttributes); final String standardSyncOutputString = standardSyncOutput.toString(); LOGGER.info("sync summary: {}", standardSyncOutputString); @@ -208,12 +215,12 @@ public StandardSyncOutput replicate(final JobRunConfig jobRunConfig, () -> context); } - private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutput output) { + private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutput output, final Map metricAttributes) { final StandardSyncOutput standardSyncOutput = new StandardSyncOutput(); final StandardSyncSummary syncSummary = new StandardSyncSummary(); final ReplicationAttemptSummary replicationSummary = output.getReplicationAttemptSummary(); - traceReplicationSummary(replicationSummary); + traceReplicationSummary(replicationSummary, metricAttributes); syncSummary.setBytesSynced(replicationSummary.getBytesSynced()); syncSummary.setRecordsSynced(replicationSummary.getRecordsSynced()); @@ -231,19 +238,22 @@ private static StandardSyncOutput reduceReplicationOutput(final ReplicationOutpu return standardSyncOutput; } - private static void traceReplicationSummary(final ReplicationAttemptSummary replicationSummary) { + private static void traceReplicationSummary(final ReplicationAttemptSummary replicationSummary, final Map metricAttributes) { if (replicationSummary == null) { return; } + final MetricAttribute[] attributes = metricAttributes.entrySet().stream() + .map(e -> new MetricAttribute(ApmTraceUtils.formatTag(e.getKey()), e.getValue().toString())) + .collect(Collectors.toSet()).toArray(new MetricAttribute[] {}); final Map tags = new HashMap<>(); if (replicationSummary.getBytesSynced() != null) { tags.put(REPLICATION_BYTES_SYNCED_KEY, replicationSummary.getBytesSynced()); - MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_BYTES_SYNCED, replicationSummary.getBytesSynced()); + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_BYTES_SYNCED, replicationSummary.getBytesSynced(), attributes); } if (replicationSummary.getRecordsSynced() != null) { tags.put(REPLICATION_RECORDS_SYNCED_KEY, replicationSummary.getRecordsSynced()); - MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_RECORDS_SYNCED, replicationSummary.getRecordsSynced()); + MetricClientFactory.getMetricClient().count(OssMetricsRegistry.REPLICATION_RECORDS_SYNCED, replicationSummary.getRecordsSynced(), attributes); } if (replicationSummary.getStatus() != null) { tags.put(REPLICATION_STATUS_KEY, replicationSummary.getStatus().value()); @@ -272,12 +282,11 @@ private CheckedSupplier, Exception> syncInput.getDestinationResourceRequirements()); // reset jobs use an empty source to induce resetting all data in destination. - final AirbyteSource airbyteSource = - WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equals(sourceLauncherConfig.getDockerImage()) - ? new EmptyAirbyteSource(featureFlags.useStreamCapableState()) - : new DefaultAirbyteSource(sourceLauncher, - new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(), - DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); + final AirbyteSource airbyteSource = isResetJob(sourceLauncherConfig.getDockerImage()) + ? new EmptyAirbyteSource(featureFlags.useStreamCapableState()) + : new DefaultAirbyteSource(sourceLauncher, + new VersionedAirbyteStreamFactory<>(serDeProvider, migratorFactory, sourceLauncherConfig.getProtocolVersion(), + DefaultAirbyteSource.CONTAINER_LOG_MDC_BUILDER)); MetricClientFactory.initialize(MetricEmittingApps.WORKER); final MetricClient metricClient = MetricClientFactory.getMetricClient(); final WorkerMetricReporter metricReporter = new WorkerMetricReporter(metricClient, sourceLauncherConfig.getDockerImage()); @@ -327,4 +336,8 @@ private CheckedSupplier, Exception> workerConfigs); } + private boolean isResetJob(final String dockerImage) { + return WorkerConstants.RESET_JOB_SOURCE_DOCKER_IMAGE_STUB.equalsIgnoreCase(dockerImage); + } + } diff --git a/docs/integrations/sources/faker.md b/docs/integrations/sources/faker.md index c730c643d9dc..4ea89e33da23 100644 --- a/docs/integrations/sources/faker.md +++ b/docs/integrations/sources/faker.md @@ -1,16 +1,67 @@ -# Faker +# Sample Data (Faker) ## Sync overview -The Faker source generates sample data using the python [`faker`](https://faker.readthedocs.io/) package. Specifically, we generate data that looks like an e-commerce company's `users` table with the [`faker.profile()`](https://faker.readthedocs.io/en/master/providers/faker.providers.profile.html) method. +The Sample Data (Faker) source generates sample data using the python [`mimesis`](https://mimesis.name/en/master/) package. ### Output schema -Only `Users` is supported. - -### Data type mapping - -Native Airbyte types (string, number, date, etc) +This source will generate an "e-commerce-like" dataset with users, products, and purchases. Here's what is produced at a Postgres destination connected to this source: + +```sql +CREATE TABLE "public"."users" ( + "id" float8, + "age" int8, + "name" text, + "email" text, + "title" text, + "gender" text, + "height" text, + "weight" int8, + "language" text, + "telephone" text, + "blood_type" text, + "created_at" timestamptz, + "occupation" text, + "updated_at" timestamptz, + "nationality" text, + "academic_degree" text, + -- "_airbyte_ab_id" varchar, + -- "_airbyte_emitted_at" timestamptz, + -- "_airbyte_normalized_at" timestamptz, + -- "_airbyte_dev_users_hashid" text, + -- "_airbyte_unique_key" text +); + +CREATE TABLE "public"."products" ( + "id" float8, + "make" text, + "year" float8, + "model" text, + "price" float8, + "created_at" timestamptz, + -- "_airbyte_ab_id" varchar, + -- "_airbyte_emitted_at" timestamptz, + -- "_airbyte_normalized_at" timestamptz, + -- "_airbyte_dev_products_hashid" text, + -- "_airbyte_unique_key" text +); + +CREATE TABLE "public"."purchases" ( + "id" float8, + "user_id" float8, + "product_id" float8, + "returned_at" timestamptz, + "purchased_at" timestamptz, + "added_to_cart_at" timestamptz, + -- "_airbyte_ab_id" varchar, + -- "_airbyte_emitted_at" timestamptz, + -- "_airbyte_normalized_at" timestamptz, + -- "_airbyte_dev_purchases_hashid" text, + -- "_airbyte_unique_key" text +); + +``` ### Features @@ -21,34 +72,26 @@ Native Airbyte types (string, number, date, etc) | Namespaces | No | | Of note, if you choose `Incremental Sync`, state will be maintained between syncs, and once you hit `count` records, no new records will be added. -You can choose a specific `seed` (integer) as an option for this connector which will guarantee that the same fake records are generated each time. Otherwise, random data will be created on each subsequent sync. - -### Rate Limiting & Performance Considerations -N/A - -## Getting started +You can choose a specific `seed` (integer) as an option for this connector which will guarantee that the same fake records are generated each time. Otherwise, random data will be created on each subsequent sync. ### Requirements None! -### Setup guide - -N/A - ## Changelog -| Version | Date | Pull Request | Subject | -| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------- | -| 0.2.1 | 2022-10-14 | [19197](https://github.com/airbytehq/airbyte/pull/19197) | Emit `AirbyteEstimateTraceMessage` | -| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! | -| 0.1.8 | 2022-10-12 | [17889](https://github.com/airbytehq/airbyte/pull/17889) | Bump to test publish command (2) | -| 0.1.7 | 2022-10-11 | [17848](https://github.com/airbytehq/airbyte/pull/17848) | Bump to test publish command | -| 0.1.6 | 2022-09-07 | [16418](https://github.com/airbytehq/airbyte/pull/16418) | Log start of each stream | -| 0.1.5 | 2022-06-10 | [13695](https://github.com/airbytehq/airbyte/pull/13695) | Emit timestamps in the proper ISO format | -| 0.1.4 | 2022-05-27 | [13298](https://github.com/airbytehq/airbyte/pull/13298) | Test publication flow | -| 0.1.3 | 2022-05-27 | [13248](https://github.com/airbytehq/airbyte/pull/13248) | Add options for records_per_sync and page_size | -| 0.1.2 | 2022-05-26 | [13248](https://github.com/airbytehq/airbyte/pull/13293) | Test publication flow | -| 0.1.1 | 2022-05-26 | [13235](https://github.com/airbytehq/airbyte/pull/13235) | Publish for AMD and ARM (M1 Macs) & remove User.birthdate | -| 0.1.0 | 2022-04-12 | [11738](https://github.com/airbytehq/airbyte/pull/11738) | The Faker Source is created | +| Version | Date | Pull Request | Subject | +| :------ | :--------- | :------------------------------------------------------- | :-------------------------------------------------------------------------------------------------------------- | +| 1.0.0 | 2022-11-28 | [19490](https://github.com/airbytehq/airbyte/pull/19490) | Faker uses the CDK; rename streams to be lower-case (breaking), add determinism to random purchases, and rename | +| 0.2.1 | 2022-10-14 | [19197](https://github.com/airbytehq/airbyte/pull/19197) | Emit `AirbyteEstimateTraceMessage` | +| 0.2.0 | 2022-10-14 | [18021](https://github.com/airbytehq/airbyte/pull/18021) | Move to mimesis for speed! | +| 0.1.8 | 2022-10-12 | [17889](https://github.com/airbytehq/airbyte/pull/17889) | Bump to test publish command (2) | +| 0.1.7 | 2022-10-11 | [17848](https://github.com/airbytehq/airbyte/pull/17848) | Bump to test publish command | +| 0.1.6 | 2022-09-07 | [16418](https://github.com/airbytehq/airbyte/pull/16418) | Log start of each stream | +| 0.1.5 | 2022-06-10 | [13695](https://github.com/airbytehq/airbyte/pull/13695) | Emit timestamps in the proper ISO format | +| 0.1.4 | 2022-05-27 | [13298](https://github.com/airbytehq/airbyte/pull/13298) | Test publication flow | +| 0.1.3 | 2022-05-27 | [13248](https://github.com/airbytehq/airbyte/pull/13248) | Add options for records_per_sync and page_size | +| 0.1.2 | 2022-05-26 | [13248](https://github.com/airbytehq/airbyte/pull/13293) | Test publication flow | +| 0.1.1 | 2022-05-26 | [13235](https://github.com/airbytehq/airbyte/pull/13235) | Publish for AMD and ARM (M1 Macs) & remove User.birthdate | +| 0.1.0 | 2022-04-12 | [11738](https://github.com/airbytehq/airbyte/pull/11738) | The Faker Source is created | diff --git a/docs/integrations/sources/gitlab.md b/docs/integrations/sources/gitlab.md index b01a0e598f70..5a58dd482517 100644 --- a/docs/integrations/sources/gitlab.md +++ b/docs/integrations/sources/gitlab.md @@ -61,13 +61,14 @@ GitLab source is working with GitLab API v4. It can also work with self-hosted G ## Changelog -| Version | Date | Pull Request | Subject | -|:--------|:-----------|:---------------------------------------------------------| :--- | -| 0.1.6 | 2022-06-23 | [13252](https://github.com/airbytehq/airbyte/pull/13252) | Add GroupIssueBoards stream | -| 0.1.5 | 2022-05-02 | [11907](https://github.com/airbytehq/airbyte/pull/11907) | Fix null projects param and `container_expiration_policy` | -| 0.1.4 | 2022-03-23 | [11140](https://github.com/airbytehq/airbyte/pull/11140) | Ingest All Accessible Groups if not Specified in Config | -| 0.1.3 | 2021-12-21 | [8991](https://github.com/airbytehq/airbyte/pull/8991) | Update connector fields title/description | -| 0.1.2 | 2021-10-18 | [7108](https://github.com/airbytehq/airbyte/pull/7108) | Allow all domains to be used as `api_url` | +| Version | Date | Pull Request | Subject | +|:--------|:-----------|:---------------------------------------------------------|:-------------------------------------------------------------------------------------------| +| 0.1.7 | 2022-12-01 | [19986](https://github.com/airbytehq/airbyte/pull/19986) | Fix `GroupMilestones` stream schema | +| 0.1.6 | 2022-06-23 | [13252](https://github.com/airbytehq/airbyte/pull/13252) | Add GroupIssueBoards stream | +| 0.1.5 | 2022-05-02 | [11907](https://github.com/airbytehq/airbyte/pull/11907) | Fix null projects param and `container_expiration_policy` | +| 0.1.4 | 2022-03-23 | [11140](https://github.com/airbytehq/airbyte/pull/11140) | Ingest All Accessible Groups if not Specified in Config | +| 0.1.3 | 2021-12-21 | [8991](https://github.com/airbytehq/airbyte/pull/8991) | Update connector fields title/description | +| 0.1.2 | 2021-10-18 | [7108](https://github.com/airbytehq/airbyte/pull/7108) | Allow all domains to be used as `api_url` | | 0.1.1 | 2021-10-12 | [6932](https://github.com/airbytehq/airbyte/pull/6932) | Fix pattern field in spec file, remove unused fields from config files, use cache from CDK | -| 0.1.0 | 2021-07-06 | [4174](https://github.com/airbytehq/airbyte/pull/4174) | Initial Release | +| 0.1.0 | 2021-07-06 | [4174](https://github.com/airbytehq/airbyte/pull/4174) | Initial Release | diff --git a/docs/integrations/sources/iterable.md b/docs/integrations/sources/iterable.md index 5fc094849061..429e45b23502 100644 --- a/docs/integrations/sources/iterable.md +++ b/docs/integrations/sources/iterable.md @@ -76,6 +76,7 @@ The Iterable source connector supports the following [sync modes](https://docs.a | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:---------------------------------------------------------------------------| +| 0.1.22 | 2022-11-30 | [19913](https://github.com/airbytehq/airbyte/pull/19913) | Replace pendulum.parse -> dateutil.parser.parse to avoid memory leak | | 0.1.21 | 2022-10-27 | [18537](https://github.com/airbytehq/airbyte/pull/18537) | Improve streams discovery | | 0.1.20 | 2022-10-21 | [18292](https://github.com/airbytehq/airbyte/pull/18292) | Better processing of 401 and 429 errors | | 0.1.19 | 2022-10-05 | [17602](https://github.com/airbytehq/airbyte/pull/17602) | Add check for stream permissions | diff --git a/docs/integrations/sources/slack.md b/docs/integrations/sources/slack.md index 92e7d7fdd6a2..8323cd5bc688 100644 --- a/docs/integrations/sources/slack.md +++ b/docs/integrations/sources/slack.md @@ -136,6 +136,7 @@ It is recommended to sync required channels only, this can be done by specifying | Version | Date | Pull Request | Subject | |:--------|:-----------|:---------------------------------------------------------|:----------------------------------------------------| +| 0.1.19 | 2022-12-01 | [19970](https://github.com/airbytehq/airbyte/pull/19970) | Remove OAuth2.0 broken `refresh_token` support | | 0.1.18 | 2022-09-28 | [17315](https://github.com/airbytehq/airbyte/pull/17315) | Always install latest version of Airbyte CDK | | 0.1.17 | 2022-08-28 | [16085](https://github.com/airbytehq/airbyte/pull/16085) | Increase unit test coverage | | 0.1.16 | 2022-08-28 | [16050](https://github.com/airbytehq/airbyte/pull/16050) | Fix SATs | diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 7454180f2ae8..943f0cbfa754 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 41dfb87909a8..ef89f5aef35f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.4-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6-bin.zip +networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME -zipStorePath=wrapper/dists +zipStorePath=wrapper/dists \ No newline at end of file diff --git a/gradlew b/gradlew index 1b6c787337ff..65dcd68d65c8 100755 --- a/gradlew +++ b/gradlew @@ -55,7 +55,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/master/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/HEAD/subprojects/plugins/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. @@ -80,10 +80,10 @@ do esac done -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -APP_NAME="Gradle" +# This is normally unused +# shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} +APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit # Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' @@ -143,12 +143,16 @@ fi if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) + # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac case $MAX_FD in #( '' | soft) :;; #( *) + # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. + # shellcheck disable=SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -205,6 +209,12 @@ set -- \ org.gradle.wrapper.GradleWrapperMain \ "$@" +# Stop when "xargs" is not available. +if ! command -v xargs >/dev/null 2>&1 +then + die "xargs is not available" +fi + # Use "xargs" to parse quoted args. # # With -n1 it outputs one arg per line, with the quotes and backslashes removed. diff --git a/gradlew.bat b/gradlew.bat index ac1b06f93825..6689b85beecd 100644 --- a/gradlew.bat +++ b/gradlew.bat @@ -14,7 +14,7 @@ @rem limitations under the License. @rem -@if "%DEBUG%" == "" @echo off +@if "%DEBUG%"=="" @echo off @rem ########################################################################## @rem @rem Gradle startup script for Windows @@ -25,7 +25,8 @@ if "%OS%"=="Windows_NT" setlocal set DIRNAME=%~dp0 -if "%DIRNAME%" == "" set DIRNAME=. +if "%DIRNAME%"=="" set DIRNAME=. +@rem This is normally unused set APP_BASE_NAME=%~n0 set APP_HOME=%DIRNAME% @@ -40,7 +41,7 @@ if defined JAVA_HOME goto findJavaFromJavaHome set JAVA_EXE=java.exe %JAVA_EXE% -version >NUL 2>&1 -if "%ERRORLEVEL%" == "0" goto execute +if %ERRORLEVEL% equ 0 goto execute echo. echo ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. @@ -75,13 +76,15 @@ set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar :end @rem End local scope for the variables with windows NT shell -if "%ERRORLEVEL%"=="0" goto mainEnd +if %ERRORLEVEL% equ 0 goto mainEnd :fail rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of rem the _cmd.exe /c_ return code! -if not "" == "%GRADLE_EXIT_CONSOLE%" exit 1 -exit /b 1 +set EXIT_CODE=%ERRORLEVEL% +if %EXIT_CODE% equ 0 set EXIT_CODE=1 +if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% +exit /b %EXIT_CODE% :mainEnd if "%OS%"=="Windows_NT" endlocal