diff --git a/airbyte-commons-converters/src/main/java/io/airbyte/commons/converters/CatalogClientConverters.java b/airbyte-commons-converters/src/main/java/io/airbyte/commons/converters/CatalogClientConverters.java index fb8e3c4be78..a1c2c8ad327 100644 --- a/airbyte-commons-converters/src/main/java/io/airbyte/commons/converters/CatalogClientConverters.java +++ b/airbyte-commons-converters/src/main/java/io/airbyte/commons/converters/CatalogClientConverters.java @@ -177,30 +177,28 @@ private static io.airbyte.config.AirbyteStream toStreamInternal(final io.airbyte } ((ObjectNode) properties).retain(selectedFieldNames); } - return new io.airbyte.config.AirbyteStream() - .withName(stream.getName()) - .withJsonSchema(stream.getJsonSchema()) - .withSupportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.config.SyncMode.class)) - .withSourceDefinedCursor(stream.getSourceDefinedCursor()) - .withDefaultCursorField(stream.getDefaultCursorField()) - .withSourceDefinedPrimaryKey( - Optional.ofNullable(stream.getSourceDefinedPrimaryKey()).orElse(Collections.emptyList())) - .withNamespace(stream.getNamespace()) - .withIsResumable(stream.isResumable()); + return new io.airbyte.config.AirbyteStream(stream.getName(), stream.getJsonSchema(), + Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.config.SyncMode.class)) + .withSourceDefinedCursor(stream.getSourceDefinedCursor()) + .withDefaultCursorField(stream.getDefaultCursorField()) + .withSourceDefinedPrimaryKey( + Optional.ofNullable(stream.getSourceDefinedPrimaryKey()).orElse(Collections.emptyList())) + .withNamespace(stream.getNamespace()) + .withIsResumable(stream.isResumable()); } private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airbyte.api.client.model.generated.AirbyteStream stream, final AirbyteStreamConfiguration config) throws JsonValidationException { - return new ConfiguredAirbyteStream() - .withStream(toStreamInternal(stream, config)) - .withSyncMode(Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class)) - .withDestinationSyncMode(Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class)) - .withPrimaryKey(config.getPrimaryKey()) - .withCursorField(config.getCursorField()) - .withGenerationId(config.getGenerationId()) - .withMinimumGenerationId(config.getMinimumGenerationId()) - .withSyncId(config.getSyncId()); + return new ConfiguredAirbyteStream( + toStreamInternal(stream, config), + Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class), + Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class)) + .withPrimaryKey(config.getPrimaryKey()) + .withCursorField(config.getCursorField()) + .withGenerationId(config.getGenerationId()) + .withMinimumGenerationId(config.getMinimumGenerationId()) + .withSyncId(config.getSyncId()); } /** diff --git a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/CatalogDiffHelpersTest.java b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/CatalogDiffHelpersTest.java index fec2ce9f553..fdaa5fccd6c 100644 --- a/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/CatalogDiffHelpersTest.java +++ b/airbyte-commons-protocol/src/test/java/io/airbyte/commons/protocol/CatalogDiffHelpersTest.java @@ -89,9 +89,10 @@ void testGetCatalogDiff() throws IOException { new io.airbyte.protocol.models.AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject()))); final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject())) - .withSyncMode(SyncMode.FULL_REFRESH))); + new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema2, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream(SALES, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND))); final Set actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog); final List expectedDiff = Stream.of( @@ -156,9 +157,10 @@ void testGetCatalogDiffWithInvalidSchema() throws IOException { new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(schema2))); final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject())) - .withSyncMode(SyncMode.FULL_REFRESH))); + new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema2, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream(SALES, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND))); final Set actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog); @@ -178,9 +180,10 @@ void testGetCatalogDiffWithBothInvalidSchema() throws IOException { new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(schema2))); final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject())) - .withSyncMode(SyncMode.FULL_REFRESH))); + new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema2, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream(SALES, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND))); final Set actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog); @@ -197,8 +200,9 @@ void testCatalogDiffWithBreakingChanges() throws IOException { new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(breakingSchema))); final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema1)).withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(List.of(DATE)).withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP).withPrimaryKey(List.of(List.of("id"))))); + new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema1, List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP) + .withCursorField(List.of(DATE)).withPrimaryKey(List.of(List.of("id"))))); final Set diff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog); @@ -221,8 +225,9 @@ void testCatalogDiffWithoutStreamConfig() throws IOException { new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(breakingSchema))); final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(schema1)).withSyncMode(SyncMode.INCREMENTAL) - .withCursorField(List.of(DATE)).withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP).withPrimaryKey(List.of(List.of("id"))))); + new ConfiguredAirbyteStream(new AirbyteStream(SALES, schema1, List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP) + .withCursorField(List.of(DATE)).withPrimaryKey(List.of(List.of("id"))))); final Set diff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog); @@ -236,16 +241,17 @@ void testCatalogDiffStreamChangeWithNoTransforms() throws IOException { final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of( ProtocolConverters - .toProtocol(new AirbyteStream().withName(USERS).withJsonSchema(schema1).withSourceDefinedPrimaryKey(List.of(List.of("id")))), - ProtocolConverters.toProtocol(new AirbyteStream().withName(SALES).withJsonSchema(schema1)))); + .toProtocol(new AirbyteStream(USERS, schema1, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(List.of(List.of("id")))), + ProtocolConverters.toProtocol(new AirbyteStream(SALES, schema1, List.of(SyncMode.FULL_REFRESH))))); final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of( ProtocolConverters - .toProtocol(new AirbyteStream().withName(USERS).withJsonSchema(schema1).withSourceDefinedPrimaryKey(List.of(List.of("id")))))); + .toProtocol(new AirbyteStream(USERS, schema1, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(List.of(List.of("id")))))); final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema1)).withSyncMode(SyncMode.FULL_REFRESH), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(schema1)) - .withSyncMode(SyncMode.FULL_REFRESH))); + new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema1, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream(SALES, schema1, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND))); final Set actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog); @@ -302,16 +308,13 @@ void testCatalogDiffWithSourceDefinedPrimaryKeyChange(final DestinationSyncMode throws IOException { final JsonNode schema = Jsons.deserialize(readResource(VALID_SCHEMA_JSON)); - final AirbyteStream stream = new AirbyteStream().withName(USERS).withJsonSchema(schema).withSourceDefinedPrimaryKey(prevSourcePK); - final AirbyteStream refreshedStream = new AirbyteStream().withName(USERS).withJsonSchema(schema).withSourceDefinedPrimaryKey(newSourcePK); + final AirbyteStream stream = new AirbyteStream(USERS, schema, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(prevSourcePK); + final AirbyteStream refreshedStream = new AirbyteStream(USERS, schema, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(newSourcePK); final AirbyteCatalog initialCatalog = new AirbyteCatalog().withStreams(List.of(ProtocolConverters.toProtocol(stream))); final AirbyteCatalog refreshedCatalog = new AirbyteCatalog().withStreams(List.of(ProtocolConverters.toProtocol(refreshedStream))); - final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream() - .withStream(stream) - .withSyncMode(SyncMode.INCREMENTAL) - .withDestinationSyncMode(destSyncMode) + final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream(stream, SyncMode.INCREMENTAL, destSyncMode) .withPrimaryKey(configuredPK); final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream)); diff --git a/airbyte-commons-protocol/src/test/kotlin/io/airbyte/commons/protocol/DefaultProtocolSerializerTest.kt b/airbyte-commons-protocol/src/test/kotlin/io/airbyte/commons/protocol/DefaultProtocolSerializerTest.kt index 15bf3b9c4bd..263c1dfaf1f 100644 --- a/airbyte-commons-protocol/src/test/kotlin/io/airbyte/commons/protocol/DefaultProtocolSerializerTest.kt +++ b/airbyte-commons-protocol/src/test/kotlin/io/airbyte/commons/protocol/DefaultProtocolSerializerTest.kt @@ -5,6 +5,7 @@ import io.airbyte.config.AirbyteStream import io.airbyte.config.ConfiguredAirbyteCatalog import io.airbyte.config.ConfiguredAirbyteStream import io.airbyte.config.DestinationSyncMode +import io.airbyte.config.SyncMode import org.junit.jupiter.api.Assertions.assertEquals import org.junit.jupiter.api.Test import io.airbyte.protocol.models.AirbyteStream as ProtocolAirbyteStream @@ -30,22 +31,19 @@ class DefaultProtocolSerializerTest { serializer: ProtocolSerializer, supportRefreshes: Boolean, ) { + val appendStreamName = "append" + val overwriteStreamName = "overwrite" + val appendDedupStreamName = "append_dedup" + val overwriteDedupStreamName = "overwrite_dedup" + val configuredCatalog = ConfiguredAirbyteCatalog() .withStreams( listOf( - ConfiguredAirbyteStream() - .withStream(AirbyteStream().withName("append")) - .withDestinationSyncMode(DestinationSyncMode.APPEND), - ConfiguredAirbyteStream() - .withStream(AirbyteStream().withName("overwrite")) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE), - ConfiguredAirbyteStream() - .withStream(AirbyteStream().withName("append_dedup")) - .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP), - ConfiguredAirbyteStream() - .withStream(AirbyteStream().withName("overwrite_dedup")) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE_DEDUP), + ConfiguredAirbyteStream(getAirbyteStream(appendStreamName), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + ConfiguredAirbyteStream(getAirbyteStream(overwriteStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE), + ConfiguredAirbyteStream(getAirbyteStream(appendDedupStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND_DEDUP), + ConfiguredAirbyteStream(getAirbyteStream(overwriteDedupStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE_DEDUP), ), ) val frozenConfiguredCatalog = Jsons.clone(configuredCatalog) @@ -55,16 +53,36 @@ class DefaultProtocolSerializerTest { .withStreams( listOf( ProtocolConfiguredAirbyteStream() - .withStream(ProtocolAirbyteStream().withName("append")) + .withStream( + ProtocolAirbyteStream().withName( + appendStreamName, + ).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)), + ) + .withSyncMode(io.airbyte.protocol.models.SyncMode.INCREMENTAL) .withDestinationSyncMode(ProtocolDestinationSyncMode.APPEND), ProtocolConfiguredAirbyteStream() - .withStream(ProtocolAirbyteStream().withName("overwrite")) + .withStream( + ProtocolAirbyteStream().withName( + overwriteStreamName, + ).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)), + ) + .withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH) .withDestinationSyncMode(if (supportRefreshes) ProtocolDestinationSyncMode.APPEND else ProtocolDestinationSyncMode.OVERWRITE), ProtocolConfiguredAirbyteStream() - .withStream(ProtocolAirbyteStream().withName("append_dedup")) + .withStream( + ProtocolAirbyteStream().withName( + appendDedupStreamName, + ).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)), + ) + .withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH) .withDestinationSyncMode(ProtocolDestinationSyncMode.APPEND_DEDUP), ProtocolConfiguredAirbyteStream() - .withStream(ProtocolAirbyteStream().withName("overwrite_dedup")) + .withStream( + ProtocolAirbyteStream().withName( + overwriteDedupStreamName, + ).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)), + ) + .withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH) .withDestinationSyncMode(if (supportRefreshes) ProtocolDestinationSyncMode.APPEND_DEDUP else ProtocolDestinationSyncMode.OVERWRITE), ), ) @@ -78,5 +96,7 @@ class DefaultProtocolSerializerTest { // Verify we didn't mutate the input assertEquals(frozenConfiguredCatalog, configuredCatalog) } + + fun getAirbyteStream(name: String) = AirbyteStream(name, Jsons.emptyObject(), listOf(SyncMode.FULL_REFRESH)) } } diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java index 17af82a3679..0b16e07c1af 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/AttemptHandler.java @@ -214,7 +214,7 @@ Set getFullRefreshStreamsToClear(final ConfiguredAirbyteCatalo .filter(s -> { if (s.getSyncMode().equals(SyncMode.FULL_REFRESH)) { if (excludeResumableStreams) { - return s.getStream().getIsResumable() == null || !s.getStream().getIsResumable(); + return s.getStream().isResumable() == null || !s.getStream().isResumable(); } else { return true; } diff --git a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/helpers/CatalogConverter.java b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/helpers/CatalogConverter.java index 507eb6dd25e..62d50728845 100644 --- a/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/helpers/CatalogConverter.java +++ b/airbyte-commons-server/src/main/java/io/airbyte/commons/server/handlers/helpers/CatalogConverter.java @@ -210,13 +210,12 @@ public static ConfiguredAirbyteCatalog toConfiguredInternal(final io.airbyte.api .filter(s -> s.getConfig().getSelected()) .map(s -> { try { - return new ConfiguredAirbyteStream() - .withStream(ProtocolConverters.toInternal(toConfiguredProtocol(s.getStream(), s.getConfig()))) - .withSyncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.config.SyncMode.class)) - .withCursorField(s.getConfig().getCursorField()) - .withDestinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(), - io.airbyte.config.DestinationSyncMode.class)) - .withPrimaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList())); + return new ConfiguredAirbyteStream( + ProtocolConverters.toInternal(toConfiguredProtocol(s.getStream(), s.getConfig())), + Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.config.SyncMode.class), + Enums.convertTo(s.getConfig().getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class)) + .withCursorField(s.getConfig().getCursorField()) + .withPrimaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList())); } catch (final JsonValidationException e) { LOGGER.error("Error parsing catalog: {}", e); errors.add(e); diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/converters/JobConverterTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/converters/JobConverterTest.java index 16d5ed0c1b3..1c2fa441d14 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/converters/JobConverterTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/converters/JobConverterTest.java @@ -30,6 +30,7 @@ import io.airbyte.api.model.generated.StreamDescriptor; import io.airbyte.api.model.generated.SynchronousJobRead; import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.server.scheduler.SynchronousJobMetadata; import io.airbyte.commons.version.AirbyteVersion; import io.airbyte.config.AirbyteStream; @@ -38,6 +39,7 @@ import io.airbyte.config.Configs.WorkerEnvironment; import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.FailureReason.FailureType; @@ -55,6 +57,7 @@ import io.airbyte.config.StandardSyncOutput; import io.airbyte.config.StandardSyncSummary; import io.airbyte.config.StreamSyncStats; +import io.airbyte.config.SyncMode; import io.airbyte.config.SyncStats; import io.airbyte.config.helpers.LogConfigs; import io.airbyte.featureflag.TestClient; @@ -145,8 +148,10 @@ class TestJob { private static final JobConfig JOB_CONFIG = new JobConfig() .withConfigType(CONFIG_TYPE) .withSync(new JobSyncConfig().withConfiguredAirbyteCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS)), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(ACCOUNTS)))))); + new ConfiguredAirbyteStream(new AirbyteStream(USERS, Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream(ACCOUNTS, Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))))); private static final JobOutput JOB_OUTPUT = new JobOutput() .withOutputType(OutputType.SYNC) diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java index 0a6c7a9af8c..5014e2a5220 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/AttemptHandlerTest.java @@ -50,6 +50,7 @@ import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.Job; @@ -256,8 +257,10 @@ void createAttemptNumberForSync(final boolean enableRfr) final var mCatalog = mock(ConfiguredAirbyteCatalog.class); when(mSyncConfig.getConfiguredAirbyteCatalog()).thenReturn(mCatalog); when(mCatalog.getStreams()).thenReturn(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withIsResumable(true).withName("rfrStream")) - .withSyncMode(SyncMode.FULL_REFRESH))); + new ConfiguredAirbyteStream( + new AirbyteStream("rfrStream", Jsons.emptyObject(), List.of(io.airbyte.config.SyncMode.FULL_REFRESH)).withIsResumable(true), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND))); when(jobPersistence.getJob(JOB_ID)).thenReturn(mJob); when(path.resolve(Mockito.anyString())).thenReturn(path); @@ -309,8 +312,9 @@ void createAttemptNumberForClear() final var mCatalog = mock(ConfiguredAirbyteCatalog.class); when(mResetConfig.getConfiguredAirbyteCatalog()).thenReturn(mCatalog); when(mCatalog.getStreams()).thenReturn(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withIsResumable(true).withName("rfrStream")) - .withSyncMode(SyncMode.INCREMENTAL))); + new ConfiguredAirbyteStream(new AirbyteStream("rfrStream", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)).withIsResumable(true), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))); when(mResetConfig.getResetSourceConfiguration()).thenReturn(new ResetSourceConfiguration().withStreamsToReset( List.of(new StreamDescriptor().withName("rfrStream")))); @@ -357,10 +361,12 @@ void createAttemptNumberForRefresh(final int attemptNumber) final var mCatalog = mock(ConfiguredAirbyteCatalog.class); when(mRefreshConfig.getConfiguredAirbyteCatalog()).thenReturn(mCatalog); when(mCatalog.getStreams()).thenReturn(List.of( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withIsResumable(true).withName("rfrStream")) - .withSyncMode(SyncMode.FULL_REFRESH), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("nonRfrStream")) - .withSyncMode(SyncMode.FULL_REFRESH))); + new ConfiguredAirbyteStream( + new AirbyteStream("rfrStream", Jsons.emptyObject(), List.of(io.airbyte.config.SyncMode.FULL_REFRESH)).withIsResumable(true), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("nonRfrStream", Jsons.emptyObject(), List.of(io.airbyte.config.SyncMode.FULL_REFRESH)), + SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND))); when(jobPersistence.getJob(JOB_ID)).thenReturn(mJob); when(path.resolve(Mockito.anyString())).thenReturn(path); @@ -414,10 +420,17 @@ void getFullRefreshStreamsShouldOnlyReturnFullRefreshStreams(final boolean enabl when(mSyncConfig.getConfiguredAirbyteCatalog()).thenReturn(mCatalog); when(mCatalog.getStreams()).thenReturn(List.of( - new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("full").withIsResumable(true)), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL).withStream(new AirbyteStream().withName("incre")), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("full").withNamespace("name")), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL).withStream(new AirbyteStream().withName("incre").withNamespace("name")))); + new ConfiguredAirbyteStream( + new AirbyteStream("full", Jsons.emptyObject(), List.of(io.airbyte.config.SyncMode.FULL_REFRESH)).withIsResumable(true), + SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("incre", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream( + new AirbyteStream("full", Jsons.emptyObject(), List.of(io.airbyte.config.SyncMode.FULL_REFRESH)).withNamespace("name"), + SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("incre", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)).withNamespace("name"), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))); final var streams = handler.getFullRefreshStreamsToClear(mCatalog, 1, enableResumableFullRefresh); final var exp = enableResumableFullRefresh ? Set.of(new StreamDescriptor().withName("full").withNamespace("name")) @@ -468,10 +481,15 @@ void createAttemptShouldAlwaysDeleteFullRefreshStreamState(final int attemptNumb when(mDyncConfig.getConfiguredAirbyteCatalog()).thenReturn(mCatalog); when(mCatalog.getStreams()).thenReturn(List.of( - new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("full").withIsResumable(true)), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL).withStream(new AirbyteStream().withName("incre")), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("full").withNamespace("name")), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL).withStream(new AirbyteStream().withName("incre").withNamespace("name")))); + new ConfiguredAirbyteStream(new AirbyteStream("full", Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)).withIsResumable(true), + SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("incre", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("full", Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)).withNamespace("name"), + SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("incre", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)).withNamespace("name"), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))); when(jobPersistence.createAttempt(JOB_ID, expectedLogPath)).thenReturn(attemptNumber); diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/ConnectionsHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/ConnectionsHandlerTest.java index baab3dc64c1..4d98eba73bb 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/ConnectionsHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/ConnectionsHandlerTest.java @@ -1392,21 +1392,21 @@ void setup() throws IOException, JsonValidationException, ConfigNotFoundExceptio } private ConfiguredAirbyteStream buildConfiguredStream(final String name) { - return new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(name, Field.of(FIELD_NAME, JsonSchemaType.STRING)) + return new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(name, Field.of(FIELD_NAME, JsonSchemaType.STRING)) .withDefaultCursorField(List.of(FIELD_NAME)) .withSourceDefinedCursor(false) .withSupportedSyncModes( - List.of(io.airbyte.config.SyncMode.FULL_REFRESH, io.airbyte.config.SyncMode.INCREMENTAL))) - .withCursorField(List.of(FIELD_NAME)) - .withSyncMode(io.airbyte.config.SyncMode.INCREMENTAL) - .withDestinationSyncMode(io.airbyte.config.DestinationSyncMode.APPEND); + List.of(io.airbyte.config.SyncMode.FULL_REFRESH, io.airbyte.config.SyncMode.INCREMENTAL)), + io.airbyte.config.SyncMode.INCREMENTAL, + io.airbyte.config.DestinationSyncMode.APPEND) + .withCursorField(List.of(FIELD_NAME)); } private AirbyteStreamAndConfiguration buildStream(final String name) { return new AirbyteStreamAndConfiguration() - .stream(new AirbyteStream().name(name)) - .config(new AirbyteStreamConfiguration().selected(true)); + .stream(new AirbyteStream().name(name).jsonSchema(Jsons.emptyObject()).supportedSyncModes(List.of(SyncMode.INCREMENTAL))) + .config(new AirbyteStreamConfiguration().syncMode(SyncMode.INCREMENTAL).destinationSyncMode(DestinationSyncMode.APPEND).selected(true)); } @Test @@ -1747,7 +1747,10 @@ void testDeactivateStreamsWipeState() throws JsonValidationException, ConfigNotF catalog.setStreams(Stream.concat( stillActiveStreams.stream().map(this::buildStream), - deactivatedStreams.stream().map(this::buildStream).peek(s -> s.setConfig(new AirbyteStreamConfiguration().selected(false)))).toList()); + deactivatedStreams.stream().map(this::buildStream) + .peek(s -> s.setConfig( + new AirbyteStreamConfiguration().syncMode(SyncMode.INCREMENTAL).destinationSyncMode(DestinationSyncMode.APPEND).selected(false)))) + .toList()); final ConnectionUpdate request = new ConnectionUpdate() .connectionId(moreComplexCatalogSync.getConnectionId()) .syncCatalog(catalog); @@ -2543,8 +2546,9 @@ void testAutoPropagateSchemaChange() throws IOException, ConfigNotFoundException CatalogConverter.toApi(Jsons.clone(airbyteCatalog), SOURCE_VERSION); catalogWithDiff.addStreamsItem(new AirbyteStreamAndConfiguration() .stream(new AirbyteStream().name(A_DIFFERENT_STREAM).namespace(A_DIFFERENT_NAMESPACE).sourceDefinedCursor(false) - .supportedSyncModes(List.of(SyncMode.FULL_REFRESH))) - .config(new AirbyteStreamConfiguration().selected(true))); + .jsonSchema(Jsons.emptyObject()).supportedSyncModes(List.of(SyncMode.FULL_REFRESH))) + .config( + new AirbyteStreamConfiguration().syncMode(SyncMode.FULL_REFRESH).destinationSyncMode(DestinationSyncMode.OVERWRITE).selected(true))); final ConnectionAutoPropagateSchemaChange request = new ConnectionAutoPropagateSchemaChange() .connectionId(CONNECTION_ID) @@ -2561,13 +2565,14 @@ void testAutoPropagateSchemaChange() throws IOException, ConfigNotFoundException final ConfiguredAirbyteCatalog expectedCatalog = Jsons.clone(configuredAirbyteCatalog); expectedCatalog.getStreams().forEach(s -> s.getStream().withSourceDefinedCursor(false)); expectedCatalog.getStreams() - .add(new ConfiguredAirbyteStream().withStream(new io.airbyte.config.AirbyteStream().withName(A_DIFFERENT_STREAM) - .withNamespace(A_DIFFERENT_NAMESPACE).withSupportedSyncModes(List.of(io.airbyte.config.SyncMode.FULL_REFRESH)) - .withSourceDefinedCursor(false) - .withDefaultCursorField(List.of())) - .withDestinationSyncMode(io.airbyte.config.DestinationSyncMode.OVERWRITE) - .withSyncMode(io.airbyte.config.SyncMode.FULL_REFRESH) - .withCursorField(List.of())); + .add(new ConfiguredAirbyteStream( + new io.airbyte.config.AirbyteStream(A_DIFFERENT_STREAM, Jsons.emptyObject(), List.of(io.airbyte.config.SyncMode.FULL_REFRESH)) + .withNamespace(A_DIFFERENT_NAMESPACE) + .withSourceDefinedCursor(false) + .withDefaultCursorField(List.of()), + io.airbyte.config.SyncMode.FULL_REFRESH, + io.airbyte.config.DestinationSyncMode.OVERWRITE) + .withCursorField(List.of())); final ArgumentCaptor standardSyncArgumentCaptor = ArgumentCaptor.forClass(StandardSync.class); verify(configRepository).writeStandardSync(standardSyncArgumentCaptor.capture()); final StandardSync actualStandardSync = standardSyncArgumentCaptor.getValue(); diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java index 2be396ef592..ea03d69850b 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobHistoryHandlerTest.java @@ -45,6 +45,7 @@ import io.airbyte.api.model.generated.StreamStats; import io.airbyte.api.model.generated.StreamSyncProgressReadItem; import io.airbyte.commons.enums.Enums; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.server.converters.JobConverter; import io.airbyte.commons.server.helpers.ConnectionHelpers; import io.airbyte.commons.server.helpers.DestinationHelpers; @@ -58,6 +59,7 @@ import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.Job; import io.airbyte.config.JobCheckConnectionConfig; import io.airbyte.config.JobConfig; @@ -115,14 +117,11 @@ class JobHistoryHandlerTest { .withCheckConnection(new JobCheckConnectionConfig()) .withSync(new JobSyncConfig().withConfiguredAirbyteCatalog( new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.FULL_REFRESH) - .withStream(new AirbyteStream() - .withNamespace("ns1") - .withName("stream1")), - new ConfiguredAirbyteStream() - .withSyncMode(SyncMode.INCREMENTAL) - .withStream(new AirbyteStream().withName("stream2")))))); + new ConfiguredAirbyteStream(new AirbyteStream("stream1", Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)).withNamespace("ns1"), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("stream2", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))))); private static final Path LOG_PATH = Path.of("log_path"); private static final LogRead EMPTY_LOG_READ = new LogRead().logLines(new ArrayList<>()); private static final long CREATED_AT = System.currentTimeMillis() / 1000; diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobsHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobsHandlerTest.java index ef9eb3c8efe..d81f9c95af0 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobsHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/JobsHandlerTest.java @@ -23,6 +23,7 @@ import io.airbyte.api.model.generated.InternalOperationResult; import io.airbyte.api.model.generated.JobFailureRequest; import io.airbyte.api.model.generated.JobSuccessWithAttemptNumberRequest; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.server.JobStatus; import io.airbyte.commons.server.errors.BadRequestException; import io.airbyte.commons.server.handlers.helpers.ConnectionTimelineEventHelper; @@ -33,6 +34,7 @@ import io.airbyte.config.AttemptSyncConfig; import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.FailureReason; import io.airbyte.config.FailureReason.FailureOrigin; import io.airbyte.config.Job; @@ -85,7 +87,9 @@ public class JobsHandlerTest { private static final JobOutput jobOutput = new JobOutput().withSync(standardSyncOutput); private static final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog() - .withStreams(List.of(new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(new AirbyteStream().withName("stream")))); + .withStreams( + List.of(new ConfiguredAirbyteStream(new AirbyteStream("stream", Jsons.emptyObject(), List.of(io.airbyte.config.SyncMode.FULL_REFRESH)), + SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND))); private static final JobConfig simpleConfig = new JobConfig().withConfigType(SYNC).withSync(new JobSyncConfig().withConfiguredAirbyteCatalog(catalog)); diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java index 5b1a38c12da..6d040f4236e 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/SchedulerHandlerTest.java @@ -2012,8 +2012,10 @@ void testSchemaPropagatedEmptyDiff() throws IOException, JsonValidationException .addStreamsItem(new AirbyteStreamAndConfiguration().stream(new AirbyteStream().name("foo").namespace("ns"))); final io.airbyte.api.model.generated.AirbyteCatalog configuredCatalog = new io.airbyte.api.model.generated.AirbyteCatalog() .addStreamsItem(new AirbyteStreamAndConfiguration() - .stream(new AirbyteStream().name("foo").namespace("ns").supportedSyncModes(List.of(SyncMode.FULL_REFRESH))) - .config(new AirbyteStreamConfiguration().selected(true).syncMode(SyncMode.FULL_REFRESH))); + .stream( + new AirbyteStream().name("foo").namespace("ns").jsonSchema(Jsons.emptyObject()).supportedSyncModes(List.of(SyncMode.FULL_REFRESH))) + .config(new AirbyteStreamConfiguration().selected(true).syncMode(SyncMode.FULL_REFRESH).destinationSyncMode( + io.airbyte.api.model.generated.DestinationSyncMode.APPEND))); final io.airbyte.api.model.generated.AirbyteCatalog newCatalog = new io.airbyte.api.model.generated.AirbyteCatalog() .addStreamsItem(new AirbyteStreamAndConfiguration().stream(new AirbyteStream().name("foo").namespace("ns"))); diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelperTest.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelperTest.java index 5cd4ffc1fd2..ccc55941221 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelperTest.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/handlers/helpers/ConnectionTimelineEventHelperTest.java @@ -8,10 +8,12 @@ import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.server.support.CurrentUserService; import io.airbyte.config.AirbyteStream; import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.Job; import io.airbyte.config.JobConfig; import io.airbyte.config.JobStatus; @@ -53,9 +55,12 @@ void testGetLoadedStats() { final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog() .withStreams(List.of( - new ConfiguredAirbyteStream().withSyncMode(userStreamMode).withStream(new AirbyteStream().withName(userStreamName)), - new ConfiguredAirbyteStream().withSyncMode(purchaseStreamMode).withStream(new AirbyteStream().withName(purchaseStreamName)), - new ConfiguredAirbyteStream().withSyncMode(vendorStreamMode).withStream(new AirbyteStream().withName(vendorStreamName)))); + new ConfiguredAirbyteStream(new AirbyteStream(userStreamName, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), userStreamMode, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream(purchaseStreamName, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), + purchaseStreamMode, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream(vendorStreamName, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), vendorStreamMode, + DestinationSyncMode.APPEND))); final JobConfig jobConfig = new JobConfig().withConfigType(SYNC).withSync(new JobSyncConfig().withConfiguredAirbyteCatalog(catalog)); final Job job = diff --git a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/helpers/ConnectionHelpers.java b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/helpers/ConnectionHelpers.java index f28c8359862..2c2ecefe6e9 100644 --- a/airbyte-commons-server/src/test/java/io/airbyte/commons/server/helpers/ConnectionHelpers.java +++ b/airbyte-commons-server/src/test/java/io/airbyte/commons/server/helpers/ConnectionHelpers.java @@ -318,16 +318,14 @@ public static ConfiguredAirbyteCatalog generateBasicConfiguredAirbyteCatalog() { } public static ConfiguredAirbyteCatalog generateAirbyteCatalogWithTwoFields() { - return new ConfiguredAirbyteCatalog().withStreams(List.of(new ConfiguredAirbyteStream() - .withStream( - new io.airbyte.config.AirbyteStream() - .withName(STREAM_NAME) - .withJsonSchema(generateJsonSchemaWithTwoFields()) + return new ConfiguredAirbyteCatalog(List.of(new ConfiguredAirbyteStream( + new io.airbyte.config.AirbyteStream(STREAM_NAME, generateJsonSchemaWithTwoFields(), + List.of(io.airbyte.config.SyncMode.FULL_REFRESH, io.airbyte.config.SyncMode.INCREMENTAL)) .withDefaultCursorField(List.of(FIELD_NAME)) .withSourceDefinedCursor(false) - .withSourceDefinedPrimaryKey(List.of()) - .withSupportedSyncModes( - List.of(io.airbyte.config.SyncMode.FULL_REFRESH, io.airbyte.config.SyncMode.INCREMENTAL))))); + .withSourceDefinedPrimaryKey(List.of()), + io.airbyte.config.SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))); } public static ConfiguredAirbyteCatalog generateMultipleStreamsConfiguredAirbyteCatalog(final int streamsCount) { @@ -339,11 +337,8 @@ public static ConfiguredAirbyteCatalog generateMultipleStreamsConfiguredAirbyteC } public static ConfiguredAirbyteStream generateBasicConfiguredStream(final String nameSuffix) { - return new ConfiguredAirbyteStream() - .withStream(generateBasicAirbyteStream(nameSuffix)) - .withCursorField(List.of(FIELD_NAME)) - .withSyncMode(io.airbyte.config.SyncMode.INCREMENTAL) - .withDestinationSyncMode(DestinationSyncMode.APPEND); + return new ConfiguredAirbyteStream(generateBasicAirbyteStream(nameSuffix), io.airbyte.config.SyncMode.INCREMENTAL, DestinationSyncMode.APPEND) + .withCursorField(List.of(FIELD_NAME)); } private static io.airbyte.config.AirbyteStream generateBasicAirbyteStream(final String nameSuffix) { diff --git a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java index c6d42759f50..daaa11e5639 100644 --- a/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java +++ b/airbyte-commons-worker/src/main/java/io/airbyte/workers/test_utils/TestConfigHelpers.java @@ -10,6 +10,7 @@ import io.airbyte.config.ConfiguredAirbyteStream; import io.airbyte.config.ConnectionContext; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.SourceConnection; import io.airbyte.config.StandardSync; @@ -17,6 +18,7 @@ import io.airbyte.config.StandardSyncInput; import io.airbyte.config.StandardSyncOperation; import io.airbyte.config.State; +import io.airbyte.config.SyncMode; import io.airbyte.config.helpers.CatalogHelpers; import io.airbyte.persistence.job.models.IntegrationLauncherConfig; import io.airbyte.persistence.job.models.ReplicationInput; @@ -125,17 +127,23 @@ public static ImmutablePair createReplicationCon final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog(); if (multipleNamespaces) { - final ConfiguredAirbyteStream streamOne = new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME, "namespace", Field.of(FIELD_NAME, JsonSchemaType.STRING))); - final ConfiguredAirbyteStream streamTwo = new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME, "namespace2", Field.of(FIELD_NAME, JsonSchemaType.STRING))); + final ConfiguredAirbyteStream streamOne = new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM_NAME, "namespace", Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND); + final ConfiguredAirbyteStream streamTwo = new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM_NAME, "namespace2", Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND); final List streams = List.of(streamOne, streamTwo); catalog.withStreams(streams); } else { - final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING))); + final ConfiguredAirbyteStream stream = new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND); catalog.withStreams(Collections.singletonList(stream)); } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java index 1778a1310a1..e3daae2c26f 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/ReplicationInputHydratorTest.java @@ -93,7 +93,7 @@ class ReplicationInputHydratorTest { new AirbyteStreamAndConfiguration( new AirbyteStream( TEST_STREAM_NAME, - null, + Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL), null, null, diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java index dd87c6778fb..805e987ea25 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/general/ReplicationWorkerHelperTest.java @@ -287,7 +287,7 @@ private void mockSupportRefreshes(final boolean supportsRefreshes) throws IOExce } private ConfiguredAirbyteCatalog buildConfiguredAirbyteCatalog() { - return new ConfiguredAirbyteCatalog().withAdditionalProperty("test", "test"); + return new ConfiguredAirbyteCatalog(); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/BackfillHelperTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/BackfillHelperTest.java index db0bdb36ed0..ca15c059d4a 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/BackfillHelperTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/BackfillHelperTest.java @@ -17,6 +17,7 @@ import io.airbyte.config.AirbyteStream; import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.State; import io.airbyte.config.SyncMode; import io.airbyte.config.helpers.StateMessageHelper; @@ -35,16 +36,16 @@ class BackfillHelperTest { .withNamespace(STREAM_NAMESPACE); private static final StreamDescriptor ANOTHER_STREAM_DESCRIPTOR = new StreamDescriptor(ANOTHER_STREAM_NAME, ANOTHER_STREAM_NAMESPACE); - private static final ConfiguredAirbyteStream INCREMENTAL_STREAM = new ConfiguredAirbyteStream() - .withStream(new AirbyteStream() - .withName(STREAM_NAME) - .withNamespace(STREAM_NAMESPACE)) - .withSyncMode(SyncMode.INCREMENTAL); - private static final ConfiguredAirbyteStream FULL_REFRESH_STREAM = new ConfiguredAirbyteStream() - .withStream(new AirbyteStream() - .withName(ANOTHER_STREAM_NAME) - .withNamespace(ANOTHER_STREAM_NAMESPACE)) - .withSyncMode(SyncMode.FULL_REFRESH); + private static final ConfiguredAirbyteStream INCREMENTAL_STREAM = new ConfiguredAirbyteStream( + new AirbyteStream(STREAM_NAME, Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)) + .withNamespace(STREAM_NAMESPACE), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND); + private static final ConfiguredAirbyteStream FULL_REFRESH_STREAM = new ConfiguredAirbyteStream( + new AirbyteStream(ANOTHER_STREAM_NAME, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)) + .withNamespace(ANOTHER_STREAM_NAMESPACE), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND); private static final ConfiguredAirbyteCatalog INCREMENTAL_CATALOG = new ConfiguredAirbyteCatalog() .withStreams(List.of(INCREMENTAL_STREAM)); private static final CatalogDiff SINGLE_STREAM_ADD_COLUMN_DIFF = new CatalogDiff( diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/GsonPksExtractorTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/GsonPksExtractorTest.java index e8a391e12b1..ccc7b73b919 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/GsonPksExtractorTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/helper/GsonPksExtractorTest.java @@ -6,6 +6,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; +import io.airbyte.commons.json.Jsons; import io.airbyte.config.AirbyteStream; import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; @@ -123,12 +124,9 @@ private ConfiguredAirbyteCatalog getCatalogWithPk(final String streamName, final List> pksList) { return new ConfiguredAirbyteCatalog() .withStreams(List.of( - new ConfiguredAirbyteStream() - .withStream(new AirbyteStream() - .withName(streamName)) - .withPrimaryKey(pksList) - .withSyncMode(SyncMode.INCREMENTAL) - .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP))); + new ConfiguredAirbyteStream(new AirbyteStream(streamName, Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP) + .withPrimaryKey(pksList))); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/BasicAirbyteMessageValidatorTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/BasicAirbyteMessageValidatorTest.java index 336eb110f2b..95f126d70a3 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/BasicAirbyteMessageValidatorTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/BasicAirbyteMessageValidatorTest.java @@ -138,32 +138,23 @@ private ConfiguredAirbyteCatalog getCatalogWithPk(final String streamName, final List> pksList) { return new ConfiguredAirbyteCatalog() .withStreams(List.of( - new ConfiguredAirbyteStream() - .withStream(new AirbyteStream() - .withName(streamName)) - .withPrimaryKey(pksList) - .withSyncMode(SyncMode.INCREMENTAL) - .withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP))); + new ConfiguredAirbyteStream(new AirbyteStream(streamName, Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND_DEDUP) + .withPrimaryKey(pksList))); } private ConfiguredAirbyteCatalog getCatalogNonIncrementalDedup(final String streamName) { return new ConfiguredAirbyteCatalog() .withStreams(List.of( - new ConfiguredAirbyteStream() - .withStream(new AirbyteStream() - .withName(streamName)) - .withSyncMode(SyncMode.INCREMENTAL) - .withDestinationSyncMode(DestinationSyncMode.APPEND))); + new ConfiguredAirbyteStream(new AirbyteStream(streamName, Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))); } private ConfiguredAirbyteCatalog getCatalogNonIncremental(final String streamName) { return new ConfiguredAirbyteCatalog() .withStreams(List.of( - new ConfiguredAirbyteStream() - .withStream(new AirbyteStream() - .withName(streamName)) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.APPEND))); + new ConfiguredAirbyteStream(new AirbyteStream(streamName, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND))); } } diff --git a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java index 77454506a93..348f229d069 100644 --- a/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java +++ b/airbyte-commons-worker/src/test/java/io/airbyte/workers/internal/EmptyAirbyteSourceTest.java @@ -10,9 +10,11 @@ import io.airbyte.config.AirbyteStream; import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.ResetSourceConfiguration; import io.airbyte.config.State; import io.airbyte.config.StreamDescriptor; +import io.airbyte.config.SyncMode; import io.airbyte.config.WorkerSourceConfig; import io.airbyte.config.helpers.ProtocolConverters; import io.airbyte.protocol.models.AirbyteGlobalState; @@ -40,9 +42,9 @@ class EmptyAirbyteSourceTest { private static final ConfiguredAirbyteCatalog AIRBYTE_CATALOG = new ConfiguredAirbyteCatalog() .withStreams(Lists.newArrayList( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + new ConfiguredAirbyteStream(getAirbyteStream("a"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("b"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("c"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND))); @BeforeEach void init() { @@ -324,10 +326,10 @@ void testLegacyWithMissingCatalogStream() { .withStreamsToReset(streamsToReset); final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog() .withStreams(Lists.newArrayList( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("d")))); + new ConfiguredAirbyteStream(getAirbyteStream("a"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("b"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("c"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("d"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND))); final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) @@ -350,9 +352,9 @@ void testLegacyWithResettingExtraStreamNotInCatalog() throws Exception { .withStreamsToReset(streamsToResetWithExtra); final ConfiguredAirbyteCatalog airbyteCatalog = new ConfiguredAirbyteCatalog() .withStreams(Lists.newArrayList( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + new ConfiguredAirbyteStream(getAirbyteStream("a"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("b"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("c"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND))); final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) @@ -390,9 +392,9 @@ void testLegacyWithNewConfig() throws Exception { .withStreamsToReset(streamsToReset); final ConfiguredAirbyteCatalog airbyteCatalog = new ConfiguredAirbyteCatalog() .withStreams(Lists.newArrayList( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + new ConfiguredAirbyteStream(getAirbyteStream("a"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("b"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("c"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND))); final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) @@ -429,9 +431,9 @@ void testLegacyWithNullState() throws Exception { .withStreamsToReset(streamsToReset); final ConfiguredAirbyteCatalog airbyteCatalogWithExtraStream = new ConfiguredAirbyteCatalog() .withStreams(Lists.newArrayList( - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("a")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("b")), - new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName("c")))); + new ConfiguredAirbyteStream(getAirbyteStream("a"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("b"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(getAirbyteStream("c"), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND))); final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig() .withSourceConnectionConfiguration(Jsons.jsonNode(resetSourceConfiguration)) @@ -538,11 +540,10 @@ void emptyLegacyStateShouldNotEmitState() throws Exception { new ResetSourceConfiguration().withStreamsToReset(Collections.singletonList(streamDescriptor)); final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog() .withStreams( - Collections.singletonList(new ConfiguredAirbyteStream() - .withStream( - new AirbyteStream() - .withName("test") - .withNamespace("schema")))); + Collections.singletonList(new ConfiguredAirbyteStream( + getAirbyteStream("test").withNamespace("schema"), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND))); final WorkerSourceConfig workerSourceConfig = new WorkerSourceConfig().withSourceId(UUID.randomUUID()) .withState(new State().withState(Jsons.emptyObject())) .withCatalog(configuredAirbyteCatalog) @@ -556,4 +557,8 @@ void emptyLegacyStateShouldNotEmitState() throws Exception { Assertions.assertThat(emptyAirbyteSource.isFinished()).isTrue(); } + private static AirbyteStream getAirbyteStream(final String name) { + return new AirbyteStream(name, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)); + } + } diff --git a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/ResumableFullRefreshStatsHelperTest.kt b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/ResumableFullRefreshStatsHelperTest.kt index dcfb74cfc65..515f7797a18 100644 --- a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/ResumableFullRefreshStatsHelperTest.kt +++ b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/ResumableFullRefreshStatsHelperTest.kt @@ -4,6 +4,7 @@ import io.airbyte.commons.json.Jsons import io.airbyte.config.AirbyteStream import io.airbyte.config.ConfiguredAirbyteCatalog import io.airbyte.config.ConfiguredAirbyteStream +import io.airbyte.config.DestinationSyncMode import io.airbyte.config.StandardSyncOutput import io.airbyte.config.StandardSyncSummary import io.airbyte.config.State @@ -85,8 +86,16 @@ class ResumableFullRefreshStatsHelperTest { val catalog = ConfiguredAirbyteCatalog().withStreams( listOf( - ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(AirbyteStream().withNamespace(null).withName("s0")), - ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL).withStream(AirbyteStream().withNamespace("ns").withName("s1")), + ConfiguredAirbyteStream( + AirbyteStream(name = "s0", jsonSchema = Jsons.emptyObject(), supportedSyncModes = listOf(SyncMode.FULL_REFRESH)), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND, + ), + ConfiguredAirbyteStream( + AirbyteStream(name = "s0", jsonSchema = Jsons.emptyObject(), supportedSyncModes = listOf(SyncMode.INCREMENTAL)), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + ), ), ) @@ -103,8 +112,16 @@ class ResumableFullRefreshStatsHelperTest { val catalog = ConfiguredAirbyteCatalog().withStreams( listOf( - ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream(AirbyteStream().withNamespace(null).withName("s0")), - ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL).withStream(AirbyteStream().withNamespace("ns").withName("s1")), + ConfiguredAirbyteStream( + AirbyteStream(name = "s0", jsonSchema = Jsons.emptyObject(), supportedSyncModes = listOf(SyncMode.FULL_REFRESH)), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND, + ), + ConfiguredAirbyteStream( + AirbyteStream(name = "s1", namespace = "ns", jsonSchema = Jsons.emptyObject(), supportedSyncModes = listOf(SyncMode.FULL_REFRESH)), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + ), ), ) diff --git a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/StreamStatusCompletionTrackerTest.kt b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/StreamStatusCompletionTrackerTest.kt index af348eb329c..8c95516a0bf 100644 --- a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/StreamStatusCompletionTrackerTest.kt +++ b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/helper/StreamStatusCompletionTrackerTest.kt @@ -1,9 +1,12 @@ package io.airbyte.workers.helper +import io.airbyte.commons.json.Jsons import io.airbyte.config.AirbyteStream import io.airbyte.config.ConfiguredAirbyteCatalog import io.airbyte.config.ConfiguredAirbyteStream +import io.airbyte.config.DestinationSyncMode import io.airbyte.config.StreamDescriptor +import io.airbyte.config.SyncMode import io.airbyte.protocol.models.AirbyteMessage import io.airbyte.protocol.models.AirbyteStreamStatusTraceMessage import io.airbyte.protocol.models.AirbyteTraceMessage @@ -27,8 +30,21 @@ internal class StreamStatusCompletionTrackerTest { ConfiguredAirbyteCatalog() .withStreams( listOf( - ConfiguredAirbyteStream().withStream(AirbyteStream().withName("name1")), - ConfiguredAirbyteStream().withStream(AirbyteStream().withName("name2").withNamespace("namespace2")), + ConfiguredAirbyteStream( + AirbyteStream(name = "name1", jsonSchema = Jsons.emptyObject(), supportedSyncModes = listOf(SyncMode.INCREMENTAL)), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + ), + ConfiguredAirbyteStream( + AirbyteStream( + name = "name2", + namespace = "namespace2", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.INCREMENTAL), + ), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND, + ), ), ) diff --git a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/internal/FieldSelectorTest.kt b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/internal/FieldSelectorTest.kt index 097d916f381..c8eac0cd221 100644 --- a/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/internal/FieldSelectorTest.kt +++ b/airbyte-commons-worker/src/test/kotlin/io/airbyte/workers/internal/FieldSelectorTest.kt @@ -8,6 +8,8 @@ import io.airbyte.commons.json.Jsons import io.airbyte.config.AirbyteStream import io.airbyte.config.ConfiguredAirbyteCatalog import io.airbyte.config.ConfiguredAirbyteStream +import io.airbyte.config.DestinationSyncMode +import io.airbyte.config.SyncMode import io.airbyte.protocol.models.AirbyteMessage import io.airbyte.protocol.models.AirbyteRecordMessage import io.airbyte.workers.RecordSchemaValidator @@ -81,10 +83,11 @@ internal class FieldSelectorTest { ConfiguredAirbyteCatalog() .withStreams( listOf( - ConfiguredAirbyteStream() - .withStream( - AirbyteStream().withName(STREAM_NAME).withJsonSchema(Jsons.deserialize(SCHEMA)), - ), + ConfiguredAirbyteStream( + stream = AirbyteStream(STREAM_NAME, Jsons.deserialize(SCHEMA), listOf(SyncMode.INCREMENTAL)), + syncMode = SyncMode.INCREMENTAL, + destinationSyncMode = DestinationSyncMode.APPEND, + ), ), ) @@ -103,10 +106,16 @@ internal class FieldSelectorTest { ConfiguredAirbyteCatalog() .withStreams( listOf( - ConfiguredAirbyteStream() - .withStream( - AirbyteStream().withName(STREAM_NAME).withJsonSchema(Jsons.deserialize(SCHEMA_WITH_ESCAPE)), - ), + ConfiguredAirbyteStream( + stream = + AirbyteStream( + name = STREAM_NAME, + jsonSchema = Jsons.deserialize(SCHEMA_WITH_ESCAPE), + supportedSyncModes = listOf(SyncMode.INCREMENTAL), + ), + syncMode = SyncMode.INCREMENTAL, + destinationSyncMode = DestinationSyncMode.APPEND, + ), ), ) diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/AirbyteStream.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/AirbyteStream.java deleted file mode 100644 index f9d88559ec9..00000000000 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/AirbyteStream.java +++ /dev/null @@ -1,429 +0,0 @@ -/* - * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.config; - -import com.fasterxml.jackson.annotation.JsonAnyGetter; -import com.fasterxml.jackson.annotation.JsonAnySetter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyDescription; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import com.fasterxml.jackson.databind.JsonNode; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * AirbyteStream. - *

- * - * - */ -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonPropertyOrder({ - "name", - "json_schema", - "supported_sync_modes", - "source_defined_cursor", - "default_cursor_field", - "source_defined_primary_key", - "namespace", - "is_resumable" -}) -@SuppressWarnings({ - "PMD.AvoidDuplicateLiterals", - "PMD.AvoidLiteralsInIfCondition", - "PMD.ImmutableField", - "PMD.UseEqualsToCompareStrings" -}) -public class AirbyteStream implements Serializable { - - /** - * Stream's name. (Required) - * - */ - @JsonProperty("name") - @JsonPropertyDescription("Stream's name.") - private String name; - /** - * Stream schema using Json Schema specs. (Required) - * - */ - @JsonProperty("json_schema") - @JsonPropertyDescription("Stream schema using Json Schema specs.") - private JsonNode jsonSchema; - /** - * List of sync modes supported by this stream. (Required) - * - */ - @JsonProperty("supported_sync_modes") - @JsonPropertyDescription("List of sync modes supported by this stream.") - private List supportedSyncModes = new ArrayList(); - /** - * If the source defines the cursor field, then any other cursor field inputs will be ignored. If it - * does not, either the user_provided one is used, or the default one is used as a backup. This - * field must be set if is_resumable is set to true, including resumable full refresh synthetic - * cursors. - * - */ - @JsonProperty("source_defined_cursor") - @JsonPropertyDescription(""" - If the source defines the cursor field, then any other cursor field inputs will be ignored. If it does not, - either the user_provided one is used, or the default one is used as a backup. This field must be set if - is_resumable is set to true, including resumable full refresh synthetic cursors.""") - private Boolean sourceDefinedCursor; - /** - * Path to the field that will be used to determine if a record is new or modified since the last - * sync. If not provided by the source, the end user will have to specify the comparable themselves. - * - */ - @JsonProperty("default_cursor_field") - @JsonPropertyDescription(""" - Path to the field that will be used to determine if a record is new or modified since the last sync. - If not provided by the source, the end user will have to specify the comparable themselves.""") - private List defaultCursorField = new ArrayList(); - /** - * If the source defines the primary key, paths to the fields that will be used as a primary key. If - * not provided by the source, the end user will have to specify the primary key themselves. - * - */ - @JsonProperty("source_defined_primary_key") - @JsonPropertyDescription(""" - If the source defines the primary key, paths to the fields that will be used as a primary key. - If not provided by the source, the end user will have to specify the primary key themselves.""") - private List> sourceDefinedPrimaryKey = new ArrayList>(); - /** - * Optional Source-defined namespace. Currently only used by JDBC destinations to determine what - * schema to write to. Airbyte streams from the same sources should have the same namespace. - * - */ - @JsonProperty("namespace") - @JsonPropertyDescription(""" - Optional Source-defined namespace. Currently only used by JDBC destinations to determine what schema to write to. - Airbyte streams from the same sources should have the same namespace.""") - private String namespace; - /** - * If the stream is resumable or not. Should be set to true if the stream supports incremental. - * Defaults to false. Primarily used by the Platform in Full Refresh to determine if a Full Refresh - * stream should actually be treated as incremental within a job. - * - */ - @JsonProperty("is_resumable") - @JsonPropertyDescription(""" - If the stream is resumable or not. Should be set to true if the stream supports incremental. Defaults to false. - Primarily used by the Platform in Full Refresh to determine if a Full Refresh stream should actually be treated as - incremental within a job.""") - private Boolean isResumable; - @JsonIgnore - private Map additionalProperties = new HashMap(); - private static final long serialVersionUID = 602929458758090299L; - - /** - * Stream's name. (Required) - * - */ - @JsonProperty("name") - public String getName() { - return name; - } - - /** - * Stream's name. (Required) - * - */ - @JsonProperty("name") - public void setName(String name) { - this.name = name; - } - - public AirbyteStream withName(String name) { - this.name = name; - return this; - } - - /** - * Stream schema using Json Schema specs. (Required) - * - */ - @JsonProperty("json_schema") - public JsonNode getJsonSchema() { - return jsonSchema; - } - - /** - * Stream schema using Json Schema specs. (Required) - * - */ - @JsonProperty("json_schema") - public void setJsonSchema(JsonNode jsonSchema) { - this.jsonSchema = jsonSchema; - } - - public AirbyteStream withJsonSchema(JsonNode jsonSchema) { - this.jsonSchema = jsonSchema; - return this; - } - - /** - * List of sync modes supported by this stream. (Required) - * - */ - @JsonProperty("supported_sync_modes") - public List getSupportedSyncModes() { - return supportedSyncModes; - } - - /** - * List of sync modes supported by this stream. (Required) - * - */ - @JsonProperty("supported_sync_modes") - public void setSupportedSyncModes(List supportedSyncModes) { - this.supportedSyncModes = supportedSyncModes; - } - - public AirbyteStream withSupportedSyncModes(List supportedSyncModes) { - this.supportedSyncModes = supportedSyncModes; - return this; - } - - /** - * If the source defines the cursor field, then any other cursor field inputs will be ignored. If it - * does not, either the user_provided one is used, or the default one is used as a backup. This - * field must be set if is_resumable is set to true, including resumable full refresh synthetic - * cursors. - * - */ - @JsonProperty("source_defined_cursor") - public Boolean getSourceDefinedCursor() { - return sourceDefinedCursor; - } - - /** - * If the source defines the cursor field, then any other cursor field inputs will be ignored. If it - * does not, either the user_provided one is used, or the default one is used as a backup. This - * field must be set if is_resumable is set to true, including resumable full refresh synthetic - * cursors. - * - */ - @JsonProperty("source_defined_cursor") - public void setSourceDefinedCursor(Boolean sourceDefinedCursor) { - this.sourceDefinedCursor = sourceDefinedCursor; - } - - public AirbyteStream withSourceDefinedCursor(Boolean sourceDefinedCursor) { - this.sourceDefinedCursor = sourceDefinedCursor; - return this; - } - - /** - * Path to the field that will be used to determine if a record is new or modified since the last - * sync. If not provided by the source, the end user will have to specify the comparable themselves. - * - */ - @JsonProperty("default_cursor_field") - public List getDefaultCursorField() { - return defaultCursorField; - } - - /** - * Path to the field that will be used to determine if a record is new or modified since the last - * sync. If not provided by the source, the end user will have to specify the comparable themselves. - * - */ - @JsonProperty("default_cursor_field") - public void setDefaultCursorField(List defaultCursorField) { - this.defaultCursorField = defaultCursorField; - } - - public AirbyteStream withDefaultCursorField(List defaultCursorField) { - this.defaultCursorField = defaultCursorField; - return this; - } - - /** - * If the source defines the primary key, paths to the fields that will be used as a primary key. If - * not provided by the source, the end user will have to specify the primary key themselves. - * - */ - @JsonProperty("source_defined_primary_key") - public List> getSourceDefinedPrimaryKey() { - return sourceDefinedPrimaryKey; - } - - /** - * If the source defines the primary key, paths to the fields that will be used as a primary key. If - * not provided by the source, the end user will have to specify the primary key themselves. - * - */ - @JsonProperty("source_defined_primary_key") - public void setSourceDefinedPrimaryKey(List> sourceDefinedPrimaryKey) { - this.sourceDefinedPrimaryKey = sourceDefinedPrimaryKey; - } - - public AirbyteStream withSourceDefinedPrimaryKey(List> sourceDefinedPrimaryKey) { - this.sourceDefinedPrimaryKey = sourceDefinedPrimaryKey; - return this; - } - - /** - * Optional Source-defined namespace. Currently only used by JDBC destinations to determine what - * schema to write to. Airbyte streams from the same sources should have the same namespace. - * - */ - @JsonProperty("namespace") - public String getNamespace() { - return namespace; - } - - /** - * Optional Source-defined namespace. Currently only used by JDBC destinations to determine what - * schema to write to. Airbyte streams from the same sources should have the same namespace. - * - */ - @JsonProperty("namespace") - public void setNamespace(String namespace) { - this.namespace = namespace; - } - - public AirbyteStream withNamespace(String namespace) { - this.namespace = namespace; - return this; - } - - /** - * If the stream is resumable or not. Should be set to true if the stream supports incremental. - * Defaults to false. Primarily used by the Platform in Full Refresh to determine if a Full Refresh - * stream should actually be treated as incremental within a job. - * - */ - @JsonProperty("is_resumable") - public Boolean getIsResumable() { - return isResumable; - } - - /** - * If the stream is resumable or not. Should be set to true if the stream supports incremental. - * Defaults to false. Primarily used by the Platform in Full Refresh to determine if a Full Refresh - * stream should actually be treated as incremental within a job. - * - */ - @JsonProperty("is_resumable") - public void setIsResumable(Boolean isResumable) { - this.isResumable = isResumable; - } - - public AirbyteStream withIsResumable(Boolean isResumable) { - this.isResumable = isResumable; - return this; - } - - @JsonAnyGetter - public Map getAdditionalProperties() { - return this.additionalProperties; - } - - @JsonAnySetter - public void setAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - } - - public AirbyteStream withAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - return this; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(AirbyteStream.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('['); - sb.append("name"); - sb.append('='); - sb.append(((this.name == null) ? "" : this.name)); - sb.append(','); - sb.append("jsonSchema"); - sb.append('='); - sb.append(((this.jsonSchema == null) ? "" : this.jsonSchema)); - sb.append(','); - sb.append("supportedSyncModes"); - sb.append('='); - sb.append(((this.supportedSyncModes == null) ? "" : this.supportedSyncModes)); - sb.append(','); - sb.append("sourceDefinedCursor"); - sb.append('='); - sb.append(((this.sourceDefinedCursor == null) ? "" : this.sourceDefinedCursor)); - sb.append(','); - sb.append("defaultCursorField"); - sb.append('='); - sb.append(((this.defaultCursorField == null) ? "" : this.defaultCursorField)); - sb.append(','); - sb.append("sourceDefinedPrimaryKey"); - sb.append('='); - sb.append(((this.sourceDefinedPrimaryKey == null) ? "" : this.sourceDefinedPrimaryKey)); - sb.append(','); - sb.append("namespace"); - sb.append('='); - sb.append(((this.namespace == null) ? "" : this.namespace)); - sb.append(','); - sb.append("isResumable"); - sb.append('='); - sb.append(((this.isResumable == null) ? "" : this.isResumable)); - sb.append(','); - sb.append("additionalProperties"); - sb.append('='); - sb.append(((this.additionalProperties == null) ? "" : this.additionalProperties)); - sb.append(','); - if (sb.charAt((sb.length() - 1)) == ',') { - sb.setCharAt((sb.length() - 1), ']'); - } else { - sb.append(']'); - } - return sb.toString(); - } - - @Override - public int hashCode() { - int result = 1; - result = ((result * 31) + ((this.sourceDefinedPrimaryKey == null) ? 0 : this.sourceDefinedPrimaryKey.hashCode())); - result = ((result * 31) + ((this.supportedSyncModes == null) ? 0 : this.supportedSyncModes.hashCode())); - result = ((result * 31) + ((this.sourceDefinedCursor == null) ? 0 : this.sourceDefinedCursor.hashCode())); - result = ((result * 31) + ((this.jsonSchema == null) ? 0 : this.jsonSchema.hashCode())); - result = ((result * 31) + ((this.isResumable == null) ? 0 : this.isResumable.hashCode())); - result = ((result * 31) + ((this.name == null) ? 0 : this.name.hashCode())); - result = ((result * 31) + ((this.namespace == null) ? 0 : this.namespace.hashCode())); - result = ((result * 31) + ((this.defaultCursorField == null) ? 0 : this.defaultCursorField.hashCode())); - result = ((result * 31) + ((this.additionalProperties == null) ? 0 : this.additionalProperties.hashCode())); - return result; - } - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } - if ((other instanceof AirbyteStream) == false) { - return false; - } - AirbyteStream rhs = ((AirbyteStream) other); - return ((((((((((this.sourceDefinedPrimaryKey == rhs.sourceDefinedPrimaryKey) - || ((this.sourceDefinedPrimaryKey != null) && this.sourceDefinedPrimaryKey.equals(rhs.sourceDefinedPrimaryKey))) - && ((this.supportedSyncModes == rhs.supportedSyncModes) - || ((this.supportedSyncModes != null) && this.supportedSyncModes.equals(rhs.supportedSyncModes)))) - && ((this.sourceDefinedCursor == rhs.sourceDefinedCursor) - || ((this.sourceDefinedCursor != null) && this.sourceDefinedCursor.equals(rhs.sourceDefinedCursor)))) - && ((this.jsonSchema == rhs.jsonSchema) || ((this.jsonSchema != null) && this.jsonSchema.equals(rhs.jsonSchema)))) - && ((this.isResumable == rhs.isResumable) || ((this.isResumable != null) && this.isResumable.equals(rhs.isResumable)))) - && ((this.name == rhs.name) || ((this.name != null) && this.name.equals(rhs.name)))) - && ((this.namespace == rhs.namespace) || ((this.namespace != null) && this.namespace.equals(rhs.namespace)))) - && ((this.defaultCursorField == rhs.defaultCursorField) - || ((this.defaultCursorField != null) && this.defaultCursorField.equals(rhs.defaultCursorField)))) - && ((this.additionalProperties == rhs.additionalProperties) - || ((this.additionalProperties != null) && this.additionalProperties.equals(rhs.additionalProperties)))); - } - -} diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/ConfiguredAirbyteCatalog.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/ConfiguredAirbyteCatalog.java deleted file mode 100644 index 0b0579b301a..00000000000 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/ConfiguredAirbyteCatalog.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.config; - -import com.fasterxml.jackson.annotation.JsonAnyGetter; -import com.fasterxml.jackson.annotation.JsonAnySetter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * ConfiguredAirbyteCatalog. - *

