Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate destinations to use airbyte protocol #557

Merged
merged 2 commits into from
Oct 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.time;

public class Instants {

public static long toSeconds(long millis) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary when long seconds = TimeUnit.MILLISECONDS.toSeconds(millis); is part of the standard library?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough.

return millis / 1000;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.time;

import static org.junit.jupiter.api.Assertions.*;

import java.time.Instant;
import org.junit.jupiter.api.Test;

class InstantsTest {

@Test
void testMillisToSeconds() {
final Instant now = Instant.now();
assertEquals(now.getEpochSecond(), Instants.toSeconds(now.toEpochMilli()));
}

}
2 changes: 1 addition & 1 deletion airbyte-integrations/base-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ dependencies {
implementation 'commons-cli:commons-cli:1.4'

implementation project(':airbyte-config:models')
implementation project(':airbyte-singer')
implementation project(':airbyte-protocol:models')
}

buildImage.dependsOn ':airbyte-integrations:base:buildImage'
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.config.ConnectorSpecification;
import io.airbyte.config.Schema;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.singer.SingerMessage;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;

// todo (cgardens) - share common parts of this interface with source.
public interface Destination {
Expand Down Expand Up @@ -67,14 +67,12 @@ public interface Destination {
*
* @param config - integration-specific configuration object as json. e.g. { "username": "airbyte",
* "password": "super secure" }
* @param schema - schema of the incoming messages.
* @param catalog - schema of the incoming messages.
* @return Consumer that accepts message. The {@link DestinationConsumer#accept(Object)} will be
* called n times where n is the number of messages. {@link DestinationConsumer#complete()}
* will be called once if all messages were accepted successfully.
* {@link DestinationConsumer#close()} will always be called once regardless of success or
* failure.
* called n times where n is the number of messages. {@link DestinationConsumer#close()}
* will always be called once regardless of success or failure.
* @throws Exception - any exception.
*/
DestinationConsumer<SingerMessage> write(JsonNode config, Schema schema) throws Exception;
DestinationConsumer<AirbyteMessage> write(JsonNode config, AirbyteCatalog catalog) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.Schema;
import io.airbyte.singer.SingerMessage;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import java.nio.file.Path;
import java.util.Optional;
import java.util.Scanner;
Expand Down Expand Up @@ -81,8 +81,8 @@ public void run(String[] args) throws Exception {
throw new RuntimeException("Not implemented");
case WRITE -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
final Schema schema = parseConfig(parsed.getCatalogPath(), Schema.class);
final DestinationConsumer<SingerMessage> consumer = destination.write(config, schema);
final AirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), AirbyteCatalog.class);
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);
consumeWriteStream(consumer);
}
default -> throw new IllegalStateException("Unexpected value: " + parsed.getCommand());
Expand All @@ -91,12 +91,12 @@ public void run(String[] args) throws Exception {
LOGGER.info("Completed integration: {}", destination.getClass().getName());
}

void consumeWriteStream(DestinationConsumer<SingerMessage> consumer) throws Exception {
void consumeWriteStream(DestinationConsumer<AirbyteMessage> consumer) throws Exception {
final Scanner input = new Scanner(System.in);
try (consumer) {
while (input.hasNextLine()) {
final String inputString = input.nextLine();
final Optional<SingerMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, SingerMessage.class);
final Optional<AirbyteMessage> singerMessageOptional = Jsons.tryDeserialize(inputString, AirbyteMessage.class);
if (singerMessageOptional.isPresent()) {
consumer.accept(singerMessageOptional.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.ConnectorSpecification;
import io.airbyte.config.Schema;
import io.airbyte.config.StandardCheckConnectionOutput;
import io.airbyte.config.StandardCheckConnectionOutput.Status;
import io.airbyte.config.StandardDiscoverCatalogOutput;
import io.airbyte.config.Stream;
import io.airbyte.singer.SingerMessage;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.util.function.Consumer;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -64,8 +65,10 @@ class IntegrationRunnerTest {

private static final String CONFIG_STRING = "{ \"username\": \"airbyte\" }";
private static final JsonNode CONFIG = Jsons.deserialize(CONFIG_STRING);
private static final String STREAM_NAME = "users";
private static final Long EMITTED_AT = Instant.now().toEpochMilli();

private static final Schema SCHEMA = new Schema().withStreams(Lists.newArrayList(new Stream().withName("users")));
private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME)));

private IntegrationCliParser cliParser;
private Consumer<String> stdoutConsumer;
Expand All @@ -82,7 +85,7 @@ void setup() throws IOException {
Path configDir = Files.createTempDirectory("test");

configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING);
catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(SCHEMA));
catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(CATALOG));
}

@Test
Expand Down Expand Up @@ -113,48 +116,55 @@ void testCheck() throws Exception {
verify(stdoutConsumer).accept(Jsons.serialize(output));
}

@Test
void testDiscover() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.discover(Path.of(configPath.toString()));
final StandardDiscoverCatalogOutput output = new StandardDiscoverCatalogOutput().withSchema(SCHEMA);

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.discover(CONFIG)).thenReturn(output);

new IntegrationRunner(cliParser, stdoutConsumer, destination).run(ARGS);

verify(destination).discover(CONFIG);
verify(stdoutConsumer).accept(Jsons.serialize(output));
}
// @Test
// void testDiscover() throws Exception {
// final IntegrationConfig intConfig = IntegrationConfig.discover(Path.of(configPath.toString()));
// final StandardDiscoverCatalogOutput output = new
// StandardDiscoverCatalogOutput().withSchema(CATALOG);
//
// when(cliParser.parse(ARGS)).thenReturn(intConfig);
// when(destination.discover(CONFIG)).thenReturn(output);
//
// new IntegrationRunner(cliParser, stdoutConsumer, destination).run(ARGS);
//
// verify(destination).discover(CONFIG);
// verify(stdoutConsumer).accept(Jsons.serialize(output));
// }

@SuppressWarnings("unchecked")
@Test
void testWrite() throws Exception {
final IntegrationConfig intConfig = IntegrationConfig.write(Path.of(configPath.toString()), Path.of(catalogPath.toString()));
final DestinationConsumer<SingerMessage> destinationConsumerMock = mock(DestinationConsumer.class);
final DestinationConsumer<AirbyteMessage> destinationConsumerMock = mock(DestinationConsumer.class);
when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(destination.write(CONFIG, SCHEMA)).thenReturn(destinationConsumerMock);
when(destination.write(CONFIG, CATALOG)).thenReturn(destinationConsumerMock);

final IntegrationRunner runner = spy(new IntegrationRunner(cliParser, stdoutConsumer, destination));
doNothing().when(runner).consumeWriteStream(destinationConsumerMock);
runner.run(ARGS);

verify(destination).write(CONFIG, SCHEMA);
verify(destination).write(CONFIG, CATALOG);
verify(runner).consumeWriteStream(destinationConsumerMock);
}

@SuppressWarnings("unchecked")
@Test
void testDestinationConsumerLifecycleSuccess() throws Exception {
final SingerMessage singerMessage1 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"blue\" }"));
final SingerMessage singerMessage2 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"yellow\" }"));
final AirbyteMessage singerMessage1 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"blue\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
final AirbyteMessage singerMessage2 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"yellow\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
System.setIn(new ByteArrayInputStream((Jsons.serialize(singerMessage1) + "\n" + Jsons.serialize(singerMessage2)).getBytes()));

