Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

☝🏼Destinations supports destination sync mode #2460

Merged
merged 12 commits into from
Mar 26, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/bigquery"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "25c5221d-dce2-4163-ade9-739ef790f503",
"name": "Postgres",
"dockerRepository": "airbyte/destination-postgres",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "424892c4-daac-4491-b35d-c6688ba547ba",
"name": "Snowflake",
"dockerRepository": "airbyte/destination-snowflake",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/snowflake"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "8be1cf83-fde1-477f-a4ad-318d23c9f3c6",
"name": "Local CSV",
"dockerRepository": "airbyte/destination-csv",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-csv"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "a625d593-bba5-4a1c-a53d-2d246268a816",
"name": "Local JSON",
"dockerRepository": "airbyte/destination-local-json",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/local-json"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "af7c921e-5892-4ff2-b6c1-4a5ab258fb7e",
"name": "MeiliSearch",
"dockerRepository": "airbyte/destination-meilisearch",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/meilisearch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"destinationDefinitionId": "f7a7d195-377f-cf5b-70a5-be6b819019dc",
"name": "Redshift",
"dockerRepository": "airbyte/destination-redshift",
"dockerImageTag": "0.2.0",
"dockerImageTag": "0.2.1",
"documentationUrl": "https://docs.airbyte.io/integrations/destinations/redshift"
}
Original file line number Diff line number Diff line change
@@ -1,35 +1,35 @@
- destinationDefinitionId: a625d593-bba5-4a1c-a53d-2d246268a816
name: Local JSON
dockerRepository: airbyte/destination-local-json
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-json
- destinationDefinitionId: 8be1cf83-fde1-477f-a4ad-318d23c9f3c6
name: Local CSV
dockerRepository: airbyte/destination-csv
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/local-csv
- destinationDefinitionId: 25c5221d-dce2-4163-ade9-739ef790f503
name: Postgres
dockerRepository: airbyte/destination-postgres
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/postgres
- destinationDefinitionId: 22f6c74f-5699-40ff-833c-4a879ea40133
name: BigQuery
dockerRepository: airbyte/destination-bigquery
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/bigquery
- destinationDefinitionId: 424892c4-daac-4491-b35d-c6688ba547ba
name: Snowflake
dockerRepository: airbyte/destination-snowflake
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/snowflake
- destinationDefinitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
name: Redshift
dockerRepository: airbyte/destination-redshift
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/redshift
- destinationDefinitionId: af7c921e-5892-4ff2-b6c1-4a5ab258fb7e
name: MeiliSearch
dockerRepository: airbyte/destination-meilisearch
dockerImageTag: 0.2.0
dockerImageTag: 0.2.1
documentationUrl: https://docs.airbyte.io/integrations/destinations/meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@

package io.airbyte.integrations.destination;

import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;

public class WriteConfig {

private final String streamName;
private final String outputNamespaceName;
private final String tmpTableName;
private final String outputTableName;
private final SyncMode syncMode;
private final DestinationSyncMode syncMode;

public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, SyncMode syncMode) {
public WriteConfig(String streamName, String outputNamespaceName, String tmpTableName, String outputTableName, DestinationSyncMode syncMode) {
this.streamName = streamName;
this.outputNamespaceName = outputNamespaceName;
this.tmpTableName = tmpTableName;
Expand All @@ -58,7 +58,7 @@ public String getOutputTableName() {
return outputTableName;
}

public SyncMode getSyncMode() {
public DestinationSyncMode getSyncMode() {
return syncMode;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.workers.DefaultCheckConnectionWorker;
Expand Down Expand Up @@ -292,7 +293,10 @@ public void testIncrementalSync(String messagesFilename, String catalogFilename)
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
configuredCatalog.getStreams().forEach(s -> s.withSyncMode(SyncMode.INCREMENTAL));
configuredCatalog.getStreams().forEach(s -> {
s.withSyncMode(SyncMode.INCREMENTAL);
s.withDestinationSyncMode(DestinationSyncMode.APPEND);
});
final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfig(), firstSyncMessages, configuredCatalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultDiscoverCatalogWorker;
Expand Down Expand Up @@ -378,6 +379,7 @@ private ConfiguredAirbyteCatalog withFullRefreshSyncModes(ConfiguredAirbyteCatal
for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getStream().getSupportedSyncModes().contains(FULL_REFRESH)) {
configuredStream.setSyncMode(FULL_REFRESH);
configuredStream.setDestinationSyncMode(DestinationSyncMode.OVERWRITE);
}
}
return clone;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-bigquery
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
Expand Down Expand Up @@ -231,7 +231,7 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
.setFormatOptions(FormatOptions.json()).build(); // new-line delimited json.

final TableDataWriteChannel writer = bigquery.writer(JobId.of(UUID.randomUUID().toString()), writeChannelConfiguration);
final WriteDisposition syncMode = getWriteDisposition(stream.getSyncMode());
final WriteDisposition syncMode = getWriteDisposition(stream.getDestinationSyncMode());

writeConfigs.put(stream.getStream().getName(),
new WriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), writer, syncMode));
Expand All @@ -242,13 +242,15 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
return new RecordConsumer(bigquery, writeConfigs, catalog);
}

