Skip to content

Commit

Permalink
destinations
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens committed Nov 18, 2020
1 parent 00e9dbd commit 329b0f1
Show file tree
Hide file tree
Showing 19 changed files with 85 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
package io.airbyte.integrations.base;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;

public interface Destination extends Integration {

Expand All @@ -41,6 +41,6 @@ public interface Destination extends Integration {
* will always be called once regardless of success or failure.
* @throws Exception - any exception.
*/
DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) throws Exception;
DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Scanner;
Expand Down Expand Up @@ -110,7 +111,7 @@ public void run(String[] args) throws Exception {
// destination only
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
final AirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);
consumeWriteStream(consumer);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.DefaultCheckConnectionWorker;
import io.airbyte.workers.DefaultGetSpecWorker;
import io.airbyte.workers.OutputAndStatus;
Expand Down Expand Up @@ -241,9 +243,10 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
@ArgumentsSource(DataArgumentsProvider.class)
public void testSync(String messagesFilename, String catalogFilename) throws Exception {
final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfig(), messages, catalog);
runSync(getConfig(), messages, configuredCatalog);

assertSameMessages(messages, retrieveRecordsForCatalog(catalog));
}
Expand All @@ -260,9 +263,10 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil
}

final AirbyteCatalog catalog = Jsons.deserialize(MoreResources.readResource(catalogFilename), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> messages = MoreResources.readResource(messagesFilename).lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfigWithBasicNormalization(), messages, catalog);
runSync(getConfigWithBasicNormalization(), messages, configuredCatalog);

assertSameMessages(messages, retrieveRecordsForCatalog(catalog));
assertSameMessages(messages, retrieveNormalizedRecordsForCatalog(catalog), true);
Expand All @@ -275,9 +279,10 @@ public void testSyncWithNormalization(String messagesFilename, String catalogFil
public void testSecondSync() throws Exception {
final AirbyteCatalog catalog =
Jsons.deserialize(MoreResources.readResource("exchange_rate_catalog.json"), AirbyteCatalog.class);
final ConfiguredAirbyteCatalog configuredCatalog = CatalogHelpers.toDefaultConfiguredCatalog(catalog);
final List<AirbyteMessage> firstSyncMessages = MoreResources.readResource("exchange_rate_messages.txt").lines()
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfig(), firstSyncMessages, catalog);
runSync(getConfig(), firstSyncMessages, configuredCatalog);

final List<AirbyteMessage> secondSyncMessages = Lists.newArrayList(new AirbyteMessage()
.withRecord(new AirbyteRecordMessage()
Expand All @@ -287,7 +292,7 @@ public void testSecondSync() throws Exception {
.put("HKD", 10)
.put("NZD", 700)
.build()))));
runSync(getConfig(), secondSyncMessages, catalog);
runSync(getConfig(), secondSyncMessages, configuredCatalog);
assertSameMessages(secondSyncMessages, retrieveRecordsForCatalog(catalog));
}

Expand All @@ -301,7 +306,7 @@ private OutputAndStatus<StandardCheckConnectionOutput> runCheck(JsonNode config)
.run(new StandardCheckConnectionInput().withConnectionConfiguration(config), jobRoot);
}

