diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json index 418add5bc9b06..cf88f3bd8a3d3 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_DESTINATION_DEFINITION/8be1cf83-fde1-477f-a4ad-318d23c9f3c6.json @@ -2,6 +2,6 @@ "destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6", "name": "Local CSV", "dockerRepository": "airbyte/destination-csv", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.4", "documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-csv-destination" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json index 445a817d6b462..84a3b9b367c31 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/435bb9a5-7887-4809-aa58-28c27df0d7ad.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "435bb9a5-7887-4809-aa58-28c27df0d7ad", "name": "MySQL", "dockerRepository": "airbyte/source-mysql", - "dockerImageTag": "0.1.3", + "dockerImageTag": "0.1.4", "documentationUrl": "https://docs.airbyte.io/integrations/sources/mysql" } diff --git a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json index b1713fb52a4ed..3919d710c9a93 100644 --- a/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json +++ b/airbyte-config/init/src/main/resources/config/STANDARD_SOURCE_DEFINITION/decd338e-5647-4c0b-adf4-da0e75f5a750.json @@ -2,6 +2,6 @@ "sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750", "name": "Postgres", "dockerRepository": "airbyte/source-postgres", - "dockerImageTag": "0.1.4", + "dockerImageTag": "0.1.5", "documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres" } diff --git a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml index 330270e07fb07..5a1f9676721a7 100644 --- a/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/destination_definitions.yaml @@ -1,7 +1,7 @@ - destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6 name: Local CSV dockerRepository: airbyte/destination-csv - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://hub.docker.com/r/airbyte/integration-singer-csv-destination - destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503 name: Postgres diff --git a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml index 92ae0f9e566fa..88f74afc0afb2 100644 --- a/airbyte-config/init/src/main/resources/seed/source_definitions.yaml +++ b/airbyte-config/init/src/main/resources/seed/source_definitions.yaml @@ -26,7 +26,7 @@ - sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750 name: Postgres dockerRepository: airbyte/source-postgres - dockerImageTag: 0.1.4 + dockerImageTag: 0.1.5 documentationUrl: https://hub.docker.com/r/airbyte/source-postgres - sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01 name: Recurly @@ -51,7 +51,7 @@ - sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad name: MySQL dockerRepository: airbyte/source-mysql - dockerImageTag: 0.1.3 + dockerImageTag: 0.1.4 documentationUrl: https://docs.airbyte.io/integrations/sources/mysql - sourceDefinitionId: 2470e835-feaf-4db6-96f3-70fd645acc77 name: Salesforce diff --git a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java index 3602d2acbb9c0..7598647e95560 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/main/java/io/airbyte/integrations/destination/bigquery/BigQueryDestination.java @@ -112,7 +112,7 @@ public AirbyteConnectionStatus check(JsonNode config) { final String datasetId = config.get(CONFIG_DATASET_ID).asText(); final BigQuery bigquery = getBigQuery(config); - if (!bigquery.getDataset(datasetId).exists()) { + if (bigquery.getDataset(datasetId) == null || !bigquery.getDataset(datasetId).exists()) { final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build(); bigquery.create(datasetInfo); } diff --git a/airbyte-integrations/connectors/destination-csv/Dockerfile b/airbyte-integrations/connectors/destination-csv/Dockerfile index 862c55737a26b..10d1507a33e1e 100644 --- a/airbyte-integrations/connectors/destination-csv/Dockerfile +++ b/airbyte-integrations/connectors/destination-csv/Dockerfile @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/destination-csv diff --git a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java index 6791c5dd80607..ddb8a6b35685a 100644 --- a/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java +++ b/airbyte-integrations/connectors/destination-csv/src/main/java/io/airbyte/integrations/destination/csv/CsvDestination.java @@ -30,10 +30,10 @@ import io.airbyte.commons.resources.MoreResources; import io.airbyte.integrations.base.Destination; import io.airbyte.integrations.base.DestinationConsumer; -import io.airbyte.integrations.base.ExtendedSQLNaming; import io.airbyte.integrations.base.FailureTrackingConsumer; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.SQLNamingResolvable; +import io.airbyte.integrations.base.StandardSQLNaming; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; import io.airbyte.protocol.models.AirbyteMessage; @@ -67,7 +67,7 @@ public class CsvDestination implements Destination { private final SQLNamingResolvable namingResolver; public CsvDestination() { - namingResolver = new ExtendedSQLNaming(); + namingResolver = new StandardSQLNaming(); } @Override diff --git a/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java b/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java index acd2f06b7bb0e..9298818b425a1 100644 --- a/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java +++ b/airbyte-integrations/connectors/destination-csv/src/test-integration/java/io/airbyte/integrations/destination/csv/CsvDestinationIntegrationTest.java @@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.json.Jsons; +import io.airbyte.integrations.base.StandardSQLNaming; import io.airbyte.integrations.standardtest.destination.TestDestination; import java.io.FileReader; import java.nio.file.Files; @@ -75,7 +76,8 @@ public void testCheckConnectionInvalidCredentials() {} @Override protected List retrieveRecords(TestDestinationEnv testEnv, String streamName) throws Exception { final List allOutputs = Files.list(testEnv.getLocalRoot().resolve(RELATIVE_PATH)).collect(Collectors.toList()); - final Optional streamOutput = allOutputs.stream().filter(path -> path.getFileName().toString().contains(streamName)).findFirst(); + final Optional streamOutput = + allOutputs.stream().filter(path -> path.getFileName().toString().contains(new StandardSQLNaming().getRawTableName(streamName))).findFirst(); assertTrue(streamOutput.isPresent(), "could not find output file for stream: " + streamName); diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 3fb7f38609d75..07fa2715e3b02 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -119,6 +119,10 @@ public AirbyteCatalog discover(JsonNode config) throws Exception { } } + protected List getExcludedInternalSchemas() { + return List.of(); + } + protected List getTables(final Database database) throws Exception { return discoverInternal(database).stream() .map(t -> { @@ -126,20 +130,19 @@ protected List getTables(final Database database) throws Exception { .map(f -> Field.of(f.getName(), jooqDataTypeToJsonSchemaType(f.getDataType()))) .collect(Collectors.toList()); - return new TableInfo(t.getName(), fields); + return new TableInfo(String.format("%s.%s", t.getSchema().getName(), t.getName()), fields); }) .collect(Collectors.toList()); } private List> discoverInternal(final Database database) throws Exception { return database.query(context -> { - final String databaseName = getCurrentDatabaseName(context); - final List schemas = context.meta().getSchemas(databaseName); - if (schemas.size() > 1) { - throw new IllegalStateException("found multiple databases with the same name."); - } - final Schema schema = schemas.get(0); - return context.meta(schema).getTables(); + final List schemas = context.meta().getSchemas(); + final List> tables = schemas.stream() + .filter(schema -> !getExcludedInternalSchemas().contains(schema.getName())) + .flatMap(schema -> context.meta(schema).getTables().stream()) + .collect(Collectors.toList()); + return tables; }); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/test-integration/java/io/airbyte/integrations/source/jdbc/JdbcIntegrationTest.java b/airbyte-integrations/connectors/source-jdbc/src/test-integration/java/io/airbyte/integrations/source/jdbc/JdbcIntegrationTest.java index 30a074f75b0d6..fe59c4f2dab15 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test-integration/java/io/airbyte/integrations/source/jdbc/JdbcIntegrationTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test-integration/java/io/airbyte/integrations/source/jdbc/JdbcIntegrationTest.java @@ -43,7 +43,7 @@ public class JdbcIntegrationTest extends TestSource { - private static final String STREAM_NAME = "id_and_name"; + private static final String STREAM_NAME = "public.id_and_name"; private PostgreSQLContainer container; private Database database; private JsonNode config; diff --git a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java index 83c99c55e22ab..d8c44b9d24155 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/test/java/io/airbyte/integrations/source/jdbc/JdbcSourceTest.java @@ -60,7 +60,7 @@ class JdbcSourceTest { - private static final String STREAM_NAME = "id_and_name"; + private static final String STREAM_NAME = "public.id_and_name"; private static final AirbyteCatalog CATALOG = CatalogHelpers.createAirbyteCatalog( STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), diff --git a/airbyte-integrations/connectors/source-mysql/Dockerfile b/airbyte-integrations/connectors/source-mysql/Dockerfile index 60db88ee169a9..9f79af8c7d6af 100644 --- a/airbyte-integrations/connectors/source-mysql/Dockerfile +++ b/airbyte-integrations/connectors/source-mysql/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.3 +LABEL io.airbyte.version=0.1.4 LABEL io.airbyte.name=airbyte/source-mysql diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java index bf9b31ba17665..3c00708261ab7 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSource.java @@ -30,6 +30,7 @@ import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import java.util.List; import org.jooq.SQLDialect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,15 @@ public JsonNode toJdbcConfig(JsonNode config) { return Jsons.jsonNode(configBuilder.build()); } + @Override + protected List getExcludedInternalSchemas() { + return List.of( + "information_schema", + "mysql", + "performance_schema", + "sys"); + } + public static void main(String[] args) throws Exception { final Source source = new MySqlSource(); LOGGER.info("starting source: {}", MySqlSource.class); diff --git a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlIntegrationTest.java b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlIntegrationTest.java index 1c18a36e7606b..7d37fab16ec22 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlIntegrationTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test-integration/java/io/airbyte/integrations/source/mysql/MySqlIntegrationTest.java @@ -103,7 +103,7 @@ protected JsonNode getConfig() { @Override protected AirbyteCatalog getCatalog() { return CatalogHelpers.createAirbyteCatalog( - STREAM_NAME, + String.format("%s.%s", config.get("database").asText(), STREAM_NAME), Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)); } diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java index 39f82966d8afd..6ef877cfd4521 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceTest.java @@ -65,19 +65,6 @@ class MySqlSourceTest { private static final String TEST_USER = "test"; private static final String TEST_PASSWORD = "test"; private static final String STREAM_NAME = "id_and_name"; - private static final AirbyteCatalog CATALOG = CatalogHelpers.createAirbyteCatalog( - STREAM_NAME, - Field.of("id", JsonSchemaPrimitive.NUMBER), - Field.of("name", JsonSchemaPrimitive.STRING)); - private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); - private static final Set MESSAGES = Sets.newHashSet( - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")))), - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 2, "name", "crusher")))), - new AirbyteMessage().withType(Type.RECORD) - .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 3, "name", "vash"))))); - private static MySQLContainer container; private JsonNode config; @@ -154,13 +141,19 @@ void testCheckFailure() { @Test void testDiscover() throws Exception { - final AirbyteCatalog actual = new MySqlSource().discover(config); - assertEquals(CATALOG, actual); + final AirbyteCatalog allStreams = new MySqlSource().discover(config); + // Filter out streams not related to this test case (from other tests running in parallel) + final AirbyteCatalog actual = new AirbyteCatalog() + .withStreams(allStreams.getStreams() + .stream() + .filter(s -> s.getName().equals(getStreamName())) + .collect(Collectors.toList())); + assertEquals(generateExpectedCatalog(), actual); } @Test void testReadSuccess() throws Exception { - final Set actualMessages = new MySqlSource().read(config, CONFIGURED_CATALOG, null).collect(Collectors.toSet()); + final Set actualMessages = new MySqlSource().read(config, generateConfiguredCatalog(), null).collect(Collectors.toSet()); actualMessages.forEach(r -> { if (r.getRecord() != null) { @@ -168,13 +161,13 @@ void testReadSuccess() throws Exception { } }); - assertEquals(MESSAGES, actualMessages); + assertEquals(generateExpectedMessages(), actualMessages); } @SuppressWarnings("ResultOfMethodCallIgnored") @Test void testReadFailure() throws Exception { - final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); + final ConfiguredAirbyteStream spiedAbStream = spy(generateConfiguredCatalog().getStreams().get(0)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream(); @@ -183,4 +176,30 @@ void testReadFailure() throws Exception { assertThrows(RuntimeException.class, () -> source.read(config, catalog, null)); } + private AirbyteCatalog generateExpectedCatalog() { + return CatalogHelpers.createAirbyteCatalog( + getStreamName(), + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)); + } + + private ConfiguredAirbyteCatalog generateConfiguredCatalog() { + return CatalogHelpers.toDefaultConfiguredCatalog(generateExpectedCatalog()); + } + + private java.util.HashSet generateExpectedMessages() { + final String streamName = getStreamName(); + return Sets.newHashSet( + new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")))), + new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withData(Jsons.jsonNode(ImmutableMap.of("id", 2, "name", "crusher")))), + new AirbyteMessage().withType(Type.RECORD) + .withRecord(new AirbyteRecordMessage().withStream(streamName).withData(Jsons.jsonNode(ImmutableMap.of("id", 3, "name", "vash"))))); + } + + private String getStreamName() { + return String.format("%s.%s", config.get("database").asText(), STREAM_NAME); + } + } diff --git a/airbyte-integrations/connectors/source-postgres/Dockerfile b/airbyte-integrations/connectors/source-postgres/Dockerfile index ef62926b67827..fc6224579d684 100644 --- a/airbyte-integrations/connectors/source-postgres/Dockerfile +++ b/airbyte-integrations/connectors/source-postgres/Dockerfile @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar RUN tar xf ${APPLICATION}.tar --strip-components=1 -LABEL io.airbyte.version=0.1.4 +LABEL io.airbyte.version=0.1.5 LABEL io.airbyte.name=airbyte/source-postgres diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java index 390b9200ff69f..522d0e9b29316 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java @@ -42,7 +42,7 @@ public class PostgresIntegrationTests extends TestSource { - private static final String STREAM_NAME = "id_and_name"; + private static final String STREAM_NAME = "public.id_and_name"; private PostgreSQLContainer container; private JsonNode config; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 8a2f3568053a7..e4e11b6d204ed 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -51,6 +51,7 @@ import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; import io.airbyte.test.utils.PostgreSQLContainerHelper; +import java.util.List; import java.util.Set; import java.util.stream.Collectors; import org.apache.commons.lang3.RandomStringUtils; @@ -64,12 +65,20 @@ class PostgresSourceTest { - private static final String STREAM_NAME = "id_and_name"; - private static final AirbyteCatalog CATALOG = CatalogHelpers.createAirbyteCatalog( + private static final String STREAM_NAME = "public.id_and_name"; + private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(List.of( + CatalogHelpers.createAirbyteStream( + STREAM_NAME, + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)), + CatalogHelpers.createAirbyteStream( + "test_another_schema.id_and_name", + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)))); + private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.createConfiguredAirbyteCatalog( STREAM_NAME, Field.of("id", JsonSchemaPrimitive.NUMBER), Field.of("name", JsonSchemaPrimitive.STRING)); - private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); private static final Set ASCII_MESSAGES = Sets.newHashSet( new AirbyteMessage().withType(Type.RECORD) .withRecord(new AirbyteRecordMessage().withStream(STREAM_NAME).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "goku")))), @@ -110,6 +119,9 @@ void setup() throws Exception { database.query(ctx -> { ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));"); ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'goku'), (2, 'vegeta'), (3, 'piccolo');"); + ctx.fetch("CREATE SCHEMA test_another_schema;"); + ctx.fetch("CREATE TABLE test_another_schema.id_and_name(id INTEGER, name VARCHAR(200));"); + ctx.fetch("INSERT INTO test_another_schema.id_and_name (id, name) VALUES (1,'tom'), (2, 'jerry');"); return null; }); database.close(); diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java index ff0edfab35602..40d82c5835844 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/AcceptanceTests.java @@ -98,7 +98,8 @@ @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class AcceptanceTests { - private static final String STREAM_NAME = "public.id_and_name"; + private static final String TABLE_NAME = "id_and_name"; + private static final String STREAM_NAME = "public." + TABLE_NAME; private static final String COLUMN_ID = "id"; private static final String COLUMN_NAME = "name"; @@ -340,7 +341,7 @@ public void testIncrementalSync() throws Exception { final ConnectionSyncRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); assertEquals(ConnectionSyncRead.StatusEnum.SUCCEEDED, connectionSyncRead2.getStatus()); - assertDestinationContains(expectedRecords, STREAM_NAME); + assertDestinationContains(expectedRecords, TABLE_NAME); assertSourceAndTargetDbInSync(sourcePsql); } @@ -402,7 +403,7 @@ private Set listStreams(Database database) throws SQLException { private Set listCsvStreams() throws IOException { return Files.list(outputDir) - .map(file -> file.getFileName().toString().replaceAll(".csv", "")) + .map(file -> adaptCsvName(file.getFileName().toString())) .collect(Collectors.toSet()); } @@ -485,7 +486,7 @@ private List retrievePgRecords(Database database, String table) throws private List retrieveCsvRecords(String streamName) throws Exception { final Optional stream = Files.list(outputDir) - .filter(path -> path.getFileName().toString().toLowerCase().contains(streamName)) + .filter(path -> path.getFileName().toString().toLowerCase().contains(adaptToCsvName(streamName))) .findFirst(); assertTrue(stream.isPresent()); @@ -564,4 +565,12 @@ private void deleteDestination(UUID destinationId) throws ApiException { apiClient.getDestinationApi().deleteDestination(new DestinationIdRequestBody().destinationId(destinationId)); } + private String adaptCsvName(String streamName) { + return streamName.replaceAll("_raw\\.csv", "").replaceAll("public_", "public."); + } + + private String adaptToCsvName(String streamName) { + return streamName.replaceAll("public\\.", "public_"); + } + }