private static WriteDisposition getWriteDisposition(SyncMode syncMode) {
if (syncMode == null || syncMode == SyncMode.FULL_REFRESH) {
return WriteDisposition.WRITE_TRUNCATE;
} else if (syncMode == SyncMode.INCREMENTAL) {
return WriteDisposition.WRITE_APPEND;
} else {
throw new IllegalStateException("Unrecognized sync mode: " + syncMode);
private static WriteDisposition getWriteDisposition(DestinationSyncMode syncMode) {
switch (syncMode) {
case OVERWRITE -> {
return WriteDisposition.WRITE_TRUNCATE;
}
case APPEND, APPEND_DEDUP -> {
return WriteDisposition.WRITE_APPEND;
}
default -> throw new IllegalStateException("Unrecognized sync mode: " + syncMode);
}
}

Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/destination-csv/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-csv
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.FileWriter;
import java.io.IOException;
import java.nio.file.Files;
Expand Down Expand Up @@ -105,12 +105,12 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
final Path finalPath = destinationDir.resolve(tableName + ".csv");
CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader(JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_EMITTED_AT,
JavaBaseConstants.COLUMN_NAME_DATA);
final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL;
if (isIncremental && finalPath.toFile().exists()) {
final boolean isAppendMode = stream.getDestinationSyncMode() != DestinationSyncMode.OVERWRITE;
if (isAppendMode && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
csvFormat = csvFormat.withSkipHeaderRecord();
}
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), isIncremental);
final FileWriter fileWriter = new FileWriter(tmpPath.toFile(), isAppendMode);
final CSVPrinter printer = new CSVPrinter(fileWriter, csvFormat);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-jdbc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import io.airbyte.integrations.destination.buffered_stream_consumer.BufferedStreamConsumer.RecordWriter;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import java.time.Instant;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -84,7 +84,7 @@ private static List<WriteConfig> createWriteConfigs(NamingConventionTransformer
final String schemaName = namingResolver.getIdentifier(config.get("schema").asText());
final String tableName = Names.concatQuotedNames("_airbyte_raw_", namingResolver.getIdentifier(streamName));
final String tmpTableName = Names.concatQuotedNames("_airbyte_" + now.toEpochMilli() + "_", tableName);
final SyncMode syncMode = stream.getSyncMode() != null ? stream.getSyncMode() : SyncMode.FULL_REFRESH;
final DestinationSyncMode syncMode = stream.getDestinationSyncMode() != null ? stream.getDestinationSyncMode() : DestinationSyncMode.APPEND;
return new WriteConfig(streamName, schemaName, tmpTableName, tableName, syncMode);
}).collect(Collectors.toList());
}
Expand Down Expand Up @@ -138,8 +138,9 @@ private static OnCloseFunction onCloseFunction(JdbcDatabase database, SqlOperati

sqlOperations.createTableIfNotExists(database, schemaName, dstTableName);
switch (writeConfig.getSyncMode()) {
case FULL_REFRESH -> queries.append(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case INCREMENTAL -> {}
case OVERWRITE -> queries.append(sqlOperations.truncateTableQuery(schemaName, dstTableName));
case APPEND -> {}
case APPEND_DEDUP -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
queries.append(sqlOperations.copyTableQuery(schemaName, srcTableName, dstTableName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
Expand Down Expand Up @@ -183,7 +184,10 @@ void testWriteSuccess() throws Exception {
@Test
void testWriteIncremental() throws Exception {
final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG);
catalog.getStreams().forEach(stream -> stream.withSyncMode(SyncMode.INCREMENTAL));
catalog.getStreams().forEach(stream -> {
stream.withSyncMode(SyncMode.INCREMENTAL);
stream.withDestinationSyncMode(DestinationSyncMode.APPEND);
});

final JdbcDestination destination = new JdbcDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-local-json
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.SyncMode;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Writer;
Expand Down Expand Up @@ -102,12 +102,12 @@ public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirb
final Path finalPath = destinationDir.resolve(namingResolver.getRawTableName(streamName) + ".jsonl");
final Path tmpPath = destinationDir.resolve(namingResolver.getTmpTableName(streamName) + ".jsonl");

final boolean isIncremental = stream.getSyncMode() == SyncMode.INCREMENTAL;
if (isIncremental && finalPath.toFile().exists()) {
final boolean isAppendMode = stream.getDestinationSyncMode() != DestinationSyncMode.OVERWRITE;
if (isAppendMode && finalPath.toFile().exists()) {
Files.copy(finalPath, tmpPath, StandardCopyOption.REPLACE_EXISTING);
}

final Writer writer = new FileWriter(tmpPath.toFile(), isIncremental);
final Writer writer = new FileWriter(tmpPath.toFile(), isAppendMode);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(writer, tmpPath, finalPath));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-meilisearch
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -120,7 +120,7 @@ private static Map<String, Index> createIndices(ConfiguredAirbyteCatalog catalog
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String indexName = getIndexName(stream);

if (stream.getSyncMode() == SyncMode.FULL_REFRESH && indexExists(client, indexName)) {
if (stream.getDestinationSyncMode() == DestinationSyncMode.OVERWRITE && indexExists(client, indexName)) {
client.deleteIndex(indexName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteStream.DestinationSyncMode;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
Expand Down Expand Up @@ -186,7 +187,10 @@ void testWriteSuccess() throws Exception {
@Test
void testWriteIncremental() throws Exception {
final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG);
catalog.getStreams().forEach(stream -> stream.withSyncMode(SyncMode.INCREMENTAL));
catalog.getStreams().forEach(stream -> {
stream.withSyncMode(SyncMode.INCREMENTAL);
stream.withDestinationSyncMode(DestinationSyncMode.APPEND);
});

final PostgresDestination destination = new PostgresDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1

LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=0.2.1
LABEL io.airbyte.name=airbyte/destination-redshift
Loading