Skip to content

Commit

Permalink
redshift_catalog_parser_update
Browse files Browse the repository at this point in the history
  • Loading branch information
gisripa authored and edgao committed May 10, 2024
1 parent 0724d77 commit 2cb49a4
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
airbyteJavaConnector {
cdkVersionRequired = '0.34.0'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
useLocalCdk = true
}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public SerializedAirbyteMessageConsumer getSerializedMessageConsumer(final JsonN
catalogParser = new CatalogParser(sqlGenerator, rawNamespace);
} else {
rawNamespace = JavaBaseConstants.DEFAULT_AIRBYTE_INTERNAL_NAMESPACE;
catalogParser = new CatalogParser(sqlGenerator);
catalogParser = new CatalogParser(sqlGenerator, defaultNamespace);
}
final RedshiftDestinationHandler redshiftDestinationHandler = new RedshiftDestinationHandler(databaseName, database, rawNamespace);
parsedCatalog = catalogParser.parseCatalog(catalog);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,19 @@ public void setup() {
columns.put(redshiftSqlGenerator.buildColumnId("_ab_cdc_deleted_at"), AirbyteProtocolType.TIMESTAMP_WITH_TIMEZONE);
incrementalDedupStream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
columns);
columns,
0,
0,
0);
incrementalAppendStream = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND,
primaryKey,
Optional.of(cursor),
columns);
columns, 0, 0, 0);
}

@Test
Expand Down Expand Up @@ -132,11 +133,10 @@ public void test2000ColumnSql() {
}
final Sql generatedSql = redshiftSqlGenerator.updateTable(new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.of(cursor),
columns), "unittest", Optional.of(Instant.parse("2023-02-15T18:35:24.00Z")), false);
columns, 0, 0, 0), "unittest", Optional.of(Instant.parse("2023-02-15T18:35:24.00Z")), false);
// This should not throw an exception.
assertFalse(generatedSql.transactions().isEmpty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,10 @@ public void setup() {
final StreamId streamId = new StreamId("test_schema", "users_final", "test_schema", "users_raw", "test_schema", "users_final");
StreamConfig streamConfig = new StreamConfig(
streamId,
SyncMode.INCREMENTAL,
DestinationSyncMode.APPEND_DEDUP,
primaryKey,
Optional.empty(),
columns);
columns,0 ,0, 0);
final ParsedCatalog parsedCatalog = new ParsedCatalog(List.of(streamConfig));
transformer = new RedshiftSuperLimitationTransformer(parsedCatalog, "test_schema");
}
Expand Down

0 comments on commit 2cb49a4

Please sign in to comment.