Skip to content

Commit

Permalink
chore: convert ConfiguredAirbyteCatalog, ConfiguredAirbyteStream and …
Browse files Browse the repository at this point in the history
…AirbyteStream to Kotlin data classes (#13413)
  • Loading branch information
gosusnp committed Aug 8, 2024
1 parent 719a210 commit 5357e67
Show file tree
Hide file tree
Showing 36 changed files with 752 additions and 1,262 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -177,30 +177,28 @@ private static io.airbyte.config.AirbyteStream toStreamInternal(final io.airbyte
}
((ObjectNode) properties).retain(selectedFieldNames);
}
return new io.airbyte.config.AirbyteStream()
.withName(stream.getName())
.withJsonSchema(stream.getJsonSchema())
.withSupportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.config.SyncMode.class))
.withSourceDefinedCursor(stream.getSourceDefinedCursor())
.withDefaultCursorField(stream.getDefaultCursorField())
.withSourceDefinedPrimaryKey(
Optional.ofNullable(stream.getSourceDefinedPrimaryKey()).orElse(Collections.emptyList()))
.withNamespace(stream.getNamespace())
.withIsResumable(stream.isResumable());
return new io.airbyte.config.AirbyteStream(stream.getName(), stream.getJsonSchema(),
Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.config.SyncMode.class))
.withSourceDefinedCursor(stream.getSourceDefinedCursor())
.withDefaultCursorField(stream.getDefaultCursorField())
.withSourceDefinedPrimaryKey(
Optional.ofNullable(stream.getSourceDefinedPrimaryKey()).orElse(Collections.emptyList()))
.withNamespace(stream.getNamespace())
.withIsResumable(stream.isResumable());
}

private static ConfiguredAirbyteStream toConfiguredStreamInternal(final io.airbyte.api.client.model.generated.AirbyteStream stream,
final AirbyteStreamConfiguration config)
throws JsonValidationException {
return new ConfiguredAirbyteStream()
.withStream(toStreamInternal(stream, config))
.withSyncMode(Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class))
.withDestinationSyncMode(Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
.withPrimaryKey(config.getPrimaryKey())
.withCursorField(config.getCursorField())
.withGenerationId(config.getGenerationId())
.withMinimumGenerationId(config.getMinimumGenerationId())
.withSyncId(config.getSyncId());
return new ConfiguredAirbyteStream(
toStreamInternal(stream, config),
Enums.convertTo(config.getSyncMode(), io.airbyte.config.SyncMode.class),
Enums.convertTo(config.getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
.withPrimaryKey(config.getPrimaryKey())
.withCursorField(config.getCursorField())
.withGenerationId(config.getGenerationId())
.withMinimumGenerationId(config.getMinimumGenerationId())
.withSyncId(config.getSyncId());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,10 @@ void testGetCatalogDiff() throws IOException {
new io.airbyte.protocol.models.AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject())));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject()))
.withSyncMode(SyncMode.FULL_REFRESH)));
new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema2, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND),
new ConfiguredAirbyteStream(new AirbyteStream(SALES, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND)));

final Set<StreamTransform> actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);
final List<StreamTransform> expectedDiff = Stream.of(
Expand Down Expand Up @@ -156,9 +157,10 @@ void testGetCatalogDiffWithInvalidSchema() throws IOException {
new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(schema2)));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject()))
.withSyncMode(SyncMode.FULL_REFRESH)));
new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema2, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND),
new ConfiguredAirbyteStream(new AirbyteStream(SALES, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND)));

final Set<StreamTransform> actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

Expand All @@ -178,9 +180,10 @@ void testGetCatalogDiffWithBothInvalidSchema() throws IOException {
new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(schema2)));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema2)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(Jsons.emptyObject()))
.withSyncMode(SyncMode.FULL_REFRESH)));
new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema2, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND),
new ConfiguredAirbyteStream(new AirbyteStream(SALES, Jsons.emptyObject(), List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND)));

final Set<StreamTransform> actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

Expand All @@ -197,8 +200,9 @@ void testCatalogDiffWithBreakingChanges() throws IOException {
new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(breakingSchema)));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema1)).withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of(DATE)).withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP).withPrimaryKey(List.of(List.of("id")))));
new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema1, List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP)
.withCursorField(List.of(DATE)).withPrimaryKey(List.of(List.of("id")))));

final Set<StreamTransform> diff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

Expand All @@ -221,8 +225,9 @@ void testCatalogDiffWithoutStreamConfig() throws IOException {
new io.airbyte.protocol.models.AirbyteStream().withName(USERS).withJsonSchema(breakingSchema)));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(schema1)).withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(List.of(DATE)).withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP).withPrimaryKey(List.of(List.of("id")))));
new ConfiguredAirbyteStream(new AirbyteStream(SALES, schema1, List.of(SyncMode.INCREMENTAL)), SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP)
.withCursorField(List.of(DATE)).withPrimaryKey(List.of(List.of("id")))));

