Skip to content

Commit

Permalink
Updated query to use TOP instead of LIMIT
Browse files Browse the repository at this point in the history
  • Loading branch information
nguyenaiden authored and rodireich committed Jan 12, 2024
1 parent 7b4dfea commit dbe4d60
Show file tree
Hide file tree
Showing 3 changed files with 3 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ static boolean isCdc(final JsonNode config) {
return false;
}

static Properties getDebeziumProperties(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final boolean isSnapshot) {
public static Properties getDebeziumProperties(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final boolean isSnapshot) {
final JsonNode config = database.getSourceConfig();
final JsonNode dbConfig = database.getDatabaseConfig();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class MssqlDebeziumConverter implements CustomConverter<SchemaBuilder, Re
private static final String SMALLMONEY_TYPE = "SMALLMONEY";
private static final String GEOMETRY = "GEOMETRY";
private static final String GEOGRAPHY = "GEOGRAPHY";
private static final String DEBEZIUM_DATETIMEOFFSET_FORMAT = "yyyy-MM-dd HH:mm:ss XXX";
private static final String DEBEZIUM_DATETIMEOFFSET_FORMAT = "yyyy-MM-dd HH:mm:ss[.][SSSSSSS] XXX";

private static final String DATETIME_FORMAT_MICROSECONDS = "yyyy-MM-dd'T'HH:mm:ss[.][SSSSSS]";

Expand Down Expand Up @@ -144,6 +144,7 @@ private void registerDateTimeOffSet(final RelationalColumn field,
}

if (input instanceof DateTimeOffset) {
LOGGER.info("*** input:{}", input);
return DataTypeUtils.toISO8601String(
OffsetDateTime.parse(input.toString(),
DateTimeFormatter.ofPattern(DEBEZIUM_DATETIMEOFFSET_FORMAT)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,8 +470,6 @@ public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final
final Map<String, TableInfo<CommonField<JDBCType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
// rewire snapshot here
// Duy add user cursor here
final JsonNode sourceConfig = database.getSourceConfig();
if (MssqlCdcHelper.isCdc(sourceConfig) && isAnyStreamIncrementalSyncMode(catalog)) {
LOGGER.info("using OC + CDC");
Expand Down

0 comments on commit dbe4d60

Please sign in to comment.