Skip to content

Commit

Permalink
[Source-mssql] : Remove options for data_to_sync & snapshot_isolation (
Browse files Browse the repository at this point in the history
  • Loading branch information
akashkulk authored Jan 10, 2024
1 parent 3be28af commit 804a34b
Show file tree
Hide file tree
Showing 13 changed files with 45 additions and 320 deletions.
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mssql/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 3.5.1
dockerImageTag: 3.6.0
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ public class MssqlCdcHelper {
private static final String REPLICATION_FIELD = "replication";
private static final String REPLICATION_TYPE_FIELD = "replication_type";
private static final String METHOD_FIELD = "method";
private static final String CDC_SNAPSHOT_ISOLATION_FIELD = "snapshot_isolation";
private static final String CDC_DATA_TO_SYNC_FIELD = "data_to_sync";

private static final Duration HEARTBEAT_INTERVAL = Duration.ofSeconds(10L);

Expand All @@ -40,69 +38,6 @@ public enum ReplicationMethod {
CDC
}

/**
* The default "SNAPSHOT" mode can prevent other (non-Airbyte) transactions from updating table rows
* while we snapshot. References:
* https://docs.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql?view=sql-server-ver15
* https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-snapshot-isolation-mode
*/
public enum SnapshotIsolation {

SNAPSHOT("Snapshot", "snapshot"),
READ_COMMITTED("Read Committed", "read_committed");

private final String snapshotIsolationLevel;
private final String debeziumIsolationMode;

SnapshotIsolation(final String snapshotIsolationLevel, final String debeziumIsolationMode) {
this.snapshotIsolationLevel = snapshotIsolationLevel;
this.debeziumIsolationMode = debeziumIsolationMode;
}

public String getDebeziumIsolationMode() {
return debeziumIsolationMode;
}

public static SnapshotIsolation from(final String jsonValue) {
for (final SnapshotIsolation value : values()) {
if (value.snapshotIsolationLevel.equalsIgnoreCase(jsonValue)) {
return value;
}
}
throw new IllegalArgumentException("Unexpected snapshot isolation level: " + jsonValue);
}

}

// https://debezium.io/documentation/reference/2.2/connectors/sqlserver.html#sqlserver-property-snapshot-mode
public enum DataToSync {

EXISTING_AND_NEW("Existing and New", "initial"),
NEW_CHANGES_ONLY("New Changes Only", "schema_only");

private final String dataToSyncConfig;
private final String debeziumSnapshotMode;

DataToSync(final String value, final String debeziumSnapshotMode) {
this.dataToSyncConfig = value;
this.debeziumSnapshotMode = debeziumSnapshotMode;
}

public String getDebeziumSnapshotMode() {
return debeziumSnapshotMode;
}

public static DataToSync from(final String value) {
for (final DataToSync s : values()) {
if (s.dataToSyncConfig.equalsIgnoreCase(value)) {
return s;
}
}
throw new IllegalArgumentException("Unexpected data to sync setting: " + value);
}

}

@VisibleForTesting
static boolean isCdc(final JsonNode config) {
// new replication method config since version 0.4.0
Expand All @@ -122,28 +57,6 @@ static boolean isCdc(final JsonNode config) {
return false;
}

@VisibleForTesting
static SnapshotIsolation getSnapshotIsolationConfig(final JsonNode config) {
// new replication method config since version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
final JsonNode snapshotIsolation = replicationConfig.get(CDC_SNAPSHOT_ISOLATION_FIELD);
return SnapshotIsolation.from(snapshotIsolation.asText());
}
return SnapshotIsolation.SNAPSHOT;
}

@VisibleForTesting
static DataToSync getDataToSyncConfig(final JsonNode config) {
// new replication method config since version 0.4.0
if (config.hasNonNull(LEGACY_REPLICATION_FIELD) && config.get(LEGACY_REPLICATION_FIELD).isObject()) {
final JsonNode replicationConfig = config.get(LEGACY_REPLICATION_FIELD);
final JsonNode dataToSync = replicationConfig.get(CDC_DATA_TO_SYNC_FIELD);
return DataToSync.from(dataToSync.asText());
}
return DataToSync.EXISTING_AND_NEW;
}

static Properties getDebeziumProperties(final JdbcDatabase database, final ConfiguredAirbyteCatalog catalog, final boolean isSnapshot) {
final JsonNode config = database.getSourceConfig();
final JsonNode dbConfig = database.getDatabaseConfig();
Expand All @@ -166,10 +79,12 @@ static Properties getDebeziumProperties(final JdbcDatabase database, final Confi
if (isSnapshot) {
props.setProperty("snapshot.mode", "initial_only");
} else {
props.setProperty("snapshot.mode", getDataToSyncConfig(config).getDebeziumSnapshotMode());
// If not in snapshot mode, initial will make sure that a snapshot is taken if the transaction log
// is rotated out. This will also end up read streaming changes from the transaction_log.
props.setProperty("snapshot.mode", "initial");
}

props.setProperty("snapshot.isolation.mode", getSnapshotIsolationConfig(config).getDebeziumIsolationMode());
props.setProperty("snapshot.isolation.mode", "read_committed");

props.setProperty("schema.include.list", getSchema(catalog));
props.setProperty("database.names", config.get(JdbcUtils.DATABASE_KEY).asText());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.source.mssql.MssqlCdcHelper.SnapshotIsolation;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.v0.AirbyteCatalog;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
Expand Down Expand Up @@ -104,7 +103,7 @@ SELECT CAST(IIF(EXISTS(SELECT TOP 1 1 FROM "%s"."%s" WHERE "%s" IS NULL), 1, 0)
public static final String JDBC_DELIMITER = ";";
private List<String> schemas;

public static Source sshWrappedSource(MssqlSource source) {
public static Source sshWrappedSource(final MssqlSource source) {
return new SshWrappedSource(source, JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

Expand Down Expand Up @@ -372,7 +371,6 @@ public List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations(final J
checkOperations.add(database -> assertCdcEnabledInDb(config, database));
checkOperations.add(database -> assertCdcSchemaQueryable(config, database));
checkOperations.add(database -> assertSqlServerAgentRunning(database));
checkOperations.add(database -> assertSnapshotIsolationAllowed(config, database));
}

return checkOperations;
Expand Down Expand Up @@ -456,35 +454,6 @@ protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws S
}
}

protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcDatabase database)
throws SQLException {
if (MssqlCdcHelper.getSnapshotIsolationConfig(config) != SnapshotIsolation.SNAPSHOT) {
return;
}

final List<JsonNode> queryResponse = database.queryJsons(connection -> {
final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?";
final PreparedStatement ps = connection.prepareStatement(sql);
ps.setString(1, config.get(JdbcUtils.DATABASE_KEY).asText());
LOGGER.info(String.format(
"Checking that snapshot isolation is enabled on database '%s' using the query: '%s'",
config.get(JdbcUtils.DATABASE_KEY).asText(), sql));
return ps;
}, sourceOperations::rowToJson);

if (queryResponse.size() < 1) {
throw new RuntimeException(String.format(
"Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).",
config.get(JdbcUtils.DATABASE_KEY).asText()));
}
if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) {
throw new RuntimeException(String.format(
"Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. "
+ "Please check the documentation on how to enable snapshot isolation on MS SQL Server.",
config.get(JdbcUtils.DATABASE_KEY).asText()));
}
}

@Override
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,6 @@
"const": "CDC",
"order": 0
},
"data_to_sync": {
"title": "Data to Sync",
"type": "string",
"default": "Existing and New",
"enum": ["Existing and New", "New Changes Only"],
"description": "What data should be synced under the CDC. \"Existing and New\" will read existing data as a snapshot, and sync new changes through CDC. \"New Changes Only\" will skip the initial snapshot, and only sync new changes through CDC.",
"order": 1
},
"snapshot_isolation": {
"title": "Initial Snapshot Isolation Level",
"type": "string",
"default": "Snapshot",
"enum": ["Snapshot", "Read Committed"],
"description": "Existing data in the database are synced through an initial snapshot. This parameter controls the isolation level that will be used during the initial snapshotting. If you choose the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\">snapshot isolation mode</a> on the database.",
"order": 2
},
"initial_waiting_seconds": {
"type": "integer",
"title": "Initial Waiting Time in Seconds (Advanced)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ protected void setupEnvironment(final TestDestinationEnv environment) {
\t@role_name = N'%s',
\t@supports_net_changes = 0""";
testdb
.withSnapshotIsolation()
.withCdc()
.withWaitUntilAgentRunning()
// create tables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ protected JsonNode getConfig() {
@Override
protected Database setupDatabase() {
testdb = MsSQLTestDatabase.in(BaseImage.MSSQL_2022, ContainerModifier.AGENT)
.withSnapshotIsolation()
.withCdc();
return testdb.getDatabase();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,22 +132,6 @@
"const": "CDC",
"order": 0
},
"data_to_sync": {
"title": "Data to Sync",
"type": "string",
"default": "Existing and New",
"enum": ["Existing and New", "New Changes Only"],
"description": "What data should be synced under the CDC. \"Existing and New\" will read existing data as a snapshot, and sync new changes through CDC. \"New Changes Only\" will skip the initial snapshot, and only sync new changes through CDC.",
"order": 1
},
"snapshot_isolation": {
"title": "Initial Snapshot Isolation Level",
"type": "string",
"default": "Snapshot",
"enum": ["Snapshot", "Read Committed"],
"description": "Existing data in the database are synced through an initial snapshot. This parameter controls the isolation level that will be used during the initial snapshotting. If you choose the \"Snapshot\" level, you must enable the <a href=\"https://docs.microsoft.com/en-us/dotnet/framework/data/adonet/sql/snapshot-isolation-in-sql-server\">snapshot isolation mode</a> on the database.",
"order": 2
},
"initial_waiting_seconds": {
"type": "integer",
"title": "Initial Waiting Time in Seconds (Advanced)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ protected MsSQLTestDatabase createTestDatabase() {
.withConnectionProperty("encrypt", "false")
.withConnectionProperty("databaseName", testdb.getDatabaseName())
.initialized()
.withSnapshotIsolation()
.withWaitUntilAgentRunning()
.withCdc();
}
Expand Down Expand Up @@ -169,7 +168,7 @@ protected DataSource createTestDataSource() {
protected void tearDown() {
try {
DataSourceFactory.close(testDataSource);
} catch (Exception e) {
} catch (final Exception e) {
throw new RuntimeException(e);
}
super.tearDown();
Expand Down Expand Up @@ -235,30 +234,6 @@ void testAssertSqlServerAgentRunning() {
assertDoesNotThrow(() -> source().assertSqlServerAgentRunning(testDatabase()));
}

@Test
void testAssertSnapshotIsolationAllowed() {
// snapshot isolation enabled by setup so assert check passes
assertDoesNotThrow(() -> source().assertSnapshotIsolationAllowed(config(), testDatabase()));
// now disable snapshot isolation and assert that check fails
testdb.withoutSnapshotIsolation();
assertThrows(RuntimeException.class, () -> source().assertSnapshotIsolationAllowed(config(), testDatabase()));
}

@Test
void testAssertSnapshotIsolationDisabled() {
final JsonNode replicationConfig = Jsons.jsonNode(ImmutableMap.builder()
.put("method", "CDC")
.put("data_to_sync", "New Changes Only")
// set snapshot_isolation level to "Read Committed" to disable snapshot
.put("snapshot_isolation", "Read Committed")
.build());
final var config = config();
Jsons.replaceNestedValue(config, List.of("replication_method"), replicationConfig);
assertDoesNotThrow(() -> source().assertSnapshotIsolationAllowed(config, testDatabase()));
testdb.withoutSnapshotIsolation();
assertDoesNotThrow(() -> source().assertSnapshotIsolationAllowed(config, testDatabase()));
}

// Ensure the CDC check operations are included when CDC is enabled
// todo: make this better by checking the returned checkOperations from source.getCheckOperations
@Test
Expand All @@ -280,8 +255,6 @@ void testCdcCheckOperations() throws Exception {
status = source().check(config());
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
testdb.withAgentStarted().withWaitUntilAgentRunning();
// assertSnapshotIsolationAllowed
testdb.withoutSnapshotIsolation();
status = source().check(config());
assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ public class CdcMssqlSslSourceTest extends CdcMssqlSourceTest {
}

protected MSSQLServerContainer<?> createContainer() {
MsSQLContainerFactory containerFactory = new MsSQLContainerFactory();
MSSQLServerContainer<?> container =
final MsSQLContainerFactory containerFactory = new MsSQLContainerFactory();
final MSSQLServerContainer<?> container =
containerFactory.createNewContainer(DockerImageName.parse("mcr.microsoft.com/mssql/server:2022-latest"));
containerFactory.withSslCertificates(container);
return container;
Expand All @@ -38,7 +38,6 @@ final protected MsSQLTestDatabase createTestDatabase() {
.withConnectionProperty("databaseName", testdb.getDatabaseName())
.withConnectionProperty("trustServerCertificate", "true")
.initialized()
.withSnapshotIsolation()
.withCdc()
.withWaitUntilAgentRunning();
}
Expand All @@ -60,10 +59,10 @@ protected JsonNode config() {
try {
containerIp = InetAddress.getByName(testdb.getContainer().getHost())
.getHostAddress();
} catch (UnknownHostException e) {
} catch (final UnknownHostException e) {
throw new RuntimeException(e);
}
String certificate = testdb.getCertificate(CertificateKey.SERVER);
final String certificate = testdb.getCertificate(CertificateKey.SERVER);
return testdb.configBuilder()
.withEncrytedVerifyServerCertificate(certificate, testdb.getContainer().getHost())
.with(JdbcUtils.HOST_KEY, containerIp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ public void setup() {
.withConnectionProperty("encrypt", "false")
.withConnectionProperty("databaseName", testdb.getDatabaseName())
.initialized()
.withSnapshotIsolation()
.withCdc()
.withWaitUntilAgentRunning();

Expand Down Expand Up @@ -129,7 +128,7 @@ public void setup() {
}

private AirbyteCatalog getCatalog() {
var streams = new ArrayList<AirbyteStream>();
final var streams = new ArrayList<AirbyteStream>();
for (int i = 0; i < TEST_TABLES; i++) {
streams.add(CatalogHelpers.createAirbyteStream(
"test_table_%d".formatted(i),
Expand Down Expand Up @@ -164,9 +163,7 @@ private JsonNode config() {
.with("is_test", true)
.with("replication_method", Map.of(
"method", "CDC",
"data_to_sync", "Existing and New",
"initial_waiting_seconds", 60,
"snapshot_isolation", "Snapshot"))
"initial_waiting_seconds", 60))

.build();
}
Expand Down Expand Up @@ -194,7 +191,7 @@ public void testCompressedSchemaHistory() throws Exception {
assertTrue(lastSharedStateFromFirstBatch.get(IS_COMPRESSED).asBoolean());
final var recordsFromFirstBatch = extractRecordMessages(dataFromFirstBatch);
assertEquals(TEST_TABLES, recordsFromFirstBatch.size());
for (var record : recordsFromFirstBatch) {
for (final var record : recordsFromFirstBatch) {
assertEquals("1", record.getData().get("id").toString());
}

Expand All @@ -219,7 +216,7 @@ public void testCompressedSchemaHistory() throws Exception {
assertTrue(lastSharedStateFromSecondBatch.get(IS_COMPRESSED).asBoolean());
final var recordsFromSecondBatch = extractRecordMessages(dataFromSecondBatch);
assertEquals(TEST_TABLES, recordsFromSecondBatch.size());
for (var record : recordsFromSecondBatch) {
for (final var record : recordsFromSecondBatch) {
assertEquals("2", record.getData().get("id").toString());
}
}
Expand All @@ -241,7 +238,7 @@ private Map<String, Set<AirbyteRecordMessage>> extractRecordMessagesStreamWise(f
.collect(Collectors.groupingBy(AirbyteRecordMessage::getStream));

final Map<String, Set<AirbyteRecordMessage>> recordsPerStreamWithNoDuplicates = new HashMap<>();
for (var entry : recordsPerStream.entrySet()) {
for (final var entry : recordsPerStream.entrySet()) {
final var set = new HashSet<>(entry.getValue());
recordsPerStreamWithNoDuplicates.put(entry.getKey(), set);
assertEquals(entry.getValue().size(), set.size(), "duplicate records in sync for " + entry.getKey());
Expand Down
Loading

0 comments on commit 804a34b

Please sign in to comment.