Skip to content

Commit

Permalink
migrate destinations to use airbyte protocol (#557)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Oct 14, 2020
1 parent 5e2ccaa commit ba9bf4a
Show file tree
Hide file tree
Showing 25 changed files with 405 additions and 292 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.time;

public class Instants {

public static long toSeconds(long millis) {
return millis / 1000;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.time;

import static org.junit.jupiter.api.Assertions.*;

import java.time.Instant;
import org.junit.jupiter.api.Test;

class InstantsTest {

@Test
void testMillisToSeconds() {
final Instant now = Instant.now();
assertEquals(now.getEpochSecond(), Instants.toSeconds(now.toEpochMilli()));
}

}
2 changes: 1 addition & 1 deletion airbyte-integrations/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dependencies {
implementation 'commons-cli:commons-cli:1.4'

implementation project(':airbyte-config:models')
implementation project(':airbyte-singer')
implementation project(':airbyte-protocol:models')
}

buildImage.dependsOn ':airbyte-integrations:base:buildImage'
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ConnectorSpecification;
import io.airbyte.config.Schema;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.singer.SingerMessage;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;

// todo (cgardens) - share common parts of this interface with source.
public interface Destination {
Expand Down Expand Up @@ -67,14 +67,12 @@ public interface Destination {
*
* @param config - integration-specific configuration object as json. e.g. { "username": "airbyte",
* "password": "super secure" }
* @param schema - schema of the incoming messages.
* @param catalog - schema of the incoming messages.
* @return Consumer that accepts message. The {@link DestinationConsumer#accept(Object)} will be
* called n times where n is the number of messages. {@link DestinationConsumer#complete()}
* will be called once if all messages were accepted successfully.
* {@link DestinationConsumer#close()} will always be called once regardless of success or
* failure.
* called n times where n is the number of messages. {@link DestinationConsumer#close()}
* will always be called once regardless of success or failure.
* @throws Exception - any exception.
*/
DestinationConsumer<SingerMessage> write(JsonNode config, Schema schema) throws Exception;
DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.Schema;
import io.airbyte.singer.SingerMessage;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Scanner;
Expand Down Expand Up @@ -81,8 +81,8 @@ public void run(String[] args) throws Exception {
throw new RuntimeException("Not implemented");
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
final Schema schema = parseConfig(parsed.getCatalogPath(), Schema.class);
final DestinationConsumer<SingerMessage> consumer = destination.write(config, schema);
final AirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), AirbyteCatalog.class);
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);
consumeWriteStream(consumer);
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
Expand All @@ -91,12 +91,12 @@ public void run(String[] args) throws Exception {
LOGGER.info("Completed integration: {}", destination.getClass().getName());
}

void consumeWriteStream(DestinationConsumer<SingerMessage> consumer) throws Exception {
void consumeWriteStream(DestinationConsumer<AirbyteMessage> consumer) throws Exception {
final Scanner input = new Scanner(System.in);
try (consumer) {
while (input.hasNextLine()) {
final String inputString = input.nextLine();
final Optional<SingerMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, SingerMessage.class);
final Optional<AirbyteMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (singerMessageOptional.isPresent()) {
consumer.accept(singerMessageOptional.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorSpecification;
import io.airbyte.config.Schema;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.config.Stream;
import io.airbyte.singer.SingerMessage;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -64,8 +65,10 @@ class IntegrationRunnerTest {

private static final String CONFIG_STRING = "{ \"username\": \"airbyte\" }";
private static final JsonNode CONFIG = Jsons.deserialize(CONFIG_STRING);
private static final String STREAM_NAME = "users";
private static final Long EMITTED_AT = Instant.now().toEpochMilli();

private static final Schema SCHEMA = new Schema().withStreams(Lists.newArrayList(new Stream().withName("users")));
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME)));

private IntegrationCliParser cliParser;
private Consumer<String> stdoutConsumer;
Expand All @@ -82,7 +85,7 @@ void setup() throws IOException {
Path configDir = Files.createTempDirectory("test");

configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING);
catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(SCHEMA));
catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(CATALOG));
}

@Test
Expand Down Expand Up @@ -113,48 +116,55 @@ void testCheck() throws Exception {
verify(stdoutConsumer).accept(Jsons.serialize(output));
}

@Test
void testDiscover() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.discover(Path.of(configPath.toString()));
final StandardDiscoverCatalogOutput output = new StandardDiscoverCatalogOutput().withSchema(SCHEMA);

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.discover(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, destination).run(ARGS);

verify(destination).discover(CONFIG);
verify(stdoutConsumer).accept(Jsons.serialize(output));
}
// @Test
// void testDiscover() throws Exception {
// final IntegrationConfig intConfig = IntegrationConfig.discover(Path.of(configPath.toString()));
// final StandardDiscoverCatalogOutput output = new
// StandardDiscoverCatalogOutput().withSchema(CATALOG);
//
// when(cliParser.parse(ARGS)).thenReturn(intConfig);
// when(destination.discover(CONFIG)).thenReturn(output);
//
// new IntegrationRunner(cliParser, stdoutConsumer, destination).run(ARGS);
//
// verify(destination).discover(CONFIG);
// verify(stdoutConsumer).accept(Jsons.serialize(output));
// }

@SuppressWarnings("unchecked")
@Test
void testWrite() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.write(Path.of(configPath.toString()), Path.of(catalogPath.toString()));
final DestinationConsumer<SingerMessage> destinationConsumerMock = mock(DestinationConsumer.class);
final DestinationConsumer<AirbyteMessage> destinationConsumerMock = mock(DestinationConsumer.class);
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.write(CONFIG, SCHEMA)).thenReturn(destinationConsumerMock);
when(destination.write(CONFIG, CATALOG)).thenReturn(destinationConsumerMock);

final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination));
doNothing().when(runner).consumeWriteStream(destinationConsumerMock);
runner.run(ARGS);

verify(destination).write(CONFIG, SCHEMA);
verify(destination).write(CONFIG, CATALOG);
verify(runner).consumeWriteStream(destinationConsumerMock);
}

@SuppressWarnings("unchecked")
@Test
void testDestinationConsumerLifecycleSuccess() throws Exception {
final SingerMessage singerMessage1 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"blue\" }"));
final SingerMessage singerMessage2 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"yellow\" }"));
final AirbyteMessage singerMessage1 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"blue\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
final AirbyteMessage singerMessage2 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"yellow\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
System.setIn(new ByteArrayInputStream((Jsons.serialize(singerMessage1) + "\n" + Jsons.serialize(singerMessage2)).getBytes()));

