From 19abe85d4db3a541abe5ed6cff2646eb77a02f7e Mon Sep 17 00:00:00 2001 From: cgardens Date: Wed, 18 Nov 2020 17:39:38 -0800 Subject: [PATCH 1/4] new new catalog --- .../config/AirbyteProtocolConverters.java | 8 ++- .../config/AirbyteProtocolConvertersTest.java | 10 +-- .../models/airbyte_protocol.py | 7 +- .../transform_catalog/transform.py | 15 +++-- .../base-singer/base_singer/singer_helpers.py | 3 +- .../destination/TestDestination.java | 3 +- .../bigquery/BigQueryDestination.java | 6 +- .../bigquery/BigQueryDestinationTest.java | 19 +++++- .../destination/csv/CsvDestination.java | 6 +- .../postgres/PostgresDestination.java | 8 +-- .../postgres/PostgresDestinationTest.java | 17 +++-- .../snowflake/SnowflakeDestination.java | 8 +-- .../source-file/source_file/source.py | 3 +- .../google_sheets_source.py | 4 +- .../google_sheets_source/helpers.py | 7 +- .../unit_tests/test_helpers.py | 9 ++- .../source/jdbc/AbstractJdbcSource.java | 6 +- .../source/jdbc/JdbcSourceTest.java | 9 ++- .../source_mailchimp/source.py | 3 +- .../source/mssql/MssqlSourceTest.java | 2 +- .../source/mysql/MySqlSourceTest.java | 7 +- .../source/postgres/PostgresSourceTest.java | 7 +- .../protocol/models/CatalogHelpers.java | 12 ++-- .../airbyte_protocol/airbyte_protocol.yaml | 15 ++--- .../protocol/models/CatalogHelpersTest.java | 3 +- .../io/airbyte/workers/DefaultSyncWorker.java | 2 +- .../DefaultNormalizationRunner.java | 2 +- .../workers/DefaultSyncWorkerTest.java | 8 ++- .../airbyte/DefaultAirbyteSourceTest.java | 6 +- docs/SUMMARY.md | 2 + docs/architecture/airbyte-specification.md | 3 +- docs/architecture/catalog.md | 33 ++++++++++ docs/architecture/incremental.md | 66 +++++++++++++++++++ 33 files changed, 229 insertions(+), 90 deletions(-) create mode 100644 docs/architecture/catalog.md create mode 100644 docs/architecture/incremental.md diff --git a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java index a5aa75ba01ae8..581ee85f42f06 100644 --- a/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java +++ b/airbyte-config/models/src/main/java/io/airbyte/config/AirbyteProtocolConverters.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; import io.airbyte.protocol.models.AirbyteCatalog; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import java.util.ArrayList; @@ -44,10 +45,11 @@ public class AirbyteProtocolConverters { public static ConfiguredAirbyteCatalog toConfiguredCatalog(Schema schema) { List airbyteStreams = schema.getStreams().stream() .map(s -> new ConfiguredAirbyteStream() - .withName(s.getName()) - .withJsonSchema(toJson(s.getFields()))) + .withStream(new AirbyteStream() + .withName(s.getName()) + .withJsonSchema(toJson(s.getFields())))) // perform selection based on the output of toJson, which keeps properties if selected=true - .filter(s -> !s.getJsonSchema().get("properties").isEmpty()) + .filter(s -> !s.getStream().getJsonSchema().get("properties").isEmpty()) .collect(Collectors.toList()); return new ConfiguredAirbyteCatalog().withStreams(airbyteStreams); } diff --git a/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java b/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java index 1d8dc98d54aeb..3d5682267f7b4 100644 --- a/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java +++ b/airbyte-config/models/src/test/java/io/airbyte/config/AirbyteProtocolConvertersTest.java @@ -52,10 +52,12 @@ class AirbyteProtocolConvertersTest { Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER))))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = new ConfiguredAirbyteCatalog() .withStreams(Lists.newArrayList(new ConfiguredAirbyteStream() - .withName(STREAM) - .withJsonSchema(CatalogHelpers.fieldsToJsonSchema( - Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING), - Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER))))); + .withStream( + new AirbyteStream() + .withName(STREAM) + .withJsonSchema(CatalogHelpers.fieldsToJsonSchema( + Field.of(COLUMN_NAME, JsonSchemaPrimitive.STRING), + Field.of(COLUMN_AGE, JsonSchemaPrimitive.NUMBER)))))); private static final Schema SCHEMA = new Schema() .withStreams(Lists.newArrayList(new Stream() diff --git a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py index 859b998664df9..007bece7573f1 100644 --- a/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py +++ b/airbyte-integrations/bases/airbyte-protocol/airbyte_protocol/models/airbyte_protocol.py @@ -97,6 +97,10 @@ class AirbyteStream(BaseModel): name: str = Field(..., description="Stream's name.") json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.") supported_sync_modes: Optional[List[SyncMode]] = None + source_defined_cursor: Optional[bool] = Field( + None, + description="If the source defines the cursor field, then it does any other cursor field inputs will be ignored. If it does not either the user_provided one is used or as a backup the default one is used.", + ) default_cursor_field: Optional[List[str]] = Field( None, description="Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves.", @@ -104,8 +108,7 @@ class AirbyteStream(BaseModel): class ConfiguredAirbyteStream(BaseModel): - name: str = Field(..., description="Stream's name.") - json_schema: Dict[str, Any] = Field(..., description="Stream schema using Json Schema specs.") + stream: AirbyteStream sync_mode: Optional[SyncMode] = "full_refresh" cursor_field: Optional[List[str]] = Field( None, diff --git a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py index 99e564af8b35d..64360535c7476 100644 --- a/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py +++ b/airbyte-integrations/bases/base-normalization/normalization/transform_catalog/transform.py @@ -373,13 +373,18 @@ def process_node( def generate_dbt_model(catalog: dict, json_col: str, schema: str) -> Tuple[dict, Set[Union[str]]]: result = {} source_tables = set() - for obj in catalog["streams"]: - if "name" in obj: - name = obj["name"] + for configuredStream in catalog["streams"]: + if "stream" in configuredStream: + stream = configuredStream["stream"] + else: + stream = {} + + if "name" in stream: + name = stream["name"] else: name = "undefined" # todo: should this raise an exception? - if "json_schema" in obj and "properties" in obj["json_schema"]: - properties = obj["json_schema"]["properties"] + if "json_schema" in stream and "properties" in stream["json_schema"]: + properties = stream["json_schema"]["properties"] else: properties = {} # TODO Replace {name}_raw by an argument like we do for the json_blob column diff --git a/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py b/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py index 316a7d3839d00..2acb8a29015b8 100644 --- a/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py +++ b/airbyte-integrations/bases/base-singer/base_singer/singer_helpers.py @@ -134,7 +134,8 @@ def create_singer_catalog_with_selection(masked_airbyte_catalog: ConfiguredAirby masked_singer_streams = [] stream_to_airbyte_schema = {} - for stream in masked_airbyte_catalog["streams"]: + for configured_stream in masked_airbyte_catalog["streams"]: + stream = configured_stream["stream"] stream_to_airbyte_schema[stream.get("name")] = stream for singer_stream in discovered_singer_catalog.get("streams"): diff --git a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java index 5fa952761c184..2657d223275cd 100644 --- a/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java +++ b/airbyte-integrations/bases/standard-destination-test/src/main/java/io/airbyte/integrations/standardtest/destination/TestDestination.java @@ -331,8 +331,7 @@ private void runSync(JsonNode config, List messages, ConfiguredA pbf, targetConfig.getDestinationConnectionConfiguration()); runner.start(); final Path normalizationRoot = Files.createDirectories(jobRoot.resolve("normalize")); - if (!runner.normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), - targetConfig.getCatalog())) { + if (!runner.normalize(normalizationRoot, targetConfig.getDestinationConnectionConfiguration(), targetConfig.getCatalog())) { throw new WorkerException("Normalization Failed."); } runner.close(); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 6a7d038a9ac43..f0002e0951fbf 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -193,8 +193,8 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb // create tmp tables if not exist for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { - final String tableName = NamingHelper.getRawTableName(stream.getName()); - final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); + final String tableName = NamingHelper.getRawTableName(stream.getStream().getName()); + final String tmpTableName = stream.getStream().getName() + "_" + Instant.now().toEpochMilli(); createTable(bigquery, datasetId, tmpTableName); // https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source @@ -206,7 +206,7 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb final TableDataWriteChannel writer = bigquery.writer(JobId.of(UUID.randomUUID().toString()), writeChannelConfiguration); - writeConfigs.put(stream.getName(), new WriteConfig(TableId.of(datasetId, tableName), TableId.of(datasetId, tmpTableName), writer)); + writeConfigs.put(stream.getStream().getName(), new WriteConfig(TableId.of(datasetId, tableName), TableId.of(datasetId, tmpTableName), writer)); } // write to tmp tables diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java index 72797a9a9e66b..c73308d53aedf 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationTest.java @@ -50,6 +50,7 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -228,7 +229,11 @@ void testWriteSuccess() throws Exception { assertEquals(expectedTasksJson.size(), tasksActual.size()); assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson)); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); + assertTmpTablesNotPresent(CATALOG.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(Collectors.toList())); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -244,8 +249,16 @@ void testWriteFailure() throws Exception { consumer.accept(MESSAGE_USERS2); consumer.close(); - final List tableNames = CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(toList()); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); + final List tableNames = CATALOG.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(toList()); + assertTmpTablesNotPresent(CATALOG.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(Collectors.toList())); // assert that no tables were created. assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); } diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index c6e477e46729d..b7bebc36b3e0d 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -93,11 +93,11 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb final long now = Instant.now().toEpochMilli(); final Map writeConfigs = new HashMap<>(); for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { - final Path tmpPath = destinationDir.resolve(stream.getName() + "_" + now + ".csv"); - final Path finalPath = destinationDir.resolve(stream.getName() + ".csv"); + final Path tmpPath = destinationDir.resolve(stream.getStream().getName() + "_" + now + ".csv"); + final Path finalPath = destinationDir.resolve(stream.getStream().getName() + ".csv"); final FileWriter fileWriter = new FileWriter(tmpPath.toFile()); final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_AB_ID, COLUMN_EMITTED_AT, COLUMN_DATA)); - writeConfigs.put(stream.getName(), new WriteConfig(printer, tmpPath, finalPath)); + writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath)); } return new CsvConsumer(writeConfigs, catalog); diff --git a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java index 1df5f989883b4..5186f9c0f9b6b 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java +++ b/airbyte-integrations/connectors/destination-postgres/src/main/java/io/airbyte/integrations/destination/postgres/PostgresDestination.java @@ -133,8 +133,8 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb // create tmp tables if not exist for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { - final String tableName = NamingHelper.getRawTableName(stream.getName()); - final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); + final String tableName = NamingHelper.getRawTableName(stream.getStream().getName()); + final String tmpTableName = stream.getStream().getName() + "_" + Instant.now().toEpochMilli(); database.query(ctx -> ctx.execute(String.format( "CREATE TABLE \"%s\" ( \n" + "\"ab_id\" VARCHAR PRIMARY KEY,\n" @@ -144,8 +144,8 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb tmpTableName, COLUMN_NAME))); final Path queueRoot = Files.createTempDirectory("queues"); - final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getName()), stream.getName()); - writeBuffers.put(stream.getName(), new WriteConfig(tableName, tmpTableName, writeBuffer)); + final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getStream().getName()), stream.getStream().getName()); + writeBuffers.put(stream.getStream().getName(), new WriteConfig(tableName, tmpTableName, writeBuffer)); } // write to tmp tables diff --git a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java index e4b8a0950f9ec..cb0e76bf43c4f 100644 --- a/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java +++ b/airbyte-integrations/connectors/destination-postgres/src/test/java/io/airbyte/integrations/destination/postgres/PostgresDestinationTest.java @@ -24,7 +24,6 @@ package io.airbyte.integrations.destination.postgres; -import static java.util.stream.Collectors.toList; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -47,6 +46,7 @@ import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteRecordMessage; import io.airbyte.protocol.models.AirbyteStateMessage; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -173,7 +173,8 @@ void testWriteSuccess() throws Exception { final Set expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData()); assertEquals(expectedTasksJson, tasksActual); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); + assertTmpTablesNotPresent( + CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getStream).map(AirbyteStream::getName).collect(Collectors.toList())); } @SuppressWarnings("ResultOfMethodCallIgnored") @@ -189,8 +190,16 @@ void testWriteFailure() throws Exception { consumer.accept(MESSAGE_USERS2); consumer.close(); - final List tableNames = CATALOG.getStreams().stream().map(s -> NamingHelper.getRawTableName(s.getName())).collect(toList()); - assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList())); + final List tableNames = CATALOG.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(s -> NamingHelper.getRawTableName(s.getName())) + .collect(Collectors.toList()); + assertTmpTablesNotPresent(CATALOG.getStreams() + .stream() + .map(ConfiguredAirbyteStream::getStream) + .map(AirbyteStream::getName) + .collect(Collectors.toList())); // assert that no tables were created. assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith))); } diff --git a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java index 0e553a5ea46b7..984513d357445 100644 --- a/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java +++ b/airbyte-integrations/connectors/destination-snowflake/src/main/java/io/airbyte/integrations/destination/snowflake/SnowflakeDestination.java @@ -107,8 +107,8 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb // create temporary tables if they do not exist // we don't use temporary/transient since we want to control the lifecycle for (final ConfiguredAirbyteStream stream : catalog.getStreams()) { - final String tableName = NamingHelper.getRawTableName(stream.getName()); - final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli(); + final String tableName = NamingHelper.getRawTableName(stream.getStream().getName()); + final String tmpTableName = stream.getStream().getName() + "_" + Instant.now().toEpochMilli(); final String query = String.format( "CREATE TABLE IF NOT EXISTS \"%s\" ( \n" @@ -121,8 +121,8 @@ public DestinationConsumer write(JsonNode config, ConfiguredAirb SnowflakeDatabase.executeSync(connectionFactory, query); final Path queueRoot = Files.createTempDirectory("queues"); - final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getName()), stream.getName()); - writeBuffers.put(stream.getName(), new SnowflakeWriteContext(tableName, tmpTableName, writeBuffer)); + final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getStream().getName()), stream.getStream().getName()); + writeBuffers.put(stream.getStream().getName(), new SnowflakeWriteContext(tableName, tmpTableName, writeBuffer)); } // write to transient tables diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index ea8a96bc7d58f..96932d3d5682b 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -454,7 +454,8 @@ def convert_dtype(dtype) -> str: @staticmethod def parse_catalog(catalog: AirbyteCatalog) -> set: columns = set() - for stream in catalog.streams: + for configured_stream in catalog.streams: + stream = configured_stream["stream"] for key in stream.json_schema["properties"].keys(): columns.add(key) return columns diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py index 34b4ecf26c818..da52904ad4540 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/google_sheets_source.py @@ -25,7 +25,7 @@ import json from typing import Generator -from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, Status, Type +from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, ConfiguredAirbyteCatalog, Status, Type from apiclient import errors from base_python import AirbyteLogger, Source @@ -88,7 +88,7 @@ def read(self, logger: AirbyteLogger, config_container, catalog_path, state=None config = config_container.rendered_config client = Helpers.get_authenticated_sheets_client(json.loads(config["credentials_json"])) - catalog = AirbyteCatalog.parse_obj(self.read_config(catalog_path)) + catalog = ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path)) sheet_to_column_name = Helpers.parse_sheet_and_column_names_from_catalog(catalog) spreadsheet_id = config["spreadsheet_id"] diff --git a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py index 84d736d0315a1..3de89c7f9aaf6 100644 --- a/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/google_sheets_source/helpers.py @@ -26,7 +26,7 @@ from datetime import datetime from typing import Dict, FrozenSet, Iterable, List -from airbyte_protocol import AirbyteCatalog, AirbyteRecordMessage, AirbyteStream +from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog from apiclient import discovery from google.oauth2 import service_account @@ -109,9 +109,10 @@ def get_first_row(client: discovery.Resource, spreadsheet_id: str, sheet_name: s return Helpers.get_formatted_row_values(first_row_data) @staticmethod - def parse_sheet_and_column_names_from_catalog(catalog: AirbyteCatalog) -> Dict[str, FrozenSet[str]]: + def parse_sheet_and_column_names_from_catalog(catalog: ConfiguredAirbyteCatalog) -> Dict[str, FrozenSet[str]]: sheet_to_column_name = {} - for stream in catalog.streams: + for configured_stream in catalog.streams: + stream = configured_stream.stream sheet_name = stream.name sheet_to_column_name[sheet_name] = frozenset(stream.json_schema["properties"].keys()) diff --git a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py index 35ebb61f3cc37..47137b89e1bf1 100644 --- a/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py +++ b/airbyte-integrations/connectors/source-google-sheets/unit_tests/test_helpers.py @@ -25,7 +25,7 @@ import unittest from unittest.mock import Mock -from airbyte_protocol import AirbyteCatalog, AirbyteRecordMessage, AirbyteStream +from airbyte_protocol import AirbyteRecordMessage, AirbyteStream, ConfiguredAirbyteCatalog, ConfiguredAirbyteStream from google_sheets_source.helpers import Helpers from google_sheets_source.models import CellData, GridData, RowData, Sheet, SheetProperties, Spreadsheet @@ -100,8 +100,11 @@ def test_parse_sheet_and_column_names_from_catalog(self): sheet2_columns = frozenset(["gsw", "lakers"]) sheet2_schema = {"properties": {c: {"type": "string"} for c in sheet2_columns}} - catalog = AirbyteCatalog( - streams=[AirbyteStream(name=sheet1, json_schema=sheet1_schema), AirbyteStream(name=sheet2, json_schema=sheet2_schema)] + catalog = ConfiguredAirbyteCatalog( + streams=[ + ConfiguredAirbyteStream(stream=AirbyteStream(name=sheet1, json_schema=sheet1_schema)), + ConfiguredAirbyteStream(stream=AirbyteStream(name=sheet2, json_schema=sheet2_schema)), + ] ) actual = Helpers.parse_sheet_and_column_names_from_catalog(catalog) diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 5490ebf1380a6..3fb7f38609d75 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -155,13 +155,13 @@ public Stream read(JsonNode config, ConfiguredAirbyteCatalog cat Stream resultStream = Stream.empty(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { - if (!tableNameToTable.containsKey(airbyteStream.getName())) { + if (!tableNameToTable.containsKey(airbyteStream.getStream().getName())) { continue; } final Set selectedFields = CatalogHelpers.getTopLevelFieldNames(airbyteStream); - final TableInfo table = tableNameToTable.get(airbyteStream.getName()); + final TableInfo table = tableNameToTable.get(airbyteStream.getStream().getName()); final List selectedDatabaseFields = table.getFields() .stream() .filter(field -> selectedFields.contains(field.getName())) @@ -177,7 +177,7 @@ public Stream read(JsonNode config, ConfiguredAirbyteCatalog cat .map(r -> new AirbyteMessage() .withType(Type.RECORD) .withRecord(new AirbyteRecordMessage() - .withStream(airbyteStream.getName()) + .withStream(airbyteStream.getStream().getName()) .withEmittedAt(now.toEpochMilli()) .withData(Jsons.deserialize(r.formatJSON(DB_JSON_FORMAT)))))); diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java index 0c9d0b092e54c..83c99c55e22ab 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java @@ -53,7 +53,6 @@ import java.io.IOException; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -211,14 +210,14 @@ void testReadMultipleTables() throws Exception { @SuppressWarnings("ResultOfMethodCallIgnored") @Test - void testReadFailure() throws Exception { + void testReadFailure() { final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); - doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getName(); + doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream(); - final Stream stream = new JdbcSource().read(config, catalog, null); + final JdbcSource source = new JdbcSource(); - assertThrows(RuntimeException.class, () -> stream.collect(Collectors.toList())); + assertThrows(RuntimeException.class, () -> source.read(config, catalog, null)); } } diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py index e7bed1d5a97be..770cfa797d879 100644 --- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py @@ -60,7 +60,8 @@ def read( catalog = AirbyteCatalog.parse_obj(self.read_config(catalog_path)) logger.info("Starting syncing mailchimp") - for stream in catalog.streams: + for configured_stream in catalog.streams: + stream = configured_stream["stream"] for record in self._read_record(client=client, stream=stream.name): yield AirbyteMessage(type=Type.RECORD, record=record) diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java index 27873884b4bd2..2426a78ba5445 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/MssqlSourceTest.java @@ -164,7 +164,7 @@ void testReadSuccess() throws Exception { void testReadFailure() { final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); - doThrow(new IllegalStateException()).when(spiedAbStream).getName(); + doThrow(new IllegalStateException()).when(spiedAbStream).getStream(); final MssqlSource source = new MssqlSource(); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java index 49d4049909e9a..39f82966d8afd 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java @@ -52,7 +52,6 @@ import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; @@ -177,11 +176,11 @@ void testReadSuccess() throws Exception { void testReadFailure() throws Exception { final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); - doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getName(); + doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream(); - final Stream stream = new MySqlSource().read(config, catalog, null); + final MySqlSource source = new MySqlSource(); - assertThrows(RuntimeException.class, () -> stream.collect(Collectors.toList())); + assertThrows(RuntimeException.class, () -> source.read(config, catalog, null)); } } diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 8404952d1b0ad..8a2f3568053a7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -53,7 +53,6 @@ import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; import org.jooq.SQLDialect; import org.junit.jupiter.api.AfterAll; @@ -223,11 +222,11 @@ public void testCanReadUtf8() throws Exception { void testReadFailure() throws Exception { final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); - doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getName(); + doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream(); - final Stream stream = new PostgresSource().read(config, catalog, null); + final PostgresSource source = new PostgresSource(); - assertThrows(RuntimeException.class, () -> stream.collect(Collectors.toList())); + assertThrows(RuntimeException.class, () -> source.read(config, catalog, null)); } } diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 658a94127fbb2..92ccbddbc5c1a 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -66,7 +66,7 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String strea } public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String streamName, List fields) { - return new ConfiguredAirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields)); + return new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields))); } /** @@ -86,8 +86,7 @@ public static ConfiguredAirbyteCatalog toDefaultConfiguredCatalog(AirbyteCatalog public static ConfiguredAirbyteStream toDefaultConfiguredStream(AirbyteStream stream) { return new ConfiguredAirbyteStream() - .withName(stream.getName()) - .withJsonSchema(stream.getJsonSchema()) + .withStream(stream) .withSyncMode(SyncMode.FULL_REFRESH) .withCursorField(new ArrayList<>()); } @@ -117,7 +116,7 @@ public static JsonNode fieldsToJsonSchema(List fields) { @SuppressWarnings("unchecked") public static Set getTopLevelFieldNames(final ConfiguredAirbyteStream stream) { // it is json, so the key has to be a string. - final Map object = Jsons.object(stream.getJsonSchema().get("properties"), Map.class); + final Map object = Jsons.object(stream.getStream().getJsonSchema().get("properties"), Map.class); return object.keySet(); } @@ -163,7 +162,7 @@ public static List getInvalidStreamNames(AirbyteCatalog catalog) { * @return list of stream names in the catalog that are invalid */ public static List getInvalidStreamNames(ConfiguredAirbyteCatalog catalog) { - return getInvalidStreamNames(catalog.getStreams().stream().map(ConfiguredAirbyteStream::getName)); + return getInvalidStreamNames(catalog.getStreams().stream().map(ConfiguredAirbyteStream::getStream).map(AirbyteStream::getName)); } private static List getInvalidStreamNames(Stream names) { @@ -197,7 +196,8 @@ private static Map getStreamNameToJsonSchema(AirbyteCatalog ca private static Map getStreamNameToJsonSchema(ConfiguredAirbyteCatalog catalog) { return catalog.getStreams() .stream() - .collect(Collectors.toMap(ConfiguredAirbyteStream::getName, ConfiguredAirbyteStream::getJsonSchema)); + .map(ConfiguredAirbyteStream::getStream) + .collect(Collectors.toMap(AirbyteStream::getName, AirbyteStream::getJsonSchema)); } private static Multimap getInvalidFieldNames(Map streamNameToJsonSchema) { diff --git a/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml b/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml index 30c110ce283d1..c9eb48ce540a5 100644 --- a/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml +++ b/airbyte-protocol/models/src/main/resources/airbyte_protocol/airbyte_protocol.yaml @@ -134,6 +134,9 @@ definitions: type: array items: "$ref": "#/definitions/SyncMode" + source_defined_cursor: + description: If the source defines the cursor field, then it does any other cursor field inputs will be ignored. If it does not either the user_provided one is used or as a backup the default one is used. + type: boolean default_cursor_field: description: Path to the field that will be used to determine if a record is new or modified since the last sync. If not provided by the source, the end user will have to specify the comparable themselves. type: array @@ -154,16 +157,10 @@ definitions: type: object additionalProperties: false required: - - name - - json_schema + - stream properties: - name: - type: string - description: Stream's name. - json_schema: - description: Stream schema using Json Schema specs. - type: object - existingJavaType: com.fasterxml.jackson.databind.JsonNode + stream: + "$ref": "#/definitions/AirbyteStream" sync_mode: "$ref": "#/definitions/SyncMode" default: full_refresh diff --git a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index 58819efe5a4d3..8a8c9739f381d 100644 --- a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -56,7 +56,8 @@ void testFieldToJsonSchema() { @Test void testGetTopLevelFieldNames() { final String json = "{ \"type\": \"object\", \"properties\": { \"name\": { \"type\": \"string\" } } } "; - final Set actualFieldNames = CatalogHelpers.getTopLevelFieldNames(new ConfiguredAirbyteStream().withJsonSchema(Jsons.deserialize(json))); + final Set actualFieldNames = + CatalogHelpers.getTopLevelFieldNames(new ConfiguredAirbyteStream().withStream(new AirbyteStream().withJsonSchema(Jsons.deserialize(json)))); assertEquals(Sets.newHashSet("name"), actualFieldNames); } diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java index 6c95652db8e1d..38dd31bbcb089 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/DefaultSyncWorker.java @@ -144,7 +144,7 @@ private void removeInvalidStreams(ConfiguredAirbyteCatalog catalog) { CatalogHelpers.getInvalidFieldNames(catalog).keySet()); final List streams = catalog.getStreams().stream() - .filter(stream -> !invalidStreams.contains(stream.getName())) + .filter(stream -> !invalidStreams.contains(stream.getStream().getName())) .collect(Collectors.toList()); catalog.setStreams(streams); diff --git a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java index 3da3fccca58c8..45c79c391e0d9 100644 --- a/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java +++ b/airbyte-workers/src/main/java/io/airbyte/workers/normalization/DefaultNormalizationRunner.java @@ -43,7 +43,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner { private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class); - public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:0.1.1"; + public static final String NORMALIZATION_IMAGE_NAME = "airbyte/normalization:dev"; private final DestinationType destinationType; private final ProcessBuilderFactory pbf; diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java index acafa860b9536..71fe79b43f863 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/DefaultSyncWorkerTest.java @@ -36,6 +36,7 @@ import io.airbyte.config.StandardTapConfig; import io.airbyte.config.StandardTargetConfig; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.workers.normalization.NormalizationRunner; @@ -86,9 +87,10 @@ void setup() throws Exception { invalidSyncInput.setState(validSyncInput.getState()); invalidSyncInput.setSyncMode(validSyncInput.getSyncMode()); - final ConfiguredAirbyteStream invalidStream = new ConfiguredAirbyteStream(); - invalidStream.setName(INVALID_STREAM_NAME); - invalidStream.setJsonSchema(Jsons.deserialize("{}")); + final ConfiguredAirbyteStream invalidStream = new ConfiguredAirbyteStream() + .withStream(new AirbyteStream() + .withName(INVALID_STREAM_NAME) + .withJsonSchema(Jsons.deserialize("{}"))); final List streams = new ArrayList<>(validSyncInput.getCatalog().getStreams()); streams.add(invalidStream); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog(); diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java index 855945c355856..ab1998449f2d1 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/protocols/airbyte/DefaultAirbyteSourceTest.java @@ -41,6 +41,7 @@ import io.airbyte.config.StandardTapConfig; import io.airbyte.config.State; import io.airbyte.protocol.models.AirbyteMessage; +import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; @@ -72,8 +73,9 @@ class DefaultAirbyteSourceTest { private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog() .withStreams(Collections.singletonList( new ConfiguredAirbyteStream() - .withName("hudi:latest") - .withJsonSchema(CatalogHelpers.fieldsToJsonSchema(new Field(FIELD_NAME, Field.JsonSchemaPrimitive.STRING))))); + .withStream(new AirbyteStream() + .withName("hudi:latest") + .withJsonSchema(CatalogHelpers.fieldsToJsonSchema(new Field(FIELD_NAME, Field.JsonSchemaPrimitive.STRING)))))); private static final StandardTapConfig TAP_CONFIG = new StandardTapConfig() .withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "the future.")))) diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md index 02d4a12a42b0a..6a43d5519695f 100644 --- a/docs/SUMMARY.md +++ b/docs/SUMMARY.md @@ -38,7 +38,9 @@ * [High-level View](architecture/high-level-view.md) * [Airbyte Specification](architecture/airbyte-specification.md) * [Technical Stack](architecture/tech-stack.md) + * [AirbyteCatalog & ConfiguredAirbyteCatalog](architecture/catalog.md) * [Full Refresh](architecture/full-refresh.md) + * [Incremental](architecture/incremental.md) * [Basic Normalization](architecture/basic-normalization.md) * [Contributing to Airbyte](contributing-to-airbyte/README.md) * [Code of Conduct](contributing-to-airbyte/code-of-conduct.md) diff --git a/docs/architecture/airbyte-specification.md b/docs/architecture/airbyte-specification.md index 1bc9f01523028..681778f64f838 100644 --- a/docs/architecture/airbyte-specification.md +++ b/docs/architecture/airbyte-specification.md @@ -168,7 +168,7 @@ read(Config, AirbyteCatalog, State) -> Stream * Input: 1. `config` - A configuration JSON object that has been validated using the `ConnectorSpecification`. - 2. `catalog` - An `AirbyteCatalog`. This `catalog` should be a subset of the `catalog` returned by the `discover` command. It is what will be used in the `read` command to select what data to transfer. + 2. `catalog` - An `ConfiguredAirbyteCatalog`. This `catalog` should be constructed from the `catalog` returned by the `discover` command. To convert an `AirbyteStream` to a `ConfiguredAirbyteStream` copy the `AirbyteStream` into the stream field of the `ConfiguredAirbyteStream`. Any additional configurations can be specified in the `ConfiguredAirbyteStream`. More details on how this is configured in the [catalog documentation](catalog.md). This catalog will be used in the `read` command to both select what data is transferred and how it is replicated. 3. `state` - A JSON object. This object is only ever written or read by the source, so it is a JSON blob with whatever information is necessary to keep track of how much of the data source has already been read. Because Airbyte currently only supports [Full Refresh](full-refresh.md), this state object is purely cosmetic. It will become more important when Airbyte beings to support incremental syncs. * Output: 1. `message stream` - A stream of `AirbyteRecordMessage`s and `AirbyteStateMessage`s piped to stdout. @@ -229,4 +229,3 @@ For the sake of brevity, we will not re-describe `spec` and `check`. They are ex ## Recognition We have been heavily inspired by Singer.io's [specification](https://github.com/singer-io/getting-started/blob/master/docs/SPEC.md#singer-specification) and would like to acknowledge how some of their choices have helped us bootstrap. - diff --git a/docs/architecture/catalog.md b/docs/architecture/catalog.md new file mode 100644 index 0000000000000..1620b73b12712 --- /dev/null +++ b/docs/architecture/catalog.md @@ -0,0 +1,33 @@ +# AirbyteCatalog & ConfiguredAirbyteCatalog + +## Overview +An `AirbyteCatalog` is a struct that is produced by the `discover` action of a source. It is a list of `AirbyteStream`s. Each `AirbyteStream` describes the data available to be synced from the source. After a source produces an `AirbyteCatalog` or `AirbyteStream`, they should be treated as read only. A `ConfiguredAirbyteCatalog` is a list of `ConfiguredAirbyteStream`s. Each `ConfiguredAirbyteStream` describes how to sync an `AirbyteStream`. + +## Cursor +* The cursor is how sources track which records are new or updated since the last sync. +* A "cursor field" is the field that is used as a comparable for making this determinations. + * If a configuration requires a cursor field, it requires an array of strings that serves as a path to the desired field. e.g. if the structure of a stream is `{ value: 2, metadata: { updated_at: 2020-11-01 } }` the `default_cursor_field` might be `["metadata", "updated_at"]`. + +## AirbyteStream +This section will document the meaning of each field in an `AirbyteStream` +* `json_schema` - This field contains a [JsonSchema](https://json-schema.org/understanding-json-schema) representation of the schema of the stream. +* `supported_sync_modes` - The sync modes that the stream supports. By default, all sources support `FULL_REFRESH`. Even if this array is empty, it can be assumed that a source supports `FULL_REFRESH`. The allowed sync modes are `FULL_REFRESH` and `INCREMENTAL`. +* `source_defined_cursor` - If a source supports the `INCREMENTAL` sync mode, and it sets this field to true, it is responsible for determining internally how it tracks which records in a source are new or updated since the last sync It is an array of keys to a field in the schema. +* `default_cursor_field` - If a source supports the `INCREMENTAL` sync mode, it may, optionally, set this field. If this field is set, and the user does not override it with the `cursor_field` attribute in the `ConfiguredAirbyteStream` (described below), this field will be used as the cursor. + + +## ConfiguredAirbyteStream +This section will document the meaning of each field in an `ConfiguredAirbyteStream` +* `stream` - This field contains the `AirbyteStream` that it is configured. +* `sync_mode` - The sync mode that will be used to sync that stream. The value in this field MUST be present in the `supported_sync_modes` array for the discovered `AirbyteStream` of this stream. +* `cursor_field` - This field is an array of keys to a field in the schema that in the `INCREMENTAL` sync mode will be used to determine if a record is new or updated since the last sync. + * If an `AirbyteStream` defines a `cursor_field`, then the `cursor_field` attribute in `ConfiguredAirbyteStream` will be ignored. + * If an `AirbyteStream` defines a `default_cursor_field`, then the `cursor_field` attribute in `ConfiguredAirbyteStream` is not required, but if it is set, it will override the default value. + * If an `AirbyteStream` does not define a `cursor_field` or a `default_cursor_field`, then `ConfiguredAirbyteStream` must define a `cursor_field`. + +## Logic for resolving the Cursor Field +This section lays out how a cursor field is determined in the case of a Stream that is doing an `incremental` sync. +* If `source_defined_cursor` in `AirbyteStream` is true, then the source determines the cursor field internally. It cannot be overriden. If it is false, continue... +* If `cursor_field` in `ConfiguredAirbyteStream` is set, then the source uses that field as the cursor. If it is not set, continue... +* If `default_cursor_field` in `AirbyteStream` is set, then the sources use that field as the cursor. If it is not set, continue... +* Illegal - If `source_defined_cursor`, `cursor_field`, and `default_cursor_field` are all falsey, this is an invalid configuration. diff --git a/docs/architecture/incremental.md b/docs/architecture/incremental.md new file mode 100644 index 0000000000000..68e720bdc78c4 --- /dev/null +++ b/docs/architecture/incremental.md @@ -0,0 +1,66 @@ +# Incremental + +## Overview + +Incremental syncs in Airbyte allow sources to replicate only new or modified data. This prevents re-fetching data that you have already replicated from a source. We will call this set of new or updated records the delta going forward. + +## Configuration +For a source to do incremental sync is must be able to keep track of new and updated records. This can take a couple different forms. Before we jump into them, we are going to use the word cursor or cursor field to describe the field or column in the data that Airbyte uses as a comparable to determine if any given record is new or has been updated since the last sync. + +## Source-Defined Cursor. +Some sources are able to determine the cursor that the use without any user input. For example, in the exchange rates api source, the source itself can determine that date field should be used to determine the last record that was synced. In these cases, the source will set the `cursor_field` attribute in the `AirbyteStream`. + +## User-Defined Cursor +Some sources cannot define the cursor without user input. For example, in the postgres source, the user needs to choose for themselves which database tables they want to sync. The author of the source cannot predict this. In these cases the user sets the `cursor_field` in the `ConfiguredAirbyteStream`. + +In some cases, the source may propose a `default_cursor_field` in the `AirbyteStream`. In this case, if the user does not specify a `cursor_field` in the `ConfiguredAirbyteStream`, Airbyte will fallback on the default provided by the source. The user is allowed to override the source's `default_cursor_field` by setting the `cursor_field` value in the `ConfiguredAirbyteStream`, but they CANNOT override the `cursor_field` specified in an `AirbyteStream` + +## Rules +The delta from a sync will be _appended_ to the existing data in the data warehouse. Incremental will never delete or mutate existing records. Let's walk through a few examples. + +### Newly Created Record + +Assume that `updated_at` is our `cursor_field`. Let's say the following data already exists into our data warehouse. +```json +[ + { "name": "Louis XVI", "deceased": false, "updated_at": 1754 }, + { "name": "Marie Antoinette", "deceased": false, "updated_at": 1755 } +] +``` + +In the next sync the delta contains the following record: +```json + { "name": "Louis XVII", "deceased": false, "updated_at": 1785 } +``` + +At the end of this incremental sync the data warehouse would now contain: +```json +[ + { "name": "Louis XVI", "deceased": false, "updated_at": 1754 }, + { "name": "Marie Antoinette", "deceased": false, "updated_at": 1755 }, + { "name": "Louis XVII", "deceased": false, "updated_at": 1785 } +] +``` + +### Updating a Record +Let's assume that our warehouse contains all of the data that it did at the end of the previous section. Now unfortunately the king and queen lose their heads. Let's see that delta: +```json +[ + { "name": "Louis XVI", "deceased": true, "updated_at": 1793 }, + { "name": "Marie Antoinette", "deceased": true, "updated_at": 1793 } +] +``` + +The output we expect to see in the warehouse is as follows. +```json +[ + { "name": "Louis XVI", "deceased": false, "updated_at": 1754 }, + { "name": "Marie Antoinette", "deceased": false, "updated_at": 1755 }, + { "name": "Louis XVII", "deceased": false, "updated_at": 1785 }, + { "name": "Louis XVI", "deceased": true, "updated_at": 1793 }, + { "name": "Marie Antoinette", "deceased": true, "updated_at": 1793 } +] +``` + +### Schema Migration +If the schema for the stream changes, Airbyte will _not_ allow an incremental sync to that stream. The user must first run a full refresh. From c998b9886bcbf305f4cd685414cfe4e3251416a7 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 19 Nov 2020 18:35:23 -0800 Subject: [PATCH 2/4] done? --- .../22f6c74f-5699-40ff-833c-4a879ea40133.json | 2 +- .../25c5221d-dce2-4163-ade9-739ef790f503.json | 2 +- .../424892c4-daac-4491-b35d-c6688ba547ba.json | 2 +- .../8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json | 2 +- .../2470e835-feaf-4db6-96f3-70fd645acc77.json | 2 +- .../39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json | 2 +- .../435bb9a5-7887-4809-aa58-28c27df0d7ad.json | 2 +- .../57eb1576-8f52-463d-beb6-2e107cdf571d.json | 2 +- .../71607ba1-c0ac-4799-8049-7f4b90dd50f7.json | 2 +- .../74d47f79-8d01-44ac-9755-f5eb0d7caacb.json | 2 +- .../778daa7c-feaf-4db6-96f3-70fd645acc77.json | 2 +- .../9e0556f4-69df-4522-a3fb-03264d36b348.json | 2 +- .../9fed261d-d107-47fd-8c8b-323023db6e20.json | 2 +- .../b03a9f3e-22a5-11eb-adc1-0242ac120002.json | 2 +- .../b1892b11-788d-44bd-b9ec-3a436f7b54ce.json | 2 +- .../b5ea17b1-f170-46dc-bc31-cc744ca984c1.json | 2 +- .../decd338e-5647-4c0b-adf4-da0e75f5a750.json | 2 +- .../e094cb9a-26de-4645-8761-65c0c425d1de.json | 2 +- .../ef69ef6e-aa7f-4af1-a01d-ef775033524e.json | 2 +- .../fdc8b827-3257-4b33-83cc-106d234c34d4.json | 2 +- .../destination-bigquery/Dockerfile | 2 +- .../connectors/destination-csv/Dockerfile | 2 +- .../destination-postgres/Dockerfile | 2 +- .../destination-snowflake/Dockerfile | 2 +- .../source-exchangeratesapi-singer/Dockerfile | 2 +- .../SingerExchangeRatesApiSourceTest.java | 12 ++++- .../test-integration/resources/catalog.json | 46 ------------------- .../Dockerfile | 2 +- .../connectors/source-file/Dockerfile | 2 +- .../source-file/source_file/source.py | 17 +++++-- .../source-github-singer/Dockerfile | 2 +- .../source-google-adwords-singer/Dockerfile | 2 +- .../singer_check_catalog.json | 30 ++++-------- .../source-google-sheets/Dockerfile | 2 +- .../source-googleanalytics-singer/Dockerfile | 2 +- .../source-hubspot-singer/Dockerfile | 2 +- .../connectors/source-mailchimp/Dockerfile | 2 +- .../source_mailchimp/source.py | 14 ++++-- .../source-marketo-singer/Dockerfile | 2 +- .../connectors/source-mssql/Dockerfile | 2 +- .../connectors/source-mssql/build.gradle | 3 +- .../connectors/source-mysql/Dockerfile | 2 +- .../connectors/source-postgres/Dockerfile | 2 +- .../source-salesforce-singer/Dockerfile | 2 +- .../source-shopify-singer/Dockerfile | 2 +- .../source-stripe-singer/Dockerfile | 2 +- .../sources/SingerStripeSourceTest.java | 10 +++- .../test-integration/resources/catalog.json | 2 +- .../protocol/models/CatalogHelpers.java | 4 +- .../protocol/models/CatalogHelpersTest.java | 2 +- 50 files changed, 97 insertions(+), 123 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/resources/catalog.json diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json index 01bc7657234e5..7665b9e41c592 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/22f6c74f-5699-40ff-833c-4a879ea40133.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133", "name": "BigQuery", "dockerRepository": "airbyte/destination-bigquery", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.5", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json index a95641e89f2cd..a33c84bd19795 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/25c5221d-dce2-4163-ade9-739ef790f503.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503", "name": "Postgres", "dockerRepository": "airbyte/destination-postgres", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-postgres-destination" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json index 458a41af2ea2b..97ef3cdc0dcc1 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/424892c4-daac-4491-b35d-c6688ba547ba.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba", "name": "Snowflake", "dockerRepository": "airbyte/destination-snowflake", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.5", "documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json index ebdfd91a03141..418add5bc9b06 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6", "name": "Local CSV", "dockerRepository": "airbyte/destination-csv", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json index eb6df1cf2385d..5be5cd5412784 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/2470e835-feaf-4db6-96f3-70fd645acc77.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "2470e835-feaf-4db6-96f3-70fd645acc77", "name": "Salesforce", "dockerRepository": "airbyte/source-salesforce-singer", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/r/airbyte/source-salesforce-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json index 762db975e4fae..4023f9960b108 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "39f092a6-8c87-4f6f-a8d9-5cef45b7dbe1", "name": "Google Analytics", "dockerRepository": "airbyte/source-googleanalytics-singer", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/r/airbyte/source-googleanalytics-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 71ebab3da5665..445a817d6b462 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.3", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json index be5a10121e213..39a8dad064149 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/57eb1576-8f52-463d-beb6-2e107cdf571d.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "57eb1576-8f52-463d-beb6-2e107cdf571d", "name": "Hubspot", "dockerRepository": "airbyte/source-hubspot-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.3", "documentationUrl": "https://https://docs.airbyte.io/integrations/sources/hubspot" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json index acc84cb257fcb..4046fe1526d7a 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/71607ba1-c0ac-4799-8049-7f4b90dd50f7.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "71607ba1-c0ac-4799-8049-7f4b90dd50f7", "name": "Google Sheets", "dockerRepository": "airbyte/source-google-sheets", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/repository/docker/airbyte/source-google-sheets" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json index ca8e108c8eeee..c001b38b97dfc 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/74d47f79-8d01-44ac-9755-f5eb0d7caacb.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "74d47f79-8d01-44ac-9755-f5eb0d7caacb", "name": "Facebook Marketing APIs", "dockerRepository": "airbyte/source-facebook-marketing-api-singer", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/r/airbyte/source-facebook-marketing-api-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json index 4b7531b8501eb..4cf9cd9c895c8 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/778daa7c-feaf-4db6-96f3-70fd645acc77.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "778daa7c-feaf-4db6-96f3-70fd645acc77", "name": "File", "dockerRepository": "airbyte/source-file", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.5", "documentationUrl": "https://hub.docker.com/r/airbyte/source-file" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json index 01c71569cb716..464dc2b499059 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9e0556f4-69df-4522-a3fb-03264d36b348.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "9e0556f4-69df-4522-a3fb-03264d36b348", "name": "Marketo", "dockerRepository": "airbyte/source-marketo-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/source-marketo-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json index 5241b818958d7..3c06346690921 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/9fed261d-d107-47fd-8c8b-323023db6e20.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "9fed261d-d107-47fd-8c8b-323023db6e20", "name": "exchangeratesapi.io", "dockerRepository": "airbyte/source-exchangeratesapi-singer", - "dockerImageTag": "0.1.6", + "dockerImageTag": "0.1.7", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-exchangeratesapi_io-source" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b03a9f3e-22a5-11eb-adc1-0242ac120002.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b03a9f3e-22a5-11eb-adc1-0242ac120002.json index 31abd22ba15ba..d90a739d1ccfc 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b03a9f3e-22a5-11eb-adc1-0242ac120002.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b03a9f3e-22a5-11eb-adc1-0242ac120002.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "b03a9f3e-22a5-11eb-adc1-0242ac120002", "name": "Mailchimp", "dockerRepository": "airbyte/source-mailchimp", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/source-mailchimp" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json index 9f794f5f1f890..230e30460eb33 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b1892b11-788d-44bd-b9ec-3a436f7b54ce.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "b1892b11-788d-44bd-b9ec-3a436f7b54ce", "name": "Shopify", "dockerRepository": "airbyte/source-shopify-singer", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/r/airbyte/source-shopify-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json index a8c16783cf8a7..cd1f9a2864218 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/b5ea17b1-f170-46dc-bc31-cc744ca984c1.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "b5ea17b1-f170-46dc-bc31-cc744ca984c1", "name": "Microsoft SQL Server (MSSQL)", "dockerRepository": "airbyte/source-mssql", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/source-mssql" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json index c55fab6d894b9..b1713fb52a4ed 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750", "name": "Postgres", "dockerRepository": "airbyte/source-postgres", - "dockerImageTag": "0.1.2", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json index 2edef9d90c800..8263d43c51497 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/e094cb9a-26de-4645-8761-65c0c425d1de.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "e094cb9a-26de-4645-8761-65c0c425d1de", "name": "Stripe", "dockerRepository": "airbyte/source-stripe-singer", - "dockerImageTag": "0.1.5", + "dockerImageTag": "0.1.7", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-stripe-source" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json index 7948d33579311..50fdf9a95b961 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/ef69ef6e-aa7f-4af1-a01d-ef775033524e.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "ef69ef6e-aa7f-4af1-a01d-ef775033524e", "name": "Github", "dockerRepository": "airbyte/source-github-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/source-github-singer" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json index 76f9c7dc9a593..a8e281c3c2ffe 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/fdc8b827-3257-4b33-83cc-106d234c34d4.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "fdc8b827-3257-4b33-83cc-106d234c34d4", "name": "Google Adwords", "dockerRepository": "airbyte/source-google-adwords-singer", - "dockerImageTag": "0.1.1", + "dockerImageTag": "0.1.3", "documentationUrl": "https://hub.docker.com/r/airbyte/source-google-adwords" } diff --git a/airbyte-integrations/connectors/destination-bigquery/Dockerfile b/airbyte-integrations/connectors/destination-bigquery/Dockerfile index b8b3a41d7364a..0c899aec238a8 100644 --- a/airbyte-integrations/connectors/destination-bigquery/Dockerfile +++ b/airbyte-integrations/connectors/destination-bigquery/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/destination-bigquery diff --git a/airbyte-integrations/connectors/destination-csv/Dockerfile b/airbyte-integrations/connectors/destination-csv/Dockerfile index d2e6b7174455d..862c55737a26b 100644 --- a/airbyte-integrations/connectors/destination-csv/Dockerfile +++ b/airbyte-integrations/connectors/destination-csv/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/destination-csv diff --git a/airbyte-integrations/connectors/destination-postgres/Dockerfile b/airbyte-integrations/connectors/destination-postgres/Dockerfile index f729a4c734e6f..8ffd315466b4e 100644 --- a/airbyte-integrations/connectors/destination-postgres/Dockerfile +++ b/airbyte-integrations/connectors/destination-postgres/Dockerfile @@ -9,5 +9,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/destination-postgres diff --git a/airbyte-integrations/connectors/destination-snowflake/Dockerfile b/airbyte-integrations/connectors/destination-snowflake/Dockerfile index 37902a291e2e2..581e1eb7b602f 100644 --- a/airbyte-integrations/connectors/destination-snowflake/Dockerfile +++ b/airbyte-integrations/connectors/destination-snowflake/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/destination-snowflake diff --git a/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile b/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile index 9c08671ec9d38..26a61ef5e7ef4 100644 --- a/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-exchangeratesapi-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_exchangeratesapi_singer" ENV AIRBYTE_IMPL_MODULE="source_exchangeratesapi_singer" ENV AIRBYTE_IMPL_PATH="SourceExchangeRatesApiSinger" -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-exchangeratesapi-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerExchangeRatesApiSourceTest.java b/airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerExchangeRatesApiSourceTest.java index 8d527aa296a27..31d67611cd3ed 100644 --- a/airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerExchangeRatesApiSourceTest.java +++ b/airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerExchangeRatesApiSourceTest.java @@ -29,7 +29,10 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; -import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; import io.airbyte.workers.WorkerException; import io.airbyte.workers.process.AirbyteIntegrationLauncher; import io.airbyte.workers.process.DockerProcessBuilderFactory; @@ -117,8 +120,13 @@ public void testSync() throws IOException, InterruptedException, WorkerException final Date date = Date.from(Instant.now().minus(3, ChronoUnit.DAYS)); final SimpleDateFormat fmt = new SimpleDateFormat("yyyy-MM-dd"); + final ConfiguredAirbyteCatalog catalog = CatalogHelpers.createConfiguredAirbyteCatalog( + "exchange_rates", + Field.of("date", JsonSchemaPrimitive.STRING), + Field.of("CAD", JsonSchemaPrimitive.NUMBER)); + IOs.writeFile(jobRoot, CONFIG, String.format("{\"start_date\":\"%s\"}", fmt.format(date))); - IOs.writeFile(jobRoot, CATALOG, MoreResources.readResource("catalog.json")); + IOs.writeFile(jobRoot, CATALOG, Jsons.serialize(catalog)); final Path syncOutputPath = jobRoot.resolve("sync_output.txt"); final Process process = createSyncProcess(syncOutputPath); diff --git a/airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/resources/catalog.json b/airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/resources/catalog.json deleted file mode 100644 index 36f42a5a028b4..0000000000000 --- a/airbyte-integrations/connectors/source-exchangeratesapi-singer/src/test-integration/resources/catalog.json +++ /dev/null @@ -1,46 +0,0 @@ -{ - "streams": [ - { - "stream": "exchange_rate", - "schema": { - "type": "object", - "properties": { - "date": { "type": "string", "format": "date-time" }, - "CAD": { "type": ["null", "number"] }, - "HKD": { "type": ["null", "number"] }, - "ISK": { "type": ["null", "number"] }, - "PHP": { "type": ["null", "number"] }, - "DKK": { "type": ["null", "number"] }, - "HUF": { "type": ["null", "number"] }, - "CZK": { "type": ["null", "number"] }, - "GBP": { "type": ["null", "number"] }, - "RON": { "type": ["null", "number"] }, - "SEK": { "type": ["null", "number"] }, - "IDR": { "type": ["null", "number"] }, - "INR": { "type": ["null", "number"] }, - "BRL": { "type": ["null", "number"] }, - "RUB": { "type": ["null", "number"] }, - "HRK": { "type": ["null", "number"] }, - "JPY": { "type": ["null", "number"] }, - "THB": { "type": ["null", "number"] }, - "CHF": { "type": ["null", "number"] }, - "EUR": { "type": ["null", "number"] }, - "MYR": { "type": ["null", "number"] }, - "BGN": { "type": ["null", "number"] }, - "TRY": { "type": ["null", "number"] }, - "CNY": { "type": ["null", "number"] }, - "NOK": { "type": ["null", "number"] }, - "NZD": { "type": ["null", "number"] }, - "ZAR": { "type": ["null", "number"] }, - "USD": { "type": ["null", "number"] }, - "MXN": { "type": ["null", "number"] }, - "SGD": { "type": ["null", "number"] }, - "AUD": { "type": ["null", "number"] }, - "ILS": { "type": ["null", "number"] }, - "KRW": { "type": ["null", "number"] }, - "PLN": { "type": ["null", "number"] } - } - } - } - ] -} diff --git a/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile b/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile index fb6d6db98e784..4f93987af9994 100644 --- a/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-facebook-marketing-api-singer/Dockerfile @@ -13,5 +13,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-facebook-marketing-api-singer diff --git a/airbyte-integrations/connectors/source-file/Dockerfile b/airbyte-integrations/connectors/source-file/Dockerfile index 91d33b5e8c749..70acae427e440 100644 --- a/airbyte-integrations/connectors/source-file/Dockerfile +++ b/airbyte-integrations/connectors/source-file/Dockerfile @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-file diff --git a/airbyte-integrations/connectors/source-file/source_file/source.py b/airbyte-integrations/connectors/source-file/source_file/source.py index 96932d3d5682b..6caf500200b67 100644 --- a/airbyte-integrations/connectors/source-file/source_file/source.py +++ b/airbyte-integrations/connectors/source-file/source_file/source.py @@ -32,7 +32,16 @@ import gcsfs import pandas as pd -from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteRecordMessage, AirbyteStream, Status, Type +from airbyte_protocol import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + AirbyteStream, + ConfiguredAirbyteCatalog, + Status, + Type, +) from base_python import Source from botocore import UNSIGNED from botocore.config import Config @@ -162,7 +171,7 @@ def read(self, logger, config_container, catalog_path, state_path=None) -> Gener url = SourceFile.get_simple_url(config["url"]) name = SourceFile.get_stream_name(config) logger.info(f"Reading {name} ({storage}{url}, {catalog_path}, {state_path})...") - catalog = AirbyteCatalog.parse_obj(self.read_config(catalog_path)) + catalog = ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path)) selection = SourceFile.parse_catalog(catalog) try: if "format" in config and config["format"] == "json": @@ -452,10 +461,10 @@ def convert_dtype(dtype) -> str: return "string" @staticmethod - def parse_catalog(catalog: AirbyteCatalog) -> set: + def parse_catalog(catalog: ConfiguredAirbyteCatalog) -> set: columns = set() for configured_stream in catalog.streams: - stream = configured_stream["stream"] + stream = configured_stream.stream for key in stream.json_schema["properties"].keys(): columns.add(key) return columns diff --git a/airbyte-integrations/connectors/source-github-singer/Dockerfile b/airbyte-integrations/connectors/source-github-singer/Dockerfile index 476a285d8ebd5..991bd44125dcb 100644 --- a/airbyte-integrations/connectors/source-github-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-github-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_github_singer" ENV AIRBYTE_IMPL_MODULE="source_github_singer" ENV AIRBYTE_IMPL_PATH="SourceGithubSinger" -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-github-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile b/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile index 25b895d3fb8c6..3e27ea5f0a13e 100644 --- a/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-google-adwords-singer/Dockerfile @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-google-adwords-singer diff --git a/airbyte-integrations/connectors/source-google-adwords-singer/source_google_adwords_singer/singer_check_catalog.json b/airbyte-integrations/connectors/source-google-adwords-singer/source_google_adwords_singer/singer_check_catalog.json index af3d07f977973..e8ed252ca15c9 100644 --- a/airbyte-integrations/connectors/source-google-adwords-singer/source_google_adwords_singer/singer_check_catalog.json +++ b/airbyte-integrations/connectors/source-google-adwords-singer/source_google_adwords_singer/singer_check_catalog.json @@ -1,30 +1,16 @@ { "streams": [ { - "stream": "accounts", - "tap_stream_id": "accounts", - "schema": { - "properties": { - "name": { - "type": ["null", "string"] + "stream": { + "name": "accounts", + "json_schema": { + "properties": { + "name": { + "type": "string" + } } } - }, - "metadata": [ - { - "metadata": { - "inclusion": "available", - "selected": true - }, - "breadcrumb": [] - }, - { - "metadata": { - "inclusion": "available" - }, - "breadcrumb": ["properties", "name"] - } - ] + } } ] } diff --git a/airbyte-integrations/connectors/source-google-sheets/Dockerfile b/airbyte-integrations/connectors/source-google-sheets/Dockerfile index 51cee5c123d45..1e884a37dd42e 100644 --- a/airbyte-integrations/connectors/source-google-sheets/Dockerfile +++ b/airbyte-integrations/connectors/source-google-sheets/Dockerfile @@ -11,5 +11,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install . -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-google-sheets diff --git a/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile b/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile index cc874a06277b1..932ed21596dee 100644 --- a/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-googleanalytics-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_googleanalytics_singer" ENV AIRBYTE_IMPL_MODULE="source_googleanalytics_singer" ENV AIRBYTE_IMPL_PATH="GoogleAnalyticsSingerSource" -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-googleanalytics-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile b/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile index b7c1fb045a72f..561585b597b48 100644 --- a/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-hubspot-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_hubspot_singer" ENV AIRBYTE_IMPL_MODULE="source_hubspot_singer" ENV AIRBYTE_IMPL_PATH="SourceHubspotSinger" -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-hubspot-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-mailchimp/Dockerfile b/airbyte-integrations/connectors/source-mailchimp/Dockerfile index 31c5c2ad6afc6..ac0d9a81e91f3 100644 --- a/airbyte-integrations/connectors/source-mailchimp/Dockerfile +++ b/airbyte-integrations/connectors/source-mailchimp/Dockerfile @@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-mailchimp diff --git a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py index 770cfa797d879..77ff9e425ad3d 100644 --- a/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py +++ b/airbyte-integrations/connectors/source-mailchimp/source_mailchimp/source.py @@ -25,7 +25,15 @@ from datetime import datetime from typing import Generator -from airbyte_protocol import AirbyteCatalog, AirbyteConnectionStatus, AirbyteMessage, AirbyteRecordMessage, Status, Type +from airbyte_protocol import ( + AirbyteCatalog, + AirbyteConnectionStatus, + AirbyteMessage, + AirbyteRecordMessage, + ConfiguredAirbyteCatalog, + Status, + Type, +) from base_python import AirbyteLogger, ConfigContainer, Source from .client import Client @@ -57,11 +65,11 @@ def read( ) -> Generator[AirbyteMessage, None, None]: client = self._client(config_container) - catalog = AirbyteCatalog.parse_obj(self.read_config(catalog_path)) + catalog = ConfiguredAirbyteCatalog.parse_obj(self.read_config(catalog_path)) logger.info("Starting syncing mailchimp") for configured_stream in catalog.streams: - stream = configured_stream["stream"] + stream = configured_stream.stream for record in self._read_record(client=client, stream=stream.name): yield AirbyteMessage(type=Type.RECORD, record=record) diff --git a/airbyte-integrations/connectors/source-marketo-singer/Dockerfile b/airbyte-integrations/connectors/source-marketo-singer/Dockerfile index d0022df4963c4..116ea3ddd9d75 100644 --- a/airbyte-integrations/connectors/source-marketo-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-marketo-singer/Dockerfile @@ -14,5 +14,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-marketo-singer diff --git a/airbyte-integrations/connectors/source-mssql/Dockerfile b/airbyte-integrations/connectors/source-mssql/Dockerfile index 563ce46866ef2..3ad416d0b6dee 100644 --- a/airbyte-integrations/connectors/source-mssql/Dockerfile +++ b/airbyte-integrations/connectors/source-mssql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-mssql diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index 7bcdc9d5eb3fd..0d35f62af22aa 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -1,7 +1,8 @@ plugins { id 'application' id 'airbyte-docker' - id 'airbyte-integration-test-java' + // todo (cgardens) - tmp to get build passing. failing due to issue in ci. + // id 'airbyte-integration-test-java' } application { diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 1a37b3c8d5d2d..60db88ee169a9 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.2 +LABEL io.airbyte.version=0.1.3 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index 9ac03867e71c3..ef62926b67827 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile b/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile index 54822aeb26054..2bab56322ed4d 100644 --- a/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-salesforce-singer/Dockerfile @@ -8,7 +8,7 @@ ENV CODE_PATH="source_salesforce_singer" ENV AIRBYTE_IMPL_MODULE="source_salesforce_singer" ENV AIRBYTE_IMPL_PATH="SourceSalesforceSinger" -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-salesforce-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-shopify-singer/Dockerfile b/airbyte-integrations/connectors/source-shopify-singer/Dockerfile index 16f32980dd269..07430e0ef774f 100644 --- a/airbyte-integrations/connectors/source-shopify-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-shopify-singer/Dockerfile @@ -12,5 +12,5 @@ COPY $CODE_PATH ./$CODE_PATH COPY setup.py ./ RUN pip install ".[main]" -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-shopify-singer diff --git a/airbyte-integrations/connectors/source-stripe-singer/Dockerfile b/airbyte-integrations/connectors/source-stripe-singer/Dockerfile index 0d470716b7638..361e5ed89bb3b 100644 --- a/airbyte-integrations/connectors/source-stripe-singer/Dockerfile +++ b/airbyte-integrations/connectors/source-stripe-singer/Dockerfile @@ -10,7 +10,7 @@ ENV CODE_PATH="source_stripe_singer" ENV AIRBYTE_IMPL_MODULE="source_stripe_singer" ENV AIRBYTE_IMPL_PATH="SourceStripeSinger" -LABEL io.airbyte.version=0.1.6 +LABEL io.airbyte.version=0.1.7 LABEL io.airbyte.name=airbyte/source-stripe-singer WORKDIR /airbyte/integration_code diff --git a/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java b/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java index f534448598817..cc26aa250bfb3 100644 --- a/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java +++ b/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/java/io/airbyte/integration_tests/sources/SingerStripeSourceTest.java @@ -37,10 +37,13 @@ import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; +import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; +import io.airbyte.protocol.models.CatalogHelpers; +import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.workers.WorkerException; import io.airbyte.workers.process.AirbyteIntegrationLauncher; @@ -192,8 +195,9 @@ public void testSuccessfulDiscover() throws IOException, InterruptedException, W @Test public void testSync() throws IOException, InterruptedException, WorkerException { - String catalog = MoreResources.readResource(CATALOG); - IOs.writeFile(catalogPath.getParent(), catalogPath.getFileName().toString(), catalog); + final ConfiguredAirbyteCatalog catalog = CatalogHelpers + .toDefaultConfiguredCatalog(Jsons.deserialize(MoreResources.readResource(CATALOG), AirbyteCatalog.class)); + IOs.writeFile(catalogPath.getParent(), catalogPath.getFileName().toString(), Jsons.serialize(catalog)); // run syn process Path syncOutputPath = jobRoot.resolve("sync_output.txt"); @@ -203,6 +207,8 @@ public void testSync() throws IOException, InterruptedException, WorkerException assertEquals(0, process.exitValue()); final Set actualSyncOutput = IOs.readFile(jobRoot, syncOutputPath.toString()).lines() + // the runner in this test doesn't gracefully handle non message items in stdout. + .filter(s -> Jsons.tryDeserialize(s).isPresent()) .map(Jsons::deserialize) .map(SingerStripeSourceTest::normalize) .collect(Collectors.toSet()); diff --git a/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/resources/catalog.json b/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/resources/catalog.json index de802f60139cd..e956b29f9cbd1 100644 --- a/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/resources/catalog.json +++ b/airbyte-integrations/connectors/source-stripe-singer/src/test-integration/resources/catalog.json @@ -2,7 +2,7 @@ "streams": [ { "name": "customers", - "schema": { + "json_schema": { "type": ["null", "object"], "properties": { "metadata": { diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 92ccbddbc5c1a..45dc2c205766f 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -146,7 +146,9 @@ protected static Set getAllFieldNames(JsonNode node) { * @return if the identifier matches the alphanumeric+underscore requirement for identifiers */ public static boolean isValidIdentifier(String identifier) { - return StringUtils.isAlphanumeric(identifier.replace("_", "")); + // todo (cgardens) - remove $ once mailchimp is fixed. + final String s = identifier.replaceAll("[-_.$]", ""); + return StringUtils.isAlphanumeric(s); } /** diff --git a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index 8a8c9739f381d..c6e54539c7252 100644 --- a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -73,11 +73,11 @@ void testValidIdentifiers() { assertTrue(CatalogHelpers.isValidIdentifier("identifiêr")); assertTrue(CatalogHelpers.isValidIdentifier("a_unicode_name_文")); assertTrue(CatalogHelpers.isValidIdentifier("identifier__name__")); + assertTrue(CatalogHelpers.isValidIdentifier("identifier-name.weee")); } @Test void testInvalidIdentifiers() { - assertFalse(CatalogHelpers.isValidIdentifier("invalid-identifier")); assertFalse(CatalogHelpers.isValidIdentifier("\"identifier name")); assertFalse(CatalogHelpers.isValidIdentifier("$identifier")); assertFalse(CatalogHelpers.isValidIdentifier("identifier name")); From 3ec52534720b2c4288045357487a179b72e6814a Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 19 Nov 2020 18:37:59 -0800 Subject: [PATCH 3/4] maybe --- airbyte-integrations/connectors/source-mssql/build.gradle | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/build.gradle b/airbyte-integrations/connectors/source-mssql/build.gradle index 0d35f62af22aa..7bcdc9d5eb3fd 100644 --- a/airbyte-integrations/connectors/source-mssql/build.gradle +++ b/airbyte-integrations/connectors/source-mssql/build.gradle @@ -1,8 +1,7 @@ plugins { id 'application' id 'airbyte-docker' - // todo (cgardens) - tmp to get build passing. failing due to issue in ci. - // id 'airbyte-integration-test-java' + id 'airbyte-integration-test-java' } application { From 30f124f858ca130031b54bb0d123b0487a616496 Mon Sep 17 00:00:00 2001 From: cgardens Date: Thu, 19 Nov 2020 21:24:47 -0800 Subject: [PATCH 4/4] more --- airbyte-integrations/connectors/source-rest-api/build.gradle | 3 ++- .../java/io/airbyte/protocol/models/CatalogHelpersTest.java | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-rest-api/build.gradle b/airbyte-integrations/connectors/source-rest-api/build.gradle index 90f1132a1c71e..b29f61b27d76a 100644 --- a/airbyte-integrations/connectors/source-rest-api/build.gradle +++ b/airbyte-integrations/connectors/source-rest-api/build.gradle @@ -2,7 +2,8 @@ plugins { id 'java' id 'airbyte-python' id 'airbyte-docker' - id 'airbyte-source-test' + // todo (cgardens) - bring back when we re-add this to the product. + // id 'airbyte-source-test' } airbytePython { diff --git a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java index c6e54539c7252..0fc2736dce984 100644 --- a/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java +++ b/airbyte-protocol/models/src/test/java/io/airbyte/protocol/models/CatalogHelpersTest.java @@ -79,7 +79,6 @@ void testValidIdentifiers() { @Test void testInvalidIdentifiers() { assertFalse(CatalogHelpers.isValidIdentifier("\"identifier name")); - assertFalse(CatalogHelpers.isValidIdentifier("$identifier")); assertFalse(CatalogHelpers.isValidIdentifier("identifier name")); assertFalse(CatalogHelpers.isValidIdentifier("identifier%")); assertFalse(CatalogHelpers.isValidIdentifier("`identifier`"));