diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java
index 313a1c15920e..cc27832dfa91 100644
--- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java
+++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/BasicAcceptanceTests.java
@@ -797,7 +797,7 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws
// sync one more time. verify it is the equivalent of a full refresh.
final String expectedState =
- "{\"cdc\":false,\"streams\":[{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}]}";
+ "{\"cursor\":\"6\",\"stream_name\":\"id_and_name\",\"cursor_field\":[\"id\"],\"stream_namespace\":\"public\"}";
LOGGER.info("Starting {} sync 3", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
@@ -806,7 +806,11 @@ public void testSyncAfterUpgradeToPerStreamState(final TestInfo testInfo) throws
LOGGER.info("state after sync 3: {}", state);
testHarness.assertSourceAndDestinationDbInSync(WITHOUT_SCD_TABLE);
- assertEquals(Jsons.deserialize(expectedState), state.getState());
+ assertNotNull(state.getStreamState());
+ assertEquals(1, state.getStreamState().size());
+ final StreamState idAndNameState = state.getStreamState().get(0);
+ assertEquals(new StreamDescriptor().namespace("public").name(STREAM_NAME), idAndNameState.getStreamDescriptor());
+ assertEquals(Jsons.deserialize(expectedState), idAndNameState.getStreamState());
}
@Test
diff --git a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java
index f21f5693d754..0859826166be 100644
--- a/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java
+++ b/airbyte-tests/src/acceptanceTests/java/io/airbyte/test/acceptance/CdcAcceptanceTests.java
@@ -6,28 +6,39 @@
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_ID;
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.COLUMN_NAME;
-import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.STREAM_NAME;
import static io.airbyte.test.utils.AirbyteAcceptanceTestHarness.waitForSuccessfulJob;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.api.client.AirbyteApiClient;
+import io.airbyte.api.client.generated.WebBackendApi;
import io.airbyte.api.client.invoker.generated.ApiClient;
import io.airbyte.api.client.invoker.generated.ApiException;
import io.airbyte.api.client.model.generated.AirbyteCatalog;
import io.airbyte.api.client.model.generated.AirbyteStream;
+import io.airbyte.api.client.model.generated.AirbyteStreamAndConfiguration;
import io.airbyte.api.client.model.generated.ConnectionIdRequestBody;
+import io.airbyte.api.client.model.generated.ConnectionRead;
+import io.airbyte.api.client.model.generated.ConnectionState;
+import io.airbyte.api.client.model.generated.ConnectionStateType;
import io.airbyte.api.client.model.generated.DestinationDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.DestinationDefinitionRead;
import io.airbyte.api.client.model.generated.DestinationSyncMode;
import io.airbyte.api.client.model.generated.JobInfoRead;
+import io.airbyte.api.client.model.generated.JobRead;
+import io.airbyte.api.client.model.generated.OperationRead;
import io.airbyte.api.client.model.generated.SourceDefinitionIdRequestBody;
import io.airbyte.api.client.model.generated.SourceDefinitionRead;
import io.airbyte.api.client.model.generated.SourceRead;
+import io.airbyte.api.client.model.generated.StreamDescriptor;
+import io.airbyte.api.client.model.generated.StreamState;
import io.airbyte.api.client.model.generated.SyncMode;
+import io.airbyte.api.client.model.generated.WebBackendConnectionUpdate;
+import io.airbyte.api.client.model.generated.WebBackendOperationCreateOrUpdate;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.test.utils.AirbyteAcceptanceTestHarness;
@@ -43,14 +54,34 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.UUID;
+import org.jooq.Record;
+import org.jooq.Result;
import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.condition.DisabledIfEnvironmentVariable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * These tests test the CDC source behavior in Airbyte, ensuring that the behavior of syncs when in
+ * CDC mode is as expected
+ *
+ * Some of the tests in this class are specifically testing partial reset behavior when in CDC mode,
+ * support for which was recently added to the postgres connector.
+ *
+ * These tests are disabled in Kube, similar to the BasicAcceptanceTests, because they aren't
+ * testing any behavior that is specific to or dependent on this being run on kube vs docker.
+ * Therefore, since operations tend to take longer to perform on kube, there is little value in
+ * re-running these tests on kube when we already run them on docker.
+ */
+@DisabledIfEnvironmentVariable(named = "KUBE",
+ matches = "true")
public class CdcAcceptanceTests {
record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt, Optional minDeletedAt) {}
@@ -69,13 +100,18 @@ record DestinationCdcRecordMatcher(JsonNode sourceRecord, Instant minUpdatedAt,
private static final String SCHEMA_NAME = "public";
private static final String CDC_UPDATED_AT_COLUMN = "_ab_cdc_updated_at";
private static final String CDC_DELETED_AT_COLUMN = "_ab_cdc_deleted_at";
+ private static final String ID_AND_NAME_TABLE = "id_and_name";
+ private static final String COLOR_PALETTE_TABLE = "color_palette";
+ private static final String COLUMN_COLOR = "color";
// version of the postgres destination connector that was built with the
// old Airbyte protocol that does not contain any per-stream logic/fields
private static final String POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION = "0.3.19";
private static AirbyteApiClient apiClient;
+ private static WebBackendApi webBackendApi;
private static UUID workspaceId;
+ private static OperationRead operationRead;
private AirbyteAcceptanceTestHarness testHarness;
@@ -86,6 +122,11 @@ public static void init() throws URISyntaxException, IOException, InterruptedExc
.setHost("localhost")
.setPort(8001)
.setBasePath("/api"));
+ webBackendApi = new WebBackendApi(
+ new ApiClient().setScheme("http")
+ .setHost("localhost")
+ .setPort(8001)
+ .setBasePath("/api"));
// work in whatever default workspace is present.
workspaceId = apiClient.getWorkspaceApi().listWorkspaces().getWorkspaces().get(0).getWorkspaceId();
LOGGER.info("workspaceId = " + workspaceId);
@@ -114,11 +155,11 @@ public void end() {
}
@Test
- public void testIncrementalCdcSync() throws Exception {
- LOGGER.info("Starting incremental cdc sync test");
+ public void testIncrementalCdcSync(TestInfo testInfo) throws Exception {
+ LOGGER.info("Starting {}", testInfo.getDisplayName());
final UUID connectionId = createCdcConnection();
- LOGGER.info("Starting incremental cdc sync 1");
+ LOGGER.info("Starting {} sync 1", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
@@ -126,12 +167,17 @@ public void testIncrementalCdcSync() throws Exception {
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
final Database source = testHarness.getSourceDatabase();
- List sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME);
- List expectedDestRecordMatchers = new ArrayList<>(sourceRecords
- .stream()
- .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty()))
- .toList());
- assertDestinationMatches(expectedDestRecordMatchers);
+
+ List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
+
+ List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
+ assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);
+
+ List expectedStreams = List.of(
+ new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE),
+ new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE));
+ assertGlobalStateContainsStreams(connectionId, expectedStreams);
final Instant beforeFirstUpdate = Instant.now();
@@ -143,56 +189,70 @@ public void testIncrementalCdcSync() throws Exception {
// since this is a CDC connection, the destination should contain a record with this
// new value and an updated_at time corresponding to this update query
source.query(ctx -> ctx.execute("UPDATE id_and_name SET name='yennefer' WHERE id=2"));
-
- expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher(
+ expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()),
beforeFirstUpdate,
Optional.empty()));
-
- expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher(
+ expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_NAME, "yennefer").build()),
beforeFirstUpdate,
Optional.empty()));
- LOGGER.info("Starting incremental cdc sync 2");
+ // do the same for the other table
+ source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')"));
+ source.query(ctx -> ctx.execute("UPDATE color_palette SET color='purple' WHERE id=2"));
+ expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher(
+ Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()),
+ beforeFirstUpdate,
+ Optional.empty()));
+ expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher(
+ Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 2).put(COLUMN_COLOR, "purple").build()),
+ beforeFirstUpdate,
+ Optional.empty()));
+
+ LOGGER.info("Starting {} sync 2", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
- assertDestinationMatches(expectedDestRecordMatchers);
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
+ assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);
+ assertGlobalStateContainsStreams(connectionId, expectedStreams);
// reset back to no data.
- LOGGER.info("Starting incremental cdc reset");
+ LOGGER.info("Starting {} reset", testInfo.getDisplayName());
final JobInfoRead jobInfoRead = apiClient.getConnectionApi().resetConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), jobInfoRead.getJob());
LOGGER.info("state after reset: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
- assertDestinationMatches(Collections.emptyList());
+ assertDestinationMatches(ID_AND_NAME_TABLE, Collections.emptyList());
+ assertDestinationMatches(COLOR_PALETTE_TABLE, Collections.emptyList());
+ assertNoState(connectionId);
// sync one more time. verify it is the equivalent of a full refresh.
- LOGGER.info("Starting incremental cdc sync 3");
+ LOGGER.info("Starting {} sync 3", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead3 =
apiClient.getConnectionApi().syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead3.getJob());
LOGGER.info("state after sync 3: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
- sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME);
- expectedDestRecordMatchers = sourceRecords
- .stream()
- .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty()))
- .toList();
+ expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
- assertDestinationMatches(expectedDestRecordMatchers);
+ expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
+ assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);
+
+ assertGlobalStateContainsStreams(connectionId, expectedStreams);
}
// tests that incremental syncs still work properly even when using a destination connector that was
// built on the old protocol that did not have any per-stream state fields
@Test
- public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Exception {
- LOGGER.info("Starting testIncrementalCdcSyncWithLegacyDestinationConnector()");
+ public void testIncrementalCdcSyncWithLegacyDestinationConnector(TestInfo testInfo) throws Exception {
+ LOGGER.info("Starting {}", testInfo.getDisplayName());
final UUID postgresDestDefId = testHarness.getPostgresDestinationDefinitionId();
// Fetch the current/most recent source definition version
final DestinationDefinitionRead destinationDefinitionRead = apiClient.getDestinationDefinitionApi().getDestinationDefinition(
@@ -203,7 +263,7 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Except
LOGGER.info("Setting postgres destination definition to version {}", POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION);
testHarness.updateDestinationDefinitionVersion(postgresDestDefId, POSTGRES_DESTINATION_LEGACY_CONNECTOR_VERSION);
- testIncrementalCdcSync();
+ testIncrementalCdcSync(testInfo);
} finally {
// set postgres destination definition back to latest version for other tests
LOGGER.info("Setting postgres destination definition back to version {}", destinationDefinitionRead.getDockerImageTag());
@@ -212,11 +272,11 @@ public void testIncrementalCdcSyncWithLegacyDestinationConnector() throws Except
}
@Test
- public void testDeleteRecordCdcSync() throws Exception {
- LOGGER.info("Starting delete record cdc sync test");
+ public void testDeleteRecordCdcSync(TestInfo testInfo) throws Exception {
+ LOGGER.info("Starting {}", testInfo.getDisplayName());
final UUID connectionId = createCdcConnection();
- LOGGER.info("Starting delete record cdc sync 1");
+ LOGGER.info("Starting {} sync 1", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
@@ -224,12 +284,8 @@ public void testDeleteRecordCdcSync() throws Exception {
LOGGER.info("state after sync 1: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
final Database source = testHarness.getSourceDatabase();
- List sourceRecords = testHarness.retrieveSourceRecords(source, STREAM_NAME);
- List expectedDestRecordMatchers = new ArrayList<>(sourceRecords
- .stream()
- .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty()))
- .toList());
- assertDestinationMatches(expectedDestRecordMatchers);
+ List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
final Instant beforeDelete = Instant.now();
@@ -240,18 +296,163 @@ public void testDeleteRecordCdcSync() throws Exception {
Map deletedRecordMap = new HashMap<>();
deletedRecordMap.put(COLUMN_ID, 1);
deletedRecordMap.put(COLUMN_NAME, null);
- expectedDestRecordMatchers.add(new DestinationCdcRecordMatcher(
+ expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
Jsons.jsonNode(deletedRecordMap),
beforeDelete,
Optional.of(beforeDelete)));
- LOGGER.info("Starting delete record cdc sync 2");
+ LOGGER.info("Starting {} sync 2", testInfo.getDisplayName());
final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
.syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
LOGGER.info("state after sync 2: {}", apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId)));
- assertDestinationMatches(expectedDestRecordMatchers);
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
+ }
+
+ @Test
+ public void testPartialResetFromSchemaUpdate(TestInfo testInfo) throws Exception {
+ LOGGER.info("Starting {}", testInfo.getDisplayName());
+
+ final UUID connectionId = createCdcConnection();
+ LOGGER.info("Starting {} sync 1", testInfo.getDisplayName());
+
+ final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
+ .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
+ waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
+
+ final Database source = testHarness.getSourceDatabase();
+
+ List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
+
+ List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
+ assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);
+
+ StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE);
+ StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE);
+ assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor));
+
+ LOGGER.info("Removing color palette table");
+ source.query(ctx -> ctx.dropTable(COLOR_PALETTE_TABLE).execute());
+
+ LOGGER.info("Refreshing schema and updating connection");
+ final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId));
+ UUID sourceId = createCdcSource().getSourceId();
+ AirbyteCatalog refreshedCatalog = testHarness.discoverSourceSchema(sourceId);
+ LOGGER.info("Refreshed catalog: {}", refreshedCatalog);
+ WebBackendConnectionUpdate update = getUpdateInput(connectionRead, refreshedCatalog, operationRead);
+ webBackendApi.webBackendUpdateConnectionNew(update);
+
+ LOGGER.info("Waiting for sync job after update to complete");
+ JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId);
+ waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);
+
+ // We do not check that the source and the dest are in sync here because removing a stream doesn't
+ // delete its data in the destination
+ assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor));
+ }
+
+ @Test
+ public void testPartialResetFromStreamSelection(TestInfo testInfo) throws Exception {
+ LOGGER.info("Starting {}", testInfo.getDisplayName());
+
+ final UUID connectionId = createCdcConnection();
+ LOGGER.info("Starting {} sync 1", testInfo.getDisplayName());
+
+ final JobInfoRead connectionSyncRead1 = apiClient.getConnectionApi()
+ .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
+ waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead1.getJob());
+
+ final Database source = testHarness.getSourceDatabase();
+
+ List expectedIdAndNameRecords = getCdcRecordMatchersFromSource(source, ID_AND_NAME_TABLE);
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
+
+ List expectedColorPaletteRecords = getCdcRecordMatchersFromSource(source, COLOR_PALETTE_TABLE);
+ assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);
+
+ StreamDescriptor idAndNameStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(ID_AND_NAME_TABLE);
+ StreamDescriptor colorPaletteStreamDescriptor = new StreamDescriptor().namespace(SCHEMA_NAME).name(COLOR_PALETTE_TABLE);
+ assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor));
+
+ LOGGER.info("Removing color palette stream from configured catalog");
+ final ConnectionRead connectionRead = apiClient.getConnectionApi().getConnection(new ConnectionIdRequestBody().connectionId(connectionId));
+ final UUID sourceId = connectionRead.getSourceId();
+ AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
+ final List streams = catalog.getStreams();
+ // filter out color_palette stream
+ final List updatedStreams = streams
+ .stream()
+ .filter(stream -> !stream.getStream().getName().equals(COLOR_PALETTE_TABLE))
+ .toList();
+ catalog.setStreams(updatedStreams);
+ LOGGER.info("Updated catalog: {}", catalog);
+ WebBackendConnectionUpdate update = getUpdateInput(connectionRead, catalog, operationRead);
+ webBackendApi.webBackendUpdateConnectionNew(update);
+
+ LOGGER.info("Waiting for sync job after update to start");
+ JobRead syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId);
+ LOGGER.info("Waiting for sync job after update to complete");
+ waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);
+
+ // We do not check that the source and the dest are in sync here because removing a stream doesn't
+ // delete its data in the destination
+ assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor));
+
+ LOGGER.info("Adding color palette stream back to configured catalog");
+ catalog = testHarness.discoverSourceSchema(sourceId);
+ LOGGER.info("Updated catalog: {}", catalog);
+ update = getUpdateInput(connectionRead, catalog, operationRead);
+ webBackendApi.webBackendUpdateConnectionNew(update);
+
+ LOGGER.info("Waiting for sync job after update to start");
+ syncFromTheUpdate = testHarness.waitUntilTheNextJobIsStarted(connectionId);
+ LOGGER.info("Checking that id_and_name table is unaffected by the partial reset");
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
+ LOGGER.info("Checking that color_palette table was cleared in the destination due to the reset triggered by the update");
+ assertDestinationMatches(COLOR_PALETTE_TABLE, List.of());
+ LOGGER.info("Waiting for sync job after update to complete");
+ waitForSuccessfulJob(apiClient.getJobsApi(), syncFromTheUpdate);
+
+ // Verify that color palette table records exist in destination again after sync.
+ // If we see 0 records for this table in the destination, that means the CDC partial reset logic is
+ // not working properly, and it continued from the replication log cursor for this stream despite
+ // this stream's state being reset
+ assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);
+ assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor));
+
+ // Verify that incremental still works properly after partial reset
+ LOGGER.info("Adding new records to tables");
+ final Instant beforeInsert = Instant.now();
+ source.query(ctx -> ctx.execute("INSERT INTO id_and_name(id, name) VALUES(6, 'geralt')"));
+ expectedIdAndNameRecords.add(new DestinationCdcRecordMatcher(
+ Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 6).put(COLUMN_NAME, "geralt").build()),
+ beforeInsert,
+ Optional.empty()));
+
+ source.query(ctx -> ctx.execute("INSERT INTO color_palette(id, color) VALUES(4, 'yellow')"));
+ expectedColorPaletteRecords.add(new DestinationCdcRecordMatcher(
+ Jsons.jsonNode(ImmutableMap.builder().put(COLUMN_ID, 4).put(COLUMN_COLOR, "yellow").build()),
+ beforeInsert,
+ Optional.empty()));
+
+ LOGGER.info("Starting {} sync after insert", testInfo.getDisplayName());
+ final JobInfoRead connectionSyncRead2 = apiClient.getConnectionApi()
+ .syncConnection(new ConnectionIdRequestBody().connectionId(connectionId));
+ waitForSuccessfulJob(apiClient.getJobsApi(), connectionSyncRead2.getJob());
+
+ assertDestinationMatches(ID_AND_NAME_TABLE, expectedIdAndNameRecords);
+ assertDestinationMatches(COLOR_PALETTE_TABLE, expectedColorPaletteRecords);
+ assertGlobalStateContainsStreams(connectionId, List.of(idAndNameStreamDescriptor, colorPaletteStreamDescriptor));
+ }
+
+ private List getCdcRecordMatchersFromSource(Database source, String tableName) throws SQLException {
+ List sourceRecords = testHarness.retrieveSourceRecords(source, tableName);
+ return new ArrayList<>(sourceRecords
+ .stream()
+ .map(sourceRecord -> new DestinationCdcRecordMatcher(sourceRecord, Instant.EPOCH, Optional.empty()))
+ .toList());
}
private UUID createCdcConnection() throws ApiException {
@@ -259,7 +460,8 @@ private UUID createCdcConnection() throws ApiException {
final UUID sourceId = sourceRead.getSourceId();
final UUID destinationId = testHarness.createDestination().getDestinationId();
- final UUID operationId = testHarness.createOperation().getOperationId();
+ operationRead = testHarness.createOperation();
+ final UUID operationId = operationRead.getOperationId();
final AirbyteCatalog catalog = testHarness.discoverSourceSchema(sourceId);
final AirbyteStream stream = catalog.getStreams().get(0).getStream();
LOGGER.info("stream: {}", stream);
@@ -298,8 +500,8 @@ private SourceRead createCdcSource() throws ApiException {
Jsons.jsonNode(sourceDbConfigMap));
}
- private void assertDestinationMatches(List expectedDestRecordMatchers) throws Exception {
- final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, STREAM_NAME));
+ private void assertDestinationMatches(String streamName, List expectedDestRecordMatchers) throws Exception {
+ final List destRecords = testHarness.retrieveRawDestinationRecords(new SchemaTableNamePair(SCHEMA_NAME, streamName));
if (destRecords.size() != expectedDestRecordMatchers.size()) {
final String errorMessage = String.format(
"The number of destination records %d does not match the expected number %d",
@@ -347,4 +549,67 @@ private void assertDestinationMatches(List expected
}
}
+ private void assertGlobalStateContainsStreams(final UUID connectionId, final List expectedStreams) throws ApiException {
+ final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
+ LOGGER.info("state: {}", state);
+ assertEquals(ConnectionStateType.GLOBAL, state.getStateType());
+ final List stateStreams = state.getGlobalState().getStreamStates().stream().map(StreamState::getStreamDescriptor).toList();
+
+ Assertions.assertTrue(stateStreams.containsAll(expectedStreams) && expectedStreams.containsAll(stateStreams),
+ String.format("Expected state to have streams %s, but it actually had streams %s", expectedStreams, stateStreams));
+ }
+
+ private void assertNoState(final UUID connectionId) throws ApiException {
+ final ConnectionState state = apiClient.getConnectionApi().getState(new ConnectionIdRequestBody().connectionId(connectionId));
+ assertEquals(ConnectionStateType.NOT_SET, state.getStateType());
+ assertNull(state.getState());
+ assertNull(state.getStreamState());
+ assertNull(state.getGlobalState());
+ }
+
+ private WebBackendConnectionUpdate getUpdateInput(final ConnectionRead connection, final AirbyteCatalog catalog, final OperationRead operation) {
+ final SyncMode syncMode = SyncMode.INCREMENTAL;
+ final DestinationSyncMode destinationSyncMode = DestinationSyncMode.APPEND;
+ catalog.getStreams().forEach(s -> s.getConfig()
+ .syncMode(syncMode)
+ .cursorField(List.of(COLUMN_ID))
+ .destinationSyncMode(destinationSyncMode));
+
+ return new WebBackendConnectionUpdate()
+ .connectionId(connection.getConnectionId())
+ .name(connection.getName())
+ .operationIds(connection.getOperationIds())
+ .operations(List.of(new WebBackendOperationCreateOrUpdate()
+ .name(operation.getName())
+ .operationId(operation.getOperationId())
+ .workspaceId(operation.getWorkspaceId())
+ .operatorConfiguration(operation.getOperatorConfiguration())))
+ .namespaceDefinition(connection.getNamespaceDefinition())
+ .namespaceFormat(connection.getNamespaceFormat())
+ .syncCatalog(catalog)
+ .schedule(connection.getSchedule())
+ .sourceCatalogId(connection.getSourceCatalogId())
+ .status(connection.getStatus())
+ .prefix(connection.getPrefix());
+ }
+
+ // can be helpful for debugging
+ private void printDbs() throws SQLException {
+ final Database sourceDb = testHarness.getSourceDatabase();
+ Set pairs = testHarness.listAllTables(sourceDb);
+ LOGGER.info("Printing source tables");
+ for (final SchemaTableNamePair pair : pairs) {
+ final Result result = sourceDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName)));
+ LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result);
+ }
+
+ final Database destDb = testHarness.getDestinationDatabase();
+ pairs = testHarness.listAllTables(destDb);
+ LOGGER.info("Printing destination tables");
+ for (final SchemaTableNamePair pair : pairs) {
+ final Result result = destDb.query(context -> context.fetch(String.format("SELECT * FROM %s.%s", pair.schemaName, pair.tableName)));
+ LOGGER.info("{}.{} contents:\n{}", pair.schemaName, pair.tableName, result);
+ }
+ }
+
}
diff --git a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql
index 9434b4135eb4..ce7c2da4a538 100644
--- a/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql
+++ b/airbyte-tests/src/acceptanceTests/resources/postgres_init_cdc.sql
@@ -32,15 +32,45 @@ INSERT
'john'
);
+CREATE
+ TABLE
+ color_palette(
+ id INTEGER PRIMARY KEY,
+ color VARCHAR(200)
+ );
+
+INSERT
+ INTO
+ color_palette(
+ id,
+ color
+ )
+ VALUES(
+ 1,
+ 'red'
+ ),
+ (
+ 2,
+ 'blue'
+ ),
+ (
+ 3,
+ 'green'
+ );
+
CREATE
ROLE airbyte_role REPLICATION LOGIN;
ALTER TABLE
id_and_name REPLICA IDENTITY DEFAULT;
+ALTER TABLE
+ color_palette REPLICA IDENTITY DEFAULT;
+
CREATE
PUBLICATION airbyte_publication FOR TABLE
- id_and_name;
+ id_and_name,
+ color_palette;
SELECT
pg_create_logical_replication_slot(
diff --git a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java
index fceedd214535..4b5b7b4a59d6 100644
--- a/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java
+++ b/airbyte-workers/src/test/java/io/airbyte/workers/process/AirbyteIntegrationLauncherTest.java
@@ -51,7 +51,7 @@ class AirbyteIntegrationLauncherTest {
WorkerEnvConstants.WORKER_CONNECTOR_IMAGE, FAKE_IMAGE,
WorkerEnvConstants.WORKER_JOB_ID, JOB_ID,
WorkerEnvConstants.WORKER_JOB_ATTEMPT, String.valueOf(JOB_ATTEMPT),
- EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(false));
+ EnvVariableFeatureFlags.USE_STREAM_CAPABLE_STATE, String.valueOf(new EnvVariableFeatureFlags().useStreamCapableState()));
private WorkerConfigs workerConfigs;
@Mock
diff --git a/tools/bin/acceptance_test.sh b/tools/bin/acceptance_test.sh
index 978e941861d6..41ccdd74aaa1 100755
--- a/tools/bin/acceptance_test.sh
+++ b/tools/bin/acceptance_test.sh
@@ -9,7 +9,7 @@ assert_root
echo "Starting app..."
# Detach so we can run subsequent commands
-VERSION=dev TRACKING_STRATEGY=logging docker-compose up -d
+VERSION=dev TRACKING_STRATEGY=logging USE_STREAM_CAPABLE_STATE=true docker-compose up -d
# Sometimes source/dest containers using airbyte volumes survive shutdown, which need to be killed in order to shut down properly.
shutdown_cmd="docker-compose down -v || docker kill \$(docker ps -a -f volume=airbyte_workspace -f volume=airbyte_data -f volume=airbyte_db -q) && docker-compose down -v"