- * Airbyte stream schema catalog - * - */ -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonPropertyOrder({ - "streams" -}) -@SuppressWarnings({ - "PMD.AvoidDuplicateLiterals", - "PMD.AvoidLiteralsInIfCondition", - "PMD.ImmutableField", - "PMD.UseEqualsToCompareStrings" -}) -public class ConfiguredAirbyteCatalog implements Serializable { - - @JsonProperty("streams") - private List streams = new ArrayList(); - @JsonIgnore - private Map additionalProperties = new HashMap(); - private static final long serialVersionUID = 3093736788188579672L; - - @JsonProperty("streams") - public List getStreams() { - return streams; - } - - @JsonProperty("streams") - public void setStreams(List streams) { - this.streams = streams; - } - - public ConfiguredAirbyteCatalog withStreams(List streams) { - this.streams = streams; - return this; - } - - @JsonAnyGetter - public Map getAdditionalProperties() { - return this.additionalProperties; - } - - @JsonAnySetter - public void setAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - } - - public ConfiguredAirbyteCatalog withAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - return this; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(ConfiguredAirbyteCatalog.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('['); - sb.append("streams"); - sb.append('='); - sb.append(((this.streams == null) ? "" : this.streams)); - sb.append(','); - sb.append("additionalProperties"); - sb.append('='); - sb.append(((this.additionalProperties == null) ? "" : this.additionalProperties)); - sb.append(','); - if (sb.charAt((sb.length() - 1)) == ',') { - sb.setCharAt((sb.length() - 1), ']'); - } else { - sb.append(']'); - } - return sb.toString(); - } - - @Override - public int hashCode() { - int result = 1; - result = ((result * 31) + ((this.streams == null) ? 0 : this.streams.hashCode())); - result = ((result * 31) + ((this.additionalProperties == null) ? 0 : this.additionalProperties.hashCode())); - return result; - } - - @Override - public boolean equals(Object other) { - if (other == this) { - return true; - } - if ((other instanceof ConfiguredAirbyteCatalog) == false) { - return false; - } - ConfiguredAirbyteCatalog rhs = ((ConfiguredAirbyteCatalog) other); - return (((this.streams == rhs.streams) || ((this.streams != null) && this.streams.equals(rhs.streams))) - && ((this.additionalProperties == rhs.additionalProperties) - || ((this.additionalProperties != null) && this.additionalProperties.equals(rhs.additionalProperties)))); - } - -} diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/ConfiguredAirbyteStream.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/ConfiguredAirbyteStream.java deleted file mode 100644 index 280bd1b9582..00000000000 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/ConfiguredAirbyteStream.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. - */ - -package io.airbyte.config; - -import com.fasterxml.jackson.annotation.JsonAnyGetter; -import com.fasterxml.jackson.annotation.JsonAnySetter; -import com.fasterxml.jackson.annotation.JsonIgnore; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonPropertyDescription; -import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * ConfiguredAirbyteStream. - *

