Skip to content

Commit

Permalink
fix compiler errors
Browse files Browse the repository at this point in the history
  • Loading branch information
stephane-airbyte committed Mar 27, 2024
1 parent 7809e0d commit 834efbb
Show file tree
Hide file tree
Showing 52 changed files with 3,941 additions and 2,437 deletions.
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.28.8
version=0.28.9
Original file line number Diff line number Diff line change
Expand Up @@ -96,23 +96,23 @@ protected DSLContext getDslContext() {
}

private Optional<TableDefinition> findExistingTable(final StreamId id) throws Exception {
return findExistingTable(jdbcDatabase, databaseName, id.finalNamespace(), id.finalName());
return findExistingTable(jdbcDatabase, databaseName, id.getFinalNamespace(), id.getFinalName());
}

private boolean isFinalTableEmpty(final StreamId id) throws Exception {
return !jdbcDatabase.queryBoolean(
getDslContext().select(
field(exists(
selectOne()
.from(name(id.finalNamespace(), id.finalName()))
.from(name(id.getFinalNamespace(), id.getFinalName()))
.limit(1))))
.getSQL(ParamType.INLINED));
}

private InitialRawTableStatus getInitialRawTableState(final StreamId id) throws Exception {
boolean tableExists = jdbcDatabase.executeMetadataQuery(dbmetadata -> {
LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.rawNamespace(), id.rawName());
try (final ResultSet table = dbmetadata.getTables(databaseName, id.rawNamespace(), id.rawName(), null)) {
LOGGER.info("Retrieving table from Db metadata: {} {} {}", databaseName, id.getRawNamespace(), id.getRawName());
try (final ResultSet table = dbmetadata.getTables(databaseName, id.getRawNamespace(), id.getRawName(), null)) {
return table.next();
} catch (SQLException e) {
LOGGER.error("Failed to retrieve table info from metadata", e);
Expand All @@ -131,7 +131,7 @@ private InitialRawTableStatus getInitialRawTableState(final StreamId id) throws
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
getDslContext().select(field("MIN(_airbyte_extracted_at)").as("min_timestamp"))
.from(name(id.rawNamespace(), id.rawName()))
.from(name(id.getRawNamespace(), id.getRawName()))
.where(DSL.condition("_airbyte_loaded_at IS NULL"))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
Expand All @@ -150,7 +150,7 @@ record -> record.getTimestamp("min_timestamp"))) {
try (final Stream<Timestamp> timestampStream = jdbcDatabase.unsafeQuery(
conn -> conn.prepareStatement(
getDslContext().select(field("MAX(_airbyte_extracted_at)").as("min_timestamp"))
.from(name(id.rawNamespace(), id.rawName()))
.from(name(id.getRawNamespace(), id.getRawName()))
.getSQL()),
record -> record.getTimestamp("min_timestamp"))) {
// Filter for nonNull values in case the query returned NULL (i.e. no raw records at all).
Expand All @@ -161,7 +161,7 @@ record -> record.getTimestamp("min_timestamp"))) {

@Override
public void execute(final Sql sql) throws Exception {
final List<List<String>> transactions = sql.transactions();
final List<List<String>> transactions = sql.transactions;
final UUID queryId = UUID.randomUUID();
for (final List<String> transaction : transactions) {
final UUID transactionId = UUID.randomUUID();
Expand Down Expand Up @@ -255,20 +255,20 @@ private CompletionStage<DestinationInitialStatus<DestinationState>> retrieveStat
final StreamConfig streamConfig) {
return destinationStatesFuture.thenApply(destinationStates -> {
try {
final Optional<TableDefinition> finalTableDefinition = findExistingTable(streamConfig.id());
final Optional<TableDefinition> finalTableDefinition = findExistingTable(streamConfig.getId());
final boolean isSchemaMismatch;
final boolean isFinalTableEmpty;
if (finalTableDefinition.isPresent()) {
isSchemaMismatch = !existingSchemaMatchesStreamConfig(streamConfig, finalTableDefinition.get());
isFinalTableEmpty = isFinalTableEmpty(streamConfig.id());
isFinalTableEmpty = isFinalTableEmpty(streamConfig.getId());
} else {
// If the final table doesn't exist, then by definition it doesn't have a schema mismatch and has no
// records.
isSchemaMismatch = false;
isFinalTableEmpty = true;
}
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.id());
DestinationState destinationState = destinationStates.getOrDefault(streamConfig.id().asPair(), toDestinationState(Jsons.emptyObject()));
final InitialRawTableStatus initialRawTableState = getInitialRawTableState(streamConfig.getId());
DestinationState destinationState = destinationStates.getOrDefault(streamConfig.getId().asPair(), toDestinationState(Jsons.emptyObject()));
return new DestinationInitialStatus<>(streamConfig, finalTableDefinition.isPresent(), initialRawTableState,
isSchemaMismatch, isFinalTableEmpty, destinationState);
} catch (Exception e) {
Expand Down Expand Up @@ -337,9 +337,9 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
// Missing AB meta columns from final table, we need them to do proper T+D so trigger soft-reset
return false;
}
final LinkedHashMap<String, String> intendedColumns = stream.columns().entrySet().stream()
final LinkedHashMap<String, String> intendedColumns = stream.getColumns().entrySet().stream()
.collect(LinkedHashMap::new,
(map, column) -> map.put(column.getKey().name(), toJdbcTypeName(column.getValue())),
(map, column) -> map.put(column.getKey().getName(), toJdbcTypeName(column.getValue())),
LinkedHashMap::putAll);

// Filter out Meta columns since they don't exist in stream config.
Expand All @@ -354,16 +354,16 @@ protected boolean existingSchemaMatchesStreamConfig(final StreamConfig stream, f
}

@Override
public void commitDestinationStates(final Map<StreamId, DestinationState> destinationStates) throws Exception {
public void commitDestinationStates(final Map<StreamId, ? extends DestinationState> destinationStates) throws Exception {
if (destinationStates.isEmpty()) {
return;
}

// Delete all state records where the stream name+namespace match one of our states
String deleteStates = getDslContext().deleteFrom(table(quotedName(rawTableSchemaName, DESTINATION_STATE_TABLE_NAME)))
.where(destinationStates.keySet().stream()
.map(streamId -> field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)).eq(streamId.originalName())
.and(field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)).eq(streamId.originalNamespace())))
.map(streamId -> field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAME)).eq(streamId.getOriginalName())
.and(field(quotedName(DESTINATION_STATE_TABLE_COLUMN_NAMESPACE)).eq(streamId.getOriginalNamespace())))
.reduce(
DSL.falseCondition(),
Condition::or))
Expand All @@ -381,10 +381,11 @@ public void commitDestinationStates(final Map<StreamId, DestinationState> destin
// and assume the destination can cast it appropriately.
// Destination-specific timestamp syntax is weird and annoying.
field(quotedName(DESTINATION_STATE_TABLE_COLUMN_UPDATED_AT), String.class));
for (Map.Entry<StreamId, DestinationState> destinationState : destinationStates.entrySet()) {
for (Map.Entry<StreamId, ? extends DestinationState> destinationState : destinationStates.entrySet()) {
final StreamId streamId = destinationState.getKey();
final String stateJson = Jsons.serialize(destinationState.getValue());
insertStatesStep = insertStatesStep.values(streamId.originalName(), streamId.originalNamespace(), stateJson, OffsetDateTime.now().toString());
insertStatesStep =
insertStatesStep.values(streamId.getOriginalName(), streamId.getOriginalNamespace(), stateJson, OffsetDateTime.now().toString());
}
String insertStates = insertStatesStep.getSQL(ParamType.INLINED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ List<Field<?>> buildFinalTableFields(final LinkedHashMap<ColumnId, AirbyteType>
final List<Field<?>> fields =
metaColumns.entrySet().stream().map(metaColumn -> field(quotedName(metaColumn.getKey()), metaColumn.getValue())).collect(toList());
final List<Field<?>> dataFields =
columns.entrySet().stream().map(column -> field(quotedName(column.getKey().name()), toDialectType(column.getValue()))).collect(
columns.entrySet().stream().map(column -> field(quotedName(column.getKey().getName()), toDialectType(column.getValue()))).collect(
toList());
dataFields.addAll(fields);
return dataFields;
Expand Down Expand Up @@ -258,16 +258,16 @@ public Sql createSchema(final String schema) {
@Override
public Sql createTable(final StreamConfig stream, final String suffix, final boolean force) {
// TODO: Use Naming transformer to sanitize these strings with redshift restrictions.
final String finalTableIdentifier = stream.id().finalName() + suffix.toLowerCase();
final String finalTableIdentifier = stream.getId().getFinalName() + suffix.toLowerCase();
if (!force) {
return transactionally(Stream.concat(
Stream.of(createTableSql(stream.id().finalNamespace(), finalTableIdentifier, stream.columns())),
Stream.of(createTableSql(stream.getId().getFinalNamespace(), finalTableIdentifier, stream.getColumns())),
createIndexSql(stream, suffix).stream()).toList());
}
return transactionally(Stream.concat(
Stream.of(
dropTableIfExists(quotedName(stream.id().finalNamespace(), finalTableIdentifier)).getSQL(ParamType.INLINED),
createTableSql(stream.id().finalNamespace(), finalTableIdentifier, stream.columns())),
dropTableIfExists(quotedName(stream.getId().getFinalNamespace(), finalTableIdentifier)).getSQL(ParamType.INLINED),
createTableSql(stream.getId().getFinalNamespace(), finalTableIdentifier, stream.getColumns())),
createIndexSql(stream, suffix).stream()).toList());
}

Expand All @@ -285,18 +285,18 @@ public Sql updateTable(final StreamConfig streamConfig,
@Override
public Sql overwriteFinalTable(final StreamId stream, final String finalSuffix) {
return transactionally(
dropTableIfExists(name(stream.finalNamespace(), stream.finalName())).getSQL(ParamType.INLINED),
alterTable(name(stream.finalNamespace(), stream.finalName() + finalSuffix))
.renameTo(name(stream.finalName()))
dropTableIfExists(name(stream.getFinalNamespace(), stream.getFinalName())).getSQL(ParamType.INLINED),
alterTable(name(stream.getFinalNamespace(), stream.getFinalName() + finalSuffix))
.renameTo(name(stream.getFinalName()))
.getSQL());
}

@Override
public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, final String tableName) {
final Name rawTableName = name(streamId.rawNamespace(), streamId.rawName());
final Name rawTableName = name(streamId.getRawNamespace(), streamId.getRawName());
final DSLContext dsl = getDslContext();
return transactionally(
dsl.createSchemaIfNotExists(streamId.rawNamespace()).getSQL(),
dsl.createSchemaIfNotExists(streamId.getRawNamespace()).getSQL(),
dsl.dropTableIfExists(rawTableName).getSQL(),
DSL.createTable(rawTableName)
.column(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false))
Expand All @@ -315,7 +315,7 @@ public Sql migrateFromV1toV2(final StreamId streamId, final String namespace, fi

@Override
public Sql clearLoadedAt(final StreamId streamId) {
return Sql.of(update(table(name(streamId.rawNamespace(), streamId.rawName())))
return Sql.of(update(table(name(streamId.getRawNamespace(), streamId.getRawName())))
.set(field(COLUMN_NAME_AB_LOADED_AT), inline((String) null))
.getSQL());
}
Expand Down Expand Up @@ -350,28 +350,28 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,
final String finalSuffix,
final Optional<Instant> minRawTimestamp,
final boolean useExpensiveSaferCasting) {
final String finalSchema = streamConfig.id().finalNamespace();
final String finalTable = streamConfig.id().finalName() + (finalSuffix != null ? finalSuffix.toLowerCase() : "");
final String rawSchema = streamConfig.id().rawNamespace();
final String rawTable = streamConfig.id().rawName();
final String finalSchema = streamConfig.getId().getFinalNamespace();
final String finalTable = streamConfig.getId().getFinalName() + (finalSuffix != null ? finalSuffix.toLowerCase() : "");
final String rawSchema = streamConfig.getId().getRawNamespace();
final String rawTable = streamConfig.getId().getRawName();

// Poor person's guarantee of ordering of fields by using same source of ordered list of columns to
// generate fields.
final CommonTableExpression<Record> rawTableRowsWithCast = name(TYPING_CTE_ALIAS).as(
selectFromRawTable(rawSchema, rawTable, streamConfig.columns(),
selectFromRawTable(rawSchema, rawTable, streamConfig.getColumns(),
getFinalTableMetaColumns(false),
rawTableCondition(streamConfig.destinationSyncMode(),
streamConfig.columns().containsKey(cdcDeletedAtColumn),
rawTableCondition(streamConfig.getDestinationSyncMode(),
streamConfig.getColumns().containsKey(cdcDeletedAtColumn),
minRawTimestamp),
useExpensiveSaferCasting));
final List<Field<?>> finalTableFields = buildFinalTableFields(streamConfig.columns(), getFinalTableMetaColumns(true));
final Field<Integer> rowNumber = getRowNumber(streamConfig.primaryKey(), streamConfig.cursor());
final List<Field<?>> finalTableFields = buildFinalTableFields(streamConfig.getColumns(), getFinalTableMetaColumns(true));
final Field<Integer> rowNumber = getRowNumber(streamConfig.getPrimaryKey(), streamConfig.getCursor());
final CommonTableExpression<Record> filteredRows = name(NUMBERED_ROWS_CTE_ALIAS).as(
select(asterisk(), rowNumber).from(rawTableRowsWithCast));

// Used for append-dedupe mode.
final String insertStmtWithDedupe =
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
insertIntoFinalTable(finalSchema, finalTable, streamConfig.getColumns(), getFinalTableMetaColumns(true))
.select(with(rawTableRowsWithCast)
.with(filteredRows)
.select(finalTableFields)
Expand All @@ -383,17 +383,17 @@ private Sql insertAndDeleteTransaction(final StreamConfig streamConfig,

// Used for append and overwrite modes.
final String insertStmt =
insertIntoFinalTable(finalSchema, finalTable, streamConfig.columns(), getFinalTableMetaColumns(true))
insertIntoFinalTable(finalSchema, finalTable, streamConfig.getColumns(), getFinalTableMetaColumns(true))
.select(with(rawTableRowsWithCast)
.select(finalTableFields)
.from(rawTableRowsWithCast))
.getSQL(ParamType.INLINED);
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.primaryKey(), streamConfig.cursor());
final String deleteStmt = deleteFromFinalTable(finalSchema, finalTable, streamConfig.getPrimaryKey(), streamConfig.getCursor());
final String deleteCdcDeletesStmt =
streamConfig.columns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
streamConfig.getColumns().containsKey(cdcDeletedAtColumn) ? deleteFromFinalTableCdcDeletes(finalSchema, finalTable) : "";
final String checkpointStmt = checkpointRawTable(rawSchema, rawTable, minRawTimestamp);

if (streamConfig.destinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) {
if (streamConfig.getDestinationSyncMode() != DestinationSyncMode.APPEND_DEDUP) {
return transactionally(
insertStmt,
checkpointStmt);
Expand Down Expand Up @@ -470,7 +470,7 @@ private String deleteFromFinalTable(final String schemaName,
private String deleteFromFinalTableCdcDeletes(final String schema, final String tableName) {
final DSLContext dsl = getDslContext();
return dsl.deleteFrom(table(quotedName(schema, tableName)))
.where(field(quotedName(cdcDeletedAtColumn.name())).isNotNull())
.where(field(quotedName(cdcDeletedAtColumn.getName())).isNotNull())
.getSQL(ParamType.INLINED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ public JdbcV1V2Migrator(final NamingConventionTransformer namingConventionTransf

@SneakyThrows
@Override
protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) {
public boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamConfig) {
final String retrievedSchema = database.executeMetadataQuery(dbMetadata -> {
try (ResultSet columns = dbMetadata.getSchemas(databaseName, streamConfig.id().rawNamespace())) {
try (ResultSet columns = dbMetadata.getSchemas(databaseName, streamConfig.getId().getRawNamespace())) {
String schema = "";
while (columns.next()) {
// Catalog can be null, so don't do anything with it.
Expand All @@ -54,22 +54,22 @@ protected boolean doesAirbyteInternalNamespaceExist(final StreamConfig streamCon
}

@Override
protected boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection<String> columns) {
public boolean schemaMatchesExpectation(final TableDefinition existingTable, final Collection<String> columns) {
return existingTable.columns().keySet().containsAll(columns);
}

@SneakyThrows
@Override
protected Optional<TableDefinition> getTableIfExists(final String namespace, final String tableName) throws Exception {
public Optional<TableDefinition> getTableIfExists(final String namespace, final String tableName) throws Exception {
return JdbcDestinationHandler.findExistingTable(database, databaseName, namespace, tableName);
}

@Override
protected NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) {
public NamespacedTableName convertToV1RawName(final StreamConfig streamConfig) {
@SuppressWarnings("deprecation")
final String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.id().originalName());
final String tableName = this.namingConventionTransformer.getRawTableName(streamConfig.getId().getOriginalName());
return new NamespacedTableName(
this.namingConventionTransformer.getIdentifier(streamConfig.id().originalNamespace()),
this.namingConventionTransformer.getIdentifier(streamConfig.getId().getOriginalNamespace()),
tableName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ private static Function<ConfiguredAirbyteStream, WriteConfig> toWriteConfig(fina
final String outputSchema;
final String tableName;
if (useDestinationsV2Columns) {
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).id();
outputSchema = streamId.rawNamespace();
tableName = streamId.rawName();
final StreamId streamId = parsedCatalog.getStream(abStream.getNamespace(), streamName).getId();
outputSchema = streamId.getRawNamespace();
tableName = streamId.getRawName();
} else {
outputSchema = getOutputSchema(abStream, config.get("schema").asText(), namingResolver);
tableName = namingResolver.getRawTableName(streamName);
Expand Down
Loading

0 comments on commit 834efbb

Please sign in to comment.