From 7d279ec7129891ddee4fe52b09fd14b8aa32aff9 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 15:08:16 -0700 Subject: [PATCH 01/16] set per stream feature flag to true for testing --- .../io/airbyte/commons/features/EnvVariableFeatureFlags.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index 9991fd35c503..dec17b6efcca 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -31,7 +31,7 @@ public boolean forceSecretMigration() { @Override public boolean useStreamCapableState() { - return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, false, Boolean::parseBoolean); + return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, true, Boolean::parseBoolean); } // TODO: refactor in order to use the same method than the ones in EnvConfigs.java From 3d8bbc1fca23dd7922a37331c3868a0c77154fdb Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 17:13:49 -0700 Subject: [PATCH 02/16] add a second table to cdc acceptance tests --- .../test/acceptance/CdcAcceptanceTests.java | 73 ++++++++++++------- .../resources/postgres_init_cdc.sql | 30 +++++++- 2 files changed, 74 insertions(+), 29 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index f21f5693d754..17fdd5cf4c4f 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -69,6 +69,9 @@ record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt, private static final String SCHEMA_NAME = "public"; private static final String CDC_UPDATED_AT_COLUMN = "_ab_cdc_updated_at"; private static final String CDC_DELETED_AT_COLUMN = "_ab_cdc_deleted_at"; + private static final String ID_AND_NAME_TABLE = "id_and_name"; + private static final String COLOR_PALETTE_TABLE = "color_palette"; + private static final String COLUMN_COLOR = "color"; // version of the postgres destination connector that was built with the // old Airbyte protocol that does not contain any per-stream logic/fields @@ -126,12 +129,12 @@ public void testIncrementalCdcSync() throws Exception { LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); final Database source = testHarness.getSourceDatabase(); - List sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - List expectedDestRecordMatchers = new ArrayList<>(sourceRecords - .stream() - .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) - .toList()); - assertDestinationMatches(expectedDestRecordMatchers); + + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + + List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); final Instant beforeFirstUpdate = Instant.now(); @@ -143,24 +146,35 @@ public void testIncrementalCdcSync() throws Exception { // since this is a CDC connection, the destination should contain a record with this // new value and an updated_at time corresponding to this update query source.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2")); - - expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher( + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()), beforeFirstUpdate, Optional.empty())); - - expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher( + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_NAME, "yennefer").build()), beforeFirstUpdate, Optional.empty())); + // do the same for the other table + source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')")); + source.query(ctx -> ctx.execute("UPDATE color_palette SET color='purple' WHERE id=2")); + expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()), + beforeFirstUpdate, + Optional.empty())); + expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_COLOR, "purple").build()), + beforeFirstUpdate, + Optional.empty())); + LOGGER.info("Starting incremental cdc sync 2"); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - assertDestinationMatches(expectedDestRecordMatchers); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); // reset back to no data. @@ -170,7 +184,8 @@ public void testIncrementalCdcSync() throws Exception { LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - assertDestinationMatches(Collections.emptyList()); + assertDestinationMatches(ID_AND_NAME_TABLE, Collections.emptyList()); + assertDestinationMatches(COLOR_PALETTE_TABLE, Collections.emptyList()); // sync one more time. verify it is the equivalent of a full refresh. LOGGER.info("Starting incremental cdc sync 3"); @@ -179,13 +194,11 @@ public void testIncrementalCdcSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - expectedDestRecordMatchers = sourceRecords - .stream() - .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) - .toList(); + expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); - assertDestinationMatches(expectedDestRecordMatchers); + expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); } // tests that incremental syncs still work properly even when using a destination connector that was @@ -224,12 +237,8 @@ public void testDeleteRecordCdcSync() throws Exception { LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); final Database source = testHarness.getSourceDatabase(); - List sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME); - List expectedDestRecordMatchers = new ArrayList<>(sourceRecords - .stream() - .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) - .toList()); - assertDestinationMatches(expectedDestRecordMatchers); + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); final Instant beforeDelete = Instant.now(); @@ -240,7 +249,7 @@ public void testDeleteRecordCdcSync() throws Exception { Map deletedRecordMap = new HashMap<>(); deletedRecordMap.put(COLUMN_ID, 1); deletedRecordMap.put(COLUMN_NAME, null); - expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher( + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( Jsons.jsonNode(deletedRecordMap), beforeDelete, Optional.of(beforeDelete))); @@ -251,7 +260,15 @@ public void testDeleteRecordCdcSync() throws Exception { waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId))); - assertDestinationMatches(expectedDestRecordMatchers); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + } + + private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { + List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); + return new ArrayList<>(sourceRecords + .stream() + .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty())) + .toList()); } private UUID createCdcConnection() throws ApiException { @@ -298,8 +315,8 @@ private SourceRead createCdcSource() throws ApiException { Jsons.jsonNode(sourceDbConfigMap)); } - private void assertDestinationMatches(List expectedDestRecordMatchers) throws Exception { - final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, STREAM_NAME)); + private void assertDestinationMatches(String streamName, List expectedDestRecordMatchers) throws Exception { + final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, streamName)); if (destRecords.size() != expectedDestRecordMatchers.size()) { final String errorMessage = String.format( "The number of destination records %d does not match the expected number %d", diff --git a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql index 9434b4135eb4..a760c0cff425 100644 --- a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql +++ b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql @@ -32,15 +32,43 @@ INSERT 'john' ); +CREATE + TABLE + color_palette( + id INTEGER PRIMARY KEY, + color VARCHAR(200) + ); + +INSERT + INTO + color_palette( + id, + color + ) + VALUES( + 1, + 'red' + ), + ( + 2, + 'blue' + ), + ( + 3, + 'green' + ); + CREATE ROLE airbyte_role REPLICATION LOGIN; ALTER TABLE id_and_name REPLICA IDENTITY DEFAULT; +ALTER TABLE + color_palette REPLICA IDENTITY DEFAULT; CREATE PUBLICATION airbyte_publication FOR TABLE - id_and_name; + id_and_name, color_palette; SELECT pg_create_logical_replication_slot( From f18219f34679b0ca6cb37870425ebb5e6f695d27 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 18:42:08 -0700 Subject: [PATCH 03/16] add partial reset test --- .../test/acceptance/CdcAcceptanceTests.java | 146 +++++++++++++++++- .../resources/postgres_init_cdc.sql | 4 +- 2 files changed, 147 insertions(+), 3 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 17fdd5cf4c4f..7a57d7c3a1c7 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -6,28 +6,42 @@ import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_ID; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_NAME; -import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.STREAM_NAME; import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import io.airbyte.api.client.AirbyteApiClient; +import io.airbyte.api.client.generated.WebBackendApi; import io.airbyte.api.client.invoker.generated.ApiClient; import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.AirbyteStream; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; +import io.airbyte.api.client.model.generated.ConnectionRead; +import io.airbyte.api.client.model.generated.ConnectionState; +import io.airbyte.api.client.model.generated.ConnectionStateType; import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.DestinationDefinitionRead; import io.airbyte.api.client.model.generated.DestinationSyncMode; +import io.airbyte.api.client.model.generated.JobConfigType; import io.airbyte.api.client.model.generated.JobInfoRead; +import io.airbyte.api.client.model.generated.JobListRequestBody; +import io.airbyte.api.client.model.generated.JobRead; +import io.airbyte.api.client.model.generated.JobStatus; +import io.airbyte.api.client.model.generated.JobWithAttemptsRead; +import io.airbyte.api.client.model.generated.OperationRead; import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.SourceDefinitionRead; import io.airbyte.api.client.model.generated.SourceRead; +import io.airbyte.api.client.model.generated.StreamDescriptor; +import io.airbyte.api.client.model.generated.StreamState; import io.airbyte.api.client.model.generated.SyncMode; +import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate; +import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Database; import io.airbyte.test.utils.AirbyteAcceptanceTestHarness; @@ -35,16 +49,21 @@ import java.io.IOException; import java.net.URISyntaxException; import java.sql.SQLException; +import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.UUID; +import org.jooq.impl.DSL; +import org.jooq.impl.SQLDataType; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -78,7 +97,9 @@ record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt, private static final String POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION = "0.3.19"; private static AirbyteApiClient apiClient; + private static WebBackendApi webBackendApi; private static UUID workspaceId; + private static OperationRead operationRead; private AirbyteAcceptanceTestHarness testHarness; @@ -89,6 +110,11 @@ public static void init() throws URISyntaxException, IOException, InterruptedExc .setHost("localhost") .setPort(8001) .setBasePath("/api")); + webBackendApi = new WebBackendApi( + new ApiClient().setScheme("http") + .setHost("localhost") + .setPort(8001) + .setBasePath("/api")); // work in whatever default workspace is present. workspaceId = apiClient.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId(); LOGGER.info("workspaceId = " + workspaceId); @@ -136,6 +162,11 @@ public void testIncrementalCdcSync() throws Exception { List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + List expectedStreams = List.of( + new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE), + new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE)); + assertGlobalStateContainsStreams(connectionId, expectedStreams); + final Instant beforeFirstUpdate = Instant.now(); LOGGER.info("Inserting and updating source db records"); @@ -175,6 +206,7 @@ public void testIncrementalCdcSync() throws Exception { assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + assertGlobalStateContainsStreams(connectionId, expectedStreams); // reset back to no data. @@ -186,6 +218,7 @@ public void testIncrementalCdcSync() throws Exception { assertDestinationMatches(ID_AND_NAME_TABLE, Collections.emptyList()); assertDestinationMatches(COLOR_PALETTE_TABLE, Collections.emptyList()); + assertNoState(connectionId); // sync one more time. verify it is the equivalent of a full refresh. LOGGER.info("Starting incremental cdc sync 3"); @@ -199,6 +232,8 @@ public void testIncrementalCdcSync() throws Exception { expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + + assertGlobalStateContainsStreams(connectionId, expectedStreams); } // tests that incremental syncs still work properly even when using a destination connector that was @@ -263,6 +298,49 @@ public void testDeleteRecordCdcSync() throws Exception { assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); } + @Test + public void testSchemaUpdateResultsInPartialReset() throws Exception { + LOGGER.info("Starting partial reset cdc test"); + + final UUID connectionId = createCdcConnection(); + LOGGER.info("Starting partial reset cdc sync 1"); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + + final Database source = testHarness.getSourceDatabase(); + + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + + List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + + StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); + StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + + LOGGER.info("Removing color palette table"); + source.query(ctx -> ctx.dropTable(COLOR_PALETTE_TABLE).execute()); + + LOGGER.info("Refreshing schema and updating connection"); + final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + UUID sourceId = createCdcSource().getSourceId(); + AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); + LOGGER.info("Refreshed catalog: {}", refreshedCatalog); + WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead); + webBackendApi.webBackendUpdateConnection(update); + + LOGGER.info("Waiting for sync job after update to complete"); + JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // delete its data in the destination + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor)); + } + private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); return new ArrayList<>(sourceRecords @@ -276,7 +354,8 @@ private UUID createCdcConnection() throws ApiException { final UUID sourceId = sourceRead.getSourceId(); final UUID destinationId = testHarness.createDestination().getDestinationId(); - final UUID operationId = testHarness.createOperation().getOperationId(); + operationRead = testHarness.createOperation(); + final UUID operationId = operationRead.getOperationId(); final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); final AirbyteStream stream = catalog.getStreams().get(0).getStream(); LOGGER.info("stream: {}", stream); @@ -364,4 +443,67 @@ private void assertDestinationMatches(String streamName, List expectedStreams) throws ApiException { + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + LOGGER.info("state: {}", state); + assertEquals(ConnectionStateType.GLOBAL, state.getStateType()); + final List stateStreams = state.getGlobalState().getStreamStates().stream().map(StreamState::getStreamDescriptor).toList(); + + Assertions.assertTrue(stateStreams.containsAll(expectedStreams) && expectedStreams.containsAll(stateStreams), + String.format("Expected state to have streams %s, but it actually had streams %s", expectedStreams, stateStreams)); + } + + private void assertNoState(final UUID connectionId) throws ApiException { + final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)); + assertEquals(ConnectionStateType.NOT_SET, state.getStateType()); + assertNull(state.getState()); + assertNull(state.getStreamState()); + assertNull(state.getGlobalState()); + } + + // TODO: consolidate the below methods with those added in + // https://github.com/airbytehq/airbyte/pull/14406 + + private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { + return new WebBackendConnectionUpdate() + .connectionId(connection.getConnectionId()) + .name(connection.getName()) + .operationIds(connection.getOperationIds()) + .operations(List.of(new WebBackendOperationCreateOrUpdate() + .name(operation.getName()) + .operationId(operation.getOperationId()) + .workspaceId(operation.getWorkspaceId()) + .operatorConfiguration(operation.getOperatorConfiguration()))) + .namespaceDefinition(connection.getNamespaceDefinition()) + .namespaceFormat(connection.getNamespaceFormat()) + .syncCatalog(catalog) + .schedule(connection.getSchedule()) + .sourceCatalogId(connection.getSourceCatalogId()) + .status(connection.getStatus()) + .prefix(connection.getPrefix()); + } + + private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { + return apiClient.getJobsApi() + .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) + .getJobs() + .stream() + .max(Comparator.comparingLong(job -> job.getJob().getCreatedAt())) + .map(JobWithAttemptsRead::getJob).orElseThrow(); + } + + private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { + final JobRead lastJob = getMostRecentSyncJobId(connectionId); + if (lastJob.getStatus() != JobStatus.SUCCEEDED) { + return lastJob; + } + + JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + while (mostRecentSyncJob.getId().equals(lastJob.getId())) { + Thread.sleep(Duration.ofSeconds(2).toMillis()); + mostRecentSyncJob = getMostRecentSyncJobId(connectionId); + } + return mostRecentSyncJob; + } + } diff --git a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql index a760c0cff425..ce7c2da4a538 100644 --- a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql +++ b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql @@ -63,12 +63,14 @@ CREATE ALTER TABLE id_and_name REPLICA IDENTITY DEFAULT; + ALTER TABLE color_palette REPLICA IDENTITY DEFAULT; CREATE PUBLICATION airbyte_publication FOR TABLE - id_and_name, color_palette; + id_and_name, + color_palette; SELECT pg_create_logical_replication_slot( From 40bce5f84ffe12d44e618e8737dbf59b20eac3c7 Mon Sep 17 00:00:00 2001 From: lmossman Date: Fri, 8 Jul 2022 18:42:26 -0700 Subject: [PATCH 04/16] format --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 7a57d7c3a1c7..d4b51308b106 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -60,8 +60,6 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; -import org.jooq.impl.DSL; -import org.jooq.impl.SQLDataType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; From 9d031505839409c3628eb5cc2935699814560460 Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 11 Jul 2022 14:51:27 -0700 Subject: [PATCH 05/16] add partial reset cdc tests --- .../test/acceptance/CdcAcceptanceTests.java | 110 +++++++++++++++++- 1 file changed, 108 insertions(+), 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index d4b51308b106..cfea5f719833 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -20,6 +20,7 @@ import io.airbyte.api.client.invoker.generated.ApiException; import io.airbyte.api.client.model.generated.AirbyteCatalog; import io.airbyte.api.client.model.generated.AirbyteStream; +import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration; import io.airbyte.api.client.model.generated.ConnectionIdRequestBody; import io.airbyte.api.client.model.generated.ConnectionRead; import io.airbyte.api.client.model.generated.ConnectionState; @@ -59,7 +60,10 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.UUID; +import org.jooq.Record; +import org.jooq.Result; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; @@ -297,11 +301,11 @@ public void testDeleteRecordCdcSync() throws Exception { } @Test - public void testSchemaUpdateResultsInPartialReset() throws Exception { + public void testPartialResetFromSchemaUpdate() throws Exception { LOGGER.info("Starting partial reset cdc test"); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting partial reset cdc sync 1"); + LOGGER.info("Starting sync 1"); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -339,6 +343,82 @@ public void testSchemaUpdateResultsInPartialReset() throws Exception { assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor)); } + @Test + public void testPartialResetFromStreamSelection() throws Exception { + LOGGER.info("Starting partial reset cdc test"); + + // TODO: remove this logic to set source to dev once postgres source has been released with the CDC + // changes + LOGGER.info("Setting source connector to dev to test out partial CDC resets..."); + final UUID sourceDefinitionId = testHarness.getPostgresSourceDefinitionId(); + testHarness.updateSourceDefinitionVersion(sourceDefinitionId, "dev"); + + final UUID connectionId = createCdcConnection(); + LOGGER.info("Starting sync 1"); + + final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob()); + + final Database source = testHarness.getSourceDatabase(); + + List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + + List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + + StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE); + StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + + LOGGER.info("Removing color palette stream from configured catalog"); + final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + final UUID sourceId = connectionRead.getSourceId(); + AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId); + final List streams = catalog.getStreams(); + // filter out color_palette stream + final List updatedStreams = streams + .stream() + .filter(stream -> !stream.getStream().getName().equals(COLOR_PALETTE_TABLE)) + .toList(); + catalog.setStreams(updatedStreams); + LOGGER.info("Updated catalog: {}", catalog); + WebBackendConnectionUpdate update = getUpdateInput(connectionRead, catalog, operationRead); + webBackendApi.webBackendUpdateConnection(update); + + LOGGER.info("Waiting for sync job after update to start"); + JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + LOGGER.info("Waiting for sync job after update to complete"); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // We do not check that the source and the dest are in sync here because removing a stream doesn't + // delete its data in the destination + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor)); + + LOGGER.info("Adding color palette stream back to configured catalog"); + catalog = testHarness.discoverSourceSchema(sourceId); + LOGGER.info("Updated catalog: {}", catalog); + update = getUpdateInput(connectionRead, catalog, operationRead); + webBackendApi.webBackendUpdateConnection(update); + + LOGGER.info("Waiting for sync job after update to start"); + syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + LOGGER.info("Checking that id_and_name table is unaffected by the partial reset"); + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + LOGGER.info("Checking that color_palette table was cleared in the destination due to the reset triggered by the update"); + assertDestinationMatches(COLOR_PALETTE_TABLE, List.of()); + LOGGER.info("Waiting for sync job after update to complete"); + waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); + + // Verify that color palette table records exist in destination again after sync. + // If we see 0 records for this table in the destination, that means the CDC partial reset logic is + // not working properly, and it continued from the replication log cursor for this stream despite + // this stream's state being reset + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + } + private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { List sourceRecords = testHarness.retrieveSourceRecords(source, tableName); return new ArrayList<>(sourceRecords @@ -463,6 +543,13 @@ private void assertNoState(final UUID connectionId) throws ApiException { // https://github.com/airbytehq/airbyte/pull/14406 private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { + final SyncMode syncMode = SyncMode.INCREMENTAL; + final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; + catalog.getStreams().forEach(s -> s.getConfig() + .syncMode(syncMode) + .cursorField(List.of(COLUMN_ID)) + .destinationSyncMode(destinationSyncMode)); + return new WebBackendConnectionUpdate() .connectionId(connection.getConnectionId()) .name(connection.getName()) @@ -504,4 +591,23 @@ private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exc return mostRecentSyncJob; } + // can be helpful for debugging + private void printDbs() throws SQLException { + final Database sourceDb = testHarness.getSourceDatabase(); + Set pairs = testHarness.listAllTables(sourceDb); + LOGGER.info("Printing source tables"); + for (final SchemaTableNamePair pair : pairs) { + final Result result = sourceDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); + } + + final Database destDb = testHarness.getDestinationDatabase(); + pairs = testHarness.listAllTables(destDb); + LOGGER.info("Printing destination tables"); + for (final SchemaTableNamePair pair : pairs) { + final Result result = destDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName))); + LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result); + } + } + } From 628d50a1ec5d0af8c1d69b2545c38d77cf24c75e Mon Sep 17 00:00:00 2001 From: lmossman Date: Mon, 11 Jul 2022 16:20:21 -0700 Subject: [PATCH 06/16] test incremental after partial reset --- .../test/acceptance/CdcAcceptanceTests.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index cfea5f719833..04de6248b8b5 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -417,6 +417,30 @@ public void testPartialResetFromStreamSelection() throws Exception { // this stream's state being reset assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); + + // Verify that incremental still works properly after partial reset + LOGGER.info("Adding new records to tables"); + final Instant beforeInsert = Instant.now(); + source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')")); + expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()), + beforeInsert, + Optional.empty())); + + source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')")); + expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher( + Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()), + beforeInsert, + Optional.empty())); + + LOGGER.info("Starting sync after insert"); + final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() + .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); + waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); + + assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); + assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords); + assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor)); } private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException { From 92f4fc37d593f289c27dc9281d8f9f5dc1cfdbeb Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 12 Jul 2022 11:00:22 -0700 Subject: [PATCH 07/16] remove dev image from acceptance test --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 04de6248b8b5..9fd67568d0f2 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -347,12 +347,6 @@ public void testPartialResetFromSchemaUpdate() throws Exception { public void testPartialResetFromStreamSelection() throws Exception { LOGGER.info("Starting partial reset cdc test"); - // TODO: remove this logic to set source to dev once postgres source has been released with the CDC - // changes - LOGGER.info("Setting source connector to dev to test out partial CDC resets..."); - final UUID sourceDefinitionId = testHarness.getPostgresSourceDefinitionId(); - testHarness.updateSourceDefinitionVersion(sourceDefinitionId, "dev"); - final UUID connectionId = createCdcConnection(); LOGGER.info("Starting sync 1"); From e23e2201df0475b2324f8bcbdb482854f6fdcaf7 Mon Sep 17 00:00:00 2001 From: lmossman Date: Tue, 12 Jul 2022 13:56:05 -0700 Subject: [PATCH 08/16] fix flag and add comment --- .../test/acceptance/CdcAcceptanceTests.java | 15 +++++++++++++++ .../process/AirbyteIntegrationLauncherTest.java | 2 +- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 9fd67568d0f2..de3f44166032 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -69,9 +69,24 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +/** + * These tests test the CDC source behavior in Airbyte, ensuring that the behavior of syncs when in + * CDC mode is as expected + *

