Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

convert typing_deduping to kotlin #36421

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.8
version=0.28.9
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,23 @@ protected DSLContext getDslContext() {
}

private Optional<TableDefinition> findExistingTable(final StreamId id) throws Exception {
return findExistingTable(jdbcDatabase, databaseName, id.finalNamespace(), id.finalName());
return findExistingTable(jdbcDatabase, databaseName, id.getFinalNamespace(), id.getFinalName());
}

private boolean isFinalTableEmpty(final StreamId id) throws Exception {
return !jdbcDatabase.queryBoolean(
getDslContext().select(
field(exists(
selectOne()
.from(name(id.finalNamespace(), id.finalName()))
.from(name(id.getFinalNamespace(), id.getFinalName()))
.limit(1))))
.getSQL(ParamType.INLINED));
}

private InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
boolean tableExists = jdbcDatabase.executeMetadataQuery(dbmetadata -> {
LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.rawNamespace(), id.rawName());
try (final ResultSet table = dbmetadata.getTables(databaseName, id.rawNamespace(), id.rawName(), null)) {
LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.getRawNamespace(), id.getRawName());
try (final ResultSet table = dbmetadata.getTables(databaseName, id.getRawNamespace(), id.getRawName(), null)) {
return table.next();
} catch (SQLException e) {
LOGGER.error("Failed to retrieve table info from metadata", e);
Expand All @@ -131,7 +131,7 @@ private InitialRawTableStatus getInitialRawTableState(final StreamId id) throws
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
getDslContext().select(field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
.from(name(id.rawNamespace(), id.rawName()))
.from(name(id.getRawNamespace(), id.getRawName()))
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
Expand All @@ -150,7 +150,7 @@ record -> record.getTimestamp("min_timestamp"))) {
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
getDslContext().select(field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(name(id.rawNamespace(), id.rawName()))
.from(name(id.getRawNamespace(), id.getRawName()))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at all).
Expand All @@ -161,7 +161,7 @@ record -> record.getTimestamp("min_timestamp"))) {

@Override
public void execute(final Sql sql) throws Exception {
final List<List<String>> transactions = sql.transactions();
final List<List<String>> transactions = sql.transactions;
final UUID queryId = UUID.randomUUID();
for (final List<String> transaction : transactions) {
final UUID transactionId = UUID.randomUUID();
Expand Down Expand Up @@ -255,20 +255,20 @@ private CompletionStage<DestinationInitialStatus<DestinationState>> retrieveStat
final StreamConfig streamConfig) {
return destinationStatesFuture.thenApply(destinationStates -> {
try {
final Optional<TableDefinition> finalTableDefinition = findExistingTable(streamConfig.id());
final Optional<TableDefinition> finalTableDefinition = findExistingTable(streamConfig.getId());
final boolean isSchemaMismatch;
final boolean isFinalTableEmpty;
if (finalTableDefinition.isPresent()) {
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, finalTableDefinition.get());
isFinalTableEmpty = isFinalTableEmpty(streamConfig.id());
isFinalTableEmpty = isFinalTableEmpty(streamConfig.getId());
} else {
// If the final table doesn't exist, then by definition it doesn't have a schema mismatch and has no
// records.
isSchemaMismatch = false;
isFinalTableEmpty = true;
}
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.id());
DestinationState destinationState = destinationStates.getOrDefault(streamConfig.id().asPair(), toDestinationState(Jsons.emptyObject()));
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.getId());
DestinationState destinationState = destinationStates.getOrDefault(streamConfig.getId().asPair(), toDestinationState(Jsons.emptyObject()));
return new DestinationInitialStatus<>(streamConfig, finalTableDefinition.isPresent(), initialRawTableState,
isSchemaMismatch, isFinalTableEmpty, destinationState);
} catch (Exception e) {
Expand Down Expand Up @@ -337,9 +337,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
// Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset
return false;
}
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
final LinkedHashMap<String, String> intendedColumns = stream.getColumns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())),
(map, column) -> map.put(column.getKey().getName(), toJdbcTypeName(column.getValue())),
LinkedHashMap::putAll);

// Filter out Meta columns since they don't exist in stream config.
Expand All @@ -354,16 +354,16 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
}