- * - * - */ -@JsonInclude(JsonInclude.Include.NON_NULL) -@JsonPropertyOrder({ - "stream", - "sync_mode", - "cursor_field", - "destination_sync_mode", - "primary_key", - "generation_id", - "minimum_generation_id", - "sync_id" -}) -@SuppressWarnings({ - "PMD.AvoidDuplicateLiterals", - "PMD.AvoidLiteralsInIfCondition", - "PMD.ImmutableField", - "PMD.UseEqualsToCompareStrings" -}) -public class ConfiguredAirbyteStream implements Serializable { - - @JsonProperty("stream") - private AirbyteStream stream; - @JsonProperty("sync_mode") - private SyncMode syncMode = null; - /** - * Path to the field that will be used to determine if a record is new or modified since the last - * sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored. - * - */ - @JsonProperty("cursor_field") - @JsonPropertyDescription(""" - Path to the field that will be used to determine if a record is new or modified since the last sync. This field is REQUIRED - if `sync_mode` is `incremental`. Otherwise it is ignored.""") - private List cursorField = new ArrayList(); - @JsonProperty("destination_sync_mode") - private DestinationSyncMode destinationSyncMode = null; - /** - * Paths to the fields that will be used as primary key. This field is REQUIRED if - * `destination_sync_mode` is `*_dedup`. Otherwise it is ignored. - * - */ - @JsonProperty("primary_key") - @JsonPropertyDescription(""" - Paths to the fields that will be used as primary key. This field is REQUIRED if `destination_sync_mode` is `*_dedup`. - Otherwise it is ignored.""") - private List> primaryKey = new ArrayList>(); - /** - * Monotically increasing numeric id representing the current generation of a stream. This id can be - * shared across syncs. If this is null, it means that the platform is not supporting the refresh - * and it is expected that no extra id will be added to the records and no data from previous - * generation will be cleanup. - * - */ - @JsonProperty("generation_id") - @JsonPropertyDescription(""" - Monotically increasing numeric id representing the current generation of a stream. This id can be shared across syncs. - If this is null, it means that the platform is not supporting the refresh and it is expected that no extra id will be added - to the records and no data from previous generation will be cleanup.\s""") - private Long generationId; - /** - * The minimum generation id which is needed in a stream. If it is present, the destination will try - * to delete the data that are part of a generation lower than this property. If the minimum - * generation is equals to 0, no data deletion is expected from the destiantion If this is null, it - * means that the platform is not supporting the refresh and it is expected that no extra id will be - * added to the records and no data from previous generation will be cleanup. - * - */ - @JsonProperty("minimum_generation_id") - @JsonPropertyDescription(""" - The minimum generation id which is needed in a stream. If it is present, the destination will try to delete the data that - are part of a generation lower than this property. If the minimum generation is equals to 0, no data deletion is expected - from the destiantion. If this is null, it means that the platform is not supporting the refresh and it is expected that no - extra id will be added to the records and no data from previous generation will be cleanup.\s""") - private Long minimumGenerationId; - /** - * Monotically increasing numeric id representing the current sync id. This is aimed to be unique - * per sync. If this is null, it means that the platform is not supporting the refresh and it is - * expected that no extra id will be added to the records and no data from previous generation will - * be cleanup. - * - */ - @JsonProperty("sync_id") - @JsonPropertyDescription(""" - Monotically increasing numeric id representing the current sync id. This is aimed to be unique per sync. - If this is null, it means that the platform is not supporting the refresh and it is expected that no extra id will be added - to the records and no data from previous generation will be cleanup.\s""") - private Long syncId; - @JsonIgnore - private Map additionalProperties = new HashMap(); - private static final long serialVersionUID = 3961017355969088418L; - - @JsonProperty("stream") - public AirbyteStream getStream() { - return stream; - } - - @JsonProperty("stream") - public void setStream(AirbyteStream stream) { - this.stream = stream; - } - - public ConfiguredAirbyteStream withStream(AirbyteStream stream) { - this.stream = stream; - return this; - } - - @JsonProperty("sync_mode") - public SyncMode getSyncMode() { - return syncMode; - } - - @JsonProperty("sync_mode") - public void setSyncMode(SyncMode syncMode) { - this.syncMode = syncMode; - } - - public ConfiguredAirbyteStream withSyncMode(SyncMode syncMode) { - this.syncMode = syncMode; - return this; - } - - /** - * Path to the field that will be used to determine if a record is new or modified since the last - * sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored. - * - */ - @JsonProperty("cursor_field") - public List getCursorField() { - return cursorField; - } - - /** - * Path to the field that will be used to determine if a record is new or modified since the last - * sync. This field is REQUIRED if `sync_mode` is `incremental`. Otherwise it is ignored. - * - */ - @JsonProperty("cursor_field") - public void setCursorField(List cursorField) { - this.cursorField = cursorField; - } - - public ConfiguredAirbyteStream withCursorField(List cursorField) { - this.cursorField = cursorField; - return this; - } - - @JsonProperty("destination_sync_mode") - public DestinationSyncMode getDestinationSyncMode() { - return destinationSyncMode; - } - - @JsonProperty("destination_sync_mode") - public void setDestinationSyncMode(DestinationSyncMode destinationSyncMode) { - this.destinationSyncMode = destinationSyncMode; - } - - public ConfiguredAirbyteStream withDestinationSyncMode(DestinationSyncMode destinationSyncMode) { - this.destinationSyncMode = destinationSyncMode; - return this; - } - - /** - * Paths to the fields that will be used as primary key. This field is REQUIRED if - * `destination_sync_mode` is `*_dedup`. Otherwise it is ignored. - * - */ - @JsonProperty("primary_key") - public List> getPrimaryKey() { - return primaryKey; - } - - /** - * Paths to the fields that will be used as primary key. This field is REQUIRED if - * `destination_sync_mode` is `*_dedup`. Otherwise it is ignored. - * - */ - @JsonProperty("primary_key") - public void setPrimaryKey(List> primaryKey) { - this.primaryKey = primaryKey; - } - - public ConfiguredAirbyteStream withPrimaryKey(List> primaryKey) { - this.primaryKey = primaryKey; - return this; - } - - /** - * Monotically increasing numeric id representing the current generation of a stream. This id can be - * shared across syncs. If this is null, it means that the platform is not supporting the refresh - * and it is expected that no extra id will be added to the records and no data from previous - * generation will be cleanup. - * - */ - @JsonProperty("generation_id") - public Long getGenerationId() { - return generationId; - } - - /** - * Monotically increasing numeric id representing the current generation of a stream. This id can be - * shared across syncs. If this is null, it means that the platform is not supporting the refresh - * and it is expected that no extra id will be added to the records and no data from previous - * generation will be cleanup. - * - */ - @JsonProperty("generation_id") - public void setGenerationId(Long generationId) { - this.generationId = generationId; - } - - public ConfiguredAirbyteStream withGenerationId(Long generationId) { - this.generationId = generationId; - return this; - } - - /** - * The minimum generation id which is needed in a stream. If it is present, the destination will try - * to delete the data that are part of a generation lower than this property. If the minimum - * generation is equals to 0, no data deletion is expected from the destiantion If this is null, it - * means that the platform is not supporting the refresh and it is expected that no extra id will be - * added to the records and no data from previous generation will be cleanup. - * - */ - @JsonProperty("minimum_generation_id") - public Long getMinimumGenerationId() { - return minimumGenerationId; - } - - /** - * The minimum generation id which is needed in a stream. If it is present, the destination will try - * to delete the data that are part of a generation lower than this property. If the minimum - * generation is equals to 0, no data deletion is expected from the destiantion If this is null, it - * means that the platform is not supporting the refresh and it is expected that no extra id will be - * added to the records and no data from previous generation will be cleanup. - * - */ - @JsonProperty("minimum_generation_id") - public void setMinimumGenerationId(Long minimumGenerationId) { - this.minimumGenerationId = minimumGenerationId; - } - - public ConfiguredAirbyteStream withMinimumGenerationId(Long minimumGenerationId) { - this.minimumGenerationId = minimumGenerationId; - return this; - } - - /** - * Monotically increasing numeric id representing the current sync id. This is aimed to be unique - * per sync. If this is null, it means that the platform is not supporting the refresh and it is - * expected that no extra id will be added to the records and no data from previous generation will - * be cleanup. - * - */ - @JsonProperty("sync_id") - public Long getSyncId() { - return syncId; - } - - /** - * Monotically increasing numeric id representing the current sync id. This is aimed to be unique - * per sync. If this is null, it means that the platform is not supporting the refresh and it is - * expected that no extra id will be added to the records and no data from previous generation will - * be cleanup. - * - */ - @JsonProperty("sync_id") - public void setSyncId(Long syncId) { - this.syncId = syncId; - } - - public ConfiguredAirbyteStream withSyncId(Long syncId) { - this.syncId = syncId; - return this; - } - - @JsonAnyGetter - public Map getAdditionalProperties() { - return this.additionalProperties; - } - - @JsonAnySetter - public void setAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - } - - public ConfiguredAirbyteStream withAdditionalProperty(String name, Object value) { - this.additionalProperties.put(name, value); - return this; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(ConfiguredAirbyteStream.class.getName()).append('@').append(Integer.toHexString(System.identityHashCode(this))).append('['); - sb.append("stream"); - sb.append('='); - sb.append(((this.stream == null) ? "" : this.stream)); - sb.append(','); - sb.append("syncMode"); - sb.append('='); - sb.append(((this.syncMode == null) ? "" : this.syncMode)); - sb.append(','); - sb.append("cursorField"); - sb.append('='); - sb.append(((this.cursorField == null) ? "" : this.cursorField)); - sb.append(','); - sb.append("destinationSyncMode"); - sb.append('='); - sb.append(((this.destinationSyncMode == null) ? "" : this.destinationSyncMode)); - sb.append(','); - sb.append("primaryKey"); - sb.append('='); - sb.append(((this.primaryKey == null) ? "" : this.primaryKey)); - sb.append(','); - sb.append("generationId"); - sb.append('='); - sb.append(((this.generationId == null) ? "" : this.generationId)); - sb.append(','); - sb.append("minimumGenerationId"); - sb.append('='); - sb.append(((this.minimumGenerationId == null) ? "" : this.minimumGenerationId)); - sb.append(','); - sb.append("syncId"); - sb.append('='); - sb.append(((this.syncId == null) ? "" : this.syncId)); - sb.append(','); - sb.append("additionalProperties"); - sb.append('='); - sb.append(((this.additionalProperties == null) ? "" : this.additionalProperties)); - sb.append(','); - if (sb.charAt((sb.length() - 1)) == ',') { - sb.setCharAt((sb.length() - 1), ']'); - } else { - sb.append(']'); - } - return sb.toString(); - } - - @Override - public int hashCode() { - int result = 1; - result = ((result * 31) + ((this.generationId == null) ? 0 : this.generationId.hashCode())); - result = ((result * 31) + ((this.stream == null) ? 0 : this.stream.hashCode())); - result = ((result * 31) + ((this.minimumGenerationId == null) ? 0 : this.minimumGenerationId.hashCode())); - result = ((result * 31) + ((this.syncMode == null) ? 0 : this.syncMode.hashCode())); - result = ((result * 31) + ((this.additionalProperties == null) ? 0 : this.additionalProperties.hashCode())); - result = ((result * 31) + ((this.destinationSyncMode == null) ? 0 : this.destinationSyncMode.hashCode())); - result = ((result * 31) + ((this.cursorField == null) ? 0 : this.cursorField.hashCode())); - result = ((result * 31) + ((this.primaryKey == null) ? 0 : this.primaryKey.hashCode())); - result = ((result * 31) + ((this.syncId == null) ? 0 : this.syncId.hashCode())); - return result; - } - - @Override - @SuppressFBWarnings("RC_REF_COMPARISON") - public boolean equals(Object other) { - if (other == this) { - return true; - } - if ((other instanceof ConfiguredAirbyteStream) == false) { - return false; - } - ConfiguredAirbyteStream rhs = ((ConfiguredAirbyteStream) other); - return ((((((((((this.generationId == rhs.generationId) || ((this.generationId != null) && this.generationId.equals(rhs.generationId))) - && ((this.stream == rhs.stream) || ((this.stream != null) && this.stream.equals(rhs.stream)))) - && ((this.minimumGenerationId == rhs.minimumGenerationId) - || ((this.minimumGenerationId != null) && this.minimumGenerationId.equals(rhs.minimumGenerationId)))) - && ((this.syncMode == rhs.syncMode) || ((this.syncMode != null) && this.syncMode.equals(rhs.syncMode)))) - && ((this.additionalProperties == rhs.additionalProperties) - || ((this.additionalProperties != null) && this.additionalProperties.equals(rhs.additionalProperties)))) - && ((this.destinationSyncMode == rhs.destinationSyncMode) - || ((this.destinationSyncMode != null) && this.destinationSyncMode.equals(rhs.destinationSyncMode)))) - && ((this.cursorField == rhs.cursorField) || ((this.cursorField != null) && this.cursorField.equals(rhs.cursorField)))) - && ((this.primaryKey == rhs.primaryKey) || ((this.primaryKey != null) && this.primaryKey.equals(rhs.primaryKey)))) - && ((this.syncId == rhs.syncId) || ((this.syncId != null) && this.syncId.equals(rhs.syncId)))); - } - -} diff --git a/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/CatalogHelpers.java b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/CatalogHelpers.java index 3713e4e081e..b76511d0657 100644 --- a/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/CatalogHelpers.java +++ b/airbyte-config/config-models/src/main/java/io/airbyte/config/helpers/CatalogHelpers.java @@ -43,9 +43,7 @@ public static AirbyteStream createAirbyteStream(final String streamName, public static AirbyteStream createAirbyteStream(final String streamName, final String namespace, final List fields) { - return new AirbyteStream().withName(streamName).withNamespace(namespace) - .withJsonSchema(fieldsToJsonSchema(fields)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH)); + return new AirbyteStream(streamName, fieldsToJsonSchema(fields), List.of(SyncMode.FULL_REFRESH)).withNamespace(namespace); } public static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(final String streamName, @@ -71,11 +69,10 @@ public static ConfiguredAirbyteStream createConfiguredAirbyteStream(final String public static ConfiguredAirbyteStream createConfiguredAirbyteStream(final String streamName, final String namespace, final List fields) { - return new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName(streamName).withNamespace(namespace) - .withJsonSchema(fieldsToJsonSchema(fields)) - .withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH))) - .withSyncMode(SyncMode.FULL_REFRESH).withDestinationSyncMode(DestinationSyncMode.OVERWRITE); + return new ConfiguredAirbyteStream( + new AirbyteStream(streamName, fieldsToJsonSchema(fields), List.of(SyncMode.FULL_REFRESH)).withNamespace(namespace), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE); } /** @@ -111,8 +108,7 @@ public static StreamDescriptor extractDescriptor(final ConfiguredAirbyteStream a * @return stream descriptor */ public static StreamDescriptor extractDescriptor(final AirbyteStream airbyteStream) { - return new StreamDescriptor().withName(airbyteStream.getName()) - .withNamespace(airbyteStream.getNamespace()); + return airbyteStream.getStreamDescriptor(); } /** diff --git a/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/AirbyteStream.kt b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/AirbyteStream.kt new file mode 100644 index 00000000000..41cd9ffa0ee --- /dev/null +++ b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/AirbyteStream.kt @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.config + +import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.JsonPropertyOrder +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import java.io.Serializable + +/** + * AirbyteStream. + */ +@JsonDeserialize(builder = AirbyteStream.Builder::class) +@JsonInclude(JsonInclude.Include.NON_NULL) +// Left for backward compatibility and test stability. +@JsonPropertyOrder( + "name", + "json_schema", + "supported_sync_modes", + "source_defined_cursor", + "default_cursor_field", + "source_defined_primary_key", + "namespace", + "is_resumable", +) +data class AirbyteStream + @JvmOverloads + constructor( + @JsonProperty("name") + var name: String, + @JsonProperty("json_schema") + var jsonSchema: JsonNode, + @JsonProperty("supported_sync_modes") + var supportedSyncModes: List, + @JsonProperty("source_defined_cursor") + var sourceDefinedCursor: Boolean? = null, + @JsonProperty("default_cursor_field") + var defaultCursorField: List? = ArrayList(), + @JsonProperty("source_defined_primary_key") + var sourceDefinedPrimaryKey: List>? = ArrayList(), + @JsonProperty("namespace") + var namespace: String? = null, + @JsonProperty("is_resumable") + var isResumable: Boolean? = null, + ) : Serializable { + fun withName(name: String): AirbyteStream { + this.name = name + return this + } + + fun withJsonSchema(jsonSchema: JsonNode): AirbyteStream { + this.jsonSchema = jsonSchema + return this + } + + fun withSupportedSyncModes(supportedSyncModes: List): AirbyteStream { + this.supportedSyncModes = supportedSyncModes + return this + } + + fun withSourceDefinedCursor(sourceDefinedCursor: Boolean?): AirbyteStream { + this.sourceDefinedCursor = sourceDefinedCursor + return this + } + + fun withDefaultCursorField(defaultCursorField: List?): AirbyteStream { + this.defaultCursorField = defaultCursorField + return this + } + + fun withSourceDefinedPrimaryKey(sourceDefinedPrimaryKey: List>?): AirbyteStream { + this.sourceDefinedPrimaryKey = sourceDefinedPrimaryKey + return this + } + + fun withNamespace(namespace: String?): AirbyteStream { + this.namespace = namespace + return this + } + + fun withIsResumable(isResumable: Boolean?): AirbyteStream { + this.isResumable = isResumable + return this + } + + @get:JsonIgnore + val streamDescriptor: StreamDescriptor + get() = StreamDescriptor().withName(name).withNamespace(namespace) + + class Builder( + @JsonProperty("name") + var name: String? = null, + @JsonProperty("json_schema") + var jsonSchema: JsonNode? = null, + @JsonProperty("supported_sync_modes") + var supportedSyncModes: List? = null, + @JsonProperty("source_defined_cursor") + var sourceDefinedCursor: Boolean? = null, + @JsonProperty("default_cursor_field") + var defaultCursorField: List? = ArrayList(), + @JsonProperty("source_defined_primary_key") + var sourceDefinedPrimaryKey: List>? = ArrayList(), + @JsonProperty("namespace") + var namespace: String? = null, + @JsonProperty("is_resumable") + var isResumable: Boolean? = null, + ) { + fun build(): AirbyteStream = + AirbyteStream( + name = name ?: throw IllegalArgumentException("name cannot be null"), + jsonSchema = jsonSchema ?: throw IllegalArgumentException("jsonSchema cannot be null"), + supportedSyncModes = supportedSyncModes ?: throw IllegalArgumentException("supportedSyncModes cannot be null"), + sourceDefinedCursor = sourceDefinedCursor, + defaultCursorField = defaultCursorField, + sourceDefinedPrimaryKey = sourceDefinedPrimaryKey, + namespace = namespace, + isResumable = isResumable, + ) + } + + companion object { + private const val serialVersionUID = 602929458758090299L + } + } diff --git a/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/ConfiguredAirbyteCatalog.kt b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/ConfiguredAirbyteCatalog.kt new file mode 100644 index 00000000000..4cf7f4e434b --- /dev/null +++ b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/ConfiguredAirbyteCatalog.kt @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.config + +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.JsonPropertyOrder +import java.io.Serializable + +/** + * ConfiguredAirbyteCatalog. + * + * + * Airbyte stream schema catalog + * + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonPropertyOrder( + "streams", +) +data class ConfiguredAirbyteCatalog + @JvmOverloads + constructor( + @JsonProperty("streams") + var streams: List = ArrayList(), + ) : Serializable { + fun withStreams(streams: List): ConfiguredAirbyteCatalog { + this.streams = streams + return this + } + + companion object { + private const val serialVersionUID = 3093736788188579672L + } + } diff --git a/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/ConfiguredAirbyteStream.kt b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/ConfiguredAirbyteStream.kt new file mode 100644 index 00000000000..1474c711619 --- /dev/null +++ b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/ConfiguredAirbyteStream.kt @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2020-2024 Airbyte, Inc., all rights reserved. + */ +package io.airbyte.config + +import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.annotation.JsonInclude +import com.fasterxml.jackson.annotation.JsonProperty +import com.fasterxml.jackson.annotation.JsonPropertyOrder +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import java.io.Serializable + +/** + * ConfiguredAirbyteStream. + */ +@JsonDeserialize(builder = ConfiguredAirbyteStream.Builder::class) +@JsonInclude(JsonInclude.Include.NON_NULL) +// Left for backward compatibility and test stability. +@JsonPropertyOrder( + "stream", + "sync_mode", + "cursor_field", + "destination_sync_mode", + "primary_key", + "generation_id", + "minimum_generation_id", + "sync_id", +) +data class ConfiguredAirbyteStream + @JvmOverloads + constructor( + @JsonProperty("stream") + var stream: AirbyteStream, + @JsonProperty("sync_mode") + var syncMode: SyncMode = SyncMode.FULL_REFRESH, + @JsonProperty("destination_sync_mode") + var destinationSyncMode: DestinationSyncMode = DestinationSyncMode.APPEND, + // The Previous generated code used empty arrays as defaults, preserving the behavior + @JsonProperty("cursor_field") + var cursorField: List? = ArrayList(), + // The Previous generated code used empty arrays as defaults, preserving the behavior + @JsonProperty("primary_key") + var primaryKey: List>? = ArrayList(), + @JsonProperty("generation_id") + var generationId: Long? = null, + @JsonProperty("minimum_generation_id") + var minimumGenerationId: Long? = null, + @JsonProperty("sync_id") + var syncId: Long? = null, + ) : Serializable { + fun withStream(stream: AirbyteStream): ConfiguredAirbyteStream { + this.stream = stream + return this + } + + fun withSyncMode(syncMode: SyncMode): ConfiguredAirbyteStream { + this.syncMode = syncMode + return this + } + + fun withCursorField(cursorField: List?): ConfiguredAirbyteStream { + this.cursorField = cursorField + return this + } + + fun withDestinationSyncMode(destinationSyncMode: DestinationSyncMode): ConfiguredAirbyteStream { + this.destinationSyncMode = destinationSyncMode + return this + } + + fun withPrimaryKey(primaryKey: List>?): ConfiguredAirbyteStream { + this.primaryKey = primaryKey + return this + } + + fun withGenerationId(generationId: Long?): ConfiguredAirbyteStream { + this.generationId = generationId + return this + } + + fun withMinimumGenerationId(minimumGenerationId: Long?): ConfiguredAirbyteStream { + this.minimumGenerationId = minimumGenerationId + return this + } + + fun withSyncId(syncId: Long?): ConfiguredAirbyteStream { + this.syncId = syncId + return this + } + + @get:JsonIgnore + val streamDescriptor: StreamDescriptor + get() = stream.streamDescriptor + + class Builder( + @JsonProperty("stream") + var stream: AirbyteStream? = null, + @JsonProperty("sync_mode") + var syncMode: SyncMode? = null, + @JsonProperty("destination_sync_mode") + var destinationSyncMode: DestinationSyncMode? = null, + @JsonProperty("cursor_field") + var cursorField: List? = ArrayList(), + @JsonProperty("primary_key") + var primaryKey: List>? = ArrayList(), + @JsonProperty("generation_id") + var generationId: Long? = null, + @JsonProperty("minimum_generation_id") + var minimumGenerationId: Long? = null, + @JsonProperty("sync_id") + var syncId: Long? = null, + ) { + fun stream(stream: AirbyteStream) = apply { this.stream = stream } + + fun syncMode(syncMode: SyncMode) = apply { this.syncMode = syncMode } + + fun destinationSyncMode(destinationSyncMode: DestinationSyncMode) = apply { this.destinationSyncMode = destinationSyncMode } + + fun cursorField(cursorField: List?) = apply { this.cursorField = cursorField } + + fun primaryKey(primaryKey: List>?) = apply { this.primaryKey = primaryKey } + + fun generationId(generationId: Long?) = apply { this.generationId = generationId } + + fun minimumGenerationId(minimumGenerationId: Long?) = apply { this.minimumGenerationId = minimumGenerationId } + + fun syncId(syncId: Long?) = apply { this.syncId = syncId } + + fun build(): ConfiguredAirbyteStream = + ConfiguredAirbyteStream( + stream = stream ?: throw IllegalArgumentException("stream cannot be null"), + syncMode = syncMode ?: throw IllegalArgumentException("syncMode cannot be null"), + destinationSyncMode = destinationSyncMode ?: throw IllegalArgumentException("destinationSyncMode cannot be null"), + cursorField = cursorField, + primaryKey = primaryKey, + generationId = generationId, + minimumGenerationId = minimumGenerationId, + syncId = syncId, + ) + } + + companion object { + private const val serialVersionUID = 3961017355969088418L + } + } diff --git a/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/helpers/ProtocolConverters.kt b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/helpers/ProtocolConverters.kt index 6f60215bb68..09748c86d6a 100644 --- a/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/helpers/ProtocolConverters.kt +++ b/airbyte-config/config-models/src/main/kotlin/io/airbyte/config/helpers/ProtocolConverters.kt @@ -12,15 +12,16 @@ class ProtocolConverters { companion object { @JvmStatic fun ProtocolAirbyteStream.toInternal(): InternalAirbyteStream = - InternalAirbyteStream() - .withName(name) - .withJsonSchema(jsonSchema) - .withSupportedSyncModes(Enums.convertListTo(supportedSyncModes, InternalSyncMode::class.java)) - .withSourceDefinedCursor(sourceDefinedCursor) - .withDefaultCursorField(defaultCursorField) - .withSourceDefinedPrimaryKey(sourceDefinedPrimaryKey) - .withNamespace(namespace) - .withIsResumable(isResumable) + InternalAirbyteStream( + name = name, + jsonSchema = jsonSchema, + supportedSyncModes = Enums.convertListTo(supportedSyncModes, InternalSyncMode::class.java), + sourceDefinedCursor = sourceDefinedCursor, + defaultCursorField = defaultCursorField, + sourceDefinedPrimaryKey = sourceDefinedPrimaryKey, + namespace = namespace, + isResumable = isResumable, + ) @JvmStatic fun InternalAirbyteStream.toProtocol(): ProtocolAirbyteStream = diff --git a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java index 06fa1fb5c49..06c64ab7701 100644 --- a/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java +++ b/airbyte-config/config-persistence/src/test/java/io/airbyte/config/persistence/StandardSyncPersistenceTest.java @@ -24,6 +24,7 @@ import io.airbyte.config.ConfiguredAirbyteCatalog; import io.airbyte.config.ConfiguredAirbyteStream; import io.airbyte.config.DestinationConnection; +import io.airbyte.config.DestinationSyncMode; import io.airbyte.config.Geography; import io.airbyte.config.JobSyncConfig.NamespaceDefinitionType; import io.airbyte.config.ReleaseStage; @@ -39,6 +40,7 @@ import io.airbyte.config.StandardWorkspace; import io.airbyte.config.StreamDescriptor; import io.airbyte.config.SupportLevel; +import io.airbyte.config.SyncMode; import io.airbyte.config.persistence.ConfigRepository.StandardSyncQuery; import io.airbyte.config.secrets.SecretsRepositoryReader; import io.airbyte.config.secrets.SecretsRepositoryWriter; @@ -196,10 +198,10 @@ void testDelete() throws IOException, ConfigNotFoundException, JsonValidationExc @Test void testGetAllStreamsForConnection() throws Exception { createBaseObjects(); - final AirbyteStream airbyteStream = new AirbyteStream().withName("stream1").withNamespace("namespace1"); - final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream().withStream(airbyteStream); - final AirbyteStream airbyteStream2 = new AirbyteStream().withName("stream2"); - final ConfiguredAirbyteStream configuredStream2 = new ConfiguredAirbyteStream().withStream(airbyteStream2); + final AirbyteStream airbyteStream = new AirbyteStream("stream1", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)).withNamespace("namespace1"); + final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream(airbyteStream, SyncMode.INCREMENTAL, DestinationSyncMode.APPEND); + final AirbyteStream airbyteStream2 = new AirbyteStream("stream2", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)); + final ConfiguredAirbyteStream configuredStream2 = new ConfiguredAirbyteStream(airbyteStream2, SyncMode.INCREMENTAL, DestinationSyncMode.APPEND); final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream, configuredStream2)); final StandardSync sync = createStandardSync(source1, destination1).withCatalog(configuredCatalog); diff --git a/airbyte-config/config-persistence/src/test/kotlin/io/airbyte/config/persistence/helper/CatalogGenerationSetterTest.kt b/airbyte-config/config-persistence/src/test/kotlin/io/airbyte/config/persistence/helper/CatalogGenerationSetterTest.kt index c5e6c8beed3..dc2f94849ae 100644 --- a/airbyte-config/config-persistence/src/test/kotlin/io/airbyte/config/persistence/helper/CatalogGenerationSetterTest.kt +++ b/airbyte-config/config-persistence/src/test/kotlin/io/airbyte/config/persistence/helper/CatalogGenerationSetterTest.kt @@ -1,5 +1,6 @@ package io.airbyte.config.persistence.helper +import io.airbyte.commons.json.Jsons import io.airbyte.config.AirbyteStream import io.airbyte.config.ConfiguredAirbyteCatalog import io.airbyte.config.ConfiguredAirbyteStream @@ -19,15 +20,27 @@ class CatalogGenerationSetterTest { private val catalog = ConfiguredAirbyteCatalog().withStreams( listOf( - ConfiguredAirbyteStream().withStream( - AirbyteStream() - .withName("name1") - .withNamespace("namespace1"), + ConfiguredAirbyteStream( + stream = + AirbyteStream( + name = "name1", + namespace = "namespace1", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.INCREMENTAL), + ), + syncMode = SyncMode.INCREMENTAL, + destinationSyncMode = DestinationSyncMode.APPEND, ), - ConfiguredAirbyteStream().withStream( - AirbyteStream() - .withName("name2") - .withNamespace("namespace2"), + ConfiguredAirbyteStream( + stream = + AirbyteStream( + name = "name2", + namespace = "namespace2", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.INCREMENTAL), + ), + syncMode = SyncMode.INCREMENTAL, + destinationSyncMode = DestinationSyncMode.APPEND, ), ), ) @@ -159,27 +172,36 @@ class CatalogGenerationSetterTest { val catalog = ConfiguredAirbyteCatalog().withStreams( listOf( - ConfiguredAirbyteStream().withStream( - AirbyteStream() - .withName("name1") - .withNamespace("namespace1"), - ) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE), - ConfiguredAirbyteStream().withStream( - AirbyteStream() - .withName("name2") - .withNamespace("namespace2"), - ) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.APPEND), - ConfiguredAirbyteStream().withStream( - AirbyteStream() - .withName("name2") - .withNamespace("namespace1"), - ) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE_DEDUP), + ConfiguredAirbyteStream( + AirbyteStream( + name = "name1", + namespace = "namespace1", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.FULL_REFRESH), + ), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE, + ), + ConfiguredAirbyteStream( + AirbyteStream( + name = "name2", + namespace = "namespace2", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.FULL_REFRESH), + ), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND, + ), + ConfiguredAirbyteStream( + AirbyteStream( + name = "name2", + namespace = "namespace1", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.FULL_REFRESH), + ), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE_DEDUP, + ), ), ) @@ -214,20 +236,28 @@ class CatalogGenerationSetterTest { val catalog = ConfiguredAirbyteCatalog().withStreams( listOf( - ConfiguredAirbyteStream().withStream( - AirbyteStream() - .withName("name1") - .withNamespace("namespace1"), - ) - .withSyncMode(SyncMode.INCREMENTAL) - .withDestinationSyncMode(DestinationSyncMode.APPEND), - ConfiguredAirbyteStream().withStream( - AirbyteStream() - .withName("name2") - .withNamespace("namespace2"), - ) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE), + ConfiguredAirbyteStream( + stream = + AirbyteStream( + name = "name1", + namespace = "namespace1", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.INCREMENTAL), + ), + syncMode = SyncMode.INCREMENTAL, + destinationSyncMode = DestinationSyncMode.APPEND, + ), + ConfiguredAirbyteStream( + stream = + AirbyteStream( + name = "name2", + namespace = "namespace2", + jsonSchema = Jsons.emptyObject(), + supportedSyncModes = listOf(SyncMode.FULL_REFRESH), + ), + syncMode = SyncMode.FULL_REFRESH, + destinationSyncMode = DestinationSyncMode.OVERWRITE, + ), ), ) diff --git a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java index e8e5d386fce..53297c6e6e5 100644 --- a/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java +++ b/airbyte-persistence/job-persistence/src/main/java/io/airbyte/persistence/job/DefaultJobCreator.java @@ -9,6 +9,7 @@ import io.airbyte.commons.version.Version; import io.airbyte.config.ActorDefinitionVersion; import io.airbyte.config.ConfiguredAirbyteCatalog; +import io.airbyte.config.ConfiguredAirbyteStream; import io.airbyte.config.DestinationConnection; import io.airbyte.config.JobConfig; import io.airbyte.config.JobConfig.ConfigType; @@ -200,11 +201,9 @@ Set getResumableFullRefresh(final StandardSync standardSync, f return standardSync.getCatalog().getStreams().stream() .filter(stream -> stream.getSyncMode() == SyncMode.FULL_REFRESH - && stream.getStream().getIsResumable() != null - && stream.getStream().getIsResumable()) - .map(stream -> new StreamDescriptor() - .withName(stream.getStream().getName()) - .withNamespace(stream.getStream().getNamespace())) + && stream.getStream().isResumable() != null + && stream.getStream().isResumable()) + .map(ConfiguredAirbyteStream::getStreamDescriptor) .collect(Collectors.toSet()); } diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java index a68a2d2ad82..e7a20806ecf 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/DefaultJobCreatorTest.java @@ -158,18 +158,18 @@ class DefaultJobCreatorTest { final UUID connectionId = UUID.randomUUID(); final UUID operationId = UUID.randomUUID(); - final ConfiguredAirbyteStream stream1 = new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING))) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.APPEND); - final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING))) - .withSyncMode(SyncMode.INCREMENTAL) - .withDestinationSyncMode(DestinationSyncMode.APPEND); - final ConfiguredAirbyteStream stream3 = new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)).withIsResumable(true)) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE); + final ConfiguredAirbyteStream stream1 = new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND); + final ConfiguredAirbyteStream stream2 = new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.INCREMENTAL, + DestinationSyncMode.APPEND); + final ConfiguredAirbyteStream stream3 = new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM3_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)).withIsResumable(true), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE); CONFIGURED_AIRBYTE_CATALOG = new ConfiguredAirbyteCatalog().withStreams(List.of(stream1, stream2, stream3)); STANDARD_SYNC = new StandardSync() @@ -1022,14 +1022,14 @@ void testCreateResetConnectionJob() throws IOException { final List streamsToReset = List.of(STREAM1_DESCRIPTOR, STREAM2_DESCRIPTOR); final ConfiguredAirbyteCatalog expectedCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING))) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE), - new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING))) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE), + new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE))); final SyncResourceRequirements expectedSyncResourceRequirements = new SyncResourceRequirements() .withConfigKey(new SyncResourceRequirementsKey().withVariant(DEFAULT_VARIANT)) @@ -1084,14 +1084,14 @@ void testCreateResetConnectionJob() throws IOException { void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { final List streamsToReset = List.of(STREAM1_DESCRIPTOR, STREAM2_DESCRIPTOR); final ConfiguredAirbyteCatalog expectedCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING))) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE), - new ConfiguredAirbyteStream() - .withStream(CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING))) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.OVERWRITE))); + new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM1_NAME, Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE), + new ConfiguredAirbyteStream( + CatalogHelpers.createAirbyteStream(STREAM2_NAME, NAMESPACE, Field.of(FIELD_NAME, JsonSchemaType.STRING)), + SyncMode.FULL_REFRESH, + DestinationSyncMode.OVERWRITE))); final SyncResourceRequirements expectedSyncResourceRequirements = new SyncResourceRequirements() .withConfigKey(new SyncResourceRequirementsKey().withVariant(DEFAULT_VARIANT)) @@ -1145,12 +1145,14 @@ void testCreateResetConnectionJobEnsureNoQueuing() throws IOException { void testGetResumableFullRefresh() { StandardSync standardSync = new StandardSync() .withCatalog(new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL).withStream( - new AirbyteStream().withName("no1").withIsResumable(true)), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream( - new AirbyteStream().withName("no2").withIsResumable(false)), - new ConfiguredAirbyteStream().withSyncMode(SyncMode.FULL_REFRESH).withStream( - new AirbyteStream().withName("yes").withIsResumable(true))))); + new ConfiguredAirbyteStream(new AirbyteStream("no1", Jsons.emptyObject(), List.of(SyncMode.INCREMENTAL)).withIsResumable(true), + SyncMode.INCREMENTAL, DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("no2", Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)).withIsResumable(false), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND), + new ConfiguredAirbyteStream(new AirbyteStream("yes", Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)).withIsResumable(true), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND)))); Set streamDescriptors = jobCreator.getResumableFullRefresh(standardSync, true); assertEquals(1, streamDescriptors.size()); diff --git a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java index dcb84bf0b81..d39d7687d74 100644 --- a/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java +++ b/airbyte-persistence/job-persistence/src/test/java/io/airbyte/persistence/job/tracker/JobTrackerTest.java @@ -636,10 +636,10 @@ private Job getJobMock(final ConfigType configType, final long jobId) throws Con .withDockerRepository(CONNECTOR_REPOSITORY)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( - new ConfiguredAirbyteStream() - .withStream(new AirbyteStream().withName("stream").withNamespace("namespace")) - .withSyncMode(SyncMode.FULL_REFRESH) - .withDestinationSyncMode(DestinationSyncMode.APPEND))); + new ConfiguredAirbyteStream( + new AirbyteStream("stream", Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)).withNamespace("namespace"), + SyncMode.FULL_REFRESH, + DestinationSyncMode.APPEND))); final AttemptSyncConfig attemptSyncConfig = new AttemptSyncConfig() .withSourceConfiguration(Jsons.jsonNode(ImmutableMap.of("key", "some_value")))