Skip to content

Commit

Permalink
add incremental to jooq source (and postgres) (#1172)
Browse files Browse the repository at this point in the history
  • Loading branch information
cgardens authored Dec 9, 2020
1 parent 6836c5c commit 25689ee
Show file tree
Hide file tree
Showing 24 changed files with 1,906 additions and 35 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/

package io.airbyte.commons.stream;

import java.util.Iterator;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

public class MoreStreams {

public static <T> Stream<T> toStream(Iterator<T> iterator) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@
"sourceDefinitionId": "decd338e-5647-4c0b-adf4-da0e75f5a750",
"name": "Postgres",
"dockerRepository": "airbyte/source-postgres",
"dockerImageTag": "0.1.5",
"dockerImageTag": "0.1.6",
"documentationUrl": "https://hub.docker.com/r/airbyte/source-postgres"
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
- sourceDefinitionId: decd338e-5647-4c0b-adf4-da0e75f5a750
name: Postgres
dockerRepository: airbyte/source-postgres
dockerImageTag: 0.1.5
dockerImageTag: 0.1.6
documentationUrl: https://hub.docker.com/r/airbyte/source-postgres
- sourceDefinitionId: cd42861b-01fc-4658-a8ab-5d11d0510f01
name: Recurly
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -100,10 +99,8 @@ public void run(String[] args) throws Exception {
case READ -> {
final JsonNode config = parseConfig(parsed.getConfigPath());
final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class);
// todo (cgardens) - should we should only send the contents of the state field to the integration,
// not the whole struct. this runner obfuscates everything but the contents.
final Optional<State> stateOptional = parsed.getStatePath().map(path -> parseConfig(path, State.class));
final Stream<AirbyteMessage> messageStream = source.read(config, catalog, stateOptional.map(State::getState).orElse(null));
final Optional<JsonNode> stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig);
final Stream<AirbyteMessage> messageStream = source.read(config, catalog, stateOptional.orElse(null));
messageStream.map(Jsons::serialize).forEach(stdoutConsumer);
messageStream.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.State;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
Expand Down Expand Up @@ -78,14 +77,13 @@ class IntegrationRunnerTest {

private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME)));
private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG);
private static final State STATE = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945")));
private static final JsonNode STATE = Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945"));

private IntegrationCliParser cliParser;
private Consumer<String> stdoutConsumer;
private Destination destination;
private Source source;
private Path configPath;
private Path catalogPath;
private Path configuredCatalogPath;
private Path statePath;

Expand All @@ -99,7 +97,6 @@ void setup() throws IOException {
Path configDir = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test");

configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING);
catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(CATALOG));
configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG));
statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE));
}
Expand Down Expand Up @@ -185,11 +182,11 @@ void testRead() throws Exception {
.withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald"))));

when(cliParser.parse(ARGS)).thenReturn(intConfig);
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE.getState())).thenReturn(Stream.of(message1, message2));
when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenReturn(Stream.of(message1, message2));

new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS);

verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE.getState());
verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE);
verify(stdoutConsumer).accept(Jsons.serialize(message1));
verify(stdoutConsumer).accept(Jsons.serialize(message2));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,9 @@ private List<AirbyteRecordMessage> filterRecords(Collection<AirbyteMessage> mess
private ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatalog catalog) {
ConfiguredAirbyteCatalog clone = Jsons.clone(catalog);
for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) {
if (configuredStream.getSyncMode() == INCREMENTAL && configuredStream.getStream().getSourceDefinedCursor()) {
if (configuredStream.getSyncMode() == INCREMENTAL
&& configuredStream.getStream().getSourceDefinedCursor() != null
&& configuredStream.getStream().getSourceDefinedCursor()) {
configuredStream.setCursorField(configuredStream.getStream().getDefaultCursorField());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ This is an autogenerated file describing the high-level steps you need to take t
./gradlew :airbyte-integrations:connectors:source-{{dashCase name}}:build
```
1. Define the specification for this source connector by modifying `source_{{snakeCase name}}/spec.json`.
A specification is a JSON file which uses JSONSchema to declare all the inputs needed for your integration to function
correctly. For example, if you were configuring a Postgres source, your specification might declare that you need a
`username` field which a string, a `password` field which is a string, a `host_url` which is a URI-formatted string,
and a `port` which is a `number`. The Airbyte UI will generate configurations that match the specification (by asking
A specification is a JSON file which uses JSONSchema to declare all the inputs needed for your integration to function
correctly. For example, if you were configuring a Postgres source, your specification might declare that you need a
`username` field which a string, a `password` field which is a string, a `host_url` which is a URI-formatted string,
and a `port` which is a `number`. The Airbyte UI will generate configurations that match the specification (by asking
the user to input them) and pass those configs to your source connector as input when it is being run.
You can also manually generate config files and pass them to the integration CLI.
1. Implement your integration by editing `source_{{snakeCase name}}/source.py` (and creating additional files as necessary).
Expand All @@ -25,6 +25,7 @@ This is an autogenerated file describing the high-level steps you need to take t
1. Standard tests: These are a bank of tests that all sources need to pass. We have pre-written them for you, all you need to do is provide this test suite with the proper inputs. These tests will be re-run as part of the CI on the master branch.
1. Supplying inputs: In the generated code, you'll find `integration_tests/standard_source_test.py`. You need to implement each method. The template provides comments explaining what each method does. These tests will have access to any files in the `secrets` and `integration_tests` directories, as well as the directory that contains `source.py`.
1. Note: each method is called by the tests from a separate process, so no instance variables can be stored or accessed from one method to another.
1. If your source supports incremental syncing, then make sure that the catalog that is returned in the get_catalog method is configured for incremental syncing (e.g. include cursor fields, etc).
1. Running the tests: You can run these tests with the following command `./gradlew :airbyte-integrations:connectors:source-{{dashCase name}}:standardSourceTestPython`.
1. Reading the tests: If you'd like to take a look at the implementation of these tests they can be found in `airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java`. The tests are written in java, but Airbyte handles translating the inputs from your python implementation of `standard_source_test.py` to what the tests need. You do not need to worry about writing anything in java!
1. Update `README.md` to document the usage of your integration. If API credentials are required to run the integration, please document how they can be obtained or link to a how-to guide.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ This is an autogenerated file describing the steps needed to implement a new Air
./gradlew :airbyte-integrations:connectors:source-{{dashCase name}}-singer:build
```
1. Define the specification for this source connector by modifying `source_{{snakeCase name}}_singer/spec.json`.
A specification is a JSON file which uses JSONSchema to declare all the inputs needed for your integration to function
correctly. For example, if you were configuring a Postgres source, your specification might declare that you need a
`username` field which a string, a `password` field which is a string, a `host_url` which is a URI-formatted string,
and a `port` which is a `number`. The Airbyte UI will generate configurations that match the specification (by asking
A specification is a JSON file which uses JSONSchema to declare all the inputs needed for your integration to function
correctly. For example, if you were configuring a Postgres source, your specification might declare that you need a
`username` field which a string, a `password` field which is a string, a `host_url` which is a URI-formatted string,
and a `port` which is a `number`. The Airbyte UI will generate configurations that match the specification (by asking
the user to input them) and pass those configs to your source connector as input when it is being run.
You can also manually generate config files and pass them to the integration CLI.
1. Implement your integration by editing `source_{{snakeCase name}}_singer/source.py` (and creating additional files as necessary).
Expand All @@ -26,6 +26,7 @@ This is an autogenerated file describing the steps needed to implement a new Air
1. Standard tests: These are a bank of tests that all sources need to pass. We have pre-written them for you, all you need to do is provide this test suite with the proper inputs. These tests will be re-run as part of the CI on the master branch.
1. Supplying inputs: In the generated code, you'll find `integration_tests/standard_source_test.py`. You need to implement each method. The template provides comments explaining what each method does. These tests will have access to any files in the `secrets` and `integration_tests` directories, as well as the directory that contains `source.py`.
1. Note: each method is called by the tests from a separate process, so no instance variables can be stored or accessed from one method to another.
1. If your source supports incremental syncing, then make sure that the catalog that is returned in the get_catalog method is configured for incremental syncing (e.g. include cursor fields, etc).
1. Running the tests: You can run these tests with the following command `./gradlew :airbyte-integrations:connectors:source-{{dashCase name}}:standardSourceTestPython`.
1. Reading the tests: If you'd like to take a look at the implementation of these tests they can be found in `airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java`. The tests are written in java, but Airbyte handles translating the inputs from your python implementation of `standard_source_test.py` to what the tests need. You do not need to worry about writing anything in java!
1. Update `README.md` to document the usage of your integration. If API credentials are required to run the integration, please document how they can be obtained or link to a how-to guide.
Expand Down
19 changes: 18 additions & 1 deletion airbyte-integrations/connectors/source-jdbc/build.gradle
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import org.jsonschema2pojo.SourceType

plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
// todo: needs standard source test
id 'com.github.eirnym.js2p' version '1.0'
}

application {
mainClass = 'io.airbyte.integrations.source.jdbc.JdbcSource'
}

dependencies {
implementation project(':airbyte-commons')
implementation project(':airbyte-db')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:models')
Expand All @@ -22,3 +25,17 @@ dependencies {

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}

jsonSchema2Pojo {
sourceType = SourceType.YAMLSCHEMA
source = files("${sourceSets.main.output.resourcesDir}/jdbc_models")
targetDirectory = new File(project.buildDir, 'generated/src/gen/java/')
removeOldOutput = true

targetPackage = 'io.airbyte.integrations.source.jdbc.models'

useLongIntegers = true
generateBuilders = true
includeConstructors = false
includeSetters = true
}
Loading

0 comments on commit 25689ee

Please sign in to comment.