@Override
public void commitDestinationStates(final Map<StreamId, DestinationState> destinationStates) throws Exception {
public void commitDestinationStates(final Map<StreamId, ? extends DestinationState> destinationStates) throws Exception {
if (destinationStates.isEmpty()) {
return;
}

// Delete all state records where the stream name+namespace match one of our states
String deleteStates = getDslContext().deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)))
.where(destinationStates.keySet().stream()
.map(streamId -> field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)).eq(streamId.originalName())
.and(field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)).eq(streamId.originalNamespace())))
.map(streamId -> field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)).eq(streamId.getOriginalName())
.and(field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)).eq(streamId.getOriginalNamespace())))
.reduce(
DSL.falseCondition(),
Condition::or))
Expand All @@ -381,10 +381,11 @@ public void commitDestinationStates(final Map<StreamId, DestinationState> destin
// and assume the destination can cast it appropriately.
// Destination-specific timestamp syntax is weird and annoying.
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT), String.class));
for (Map.Entry<StreamId, DestinationState> destinationState : destinationStates.entrySet()) {
for (Map.Entry<StreamId, ? extends DestinationState> destinationState : destinationStates.entrySet()) {
final StreamId streamId = destinationState.getKey();
final String stateJson = Jsons.serialize(destinationState.getValue());
insertStatesStep = insertStatesStep.values(streamId.originalName(), streamId.originalNamespace(), stateJson, OffsetDateTime.now().toString());
insertStatesStep =
insertStatesStep.values(streamId.getOriginalName(), streamId.getOriginalNamespace(), stateJson, OffsetDateTime.now().toString());
}
String insertStates = insertStatesStep.getSQL(ParamType.INLINED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ List<Field<?>> buildFinalTableFields(final LinkedHashMap<ColumnId, AirbyteType>
final List<Field<?>> fields =
metaColumns.entrySet().stream().map(metaColumn -> field(quotedName(metaColumn.getKey()), metaColumn.getValue())).collect(toList());
final List<Field<?>> dataFields =
columns.entrySet().stream().map(column -> field(quotedName(column.getKey().name()), toDialectType(column.getValue()))).collect(
columns.entrySet().stream().map(column -> field(quotedName(column.getKey().getName()), toDialectType(column.getValue()))).collect(
toList());
dataFields.addAll(fields);
return dataFields;
Expand Down Expand Up @@ -258,16 +258,16 @@ public Sql createSchema(final String schema) {
@Override
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
// TODO: Use Naming transformer to sanitize these strings with redshift restrictions.
final String finalTableIdentifier = stream.id().finalName() + suffix.toLowerCase();
final String finalTableIdentifier = stream.getId().getFinalName() + suffix.toLowerCase();
if (!force) {
return transactionally(Stream.concat(
Stream.of(createTableSql(stream.id().finalNamespace(), finalTableIdentifier, stream.columns())),
Stream.of(createTableSql(stream.getId().getFinalNamespace(), finalTableIdentifier, stream.getColumns())),
createIndexSql(stream, suffix).stream()).toList());
}
return transactionally(Stream.concat(
Stream.of(
dropTableIfExists(quotedName(stream.id().finalNamespace(), finalTableIdentifier)).getSQL(ParamType.INLINED),
createTableSql(stream.id().finalNamespace(), finalTableIdentifier, stream.columns())),
dropTableIfExists(quotedName(stream.getId().getFinalNamespace(), finalTableIdentifier)).getSQL(ParamType.INLINED),
createTableSql(stream.getId().getFinalNamespace(), finalTableIdentifier, stream.getColumns())),
createIndexSql(stream, suffix).stream()).toList());
}

Expand All @@ -285,18 +285,18 @@ public Sql updateTable(final StreamConfig streamConfig,
@Override
public Sql overwriteFinalTable(final StreamId stream, final String finalSuffix) {
return transactionally(
dropTableIfExists(name(stream.finalNamespace(), stream.finalName())).getSQL(ParamType.INLINED),
alterTable(name(stream.finalNamespace(), stream.finalName() + finalSuffix))
.renameTo(name(stream.finalName()))
dropTableIfExists(name(stream.getFinalNamespace(), stream.getFinalName())).getSQL(ParamType.INLINED),
alterTable(name(stream.getFinalNamespace(), stream.getFinalName() + finalSuffix))
.renameTo(name(stream.getFinalName()))
.getSQL());
}

@Override
public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) {
final Name rawTableName = name(streamId.rawNamespace(), streamId.rawName());
final Name rawTableName = name(streamId.getRawNamespace(), streamId.getRawName());
final DSLContext dsl = getDslContext();
return transactionally(
dsl.createSchemaIfNotExists(streamId.rawNamespace()).getSQL(),
dsl.createSchemaIfNotExists(streamId.getRawNamespace()).getSQL(),
dsl.dropTableIfExists(rawTableName).getSQL(),
DSL.createTable(rawTableName)
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
Expand All @@ -315,7 +315,7 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi

@Override
public Sql clearLoadedAt(final StreamId streamId) {
return Sql.of(update(table(name(streamId.rawNamespace(), streamId.rawName())))
return Sql.of(update(table(name(streamId.getRawNamespace(), streamId.getRawName())))
.set(field(COLUMN_NAME_AB_LOADED_AT), inline((String) null))
.getSQL());
}
Expand Down Expand Up @@ -350,28 +350,28 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
final String finalSuffix,
final Optional<Instant> minRawTimestamp,
final boolean useExpensiveSaferCasting) {
final String finalSchema = streamConfig.id().finalNamespace();
final String finalTable = streamConfig.id().finalName() + (finalSuffix != null ? finalSuffix.toLowerCase() : "");
final String rawSchema = streamConfig.id().rawNamespace();
final String rawTable = streamConfig.id().rawName();
final String finalSchema = streamConfig.getId().getFinalNamespace();
final String finalTable = streamConfig.getId().getFinalName() + (finalSuffix != null ? finalSuffix.toLowerCase() : "");
final String rawSchema = streamConfig.getId().getRawNamespace();
final String rawTable = streamConfig.getId().getRawName();

// Poor person's guarantee of ordering of fields by using same source of ordered list of columns to
// generate fields.
final CommonTableExpression<Record> rawTableRowsWithCast = name(TYPING_CTE_ALIAS).as(
selectFromRawTable(rawSchema, rawTable, streamConfig.columns(),
selectFromRawTable(rawSchema, rawTable, streamConfig.getColumns(),
getFinalTableMetaColumns(false),
rawTableCondition(streamConfig.destinationSyncMode(),
streamConfig.columns().containsKey(cdcDeletedAtColumn),
rawTableCondition(streamConfig.getDestinationSyncMode(),
streamConfig.getColumns().containsKey(cdcDeletedAtColumn),
minRawTimestamp),
useExpensiveSaferCasting));
final List<Field<?>> finalTableFields = buildFinalTableFields(streamConfig.columns(), getFinalTableMetaColumns(true));
final Field<Integer> rowNumber = getRowNumber(streamConfig.primaryKey(), streamConfig.cursor());
final List<Field<?>> finalTableFields = buildFinalTableFields(streamConfig.getColumns(), getFinalTableMetaColumns(true));
final Field<Integer> rowNumber = getRowNumber(streamConfig.getPrimaryKey(), streamConfig.getCursor());
final CommonTableExpression<Record> filteredRows = name(NUMBERED_ROWS_CTE_ALIAS).as(
select(asterisk(), rowNumber).from(rawTableRowsWithCast));

// Used for append-dedupe mode.
final String insertStmtWithDedupe =
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
insertIntoFinalTable(finalSchema, finalTable, streamConfig.getColumns(), getFinalTableMetaColumns(true))
.select(with(rawTableRowsWithCast)
.with(filteredRows)
.select(finalTableFields)
Expand All @@ -383,17 +383,17 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,

// Used for append and overwrite modes.
final String insertStmt =
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
insertIntoFinalTable(finalSchema, finalTable, streamConfig.getColumns(), getFinalTableMetaColumns(true))
.select(with(rawTableRowsWithCast)
.select(finalTableFields)
.from(rawTableRowsWithCast))
.getSQL(ParamType.INLINED);
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.primaryKey(), streamConfig.cursor());
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.getPrimaryKey(), streamConfig.getCursor());
final String deleteCdcDeletesStmt =
streamConfig.columns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
streamConfig.getColumns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
final String checkpointStmt = checkpointRawTable(rawSchema, rawTable, minRawTimestamp);

if (streamConfig.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) {
if (streamConfig.getDestinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) {
return transactionally(
insertStmt,
checkpointStmt);
Expand Down Expand Up @@ -470,7 +470,7 @@ private String deleteFromFinalTable(final String schemaName,
private String deleteFromFinalTableCdcDeletes(final String schema, final String tableName) {
final DSLContext dsl = getDslContext();
return dsl.deleteFrom(table(quotedName(schema, tableName)))
.where(field(quotedName(cdcDeletedAtColumn.name())).isNotNull())
.where(field(quotedName(cdcDeletedAtColumn.getName())).isNotNull())
.getSQL(ParamType.INLINED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransf

@SneakyThrows
@Override
protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) {
public boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) {
final String retrievedSchema = database.executeMetadataQuery(dbMetadata -> {
try (ResultSet columns = dbMetadata.getSchemas(databaseName, streamConfig.id().rawNamespace())) {
try (ResultSet columns = dbMetadata.getSchemas(databaseName, streamConfig.getId().getRawNamespace())) {
String schema = "";
while (columns.next()) {
// Catalog can be null, so don't do anything with it.
Expand All @@ -54,22 +54,22 @@ protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamCon
}

@Override
protected boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection<String> columns) {
public boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection<String> columns) {
return existingTable.columns().keySet().containsAll(columns);
}

@SneakyThrows
@Override
protected Optional<TableDefinition> getTableIfExists(final String namespace, final String tableName) throws Exception {
public Optional<TableDefinition> getTableIfExists(final String namespace, final String tableName) throws Exception {
return JdbcDestinationHandler.findExistingTable(database, databaseName, namespace, tableName);
}

@Override
protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) {
public NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) {
@SuppressWarnings("deprecation")
final String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.id().originalName());
final String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.getId().getOriginalName());
return new NamespacedTableName(
this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()),
this.namingConventionTransformer.getIdentifier(streamConfig.getId().getOriginalNamespace()),
tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(fina
final String outputSchema;
final String tableName;
if (useDestinationsV2Columns) {
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).id();
outputSchema = streamId.rawNamespace();
tableName = streamId.rawName();
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).getId();
outputSchema = streamId.getRawNamespace();
tableName = streamId.getRawName();
} else {
outputSchema = getOutputSchema(abStream, config.get("schema").asText(), namingResolver);
tableName = namingResolver.getRawTableName(streamName);
Expand Down
Loading
Loading