From d60b2ecaa2113fcebe9db695bd260528c277a5fd Mon Sep 17 00:00:00 2001 From: subodh Date: Thu, 4 Aug 2022 21:22:58 +0530 Subject: [PATCH 1/6] implement validation for cursor type before reading data --- .../db/JdbcCompatibleSourceOperations.java | 2 + .../airbyte/db/jdbc/JdbcSourceOperations.java | 8 ++++ .../source/jdbc/AbstractJdbcSource.java | 5 +++ .../source/mysql/MySqlSourceOperations.java | 10 +++++ .../postgres/PostgresSourceOperations.java | 8 ++++ .../source/relationaldb/AbstractDbSource.java | 37 +++++++++++++++++++ .../source/relationaldb/InvalidCursor.java | 37 +++++++++++++++++++ .../source/tidb/TiDBSourceOperations.java | 8 ++++ 8 files changed, 115 insertions(+) create mode 100644 airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursor.java diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java index 9539441e0e50..6c9b78e50aed 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java @@ -56,4 +56,6 @@ void setStatementField(final PreparedStatement preparedStatement, */ String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) throws SQLException; + boolean isValidCursorType(final SourceType cursorType); + } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java index 35269a391da0..f1aced35fe4b 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java @@ -60,6 +60,14 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob } } + @Override + public boolean isValidCursorType(final JDBCType cursorType) { + return switch (cursorType) { + case TIMESTAMP, TIME, DATE, BIT, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR, BINARY, BLOB -> true; + default -> false; + }; + } + @Override public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 6a0220331f2c..c83cdc928351 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -310,6 +310,11 @@ protected DataSource createDataSource(final JsonNode config) { return dataSource; } + @Override + protected boolean isValidCursorType(final Datatype cursorType) { + return sourceOperations.isValidCursorType(cursorType); + } + @Override public JdbcDatabase createDatabase(final JsonNode config) throws SQLException { final DataSource dataSource = createDataSource(config); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index 91de0cf63338..370649f8cb05 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -110,6 +110,16 @@ protected void putBoolean(final ObjectNode node, final String columnName, final node.put(columnName, resultSet.getInt(index) > 0); } + @Override + public boolean isValidCursorType(final MysqlType cursorType) { + return switch (cursorType) { + case BIT, BOOLEAN, TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED, DATE, DATETIME, TIMESTAMP, TIME, YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> true; + // since cursor are expected to be comparable, handle cursor typing strictly and error on + // unrecognized types + default -> false; + }; + } + @Override public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 8e9be8666cee..03d6fce362c1 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -90,6 +90,14 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { return jsonNode; } + @Override + public boolean isValidCursorType(final JDBCType cursorType) { + return switch (cursorType) { + case TIMESTAMP, TIMESTAMP_WITH_TIMEZONE, TIME, TIME_WITH_TIMEZONE, DATE, BIT, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, REAL, NUMERIC, DECIMAL, CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR, BINARY, BLOB -> true; + default -> false; + }; + } + @Override public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index b2acd993825a..7ef7857a7353 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -23,6 +23,7 @@ import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Source; +import io.airbyte.integrations.source.relationaldb.InvalidCursor.Info; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; @@ -125,6 +126,8 @@ public AutoCloseableIterator read(final JsonNode config, .collect(Collectors.toMap(t -> String.format("%s.%s", t.getNameSpace(), t.getName()), Function .identity())); + validateCursorFieldForIncrementalTables(fullyQualifiedTableNameToInfo, catalog); + final List> incrementalIterators = getIncrementalIterators(database, catalog, fullyQualifiedTableNameToInfo, stateManager, emittedAt); final List> fullRefreshIterators = @@ -142,6 +145,40 @@ public AutoCloseableIterator read(final JsonNode config, }); } + private void validateCursorFieldForIncrementalTables(final Map>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) { + final List tablesWithInvalidCursor = new ArrayList<>(); + for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { + final AirbyteStream stream = airbyteStream.getStream(); + final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(), + stream.getName()); + if (!tableNameToTable.containsKey(fullyQualifiedTableName) || airbyteStream.getSyncMode() != SyncMode.INCREMENTAL || airbyteStream.getStream() + .getSourceDefinedCursor()) { + continue; + } + + final TableInfo> table = tableNameToTable + .get(fullyQualifiedTableName); + final String cursorField = IncrementalUtils.getCursorField(airbyteStream); + final DataType cursorType = table.getFields().stream() + .filter(info -> info.getName().equals(cursorField)) + .map(CommonField::getType) + .findFirst() + .orElseThrow(); + + if (isValidCursorType(cursorType)) { + continue; + } + + tablesWithInvalidCursor.add(new Info(fullyQualifiedTableName, cursorField, cursorType.toString())); + } + + if (tablesWithInvalidCursor.isEmpty()) { + throw new InvalidCursor(tablesWithInvalidCursor); + } + } + + protected abstract boolean isValidCursorType(final DataType cursorType); + protected List>> discoverWithoutSystemTables(final Database database) throws Exception { final Set systemNameSpaces = getExcludedInternalNameSpaces(); final List>> discoveredTables = discoverInternal(database); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursor.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursor.java new file mode 100644 index 000000000000..b960dd4b8969 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursor.java @@ -0,0 +1,37 @@ +package io.airbyte.integrations.source.relationaldb; + +import java.util.List; +import java.util.stream.Collectors; + +public class InvalidCursor extends RuntimeException { + + public InvalidCursor(final List tablesWithInvalidCursor) { + super("The following tables have invalid columns selected as cursor " + tablesWithInvalidCursor.stream().map(Info::toString) + .collect(Collectors.joining(","))); + } + + + public static class Info { + + private final String tableName; + private final String cursorColumnName; + private final String cursorSqlType; + + public Info(final String tableName, final String cursorColumnName, final String cursorSqlType) { + this.tableName = tableName; + this.cursorColumnName = cursorColumnName; + this.cursorSqlType = cursorSqlType; + } + + @Override + public String toString() { + return "{" + + "tableName='" + tableName + '\'' + + ", cursorColumnName='" + cursorColumnName + '\'' + + ", cursorSqlType=" + cursorSqlType + + '}'; + } + } + + +} diff --git a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java index a568e6ca342e..5b7ae9fd8c25 100644 --- a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java +++ b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java @@ -85,6 +85,14 @@ public void setJsonField(ResultSet resultSet, int colIndex, ObjectNode json) thr } } + @Override + public boolean isValidCursorType(final MysqlType cursorType) { + return switch (cursorType) { + case BIT, BOOLEAN, TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED, DATE, DATETIME, TIMESTAMP, TIME, YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> true; + default -> false; + }; + } + @Override public void setStatementField(PreparedStatement preparedStatement, int parameterIndex, MysqlType cursorFieldType, String value) throws SQLException { From 87258a3d1650a606de16b1f5080a521b9a551c3b Mon Sep 17 00:00:00 2001 From: subodh Date: Thu, 4 Aug 2022 21:59:59 +0530 Subject: [PATCH 2/6] rename class --- .../source/relationaldb/AbstractDbSource.java | 8 ++-- .../source/relationaldb/InvalidCursor.java | 37 ------------------- .../relationaldb/InvalidCursorException.java | 26 +++++++++++++ 3 files changed, 30 insertions(+), 41 deletions(-) delete mode 100644 airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursor.java create mode 100644 airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 7ef7857a7353..9dfe4c8f036f 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -23,7 +23,7 @@ import io.airbyte.integrations.BaseConnector; import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair; import io.airbyte.integrations.base.Source; -import io.airbyte.integrations.source.relationaldb.InvalidCursor.Info; +import io.airbyte.integrations.source.relationaldb.InvalidCursorException.InvalidCursorInfo; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.state.StateManager; import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory; @@ -146,7 +146,7 @@ public AutoCloseableIterator read(final JsonNode config, } private void validateCursorFieldForIncrementalTables(final Map>> tableNameToTable, final ConfiguredAirbyteCatalog catalog) { - final List tablesWithInvalidCursor = new ArrayList<>(); + final List tablesWithInvalidCursor = new ArrayList<>(); for (final ConfiguredAirbyteStream airbyteStream : catalog.getStreams()) { final AirbyteStream stream = airbyteStream.getStream(); final String fullyQualifiedTableName = getFullyQualifiedTableName(stream.getNamespace(), @@ -169,11 +169,11 @@ private void validateCursorFieldForIncrementalTables(final Map tablesWithInvalidCursor) { - super("The following tables have invalid columns selected as cursor " + tablesWithInvalidCursor.stream().map(Info::toString) - .collect(Collectors.joining(","))); - } - - - public static class Info { - - private final String tableName; - private final String cursorColumnName; - private final String cursorSqlType; - - public Info(final String tableName, final String cursorColumnName, final String cursorSqlType) { - this.tableName = tableName; - this.cursorColumnName = cursorColumnName; - this.cursorSqlType = cursorSqlType; - } - - @Override - public String toString() { - return "{" + - "tableName='" + tableName + '\'' + - ", cursorColumnName='" + cursorColumnName + '\'' + - ", cursorSqlType=" + cursorSqlType + - '}'; - } - } - - -} diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java new file mode 100644 index 000000000000..eaca805fb962 --- /dev/null +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java @@ -0,0 +1,26 @@ +package io.airbyte.integrations.source.relationaldb; + +import java.util.List; +import java.util.stream.Collectors; + +public class InvalidCursorException extends RuntimeException { + + public InvalidCursorException(final List tablesWithInvalidCursor) { + super("The following tables have invalid columns selected as cursor " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) + .collect(Collectors.joining(","))); + } + + public record InvalidCursorInfo(String tableName, String cursorColumnName, String cursorSqlType) { + + @Override + public String toString() { + return "{" + + "tableName='" + tableName + '\'' + + ", cursorColumnName='" + cursorColumnName + '\'' + + ", cursorSqlType=" + cursorSqlType + + '}'; + } + } + + +} From 667ce0a151e12683e835d088d20318b30c78a219 Mon Sep 17 00:00:00 2001 From: subodh Date: Tue, 16 Aug 2022 17:17:23 +0530 Subject: [PATCH 3/6] add test --- .../source/postgres/PostgresSource.java | 1 + .../source/postgres/PostgresSourceTest.java | 47 +++++++++++++++++++ .../source/relationaldb/AbstractDbSource.java | 8 ++-- .../relationaldb/InvalidCursorException.java | 2 +- 4 files changed, 54 insertions(+), 4 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index 6e0f2dbd157f..5cbe4600bf8a 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -16,6 +16,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableMap.Builder; import com.google.common.collect.Sets; import io.airbyte.commons.features.EnvVariableFeatureFlags; import io.airbyte.commons.features.FeatureFlags; diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 1f5f21cd8ecf..c07862d2fd51 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -7,6 +7,8 @@ import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.createRecord; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.map; import static io.airbyte.integrations.source.postgres.utils.PostgresUnitTestsUtil.setEmittedAtToNull; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.AssertionsForClassTypes.catchThrowable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -24,16 +26,20 @@ import io.airbyte.db.factory.DSLContextFactory; import io.airbyte.db.factory.DatabaseDriver; import io.airbyte.db.jdbc.JdbcUtils; +import io.airbyte.integrations.source.relationaldb.InvalidCursorException; import io.airbyte.protocol.models.AirbyteCatalog; import io.airbyte.protocol.models.AirbyteMessage; import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CatalogHelpers; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; +import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.DestinationSyncMode; import io.airbyte.protocol.models.Field; import io.airbyte.protocol.models.JsonSchemaType; import io.airbyte.protocol.models.SyncMode; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.math.BigDecimal; +import java.sql.SQLException; import java.util.Collections; import java.util.List; import java.util.Map; @@ -502,4 +508,45 @@ void testGetUsername() { assertEquals(username, PostgresSource.getUsername(azureConfig)); } + + @Test + public void tableWithInvalidCursorShouldThrowException() throws Exception { + try (final PostgreSQLContainer db = new PostgreSQLContainer<>("postgres:13-alpine")) { + db.start(); + final JsonNode config = getConfig(db); + try (final DSLContext dslContext = getDslContext(config)) { + final Database database = new Database(dslContext); + final ConfiguredAirbyteStream tableWithInvalidCursorType = createTableWithInvalidCursorType(database); + final ConfiguredAirbyteCatalog configuredAirbyteCatalog = new ConfiguredAirbyteCatalog().withStreams(Collections.singletonList(tableWithInvalidCursorType)); + + final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null))); + assertThat(throwable).isInstanceOf(InvalidCursorException.class) + .hasMessageContaining( + "The following tables have invalid columns selected as cursor, please select a valid column as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}"); + } finally { + db.stop(); + } + } + } + + private ConfiguredAirbyteStream createTableWithInvalidCursorType(final Database database) throws SQLException { + database.query(ctx -> { + ctx.fetch("CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\";"); + ctx.fetch("CREATE TABLE IF NOT EXISTS public.test_table(id uuid PRIMARY KEY DEFAULT uuid_generate_v4());"); + return null; + }); + + return new ConfiguredAirbyteStream().withSyncMode(SyncMode.INCREMENTAL) + .withCursorField(Lists.newArrayList("id")) + .withDestinationSyncMode(DestinationSyncMode.APPEND) + .withSyncMode(SyncMode.INCREMENTAL) + .withStream(CatalogHelpers.createAirbyteStream( + "test_table", + "public", + Field.of("id", JsonSchemaType.STRING)) + .withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)) + .withSourceDefinedPrimaryKey(List.of(List.of("id")))); + + } + } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 9dfe4c8f036f..531f3ead87b2 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -51,6 +51,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; @@ -151,8 +152,9 @@ private void validateCursorFieldForIncrementalTables(final Map tablesWithInvalidCursor) { - super("The following tables have invalid columns selected as cursor " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) + super("The following tables have invalid columns selected as cursor, please select a valid column as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) .collect(Collectors.joining(","))); } From f498ce1ce7151fc737f51b2aa0d4907569d012c9 Mon Sep 17 00:00:00 2001 From: subodh Date: Mon, 22 Aug 2022 22:36:27 +0530 Subject: [PATCH 4/6] fix merge conflicts --- .../src/main/java/io/airbyte/db/IncrementalUtils.java | 9 +++++++++ .../java/io/airbyte/db/jdbc/JdbcSourceOperations.java | 8 -------- .../integrations/source/jdbc/AbstractJdbcSource.java | 2 +- .../source/mysql/MySqlSourceOperations.java | 10 ---------- .../source/postgres/PostgresSourceOperations.java | 8 -------- .../source/relationaldb/AbstractDbSource.java | 9 ++++++--- .../integrations/source/tidb/TiDBSourceOperations.java | 8 -------- 7 files changed, 16 insertions(+), 38 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java index 6567006a63b2..250f4d6d5ebe 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/IncrementalUtils.java @@ -6,6 +6,7 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.protocol.models.JsonSchemaPrimitive; +import java.util.Optional; public class IncrementalUtils { @@ -21,6 +22,14 @@ public static String getCursorField(final ConfiguredAirbyteStream stream) { } } + public static Optional getCursorFieldOptional(final ConfiguredAirbyteStream stream) { + try { + return Optional.ofNullable(getCursorField(stream)); + } catch (IllegalStateException e) { + return Optional.empty(); + } + } + public static JsonSchemaPrimitive getCursorType(final ConfiguredAirbyteStream stream, final String cursorField) { if (stream.getStream().getJsonSchema().get(PROPERTIES) == null) { throw new IllegalStateException(String.format("No properties found in stream: %s.", stream.getStream().getName())); diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java index 48728223b23a..5950da239a6a 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java @@ -61,14 +61,6 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob } } - @Override - public boolean isValidCursorType(final JDBCType cursorType) { - return switch (cursorType) { - case TIMESTAMP, TIME, DATE, BIT, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, DOUBLE, REAL, NUMERIC, DECIMAL, CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR, BINARY, BLOB -> true; - default -> false; - }; - } - @Override public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 9dff6a5d9f1d..53fe69e7f384 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -330,7 +330,7 @@ protected DataSource createDataSource(final JsonNode config) { @Override protected boolean isValidCursorType(final Datatype cursorType) { - return sourceOperations.isValidCursorType(cursorType); + return sourceOperations.isCursorType(cursorType); } @Override diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index 01688bea7d6c..54079acbc935 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -149,16 +149,6 @@ protected void putBoolean(final ObjectNode node, final String columnName, final node.put(columnName, resultSet.getInt(index) > 0); } - @Override - public boolean isValidCursorType(final MysqlType cursorType) { - return switch (cursorType) { - case BIT, BOOLEAN, TINYINT, TINYINT_UNSIGNED, SMALLINT, SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED, FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED, DATE, DATETIME, TIMESTAMP, TIME, YEAR, CHAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT, ENUM, SET, TINYBLOB, BLOB, MEDIUMBLOB, LONGBLOB, BINARY, VARBINARY -> true; - // since cursor are expected to be comparable, handle cursor typing strictly and error on - // unrecognized types - default -> false; - }; - } - @Override public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 0ea89ddcdefc..c4e6d1b46804 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -89,14 +89,6 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { return jsonNode; } - @Override - public boolean isValidCursorType(final JDBCType cursorType) { - return switch (cursorType) { - case TIMESTAMP, TIMESTAMP_WITH_TIMEZONE, TIME, TIME_WITH_TIMEZONE, DATE, BIT, BOOLEAN, TINYINT, SMALLINT, INTEGER, BIGINT, FLOAT, REAL, NUMERIC, DECIMAL, CHAR, NCHAR, NVARCHAR, VARCHAR, LONGVARCHAR, BINARY, BLOB -> true; - default -> false; - }; - } - @Override public void setStatementField(final PreparedStatement preparedStatement, final int parameterIndex, diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 40300d3af411..b82dafddc2b7 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -161,9 +161,12 @@ private void validateCursorFieldForIncrementalTables(final Map> table = tableNameToTable .get(fullyQualifiedTableName); - final String cursorField = IncrementalUtils.getCursorField(airbyteStream); + final Optional cursorField = IncrementalUtils.getCursorFieldOptional(airbyteStream); + if (cursorField.isEmpty()) { + continue; + } final DataType cursorType = table.getFields().stream() - .filter(info -> info.getName().equals(cursorField)) + .filter(info -> info.getName().equals(cursorField.get())) .map(CommonField::getType) .findFirst() .orElseThrow(); @@ -172,7 +175,7 @@ private void validateCursorFieldForIncrementalTables(final Map true; - default -> false; - }; - } - @Override public void setStatementField(PreparedStatement preparedStatement, int parameterIndex, MysqlType cursorFieldType, String value) throws SQLException { From e51470cfe001f5df7cad42299b7ed1bc2c4e02b4 Mon Sep 17 00:00:00 2001 From: subodh Date: Thu, 25 Aug 2022 12:03:06 +0530 Subject: [PATCH 5/6] address review comments --- .../integrations/source/relationaldb/AbstractDbSource.java | 6 ++---- .../source/relationaldb/InvalidCursorException.java | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 477a24624cf0..92e70ebcd234 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -171,11 +171,9 @@ private void validateCursorFieldForIncrementalTables(final Map tablesWithInvalidCursor) { - super("The following tables have invalid columns selected as cursor, please select a valid column as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) + super("The following tables have invalid columns selected as cursor, please select a column a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) .collect(Collectors.joining(","))); } From 682e5304f7e00e64c35e0b2160478b92384e41de Mon Sep 17 00:00:00 2001 From: subodh Date: Wed, 31 Aug 2022 21:06:32 +0530 Subject: [PATCH 6/6] fix test --- .../integrations/source/postgres/PostgresSourceTest.java | 2 +- .../source/relationaldb/InvalidCursorException.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java index 1664cbaa587b..485dfe11e3c0 100644 --- a/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java +++ b/airbyte-integrations/connectors/source-postgres/src/test/java/io/airbyte/integrations/source/postgres/PostgresSourceTest.java @@ -523,7 +523,7 @@ public void tableWithInvalidCursorShouldThrowException() throws Exception { final Throwable throwable = catchThrowable(() -> MoreIterators.toSet(new PostgresSource().read(config, configuredAirbyteCatalog, null))); assertThat(throwable).isInstanceOf(InvalidCursorException.class) .hasMessageContaining( - "The following tables have invalid columns selected as cursor, please select a valid column as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}"); + "The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. {tableName='public.test_table', cursorColumnName='id', cursorSqlType=OTHER}"); } finally { db.stop(); } diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java index 99da29f77516..97cf102cab35 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/InvalidCursorException.java @@ -6,7 +6,7 @@ public class InvalidCursorException extends RuntimeException { public InvalidCursorException(final List tablesWithInvalidCursor) { - super("The following tables have invalid columns selected as cursor, please select a column a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) + super("The following tables have invalid columns selected as cursor, please select a column with a well-defined ordering as a cursor. " + tablesWithInvalidCursor.stream().map(InvalidCursorInfo::toString) .collect(Collectors.joining(","))); }