diff --git a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java index d2f29b64f62b..0ae9ab51d7b7 100644 --- a/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java +++ b/airbyte-integrations/bases/base-java/src/main/java/io/airbyte/integrations/base/IntegrationRunner.java @@ -29,7 +29,6 @@ import com.google.common.base.Preconditions; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; -import io.airbyte.config.State; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteMessage.Type; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; @@ -100,10 +99,8 @@ public void run(String[] args) throws Exception { case READ -> { final JsonNode config = parseConfig(parsed.getConfigPath()); final ConfiguredAirbyteCatalog catalog = parseConfig(parsed.getCatalogPath(), ConfiguredAirbyteCatalog.class); - // todo (cgardens) - should we should only send the contents of the state field to the integration, - // not the whole struct. this runner obfuscates everything but the contents. - final Optional stateOptional = parsed.getStatePath().map(path -> parseConfig(path, State.class)); - final Stream messageStream = source.read(config, catalog, stateOptional.map(State::getState).orElse(null)); + final Optional stateOptional = parsed.getStatePath().map(IntegrationRunner::parseConfig); + final Stream messageStream = source.read(config, catalog, stateOptional.orElse(null)); messageStream.map(Jsons::serialize).forEach(stdoutConsumer); messageStream.close(); } diff --git a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java index 28abd7923e6b..37a3bd6de5ed 100644 --- a/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java +++ b/airbyte-integrations/bases/base-java/src/test/java/io/airbyte/integrations/base/IntegrationRunnerTest.java @@ -37,7 +37,6 @@ import com.google.common.collect.Lists; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; -import io.airbyte.config.State; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteConnectionStatus; import io.airbyte.protocol.models.AirbyteConnectionStatus.Status; @@ -78,14 +77,13 @@ class IntegrationRunnerTest { private static final AirbyteCatalog CATALOG = new AirbyteCatalog().withStreams(Lists.newArrayList(new AirbyteStream().withName(STREAM_NAME))); private static final ConfiguredAirbyteCatalog CONFIGURED_CATALOG = CatalogHelpers.toDefaultConfiguredCatalog(CATALOG); - private static final State STATE = new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945"))); + private static final JsonNode STATE = Jsons.jsonNode(ImmutableMap.of("checkpoint", "05/08/1945")); private IntegrationCliParser cliParser; private Consumer stdoutConsumer; private Destination destination; private Source source; private Path configPath; - private Path catalogPath; private Path configuredCatalogPath; private Path statePath; @@ -99,7 +97,6 @@ void setup() throws IOException { Path configDir = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "test"); configPath = IOs.writeFile(configDir, CONFIG_FILE_NAME, CONFIG_STRING); - catalogPath = IOs.writeFile(configDir, CATALOG_FILE_NAME, Jsons.serialize(CATALOG)); configuredCatalogPath = IOs.writeFile(configDir, CONFIGURED_CATALOG_FILE_NAME, Jsons.serialize(CONFIGURED_CATALOG)); statePath = IOs.writeFile(configDir, STATE_FILE_NAME, Jsons.serialize(STATE)); } @@ -185,11 +182,11 @@ void testRead() throws Exception { .withData(Jsons.jsonNode(ImmutableMap.of("names", "reginald")))); when(cliParser.parse(ARGS)).thenReturn(intConfig); - when(source.read(CONFIG, CONFIGURED_CATALOG, STATE.getState())).thenReturn(Stream.of(message1, message2)); + when(source.read(CONFIG, CONFIGURED_CATALOG, STATE)).thenReturn(Stream.of(message1, message2)); new IntegrationRunner(cliParser, stdoutConsumer, null, source).run(ARGS); - verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE.getState()); + verify(source).read(CONFIG, CONFIGURED_CATALOG, STATE); verify(stdoutConsumer).accept(Jsons.serialize(message1)); verify(stdoutConsumer).accept(Jsons.serialize(message2)); } diff --git a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java index 9ba00c7699d1..46a96995caa4 100644 --- a/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java +++ b/airbyte-integrations/bases/standard-source-test/src/main/java/io/airbyte/integrations/standardtest/source/TestSource.java @@ -24,14 +24,13 @@ package io.airbyte.integrations.standardtest.source; -import static io.airbyte.protocol.models.SyncMode.FULL_REFRESH; -import static io.airbyte.protocol.models.SyncMode.INCREMENTAL; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.config.JobGetSpecConfig; import io.airbyte.config.StandardCheckConnectionInput; @@ -65,6 +64,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Optional; import java.util.UUID; import java.util.stream.Collectors; import org.junit.jupiter.api.AfterEach; @@ -263,10 +263,10 @@ public void testIncrementalSyncWithState() throws Exception { return; } - ConfiguredAirbyteCatalog configuredAirbyteCatalog = withSourceDefinedCursors(getConfiguredCatalog()); - List airbyteMessages = runRead(configuredAirbyteCatalog, getState()); - List recordMessages = filterRecords(airbyteMessages); - List stateMessages = airbyteMessages + final ConfiguredAirbyteCatalog configuredAirbyteCatalog = withSourceDefinedCursors(getConfiguredCatalog()); + final List airbyteMessages = runRead(configuredAirbyteCatalog, getState()); + final List recordMessages = filterRecords(airbyteMessages); + final List stateMessages = airbyteMessages .stream() .filter(m -> m.getType() == Type.STATE) .map(AirbyteMessage::getState) @@ -310,8 +310,18 @@ private List filterRecords(Collection mess private ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatalog catalog) { ConfiguredAirbyteCatalog clone = Jsons.clone(catalog); for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) { - if (configuredStream.getSyncMode() == INCREMENTAL && configuredStream.getStream().getSourceDefinedCursor()) { - configuredStream.setCursorField(configuredStream.getStream().getDefaultCursorField()); + if (configuredStream.getStream().getSupportedSyncModes().contains(io.airbyte.protocol.models.SyncMode.INCREMENTAL)) { + configuredStream.setSyncMode(io.airbyte.protocol.models.SyncMode.INCREMENTAL); + if (Optional.ofNullable(configuredStream.getStream().getSourceDefinedCursor()).orElse(false)) { + configuredStream.setCursorField(configuredStream.getStream().getDefaultCursorField()); + } else { + // todo (cgardens) - this is really too terrible. there are some column types that aren't supported + // so you just need to order your columns such that we pick a valid one. too much guessing here. got + // to fix. + // see cursor field to an arbitrary field in the stream. + configuredStream + .setCursorField(Lists.newArrayList(new ArrayList<>(Jsons.keys(configuredStream.getStream().getJsonSchema().get("properties"))).get(0))); + } } } return clone; @@ -320,8 +330,8 @@ private ConfiguredAirbyteCatalog withSourceDefinedCursors(ConfiguredAirbyteCatal private ConfiguredAirbyteCatalog withFullRefreshSyncModes(ConfiguredAirbyteCatalog catalog) { ConfiguredAirbyteCatalog clone = Jsons.clone(catalog); for (ConfiguredAirbyteStream configuredStream : clone.getStreams()) { - if (configuredStream.getStream().getSupportedSyncModes().contains(FULL_REFRESH)) { - configuredStream.setSyncMode(FULL_REFRESH); + if (configuredStream.getStream().getSupportedSyncModes().contains(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)) { + configuredStream.setSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH); } } return clone; @@ -330,7 +340,7 @@ private ConfiguredAirbyteCatalog withFullRefreshSyncModes(ConfiguredAirbyteCatal private boolean sourceSupportsIncremental() throws Exception { ConfiguredAirbyteCatalog catalog = getConfiguredCatalog(); for (ConfiguredAirbyteStream stream : catalog.getStreams()) { - if (stream.getStream().getSupportedSyncModes().contains(INCREMENTAL)) { + if (stream.getStream().getSupportedSyncModes().contains(io.airbyte.protocol.models.SyncMode.INCREMENTAL)) { return true; } } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 7dee19e77e88..8f5072ca456f 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -29,12 +29,12 @@ import io.airbyte.commons.json.Jsons; import io.airbyte.integrations.base.IntegrationRunner; import io.airbyte.integrations.base.Source; -import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.jdbc.AbstractJooqSource; import org.jooq.SQLDialect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class PostgresSource extends AbstractJdbcSource implements Source { +public class PostgresSource extends AbstractJooqSource implements Source { private static final Logger LOGGER = LoggerFactory.getLogger(PostgresSource.class); diff --git a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java index f90e2e943e19..87487b756b58 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java +++ b/airbyte-integrations/connectors/source-postgres/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/PostgresIntegrationTests.java @@ -26,6 +26,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.resources.MoreResources; import io.airbyte.db.Database; @@ -35,6 +36,8 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.Field; +import io.airbyte.protocol.models.Field.JsonSchemaPrimitive; +import io.airbyte.protocol.models.SyncMode; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -102,10 +105,15 @@ protected JsonNode getConfig() { @Override protected ConfiguredAirbyteCatalog getConfiguredCatalog() { - return CatalogHelpers.createConfiguredAirbyteCatalog( + final ConfiguredAirbyteCatalog configuredAirbyteCatalog = CatalogHelpers.createConfiguredAirbyteCatalog( STREAM_NAME, - Field.of("id", Field.JsonSchemaPrimitive.NUMBER), - Field.of("name", Field.JsonSchemaPrimitive.STRING)); + Field.of("id", JsonSchemaPrimitive.NUMBER), + Field.of("name", JsonSchemaPrimitive.STRING)); + + configuredAirbyteCatalog.getStreams().forEach( + configuredStream -> configuredStream.getStream().withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))); + + return configuredAirbyteCatalog; } @Override diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index e4e11b6d204e..0b7951d3f74e 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -231,10 +231,10 @@ public void testCanReadUtf8() throws Exception { @SuppressWarnings("ResultOfMethodCallIgnored") @Test - void testReadFailure() throws Exception { + void testReadFailure() { final ConfiguredAirbyteStream spiedAbStream = spy(CONFIGURED_CATALOG.getStreams().get(0)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream)); - doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream(); + doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream(); final PostgresSource source = new PostgresSource(); diff --git a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java index 9ce6cd933602..918ce7ccb119 100644 --- a/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java +++ b/airbyte-protocol/models/src/main/java/io/airbyte/protocol/models/CatalogHelpers.java @@ -31,6 +31,7 @@ import io.airbyte.commons.json.Jsons; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -64,6 +65,28 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(String strea return new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(streamName).withJsonSchema(fieldsToJsonSchema(fields))); } + public static ConfiguredAirbyteStream createIncrementalConfiguredAirbyteStream( + String streamName, + SyncMode syncMode, + String cursorFieldName, + Field... fields) { + return createIncrementalConfiguredAirbyteStream(streamName, syncMode, cursorFieldName, Arrays.asList(fields)); + } + + public static ConfiguredAirbyteStream createIncrementalConfiguredAirbyteStream( + String streamName, + SyncMode syncMode, + String cursorFieldName, + List fields) { + return new ConfiguredAirbyteStream() + .withStream(new AirbyteStream() + .withName(streamName) + .withSupportedSyncModes(Collections.singletonList(syncMode)) + .withJsonSchema(fieldsToJsonSchema(fields))) + .withSyncMode(syncMode) + .withCursorField(Collections.singletonList(cursorFieldName)); + } + /** * Convert a Catalog into a ConfiguredCatalog. This applies minimum default to the Catalog to make * it a valid ConfiguredCatalog.