final DestinationConsumer<SingerMessage> destinationConsumerMock = mock(DestinationConsumer.class);
final DestinationConsumer<AirbyteMessage> destinationConsumerMock = mock(DestinationConsumer.class);
new IntegrationRunner(null).consumeWriteStream(destinationConsumerMock);

InOrder inOrder = inOrder(destinationConsumerMock);
Expand All @@ -166,15 +176,21 @@ void testDestinationConsumerLifecycleSuccess() throws Exception {
@SuppressWarnings("unchecked")
@Test
void testDestinationConsumerLifecycleFailure() throws Exception {
final SingerMessage singerMessage1 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"blue\" }"));
final SingerMessage singerMessage2 = new SingerMessage()
.withType(SingerMessage.Type.RECORD)
.withValue(Jsons.deserialize("{ \"color\": \"yellow\" }"));
final AirbyteMessage singerMessage1 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"blue\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
final AirbyteMessage singerMessage2 = new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(Jsons.deserialize("{ \"color\": \"yellow\" }"))
.withStream(STREAM_NAME)
.withEmittedAt(EMITTED_AT));
System.setIn(new ByteArrayInputStream((Jsons.serialize(singerMessage1) + "\n" + Jsons.serialize(singerMessage2)).getBytes()));

final DestinationConsumer<SingerMessage> destinationConsumerMock = mock(DestinationConsumer.class);
final DestinationConsumer<AirbyteMessage> destinationConsumerMock = mock(DestinationConsumer.class);
doThrow(new IOException("error")).when(destinationConsumerMock).accept(singerMessage1);

assertThrows(IOException.class, () -> new IntegrationRunner(null).consumeWriteStream(destinationConsumerMock));
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/bigquery-destination/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ dependencies {

implementation project(':airbyte-config:models')
implementation project(':airbyte-integrations:base-java')
implementation project(':airbyte-protocol:models')
implementation project(':airbyte-queue')
implementation project(':airbyte-singer')

integrationTestImplementation project(':airbyte-integrations:integration-test-lib')

Expand Down
Loading