+ * Some of the tests in this class are specifically testing partial reset behavior when in CDC mode, + * support for which was recently added to the postgres connector. + *

+ * These tests are disabled in Kube, similar to the BasicAcceptanceTests, because they aren't + * testing any behavior that is specific to or dependent on this being run on kube vs docker. + * Therefore, since operations tend to take longer to perform on kube, there is little value in + * re-running these tests on kube when we already run them on docker. + */ +@DisabledIfEnvironmentVariable(named = "KUBE", + matches = "true") public class CdcAcceptanceTests { record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt, Optional minDeletedAt) {} diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java index fceedd214535..4b5b7b4a59d6 100644 --- a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java +++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java @@ -51,7 +51,7 @@ class AirbyteIntegrationLauncherTest { WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE, WorkerEnvConstants.WORKER_JOB_ID, JOB_ID, WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT), - EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(false)); + EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState())); private WorkerConfigs workerConfigs; @Mock From db4951b3c487d03255f8ba5f2e60b9324bb9c221 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 13:42:11 -0700 Subject: [PATCH 09/16] Revert "set per stream feature flag to true for testing" This reverts commit 164d7da05990268b09e315eb88ff297d3a9f52f4. --- .../io/airbyte/commons/features/EnvVariableFeatureFlags.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java index dec17b6efcca..9991fd35c503 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/features/EnvVariableFeatureFlags.java @@ -31,7 +31,7 @@ public boolean forceSecretMigration() { @Override public boolean useStreamCapableState() { - return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, true, Boolean::parseBoolean); + return getEnvOrDefault(USE_STREAM_CAPABLE_STATE, false, Boolean::parseBoolean); } // TODO: refactor in order to use the same method than the ones in EnvConfigs.java From 9adc47c33099a8fba46b8ef99d5f1fb0277505d5 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 13:43:59 -0700 Subject: [PATCH 10/16] set USE_STREAM_CAPABLE_STATE flag to true in acceptance test script --- tools/bin/acceptance_test.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/bin/acceptance_test.sh b/tools/bin/acceptance_test.sh index 978e941861d6..41ccdd74aaa1 100755 --- a/tools/bin/acceptance_test.sh +++ b/tools/bin/acceptance_test.sh @@ -9,7 +9,7 @@ assert_root echo "Starting app..." # Detach so we can run subsequent commands -VERSION=dev TRACKING_STRATEGY=logging docker-compose up -d +VERSION=dev TRACKING_STRATEGY=logging USE_STREAM_CAPABLE_STATE=true docker-compose up -d # Sometimes source/dest containers using airbyte volumes survive shutdown, which need to be killed in order to shut down properly. shutdown_cmd="docker-compose down -v || docker kill \$(docker ps -a -f volume=airbyte_workspace -f volume=airbyte_data -f volume=airbyte_db -q) && docker-compose down -v" From 2745cf44cb05efe49f6639464177013a3f5166b0 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 13:58:37 -0700 Subject: [PATCH 11/16] call new update endpoint --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index de3f44166032..bd526ab8042e 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -347,7 +347,7 @@ public void testPartialResetFromSchemaUpdate() throws Exception { AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Refreshed catalog: {}", refreshedCatalog); WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to complete"); JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); @@ -394,7 +394,7 @@ public void testPartialResetFromStreamSelection() throws Exception { catalog.setStreams(updatedStreams); LOGGER.info("Updated catalog: {}", catalog); WebBackendConnectionUpdate update = getUpdateInput(connectionRead, catalog, operationRead); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); @@ -409,7 +409,7 @@ public void testPartialResetFromStreamSelection() throws Exception { catalog = testHarness.discoverSourceSchema(sourceId); LOGGER.info("Updated catalog: {}", catalog); update = getUpdateInput(connectionRead, catalog, operationRead); - webBackendApi.webBackendUpdateConnection(update); + webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); From a8f2f87f0ae06884fde0dbd538490cd3ff2237dc Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 14:41:44 -0700 Subject: [PATCH 12/16] use methods in test harness instead --- .../test/acceptance/CdcAcceptanceTests.java | 29 ++----------------- 1 file changed, 3 insertions(+), 26 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index bd526ab8042e..7ee5723224ec 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -350,7 +350,7 @@ public void testPartialResetFromSchemaUpdate() throws Exception { webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to complete"); - JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); // We do not check that the source and the dest are in sync here because removing a stream doesn't @@ -397,7 +397,7 @@ public void testPartialResetFromStreamSelection() throws Exception { webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); - JobRead syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); LOGGER.info("Waiting for sync job after update to complete"); waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate); @@ -412,7 +412,7 @@ public void testPartialResetFromStreamSelection() throws Exception { webBackendApi.webBackendUpdateConnectionNew(update); LOGGER.info("Waiting for sync job after update to start"); - syncFromTheUpdate = waitUntilTheNextJobIsStarted(connectionId); + syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId); LOGGER.info("Checking that id_and_name table is unaffected by the partial reset"); assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords); LOGGER.info("Checking that color_palette table was cleared in the destination due to the reset triggered by the update"); @@ -601,29 +601,6 @@ private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connectio .prefix(connection.getPrefix()); } - private JobRead getMostRecentSyncJobId(final UUID connectionId) throws Exception { - return apiClient.getJobsApi() - .listJobsFor(new JobListRequestBody().configId(connectionId.toString()).configTypes(List.of(JobConfigType.SYNC))) - .getJobs() - .stream() - .max(Comparator.comparingLong(job -> job.getJob().getCreatedAt())) - .map(JobWithAttemptsRead::getJob).orElseThrow(); - } - - private JobRead waitUntilTheNextJobIsStarted(final UUID connectionId) throws Exception { - final JobRead lastJob = getMostRecentSyncJobId(connectionId); - if (lastJob.getStatus() != JobStatus.SUCCEEDED) { - return lastJob; - } - - JobRead mostRecentSyncJob = getMostRecentSyncJobId(connectionId); - while (mostRecentSyncJob.getId().equals(lastJob.getId())) { - Thread.sleep(Duration.ofSeconds(2).toMillis()); - mostRecentSyncJob = getMostRecentSyncJobId(connectionId); - } - return mostRecentSyncJob; - } - // can be helpful for debugging private void printDbs() throws SQLException { final Database sourceDb = testHarness.getSourceDatabase(); From 7e9daf0d9f76d8dc7f3cc9683cf33a4148308b18 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 15:17:37 -0700 Subject: [PATCH 13/16] remove comment --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 7ee5723224ec..2616d9b02309 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -572,9 +572,6 @@ private void assertNoState(final UUID connectionId) throws ApiException { assertNull(state.getGlobalState()); } - // TODO: consolidate the below methods with those added in - // https://github.com/airbytehq/airbyte/pull/14406 - private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) { final SyncMode syncMode = SyncMode.INCREMENTAL; final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND; From 2d6d3c721dfae1d8bc1497aaa221de0e0290e3ef Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 15:24:30 -0700 Subject: [PATCH 14/16] format --- .../java/io/airbyte/test/acceptance/CdcAcceptanceTests.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 2616d9b02309..97f817c2b1b7 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -28,12 +28,8 @@ import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.DestinationDefinitionRead; import io.airbyte.api.client.model.generated.DestinationSyncMode; -import io.airbyte.api.client.model.generated.JobConfigType; import io.airbyte.api.client.model.generated.JobInfoRead; -import io.airbyte.api.client.model.generated.JobListRequestBody; import io.airbyte.api.client.model.generated.JobRead; -import io.airbyte.api.client.model.generated.JobStatus; -import io.airbyte.api.client.model.generated.JobWithAttemptsRead; import io.airbyte.api.client.model.generated.OperationRead; import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody; import io.airbyte.api.client.model.generated.SourceDefinitionRead; @@ -50,11 +46,9 @@ import java.io.IOException; import java.net.URISyntaxException; import java.sql.SQLException; -import java.time.Duration; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; From 6972f9bfe72a98b00379f413cb0e5cc4088c7dc8 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 18:21:37 -0700 Subject: [PATCH 15/16] fix state check in basic acceptance test --- .../io/airbyte/test/acceptance/BasicAcceptanceTests.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java index 313a1c15920e..cc27832dfa91 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java @@ -797,7 +797,7 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws // sync one more time. verify it is the equivalent of a full refresh. final String expectedState = - "{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}"; + "{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}"; LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -806,7 +806,11 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws LOGGER.info("state after sync 3: {}", state); testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE); - assertEquals(Jsons.deserialize(expectedState), state.getState()); + assertNotNull(state.getStreamState()); + assertEquals(1, state.getStreamState().size()); + final StreamState idAndNameState = state.getStreamState().get(0); + assertEquals(new StreamDescriptor().namespace("public").name(STREAM_NAME), idAndNameState.getStreamDescriptor()); + assertEquals(Jsons.deserialize(expectedState), idAndNameState.getStreamState()); } @Test From f8fbefe3dad96a7ad8aa9b55a2dc8a4d15735e33 Mon Sep 17 00:00:00 2001 From: lmossman Date: Thu, 14 Jul 2022 18:25:25 -0700 Subject: [PATCH 16/16] use test info for test name logging --- .../test/acceptance/CdcAcceptanceTests.java | 41 ++++++++++--------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java index 97f817c2b1b7..0859826166be 100644 --- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java +++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java @@ -63,6 +63,7 @@ import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.TestInfo; import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -154,11 +155,11 @@ public void end() { } @Test - public void testIncrementalCdcSync() throws Exception { - LOGGER.info("Starting incremental cdc sync test"); + public void testIncrementalCdcSync(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting incremental cdc sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -209,7 +210,7 @@ public void testIncrementalCdcSync() throws Exception { beforeFirstUpdate, Optional.empty())); - LOGGER.info("Starting incremental cdc sync 2"); + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); @@ -221,7 +222,7 @@ public void testIncrementalCdcSync() throws Exception { // reset back to no data. - LOGGER.info("Starting incremental cdc reset"); + LOGGER.info("Starting {} reset", testInfo.getDisplayName()); final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob()); @@ -232,7 +233,7 @@ public void testIncrementalCdcSync() throws Exception { assertNoState(connectionId); // sync one more time. verify it is the equivalent of a full refresh. - LOGGER.info("Starting incremental cdc sync 3"); + LOGGER.info("Starting {} sync 3", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead3 = apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob()); @@ -250,8 +251,8 @@ public void testIncrementalCdcSync() throws Exception { // tests that incremental syncs still work properly even when using a destination connector that was // built on the old protocol that did not have any per-stream state fields @Test - public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Exception { - LOGGER.info("Starting testIncrementalCdcSyncWithLegacyDestinationConnector()"); + public void testIncrementalCdcSyncWithLegacyDestinationConnector(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID postgresDestDefId = testHarness.getPostgresDestinationDefinitionId(); // Fetch the current/most recent source definition version final DestinationDefinitionRead destinationDefinitionRead = apiClient.getDestinationDefinitionApi().getDestinationDefinition( @@ -262,7 +263,7 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Except LOGGER.info("Setting postgres destination definition to version {}", POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION); testHarness.updateDestinationDefinitionVersion(postgresDestDefId, POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION); - testIncrementalCdcSync(); + testIncrementalCdcSync(testInfo); } finally { // set postgres destination definition back to latest version for other tests LOGGER.info("Setting postgres destination definition back to version {}", destinationDefinitionRead.getDockerImageTag()); @@ -271,11 +272,11 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Except } @Test - public void testDeleteRecordCdcSync() throws Exception { - LOGGER.info("Starting delete record cdc sync test"); + public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting delete record cdc sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -300,7 +301,7 @@ public void testDeleteRecordCdcSync() throws Exception { beforeDelete, Optional.of(beforeDelete))); - LOGGER.info("Starting delete record cdc sync 2"); + LOGGER.info("Starting {} sync 2", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob()); @@ -310,11 +311,11 @@ public void testDeleteRecordCdcSync() throws Exception { } @Test - public void testPartialResetFromSchemaUpdate() throws Exception { - LOGGER.info("Starting partial reset cdc test"); + public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -353,11 +354,11 @@ public void testPartialResetFromSchemaUpdate() throws Exception { } @Test - public void testPartialResetFromStreamSelection() throws Exception { - LOGGER.info("Starting partial reset cdc test"); + public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Exception { + LOGGER.info("Starting {}", testInfo.getDisplayName()); final UUID connectionId = createCdcConnection(); - LOGGER.info("Starting sync 1"); + LOGGER.info("Starting {} sync 1", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); @@ -436,7 +437,7 @@ public void testPartialResetFromStreamSelection() throws Exception { beforeInsert, Optional.empty())); - LOGGER.info("Starting sync after insert"); + LOGGER.info("Starting {} sync after insert", testInfo.getDisplayName()); final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi() .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId)); waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());