final DestinationConsumer<SingerMessage> destinationConsumerMock = mock(DestinationConsumer.class);
final DestinationConsumer<AirbyteMessage> destinationConsumerMock = mock(DestinationConsumer.class);
new IntegrationRunner(null).consumeWriteStream(destinationConsumerMock);

InOrder inOrder = inOrder(destinationConsumerMock);
Expand All @@ -166,15 +176,21 @@ void testDestinationConsumerLifecycleSuccess() throws Exception {
@SuppressWarnings("unchecked")
@Test
void testDestinationConsumerLifecycleFailure() throws Exception {
final SingerMessage singerMessage1 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"blue\" }"));
final SingerMessage singerMessage2 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"yellow\" }"));
final AirbyteMessage singerMessage1 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"blue\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
final AirbyteMessage singerMessage2 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"yellow\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
System.setIn(new ByteArrayInputStream((Jsons.serialize(singerMessage1) + "\n" + Jsons.serialize(singerMessage2)).getBytes()));

final DestinationConsumer<SingerMessage> destinationConsumerMock = mock(DestinationConsumer.class);
final DestinationConsumer<AirbyteMessage> destinationConsumerMock = mock(DestinationConsumer.class);
doThrow(new IOException("error")).when(destinationConsumerMock).accept(singerMessage1);

assertThrows(IOException.class, () -> new IntegrationRunner(null).consumeWriteStream(destinationConsumerMock));
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bigquery-destination/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ dependencies {

implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-queue')
implementation project(':airbyte-singer')

integrationTestImplementation project(':airbyte-integrations:integration-test-lib')

Expand Down
Loading

0 comments on commit ba9bf4a

Please sign in to comment.