final Set<StreamTransform> diff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

Expand All @@ -236,16 +241,17 @@ void testCatalogDiffStreamChangeWithNoTransforms() throws IOException {

final AirbyteCatalog catalog1 = new AirbyteCatalog().withStreams(List.of(
ProtocolConverters
.toProtocol(new AirbyteStream().withName(USERS).withJsonSchema(schema1).withSourceDefinedPrimaryKey(List.of(List.of("id")))),
ProtocolConverters.toProtocol(new AirbyteStream().withName(SALES).withJsonSchema(schema1))));
.toProtocol(new AirbyteStream(USERS, schema1, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(List.of(List.of("id")))),
ProtocolConverters.toProtocol(new AirbyteStream(SALES, schema1, List.of(SyncMode.FULL_REFRESH)))));
final AirbyteCatalog catalog2 = new AirbyteCatalog().withStreams(List.of(
ProtocolConverters
.toProtocol(new AirbyteStream().withName(USERS).withJsonSchema(schema1).withSourceDefinedPrimaryKey(List.of(List.of("id"))))));
.toProtocol(new AirbyteStream(USERS, schema1, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(List.of(List.of("id"))))));

final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(USERS).withJsonSchema(schema1)).withSyncMode(SyncMode.FULL_REFRESH),
new ConfiguredAirbyteStream().withStream(new AirbyteStream().withName(SALES).withJsonSchema(schema1))
.withSyncMode(SyncMode.FULL_REFRESH)));
new ConfiguredAirbyteStream(new AirbyteStream(USERS, schema1, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND),
new ConfiguredAirbyteStream(new AirbyteStream(SALES, schema1, List.of(SyncMode.FULL_REFRESH)), SyncMode.FULL_REFRESH,
DestinationSyncMode.APPEND)));

final Set<StreamTransform> actualDiff = CatalogDiffHelpers.getCatalogDiff(catalog1, catalog2, configuredAirbyteCatalog);