private void runSync(JsonNode config, List<AirbyteMessage> messages, AirbyteCatalog catalog) throws Exception {
private void runSync(JsonNode config, List<AirbyteMessage> messages, ConfiguredAirbyteCatalog catalog) throws Exception {

final StandardTargetConfig targetConfig = new StandardTargetConfig()
.withConnectionId(UUID.randomUUID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.ByteArrayInputStream;
import java.io.IOException;
Expand Down Expand Up @@ -186,13 +186,13 @@ private static Job waitForQuery(Job queryJob) {
* @return consumer that writes singer messages to the database.
*/
@Override
public DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) {
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) {
final BigQuery bigquery = getBigQuery(config);
Map<String, WriteConfig> writeConfigs = new HashMap<>();
final String datasetId = config.get(CONFIG_DATASET_ID).asText();

// create tmp tables if not exist
for (final AirbyteStream stream : catalog.getStreams()) {
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String tableName = NamingHelper.getRawTableName(stream.getName());
final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli();

Expand Down Expand Up @@ -251,9 +251,9 @@ public static class RecordConsumer extends FailureTrackingConsumer<AirbyteMessag

private final BigQuery bigquery;
private final Map<String, WriteConfig> writeConfigs;
private final AirbyteCatalog catalog;
private final ConfiguredAirbyteCatalog catalog;

public RecordConsumer(BigQuery bigquery, Map<String, WriteConfig> writeConfigs, AirbyteCatalog catalog) {
public RecordConsumer(BigQuery bigquery, Map<String, WriteConfig> writeConfigs, ConfiguredAirbyteCatalog catalog) {
this.bigquery = bigquery;
this.writeConfigs = writeConfigs;
this.catalog = catalog;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ public class BigQueryIntegrationTest extends TestDestination {
private static final String CONFIG_PROJECT_ID = "project_id";
private static final String CONFIG_CREDS = "credentials_json";

private static final String COLUMN_DATA = "data";

private BigQuery bigquery;
private Dataset dataset;
private boolean tornDown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
Expand Down Expand Up @@ -100,11 +100,11 @@ class BigQueryDestinationTest {
private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createAirbyteStream(USERS_STREAM_NAME, io.airbyte.protocol.models.Field.of("name", JsonSchemaPrimitive.STRING),
private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, io.airbyte.protocol.models.Field.of("name", JsonSchemaPrimitive.STRING),
io.airbyte.protocol.models.Field
.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));

private JsonNode config;

Expand Down Expand Up @@ -228,7 +228,7 @@ void testWriteSuccess() throws Exception {
assertEquals(expectedTasksJson.size(), tasksActual.size());
assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson));

assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(Collectors.toList()));
assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList()));
}

@SuppressWarnings("ResultOfMethodCallIgnored")
Expand All @@ -244,8 +244,8 @@ void testWriteFailure() throws Exception {
consumer.accept(MESSAGE_USERS2);
consumer.close();

final List<String> tableNames = CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(toList());
assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(AirbyteStream::getName).collect(Collectors.toList()));
final List<String> tableNames = CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(toList());
assertTmpTablesNotPresent(CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getName).collect(Collectors.toList()));
// assert that no tables were created.
assertTrue(fetchNamesOfTablesInDb().stream().noneMatch(tableName -> tableNames.stream().anyMatch(tableName::startsWith)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.FileWriter;
import java.io.IOException;
Expand Down Expand Up @@ -85,14 +85,14 @@ public AirbyteConnectionStatus check(JsonNode config) {
* @throws IOException - exception throw in manipulating the filesytem.
*/
@Override
public DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) throws IOException {
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws IOException {
final Path destinationDir = getDestinationPath(config);

FileUtils.forceMkdir(destinationDir.toFile());

final long now = Instant.now().toEpochMilli();
final Map<String, WriteConfig> writeConfigs = new HashMap<>();
for (final AirbyteStream stream : catalog.getStreams()) {
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final Path tmpPath = destinationDir.resolve(stream.getName() + "_" + now + ".csv");
final Path finalPath = destinationDir.resolve(stream.getName() + ".csv");
final FileWriter fileWriter = new FileWriter(tmpPath.toFile());
Expand Down Expand Up @@ -124,9 +124,9 @@ private Path getDestinationPath(JsonNode config) {
private static class CsvConsumer extends FailureTrackingConsumer<AirbyteMessage> {

private final Map<String, WriteConfig> writeConfigs;
private final AirbyteCatalog catalog;
private final ConfiguredAirbyteCatalog catalog;

public CsvConsumer(Map<String, WriteConfig> writeConfigs, AirbyteCatalog catalog) {
public CsvConsumer(Map<String, WriteConfig> writeConfigs, ConfiguredAirbyteCatalog catalog) {
this.catalog = catalog;
LOGGER.info("initializing consumer.");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
Expand Down Expand Up @@ -89,10 +89,10 @@ class CsvDestinationTest {
private static final AirbyteMessage MESSAGE_STATE = new AirbyteMessage().withType(AirbyteMessage.Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(ImmutableMap.builder().put("checkpoint", "now!").build())));

private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING),
private static final ConfiguredAirbyteCatalog CATALOG = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
CatalogHelpers.createConfiguredAirbyteStream(USERS_STREAM_NAME, Field.of("name", JsonSchemaPrimitive.STRING),
Field.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));

private Path destinationPath;
private JsonNode config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.queue.BigQueue;
import java.io.IOException;
Expand Down Expand Up @@ -126,13 +126,13 @@ public AirbyteConnectionStatus check(JsonNode config) {
* @throws Exception - anything could happen!
*/
@Override
public DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) throws Exception {
public DestinationConsumer<AirbyteMessage> write(JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception {
// connect to db.
final Database database = getDatabase(config);
Map<String, WriteConfig> writeBuffers = new HashMap<>();

// create tmp tables if not exist
for (final AirbyteStream stream : catalog.getStreams()) {
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String tableName = NamingHelper.getRawTableName(stream.getName());
final String tmpTableName = stream.getName() + "_" + Instant.now().toEpochMilli();
database.query(ctx -> ctx.execute(String.format(
Expand Down Expand Up @@ -164,9 +164,9 @@ public static class RecordConsumer extends FailureTrackingConsumer<AirbyteMessag
private final ScheduledExecutorService writerPool;
private final Database database;
private final Map<String, WriteConfig> writeConfigs;
private final AirbyteCatalog catalog;
private final ConfiguredAirbyteCatalog catalog;

public RecordConsumer(Database database, Map<String, WriteConfig> writeConfigs, AirbyteCatalog catalog) {
public RecordConsumer(Database database, Map<String, WriteConfig> writeConfigs, ConfiguredAirbyteCatalog catalog) {
this.database = database;
this.writeConfigs = writeConfigs;
this.catalog = catalog;
Expand Down
Loading

0 comments on commit 329b0f1

Please sign in to comment.