Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🐞 Postgres source: fix first record wait time parsing bug #15273

Merged
merged 9 commits into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.41
LABEL io.airbyte.version=0.4.42
LABEL io.airbyte.name=airbyte/source-postgres-strict-encrypt
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@ ENV APPLICATION source-postgres

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.4.41
LABEL io.airbyte.version=0.4.42
LABEL io.airbyte.name=airbyte/source-postgres
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
}

});

checkOperations.add(database -> {
PostgresUtils.checkFirstRecordWaitTime(config);
});
}

return checkOperations;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import java.time.Duration;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,19 +31,40 @@ public static boolean isCdc(final JsonNode config) {
return isCdc;
}

public static Optional<Integer> getFirstRecordWaitSeconds(final JsonNode config) {
final JsonNode replicationMethod = config.get("replication_method");
if (replicationMethod != null && replicationMethod.has("initial_waiting_seconds")) {
final int seconds = config.get("replication_method").get("initial_waiting_seconds").asInt();
return Optional.of(seconds);
}
return Optional.empty();
}

public static void checkFirstRecordWaitTime(final JsonNode config) {
final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
if (firstRecordWaitSeconds.isPresent() && firstRecordWaitSeconds.get() > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) {
throw new IllegalArgumentException(String.format(
"Initial waiting seconds cannot be larger than %d seconds for safety.",
MAX_FIRST_RECORD_WAIT_TIME.getSeconds()));
}
}

public static Duration getFirstRecordWaitTime(final JsonNode config) {
if (config.has("initial_waiting_seconds")) {
final int seconds = config.get("initial_waiting_seconds").asInt();
if (seconds > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) {
Duration firstRecordWaitTime = DEFAULT_FIRST_RECORD_WAIT_TIME;

final Optional<Integer> firstRecordWaitSeconds = getFirstRecordWaitSeconds(config);
if (firstRecordWaitSeconds.isPresent()) {
tuliren marked this conversation as resolved.
Show resolved Hide resolved
if (firstRecordWaitSeconds.get() > MAX_FIRST_RECORD_WAIT_TIME.getSeconds()) {
LOGGER.warn("First record waiting time is overridden to {} minutes, which is the max time allowed for safety.",
tuliren marked this conversation as resolved.
Show resolved Hide resolved
MAX_FIRST_RECORD_WAIT_TIME.toMinutes());
return MAX_FIRST_RECORD_WAIT_TIME;
firstRecordWaitTime = MAX_FIRST_RECORD_WAIT_TIME;
} else {
firstRecordWaitTime = Duration.ofSeconds(firstRecordWaitSeconds.get());
}

return Duration.ofSeconds(seconds);
}

return DEFAULT_FIRST_RECORD_WAIT_TIME;
LOGGER.info("First record waiting time: {} seconds", firstRecordWaitTime.getSeconds());
return firstRecordWaitTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package io.airbyte.integrations.source.postgres;

import static io.airbyte.integrations.source.postgres.PostgresUtils.MAX_FIRST_RECORD_WAIT_TIME;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import com.fasterxml.jackson.databind.JsonNode;
Expand All @@ -16,6 +18,7 @@
import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.junit.jupiter.api.Test;

class PostgresUtilsTest {
Expand All @@ -34,12 +37,21 @@ void testIsCdc() {
@Test
void testGetFirstRecordWaitTime() {
final JsonNode emptyConfig = Jsons.jsonNode(Collections.emptyMap());
assertDoesNotThrow(() -> PostgresUtils.checkFirstRecordWaitTime(emptyConfig));
assertEquals(Optional.empty(), PostgresUtils.getFirstRecordWaitSeconds(emptyConfig));
assertEquals(PostgresUtils.DEFAULT_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(emptyConfig));

final JsonNode normalConfig = Jsons.jsonNode(Map.of("initial_waiting_seconds", 500));
final JsonNode normalConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", 500)));
assertDoesNotThrow(() -> PostgresUtils.checkFirstRecordWaitTime(normalConfig));
assertEquals(Optional.of(500), PostgresUtils.getFirstRecordWaitSeconds(normalConfig));
assertEquals(Duration.ofSeconds(500), PostgresUtils.getFirstRecordWaitTime(normalConfig));

final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("initial_waiting_seconds", MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100));
final int tooLongTimout = (int) MAX_FIRST_RECORD_WAIT_TIME.getSeconds() + 100;
final JsonNode tooLongConfig = Jsons.jsonNode(Map.of("replication_method",
Map.of("method", "CDC", "initial_waiting_seconds", tooLongTimout)));
assertThrows(IllegalArgumentException.class, () -> PostgresUtils.checkFirstRecordWaitTime(tooLongConfig));
assertEquals(Optional.of(tooLongTimout), PostgresUtils.getFirstRecordWaitSeconds(tooLongConfig));
assertEquals(MAX_FIRST_RECORD_WAIT_TIME, PostgresUtils.getFirstRecordWaitTime(tooLongConfig));
}

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,14 +354,15 @@ Possible solutions include:

| Version | Date | Pull Request | Subject |
|:--------|:-----------|:---------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------|
| 0.4.42 | 2022-08-03 | [15273](https://github.com/airbytehq/airbyte/pull/15273) | Fix a bug in `0.4.36` and correctly parse the CDC initial record waiting time |
| 0.4.41 | 2022-08-03 | [15077](https://github.com/airbytehq/airbyte/pull/15077) | Sync data from beginning if the LSN is no longer valid in CDC |
| | 2022-08-03 | [14903](https://github.com/airbytehq/airbyte/pull/14903) | Emit state messages more frequently |
| 0.4.40 | 2022-08-03 | [15187](https://github.com/airbytehq/airbyte/pull/15187) | Add support for BCE dates/timestamps |
| | 2022-08-03 | [14534](https://github.com/airbytehq/airbyte/pull/14534) | Align regular and CDC integration tests and data mappers |
| 0.4.39 | 2022-08-02 | [14801](https://github.com/airbytehq/airbyte/pull/14801) | Fix multiply log bindings |
| 0.4.38 | 2022-07-26 | [14362](https://github.com/airbytehq/airbyte/pull/14362) | Integral columns are now discovered as int64 fields. |
| 0.4.37 | 2022-07-22 | [14714](https://github.com/airbytehq/airbyte/pull/14714) | Clarified error message when invalid cursor column selected |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable |
| 0.4.36 | 2022-07-21 | [14451](https://github.com/airbytehq/airbyte/pull/14451) | Make initial CDC waiting time configurable (⛔ this version has a bug and will not work; use `0.4.42` instead) |
| 0.4.35 | 2022-07-14 | [14574](https://github.com/airbytehq/airbyte/pull/14574) | Removed additionalProperties:false from JDBC source connectors |
| 0.4.34 | 2022-07-17 | [13840](https://github.com/airbytehq/airbyte/pull/13840) | Added the ability to connect using different SSL modes and SSL certificates. |
| 0.4.33 | 2022-07-14 | [14586](https://github.com/airbytehq/airbyte/pull/14586) | Validate source JDBC url parameters |
Expand Down