Expand Down Expand Up @@ -302,16 +308,13 @@ void testCatalogDiffWithSourceDefinedPrimaryKeyChange(final DestinationSyncMode
throws IOException {
final JsonNode schema = Jsons.deserialize(readResource(VALID_SCHEMA_JSON));

final AirbyteStream stream = new AirbyteStream().withName(USERS).withJsonSchema(schema).withSourceDefinedPrimaryKey(prevSourcePK);
final AirbyteStream refreshedStream = new AirbyteStream().withName(USERS).withJsonSchema(schema).withSourceDefinedPrimaryKey(newSourcePK);
final AirbyteStream stream = new AirbyteStream(USERS, schema, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(prevSourcePK);
final AirbyteStream refreshedStream = new AirbyteStream(USERS, schema, List.of(SyncMode.FULL_REFRESH)).withSourceDefinedPrimaryKey(newSourcePK);

final AirbyteCatalog initialCatalog = new AirbyteCatalog().withStreams(List.of(ProtocolConverters.toProtocol(stream)));
final AirbyteCatalog refreshedCatalog = new AirbyteCatalog().withStreams(List.of(ProtocolConverters.toProtocol(refreshedStream)));

final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream()
.withStream(stream)
.withSyncMode(SyncMode.INCREMENTAL)
.withDestinationSyncMode(destSyncMode)
final ConfiguredAirbyteStream configuredStream = new ConfiguredAirbyteStream(stream, SyncMode.INCREMENTAL, destSyncMode)
.withPrimaryKey(configuredPK);

final ConfiguredAirbyteCatalog configuredCatalog = new ConfiguredAirbyteCatalog().withStreams(List.of(configuredStream));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.airbyte.config.AirbyteStream
import io.airbyte.config.ConfiguredAirbyteCatalog
import io.airbyte.config.ConfiguredAirbyteStream
import io.airbyte.config.DestinationSyncMode
import io.airbyte.config.SyncMode
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test
import io.airbyte.protocol.models.AirbyteStream as ProtocolAirbyteStream
Expand All @@ -30,22 +31,19 @@ class DefaultProtocolSerializerTest {
serializer: ProtocolSerializer,
supportRefreshes: Boolean,
) {
val appendStreamName = "append"
val overwriteStreamName = "overwrite"
val appendDedupStreamName = "append_dedup"
val overwriteDedupStreamName = "overwrite_dedup"

val configuredCatalog =
ConfiguredAirbyteCatalog()
.withStreams(
listOf(
ConfiguredAirbyteStream()
.withStream(AirbyteStream().withName("append"))
.withDestinationSyncMode(DestinationSyncMode.APPEND),
ConfiguredAirbyteStream()
.withStream(AirbyteStream().withName("overwrite"))
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE),
ConfiguredAirbyteStream()
.withStream(AirbyteStream().withName("append_dedup"))
.withDestinationSyncMode(DestinationSyncMode.APPEND_DEDUP),
ConfiguredAirbyteStream()
.withStream(AirbyteStream().withName("overwrite_dedup"))
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE_DEDUP),
ConfiguredAirbyteStream(getAirbyteStream(appendStreamName), SyncMode.INCREMENTAL, DestinationSyncMode.APPEND),
ConfiguredAirbyteStream(getAirbyteStream(overwriteStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE),
ConfiguredAirbyteStream(getAirbyteStream(appendDedupStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.APPEND_DEDUP),
ConfiguredAirbyteStream(getAirbyteStream(overwriteDedupStreamName), SyncMode.FULL_REFRESH, DestinationSyncMode.OVERWRITE_DEDUP),
),
)
val frozenConfiguredCatalog = Jsons.clone(configuredCatalog)
Expand All @@ -55,16 +53,36 @@ class DefaultProtocolSerializerTest {
.withStreams(
listOf(
ProtocolConfiguredAirbyteStream()
.withStream(ProtocolAirbyteStream().withName("append"))
.withStream(
ProtocolAirbyteStream().withName(
appendStreamName,
).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)),
)
.withSyncMode(io.airbyte.protocol.models.SyncMode.INCREMENTAL)
.withDestinationSyncMode(ProtocolDestinationSyncMode.APPEND),
ProtocolConfiguredAirbyteStream()
.withStream(ProtocolAirbyteStream().withName("overwrite"))
.withStream(
ProtocolAirbyteStream().withName(
overwriteStreamName,
).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)),
)
.withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)
.withDestinationSyncMode(if (supportRefreshes) ProtocolDestinationSyncMode.APPEND else ProtocolDestinationSyncMode.OVERWRITE),
ProtocolConfiguredAirbyteStream()
.withStream(ProtocolAirbyteStream().withName("append_dedup"))
.withStream(
ProtocolAirbyteStream().withName(
appendDedupStreamName,
).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)),
)
.withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)
.withDestinationSyncMode(ProtocolDestinationSyncMode.APPEND_DEDUP),
ProtocolConfiguredAirbyteStream()
.withStream(ProtocolAirbyteStream().withName("overwrite_dedup"))
.withStream(
ProtocolAirbyteStream().withName(
overwriteDedupStreamName,
).withJsonSchema(Jsons.emptyObject()).withSupportedSyncModes(listOf(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)),
)
.withSyncMode(io.airbyte.protocol.models.SyncMode.FULL_REFRESH)
.withDestinationSyncMode(if (supportRefreshes) ProtocolDestinationSyncMode.APPEND_DEDUP else ProtocolDestinationSyncMode.OVERWRITE),
),
)
Expand All @@ -78,5 +96,7 @@ class DefaultProtocolSerializerTest {
// Verify we didn't mutate the input
assertEquals(frozenConfiguredCatalog, configuredCatalog)
}

fun getAirbyteStream(name: String) = AirbyteStream(name, Jsons.emptyObject(), listOf(SyncMode.FULL_REFRESH))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ Set<StreamDescriptor> getFullRefreshStreamsToClear(final ConfiguredAirbyteCatalo
.filter(s -> {
if (s.getSyncMode().equals(SyncMode.FULL_REFRESH)) {
if (excludeResumableStreams) {
return s.getStream().getIsResumable() == null || !s.getStream().getIsResumable();
return s.getStream().isResumable() == null || !s.getStream().isResumable();
} else {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,12 @@ public static ConfiguredAirbyteCatalog toConfiguredInternal(final io.airbyte.api
.filter(s -> s.getConfig().getSelected())
.map(s -> {
try {
return new ConfiguredAirbyteStream()
.withStream(ProtocolConverters.toInternal(toConfiguredProtocol(s.getStream(), s.getConfig())))
.withSyncMode(Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.config.SyncMode.class))
.withCursorField(s.getConfig().getCursorField())
.withDestinationSyncMode(Enums.convertTo(s.getConfig().getDestinationSyncMode(),
io.airbyte.config.DestinationSyncMode.class))
.withPrimaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList()));
return new ConfiguredAirbyteStream(
ProtocolConverters.toInternal(toConfiguredProtocol(s.getStream(), s.getConfig())),
Enums.convertTo(s.getConfig().getSyncMode(), io.airbyte.config.SyncMode.class),
Enums.convertTo(s.getConfig().getDestinationSyncMode(), io.airbyte.config.DestinationSyncMode.class))
.withCursorField(s.getConfig().getCursorField())
.withPrimaryKey(Optional.ofNullable(s.getConfig().getPrimaryKey()).orElse(Collections.emptyList()));
} catch (final JsonValidationException e) {
LOGGER.error("Error parsing catalog: {}", e);
errors.add(e);
Expand Down
Loading

0 comments on commit 5357e67

Please sign in to comment.