From 2f87d62186ad5bd99b496afe4a0529b9b0e099cb Mon Sep 17 00:00:00 2001 From: George Claireaux Date: Tue, 13 Jul 2021 23:19:56 +0100 Subject: [PATCH] addressing review comments --- .../source/mssql/MssqlCdcTargetPosition.java | 10 +- .../source/mssql/MssqlSource.java | 178 +++++++++--------- .../source/mssql/CdcMssqlSourceTest.java | 123 ++++++++++-- 3 files changed, 196 insertions(+), 115 deletions(-) diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java index 20b1207c4e06..35cf0d8198ec 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -41,7 +41,7 @@ public class MssqlCdcTargetPosition implements CdcTargetPosition { private static final Logger LOGGER = LoggerFactory.getLogger(MssqlCdcTargetPosition.class); - private final Lsn targetLsn; + public final Lsn targetLsn; public MssqlCdcTargetPosition(Lsn targetLsn) { this.targetLsn = targetLsn; @@ -84,17 +84,19 @@ public int hashCode() { return targetLsn.hashCode(); } - public static MssqlCdcTargetPosition getTargetPostion(JdbcDatabase database) { + public static MssqlCdcTargetPosition getTargetPosition(JdbcDatabase database, String dbName) { try { final List jsonNodes = database - .bufferedResultSetQuery(conn -> conn.createStatement().executeQuery("SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;"), JdbcUtils::rowToJson); + .bufferedResultSetQuery(conn -> conn.createStatement().executeQuery( + "USE " + dbName + "; SELECT sys.fn_cdc_get_max_lsn() AS max_lsn;"), JdbcUtils::rowToJson); Preconditions.checkState(jsonNodes.size() == 1); if (jsonNodes.get(0).get("max_lsn") != null) { Lsn maxLsn = Lsn.valueOf(jsonNodes.get(0).get("max_lsn").binaryValue()); LOGGER.info("identified target lsn: " + maxLsn); return new MssqlCdcTargetPosition(maxLsn); } else { - throw new RuntimeException("Max LSN is null, see docs"); // todo: make this error way better + throw new RuntimeException("SQL returned max LSN as null, this might be because the SQL Server Agent is not running. " + + "Please enable the Agent and try again (https://docs.microsoft.com/en-us/sql/ssms/agent/start-stop-or-pause-the-sql-server-agent-service?view=sql-server-ver15)"); } } catch (SQLException | IOException e) { throw new RuntimeException(e); diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 055422c9ce27..9d113b563207 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -52,6 +52,7 @@ import java.io.File; import java.sql.JDBCType; import java.sql.PreparedStatement; +import java.sql.SQLException; import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -134,104 +135,94 @@ public List> getCheckOperations(JsonNod final List> checkOperations = new ArrayList<>(super.getCheckOperations(config)); if (isCdc(config)) { + checkOperations.add(database -> assertCdcEnabledInDb(config, database)); + checkOperations.add(database -> assertCdcSchemaQueryable(config, database)); + checkOperations.add(database -> assertSqlServerAgentRunning(database)); + checkOperations.add(database -> assertSnapshotIsolationAllowed(config, database)); + } - // note, it's possible these queries could fail if user doesn't have correct permissions - // hopefully in these cases it should be evident from the SQLServerException thrown - - // check that cdc is enabled on database - checkOperations.add(database -> { - List queryResponse = database.query(connection -> { - final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?"; - PreparedStatement ps = connection.prepareStatement(sql); - ps.setString(1, config.get("database").asText()); - LOGGER.info(String.format("Checking that cdc is enabled on database '%s' using the query: '%s'", - config.get("database").asText(), sql)); - return ps; - }, JdbcUtils::rowToJson).collect(toList()); - - if (queryResponse.size() < 1) { - throw new RuntimeException(String.format( - "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", - config.get("database").asText())); - } + return checkOperations; + } - if (!(queryResponse.get(0).get("is_cdc_enabled").asBoolean())) { - throw new RuntimeException(String.format( - "Detected that CDC is not enabled for database '%s'. Please check the documentation on how to enable CDC on MS SQL Server.", - config.get("database").asText())); - } - }); - - // check that we can query cdc schema and check we have at least 1 table with cdc enabled that this - // user can see - checkOperations.add(database -> { - List queryResponse = database.query(connection -> { - final String sql = "SELECT * FROM cdc.change_tables"; - PreparedStatement ps = connection.prepareStatement(sql); - LOGGER.info(String.format("Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'", - config.get("username").asText(), sql)); - return ps; - }, JdbcUtils::rowToJson).collect(toList()); - - if (queryResponse.size() < 1) { - throw new RuntimeException("No cdc-enabled tables found. Please check the documentation on how to enable CDC on MS SQL Server."); - } - }); - - // check sql server agent is running - // todo: ensure this works for Azure managed SQL (since it uses different sql server agent) - checkOperations.add(database -> { - try { - List queryResponse = database.query(connection -> { - final String sql = "SELECT status_desc FROM sys.dm_server_services WHERE [servicename] LIKE 'SQL Server Agent%'"; - PreparedStatement ps = connection.prepareStatement(sql); - LOGGER.info(String.format("Checking that the SQL Server Agent is running using the query: '%s'", sql)); - return ps; - }, JdbcUtils::rowToJson).collect(toList()); - - if (!(queryResponse.get(0).get("status_desc").toString().contains("Running"))) { - throw new RuntimeException(String.format( - "The SQL Server Agent is not running. Current state: '%s'. Please check the documentation on ensuring SQL Server Agent is running.", - queryResponse.get(0).get("status_desc").toString())); - } - } catch (Exception e) { - if (e.getCause() != null && e.getCause().getClass().equals(com.microsoft.sqlserver.jdbc.SQLServerException.class)) { - LOGGER.warn(String.format("Skipping check for whether the SQL Server Agent is running, SQLServerException thrown: '%s'", - e.getMessage())); - } else { - throw e; - } - } - }); - - // check that snapshot isolation is allowed - checkOperations.add(database -> { - List queryResponse = database.query(connection -> { - final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?"; - PreparedStatement ps = connection.prepareStatement(sql); - ps.setString(1, config.get("database").asText()); - LOGGER.info(String.format("Checking that snapshot isolation is enabled on database '%s' using the query: '%s'", - config.get("database").asText(), sql)); - return ps; - }, JdbcUtils::rowToJson).collect(toList()); - - if (queryResponse.size() < 1) { - throw new RuntimeException(String.format( - "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", - config.get("database").asText())); - } + protected void assertCdcEnabledInDb(JsonNode config, JdbcDatabase database) throws SQLException { + List queryResponse = database.query(connection -> { + final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?"; + PreparedStatement ps = connection.prepareStatement(sql); + ps.setString(1, config.get("database").asText()); + LOGGER.info(String.format("Checking that cdc is enabled on database '%s' using the query: '%s'", + config.get("database").asText(), sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + if (queryResponse.size() < 1) { + throw new RuntimeException(String.format( + "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", + config.get("database").asText())); + } + if (!(queryResponse.get(0).get("is_cdc_enabled").asBoolean())) { + throw new RuntimeException(String.format( + "Detected that CDC is not enabled for database '%s'. Please check the documentation on how to enable CDC on MS SQL Server.", + config.get("database").asText())); + } + } - if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) { - throw new RuntimeException(String.format( - "Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. " - + "Please check the documentation on how to enable snapshot isolation on MS SQL Server.", - config.get("database").asText())); - } - }); + protected void assertCdcSchemaQueryable(JsonNode config, JdbcDatabase database) throws SQLException { + List queryResponse = database.query(connection -> { + final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; + PreparedStatement ps = connection.prepareStatement(sql); + LOGGER.info(String.format("Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'", + config.get("username").asText(), sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + // Ensure at least one available CDC table + if (queryResponse.size() < 1) { + throw new RuntimeException("No cdc-enabled tables found. Please check the documentation on how to enable CDC on MS SQL Server."); + } + } + // todo: ensure this works for Azure managed SQL (since it uses different sql server agent) + protected void assertSqlServerAgentRunning(JdbcDatabase database) throws SQLException { + try { + List queryResponse = database.query(connection -> { + final String sql = "SELECT status_desc FROM sys.dm_server_services WHERE [servicename] LIKE 'SQL Server Agent%'"; + PreparedStatement ps = connection.prepareStatement(sql); + LOGGER.info(String.format("Checking that the SQL Server Agent is running using the query: '%s'", sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + if (!(queryResponse.get(0).get("status_desc").toString().contains("Running"))) { + throw new RuntimeException(String.format( + "The SQL Server Agent is not running. Current state: '%s'. Please check the documentation on ensuring SQL Server Agent is running.", + queryResponse.get(0).get("status_desc").toString())); + } + } catch (Exception e) { + if (e.getCause() != null && e.getCause().getClass().equals(com.microsoft.sqlserver.jdbc.SQLServerException.class)) { + LOGGER.warn(String.format("Skipping check for whether the SQL Server Agent is running, SQLServerException thrown: '%s'", + e.getMessage())); + } else { + throw e; + } } + } - return checkOperations; + protected void assertSnapshotIsolationAllowed(JsonNode config, JdbcDatabase database) throws SQLException { + List queryResponse = database.query(connection -> { + final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?"; + PreparedStatement ps = connection.prepareStatement(sql); + ps.setString(1, config.get("database").asText()); + LOGGER.info(String.format("Checking that snapshot isolation is enabled on database '%s' using the query: '%s'", + config.get("database").asText(), sql)); + return ps; + }, JdbcUtils::rowToJson).collect(toList()); + if (queryResponse.size() < 1) { + throw new RuntimeException(String.format( + "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", + config.get("database").asText())); + } + if (queryResponse.get(0).get("snapshot_isolation_state").asInt() != 1) { + throw new RuntimeException(String.format( + "Detected that snapshot isolation is not enabled for database '%s'. MSSQL CDC relies on snapshot isolation. " + + "Please check the documentation on how to enable snapshot isolation on MS SQL Server.", + config.get("database").asText())); + } } @Override @@ -243,7 +234,8 @@ public List> getIncrementalIterators(JdbcD JsonNode sourceConfig = database.getSourceConfig(); if (isCdc(sourceConfig) && shouldUseCDC(catalog)) { LOGGER.info("using CDC: {}", true); - AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, MssqlCdcTargetPosition.getTargetPostion(database), + AirbyteDebeziumHandler handler = new AirbyteDebeziumHandler(sourceConfig, + MssqlCdcTargetPosition.getTargetPosition(database, sourceConfig.get("database").asText()), MssqlCdcProperties.getDebeziumProperties(), catalog, true); return handler.getIncrementalIterators(new MssqlCdcSavedInfoFetcher(stateManager.getCdcStateManager().getCdcState()), new MssqlCdcStateHandler(stateManager), new MssqlCdcConnectorMetadataInjector(), emittedAt); diff --git a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java index 614cff33cd5a..8821ce5a526b 100644 --- a/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java +++ b/airbyte-integrations/connectors/source-mssql/src/test/java/io/airbyte/integrations/source/mssql/CdcMssqlSourceTest.java @@ -30,10 +30,12 @@ import static io.airbyte.integrations.source.mssql.MssqlSource.DRIVER_CLASS; import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_CDC_OFFSET; import static io.airbyte.integrations.source.mssql.MssqlSource.MSSQL_DB_HISTORY; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; import com.fasterxml.jackson.databind.JsonNode; @@ -57,7 +59,6 @@ import java.util.Optional; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.testcontainers.containers.MSSQLServerContainer; @@ -71,6 +72,7 @@ public class CdcMssqlSourceTest extends CdcSourceTest { private String dbName; private Database database; + private JdbcDatabase testJdbcDatabase; private MssqlSource source; private JsonNode config; @@ -109,8 +111,22 @@ private void init() { DRIVER_CLASS, null); + testJdbcDatabase = Databases.createJdbcDatabase( + TEST_USER_NAME, + TEST_USER_PASSWORD, + String.format("jdbc:sqlserver://%s:%s", + container.getHost(), + container.getFirstMappedPort()), + DRIVER_CLASS + ); + executeQuery("CREATE DATABASE " + dbName + ";"); - executeQuery("ALTER DATABASE " + dbName + "\n\tSET ALLOW_SNAPSHOT_ISOLATION ON"); + switchSnapshotIsolation(true, dbName); + } + + private void switchSnapshotIsolation(Boolean on, String db) { + String onOrOff = on ? "ON" : "OFF"; + executeQuery("ALTER DATABASE " + db + "\n\tSET ALLOW_SNAPSHOT_ISOLATION " + onOrOff); } private void setupTestUser() { @@ -124,10 +140,15 @@ private void revokeAllPermissions() { executeQuery("EXEC sp_msforeachtable \"REVOKE ALL ON '?' TO " + TEST_USER_NAME + ";\""); } + private void alterPermissionsOnSchema(Boolean grant, String schema) { + String grantOrRemove = grant ? "GRANT" : "REVOKE"; + executeQuery(String.format("USE %s;\n" + "%s SELECT ON SCHEMA :: [%s] TO %s", dbName, grantOrRemove, schema, TEST_USER_NAME)); + } + private void grantCorrectPermissions() { - executeQuery(String.format("USE %s;\n" + "GRANT SELECT ON SCHEMA :: [%s] TO %s", dbName, MODELS_SCHEMA, TEST_USER_NAME)); - executeQuery(String.format("USE %s;\n" + "GRANT SELECT ON SCHEMA :: [%s] TO %s", dbName, MODELS_SCHEMA + "_random", TEST_USER_NAME)); - executeQuery(String.format("USE %s;\n" + "GRANT SELECT ON SCHEMA :: [%s] TO %s", dbName, "cdc", TEST_USER_NAME)); + alterPermissionsOnSchema(true, MODELS_SCHEMA); + alterPermissionsOnSchema(true, MODELS_SCHEMA + "_random"); + alterPermissionsOnSchema(true, "cdc"); executeQuery(String.format("EXEC sp_addrolemember N'%s', N'%s';", CDC_ROLE_NAME, TEST_USER_NAME)); } @@ -136,9 +157,14 @@ public String createSchemaQuery(String schemaName) { return "CREATE SCHEMA " + schemaName; } + private void switchCdcOnDatabase(Boolean enable, String db) { + String storedProc = enable ? "sys.sp_cdc_enable_db" : "sys.sp_cdc_disable_db"; + executeQuery("USE " + db + "\n" + "EXEC " + storedProc); + } + @Override public void createTable(String schemaName, String tableName, String columnClause) { - executeQuery("USE " + dbName + "\n" + "EXEC sys.sp_cdc_enable_db"); + switchCdcOnDatabase(true, dbName); super.createTable(schemaName, tableName, columnClause); // sometimes seeing an error that we can't enable cdc on a table while sql server agent is still @@ -203,25 +229,86 @@ public void tearDown() { } @Test - @DisplayName("Ensure CHECK still works when we have permissions to check SQL Server Agent status") - void testCheckWithElevatedPermissions() { + void testAssertCdcEnabledInDb() { + // since we enable cdc in setup, assert that we successfully pass this first + assertDoesNotThrow(() -> source.assertCdcEnabledInDb(config, testJdbcDatabase)); + // then disable cdc and assert the check fails + switchCdcOnDatabase(false, dbName); + assertThrows(RuntimeException.class, () -> source.assertCdcEnabledInDb(config, testJdbcDatabase)); + } + + @Test + void testAssertCdcSchemaQueryable() { + // correct access granted by setup so assert check passes + assertDoesNotThrow(() -> source.assertCdcSchemaQueryable(config, testJdbcDatabase)); + // now revoke perms and assert that check fails + alterPermissionsOnSchema(false, "cdc"); + assertThrows(com.microsoft.sqlserver.jdbc.SQLServerException.class, () -> source.assertCdcSchemaQueryable(config, testJdbcDatabase)); + } + + private void switchSqlServerAgentAndWait(Boolean start) throws InterruptedException { + String startOrStop = start ? "START" : "STOP"; + executeQuery(String.format("EXEC xp_servicecontrol N'%s',N'SQLServerAGENT';", startOrStop)); + Thread.sleep(15*1000); // 15 seconds to wait for change of agent state + } + + @Test + void testAssertSqlServerAgentRunning() throws InterruptedException { executeQuery(String.format("USE master;\n" + "GRANT VIEW SERVER STATE TO %s", TEST_USER_NAME)); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.SUCCEEDED); + // assert expected failure if sql server agent stopped + switchSqlServerAgentAndWait(false); + assertThrows(RuntimeException.class, () -> source.assertSqlServerAgentRunning(testJdbcDatabase)); + // assert success if sql server agent running + switchSqlServerAgentAndWait(true); + assertDoesNotThrow(() -> source.assertSqlServerAgentRunning(testJdbcDatabase)); } @Test - void testCheckWhenDbCdcDisabled() { - executeQuery("USE " + dbName + "\n" + "EXEC sys.sp_cdc_disable_db"); - final AirbyteConnectionStatus status = source.check(config); - assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + void testAssertSnapshotIsolationAllowed() { + // snapshot isolation enabled by setup so assert check passes + assertDoesNotThrow(() -> source.assertSnapshotIsolationAllowed(config, testJdbcDatabase)); + // now disable snapshot isolation and assert that check fails + switchSnapshotIsolation(false, dbName); + assertThrows(RuntimeException.class, () -> source.assertSnapshotIsolationAllowed(config, testJdbcDatabase)); } + // Ensure the CDC check operations are included when CDC is enabled + // todo: make this better by checking the returned checkOperations from source.getCheckOperations @Test - void testCheckWithInadequatePermissions() { - executeQuery(String.format("USE %s;\n" + "REVOKE SELECT ON SCHEMA :: [%s] TO %s", dbName, "cdc", TEST_USER_NAME)); - final AirbyteConnectionStatus status = source.check(config); + void testCdcCheckOperations() throws Exception { + // assertCdcEnabledInDb + switchCdcOnDatabase(false, dbName); + AirbyteConnectionStatus status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + switchCdcOnDatabase(true, dbName); + // assertCdcSchemaQueryable + alterPermissionsOnSchema(false, "cdc"); + status = getSource().check(getConfig()); assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + alterPermissionsOnSchema(true, "cdc"); + // assertSqlServerAgentRunning + executeQuery(String.format("USE master;\n" + "GRANT VIEW SERVER STATE TO %s", TEST_USER_NAME)); + switchSqlServerAgentAndWait(false); + status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + switchSqlServerAgentAndWait(true); + // assertSnapshotIsolationAllowed + switchSnapshotIsolation(false, dbName); + status = getSource().check(getConfig()); + assertEquals(status.getStatus(), AirbyteConnectionStatus.Status.FAILED); + } + + //todo: check LSN returned is actually the max LSN + //todo: check we fail as expected under certain conditions + @Test + void testGetTargetPosition() throws InterruptedException { + // check that getTargetPosition returns higher Lsn after inserting new row + Lsn firstLsn = MssqlCdcTargetPosition.getTargetPosition(testJdbcDatabase, dbName).targetLsn; + executeQuery(String.format("USE %s; INSERT INTO %s.%s (%s, %s, %s) VALUES (%s, %s, '%s');", + dbName, MODELS_SCHEMA, MODELS_STREAM_NAME, COL_ID, COL_MAKE_ID, COL_MODEL, 910019, 1, "another car")); + Thread.sleep(15*1000); // 15 seconds to wait for Agent capture job to log cdc change + Lsn secondLsn = MssqlCdcTargetPosition.getTargetPosition(testJdbcDatabase, dbName).targetLsn; + assertTrue(secondLsn.compareTo(firstLsn) > 0); } @Override @@ -247,7 +334,7 @@ protected CdcTargetPosition cdcLatestTargetPosition() { config.get("port").asInt(), dbName), DRIVER_CLASS, new MssqlJdbcStreamingQueryConfiguration(), null); - return MssqlCdcTargetPosition.getTargetPostion(jdbcDatabase); + return MssqlCdcTargetPosition.getTargetPosition(jdbcDatabase, dbName); } @Override