From e0b5e5fde6093def24c47398bfe4ee853ca4cf7d Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 23 Dec 2022 13:47:28 -0800 Subject: [PATCH 1/4] Refactor SourceOperations class --- .../db/JdbcCompatibleSourceOperations.java | 12 +++---- ...bstractJdbcCompatibleSourceOperations.java | 2 +- .../airbyte/db/jdbc/JdbcSourceOperations.java | 12 +++---- .../io/airbyte/db/jdbc/TestJdbcUtils.java | 30 ++++++++-------- .../source/bigquery/BigQuerySource.java | 2 +- .../CockroachJdbcSourceOperations.java | 2 +- .../Db2SourceOperations.java | 2 +- .../sources/Db2SourceDatatypeTest.java | 1 - .../source/jdbc/AbstractJdbcSource.java | 20 ++++------- .../MongoDbSource.java | 2 +- .../source/mssql/MssqlSourceOperations.java | 4 +-- .../source/mysql/MySqlSourceOperations.java | 16 ++++----- .../mysql/MySqlSourceOperationsTest.java | 34 +++++++++---------- .../postgres/PostgresSourceOperations.java | 15 ++++---- .../source/relationaldb/AbstractDbSource.java | 10 +++--- .../SnowflakeSourceOperations.java | 6 ++-- .../source/tidb/TiDBSourceOperations.java | 10 +++--- .../models/JsonSchemaReferenceTypes.java | 2 +- .../protocol/models/JsonSchemaType.java | 32 ++++++++++++----- 19 files changed, 110 insertions(+), 104 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java index 9d481b977c25..55adeae111e8 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java @@ -20,21 +20,21 @@ public interface JdbcCompatibleSourceOperations extends SourceOperat * * @param colIndex 1-based column index. */ - void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException; + void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException; /** * Set the cursor field in incremental table query. */ - void setStatementField(final PreparedStatement preparedStatement, - final int parameterIndex, - final SourceType cursorFieldType, - final String value) + void setCursorField(final PreparedStatement preparedStatement, + final int parameterIndex, + final SourceType cursorFieldType, + final String value) throws SQLException; /** * Determine the database specific type of the input field based on its column metadata. */ - SourceType getFieldType(final JsonNode field); + SourceType getDatabaseFieldType(final JsonNode field); /** * @return the input identifiers with quotes and delimiters. diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index eea2286e8f34..55378f929fb4 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -58,7 +58,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { } // convert to java types that will convert into reasonable json. - setJsonField(queryContext, i, jsonNode); + putJsonField(queryContext, i, jsonNode); } return jsonNode; diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java index 2c38b91b679c..32472b2a452a 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java @@ -37,7 +37,7 @@ protected JDBCType safeGetJdbcType(final int columnTypeInt) { } @Override - public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final int columnTypeInt = resultSet.getMetaData().getColumnType(colIndex); final String columnName = resultSet.getMetaData().getColumnName(colIndex); final JDBCType columnType = safeGetJdbcType(columnTypeInt); @@ -63,10 +63,10 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob } @Override - public void setStatementField(final PreparedStatement preparedStatement, - final int parameterIndex, - final JDBCType cursorFieldType, - final String value) + public void setCursorField(final PreparedStatement preparedStatement, + final int parameterIndex, + final JDBCType cursorFieldType, + final String value) throws SQLException { switch (cursorFieldType) { @@ -90,7 +90,7 @@ public void setStatementField(final PreparedStatement preparedStatement, } @Override - public JDBCType getFieldType(final JsonNode field) { + public JDBCType getDatabaseFieldType(final JsonNode field) { try { return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt()); } catch (final IllegalArgumentException ex) { diff --git a/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java b/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java index 92435db61ce4..97476dbf3cd5 100644 --- a/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java +++ b/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java @@ -152,21 +152,21 @@ void testSetStatementField() throws SQLException { // insert the bit here to stay consistent even though setStatementField does not support it yet. ps.setString(1, "1"); - sourceOperations.setStatementField(ps, 2, JDBCType.BOOLEAN, "true"); - sourceOperations.setStatementField(ps, 3, JDBCType.SMALLINT, "1"); - sourceOperations.setStatementField(ps, 4, JDBCType.INTEGER, "1"); - sourceOperations.setStatementField(ps, 5, JDBCType.BIGINT, "1"); - sourceOperations.setStatementField(ps, 6, JDBCType.FLOAT, "1.0"); - sourceOperations.setStatementField(ps, 7, JDBCType.DOUBLE, "1.0"); - sourceOperations.setStatementField(ps, 8, JDBCType.REAL, "1.0"); - sourceOperations.setStatementField(ps, 9, JDBCType.NUMERIC, "1"); - sourceOperations.setStatementField(ps, 10, JDBCType.DECIMAL, "1"); - sourceOperations.setStatementField(ps, 11, JDBCType.CHAR, "a"); - sourceOperations.setStatementField(ps, 12, JDBCType.VARCHAR, "a"); - sourceOperations.setStatementField(ps, 13, JDBCType.DATE, "2020-11-01T00:00:00Z"); - sourceOperations.setStatementField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00.000Z"); - sourceOperations.setStatementField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00.000Z"); - sourceOperations.setStatementField(ps, 16, JDBCType.BINARY, "61616161"); + sourceOperations.setCursorField(ps, 2, JDBCType.BOOLEAN, "true"); + sourceOperations.setCursorField(ps, 3, JDBCType.SMALLINT, "1"); + sourceOperations.setCursorField(ps, 4, JDBCType.INTEGER, "1"); + sourceOperations.setCursorField(ps, 5, JDBCType.BIGINT, "1"); + sourceOperations.setCursorField(ps, 6, JDBCType.FLOAT, "1.0"); + sourceOperations.setCursorField(ps, 7, JDBCType.DOUBLE, "1.0"); + sourceOperations.setCursorField(ps, 8, JDBCType.REAL, "1.0"); + sourceOperations.setCursorField(ps, 9, JDBCType.NUMERIC, "1"); + sourceOperations.setCursorField(ps, 10, JDBCType.DECIMAL, "1"); + sourceOperations.setCursorField(ps, 11, JDBCType.CHAR, "a"); + sourceOperations.setCursorField(ps, 12, JDBCType.VARCHAR, "a"); + sourceOperations.setCursorField(ps, 13, JDBCType.DATE, "2020-11-01T00:00:00Z"); + sourceOperations.setCursorField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00.000Z"); + sourceOperations.setCursorField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00.000Z"); + sourceOperations.setCursorField(ps, 16, JDBCType.BINARY, "61616161"); ps.execute(); diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index cf8c8bd80a22..da4d9b0cbfab 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -92,7 +92,7 @@ public List> getCheckOperations(fin } @Override - protected JsonSchemaType getType(final StandardSQLTypeName columnType) { + protected JsonSchemaType getAirbyteType(final StandardSQLTypeName columnType) { return sourceOperations.getJsonType(columnType); } diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java index f29eee7df9f1..5c093d9b4850 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java @@ -45,7 +45,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { try { queryContext.getObject(i); if (!queryContext.wasNull()) { - setJsonField(queryContext, i, jsonNode); + putJsonField(queryContext, i, jsonNode); } } catch (final SQLException e) { putCockroachSpecialDataType(queryContext, i, jsonNode); diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java index 3bf50749b4fc..cc5f283f5f7a 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java @@ -38,7 +38,7 @@ private void setFields(ResultSet queryContext, int index, ObjectNode jsonNode) t try { queryContext.getObject(index); if (!queryContext.wasNull()) { - setJsonField(queryContext, index, jsonNode); + putJsonField(queryContext, index, jsonNode); } } catch (SQLException e) { if (DB2_UNIQUE_NUMBER_TYPES.contains(queryContext.getMetaData().getColumnTypeName(index))) { diff --git a/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceDatatypeTest.java b/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceDatatypeTest.java index eb47985e15c1..7e8a778c183a 100644 --- a/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-db2/src/test-integration/java/io/airbyte/integrations/io/airbyte/integration_tests/sources/Db2SourceDatatypeTest.java @@ -16,7 +16,6 @@ import io.airbyte.integrations.standardtest.source.TestDataHolder; import io.airbyte.integrations.standardtest.source.TestDestinationEnv; import io.airbyte.protocol.models.JsonSchemaType; -import java.math.BigDecimal; import org.jooq.DSLContext; import org.jooq.SQLDialect; import org.testcontainers.containers.Db2Container; diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index 2e4d9d1f9ae1..a1274a395a87 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -205,8 +205,8 @@ protected List>> discoverInternal(final JdbcData .fields(fields.stream() // read the column metadata Json object, and determine its type .map(f -> { - final Datatype datatype = getFieldType(f); - final JsonSchemaType jsonType = getType(datatype); + final Datatype datatype = sourceOperations.getDatabaseFieldType(f); + final JsonSchemaType jsonType = getAirbyteType(datatype); LOGGER.info("Table {} column {} (type {}[{}], nullable {}) -> {}", fields.get(0).get(INTERNAL_TABLE_NAME).asText(), f.get(INTERNAL_COLUMN_NAME).asText(), @@ -224,7 +224,7 @@ protected List>> discoverInternal(final JdbcData private List extractCursorFields(final List fields) { return fields.stream() - .filter(field -> isCursorType(getFieldType(field))) + .filter(field -> isCursorType(sourceOperations.getDatabaseFieldType(field))) .map(field -> field.get(INTERNAL_COLUMN_NAME).asText()) .collect(Collectors.toList()); } @@ -268,14 +268,6 @@ private JsonNode getColumnMetadata(final ResultSet resultSet) throws SQLExceptio .build()); } - /** - * @param field Essential column information returned from - * {@link AbstractJdbcSource#getColumnMetadata}. - */ - private Datatype getFieldType(final JsonNode field) { - return sourceOperations.getFieldType(field); - } - @Override public List>> discoverInternal(final JdbcDatabase database) throws Exception { @@ -283,7 +275,7 @@ public List>> discoverInternal(final JdbcDatabas } @Override - public JsonSchemaType getType(final Datatype columnType) { + public JsonSchemaType getAirbyteType(final Datatype columnType) { return sourceOperations.getJsonType(columnType); } @@ -383,7 +375,7 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final PreparedStatement preparedStatement = connection.prepareStatement(sql.toString()); LOGGER.info("Executing query for table {}: {}", tableName, preparedStatement); - sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); + sourceOperations.setCursorField(preparedStatement, 1, cursorFieldType, cursorInfo.getCursor()); return preparedStatement; }, sourceOperations::rowToJson); @@ -430,7 +422,7 @@ private long getActualCursorRecordCount(final Connection connection, fullTableName, quotedCursorField); cursorRecordStatement = connection.prepareStatement(cursorRecordQuery);; - sourceOperations.setStatementField(cursorRecordStatement, 1, cursorFieldType, cursor); + sourceOperations.setCursorField(cursorRecordStatement, 1, cursorFieldType, cursor); } final ResultSet resultSet = cursorRecordStatement.executeQuery(); if (resultSet.next()) { diff --git a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java index 7491d47146be..97095a4bb4bd 100644 --- a/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java +++ b/airbyte-integrations/connectors/source-mongodb-v2/src/main/java/io.airbyte.integrations.source.mongodb/MongoDbSource.java @@ -87,7 +87,7 @@ public List> getCheckOperations(final } @Override - protected JsonSchemaType getType(final BsonType fieldType) { + protected JsonSchemaType getAirbyteType(final BsonType fieldType) { return MongoUtils.getType(fieldType); } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java index 5d7e0e507728..28ddc78d9075 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java @@ -35,7 +35,7 @@ public class MssqlSourceOperations extends JdbcSourceOperations { * @throws SQLException */ @Override - public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) + public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final SQLServerResultSetMetaData metadata = (SQLServerResultSetMetaData) resultSet @@ -81,7 +81,7 @@ private void putValue(final JDBCType columnType, } @Override - public JDBCType getFieldType(final JsonNode field) { + public JDBCType getDatabaseFieldType(final JsonNode field) { try { final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText(); if (typeName.equalsIgnoreCase("geography") diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index 8590ea970bff..46908ef46d51 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -72,7 +72,7 @@ public class MySqlSourceOperations extends AbstractJdbcCompatibleSourceOperation * @param colIndex 1-based column index. */ @Override - public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final ResultSetMetaData metaData = (ResultSetMetaData) resultSet.getMetaData(); final Field field = metaData.getFields()[colIndex - 1]; final String columnName = field.getName(); @@ -150,10 +150,10 @@ protected void putBoolean(final ObjectNode node, final String columnName, final } @Override - public void setStatementField(final PreparedStatement preparedStatement, - final int parameterIndex, - final MysqlType cursorFieldType, - final String value) + public void setCursorField(final PreparedStatement preparedStatement, + final int parameterIndex, + final MysqlType cursorFieldType, + final String value) throws SQLException { switch (cursorFieldType) { case BIT -> setBit(preparedStatement, parameterIndex, value); @@ -176,7 +176,7 @@ public void setStatementField(final PreparedStatement preparedStatement, } @Override - public MysqlType getFieldType(final JsonNode field) { + public MysqlType getDatabaseFieldType(final JsonNode field) { try { // MysqlType#getByName can handle the full MySQL type name // e.g. MEDIUMINT UNSIGNED @@ -208,7 +208,7 @@ public MysqlType getFieldType(final JsonNode field) { } @Override - public boolean isCursorType(MysqlType type) { + public boolean isCursorType(final MysqlType type) { return ALLOWED_CURSOR_TYPES.contains(type); } @@ -260,7 +260,7 @@ protected void setDate(final PreparedStatement preparedStatement, final int para } @Override - protected void setTimestamp(PreparedStatement preparedStatement, int parameterIndex, String value) throws SQLException { + protected void setTimestamp(final PreparedStatement preparedStatement, final int parameterIndex, final String value) throws SQLException { try { preparedStatement.setObject(parameterIndex, LocalDateTime.parse(value)); } catch (final DateTimeParseException e) { diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java index 48f065ddbdae..c5c79a0df5b6 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java @@ -58,7 +58,7 @@ public void init() { public void tearDown() { try { container.close(); - } catch (Exception e) { + } catch (final Exception e) { throw new RuntimeException(e); } } @@ -85,13 +85,13 @@ public void dateColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATE, DateTimeConverter.convertToDate(LocalDate.of(2019, 1, 1))); + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.DATE, DateTimeConverter.convertToDate(LocalDate.of(2019, 1, 1))); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -105,13 +105,13 @@ public void dateColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATE, "2019-01-01T00:00:00Z"); + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.DATE, "2019-01-01T00:00:00Z"); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -142,13 +142,13 @@ public void timeColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIME, DateTimeConverter.convertToTime(LocalTime.of(20, 1, 0))); + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.TIME, DateTimeConverter.convertToTime(LocalTime.of(20, 1, 0))); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -162,13 +162,13 @@ public void timeColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIME, "1970-01-01T20:01:00Z"); + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.TIME, "1970-01-01T20:01:00Z"); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -198,14 +198,14 @@ public void dateTimeColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATETIME, + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.DATETIME, DateTimeConverter.convertToTimestamp(LocalDateTime.of(2019, 1, 20, 3, 0, 0))); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -219,13 +219,13 @@ public void dateTimeColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.DATETIME, "2019-01-20T03:00:00.000000Z"); + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.DATETIME, "2019-01-20T03:00:00.000000Z"); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -256,14 +256,14 @@ public void timestampColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIMESTAMP, + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.TIMESTAMP, DateTimeConverter.convertToTimestampWithTimezone(Instant.ofEpochSecond(1660298508L))); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -278,13 +278,13 @@ public void timestampColumnAsCursor() throws SQLException { try (final Connection connection = container.createConnection("")) { final PreparedStatement preparedStatement = connection.prepareStatement( "SELECT * from " + tableName + " WHERE " + cursorColumn + " > ?"); - sqlSourceOperations.setStatementField(preparedStatement, 1, MysqlType.TIMESTAMP, Instant.ofEpochSecond(1660298508L).toString()); + sqlSourceOperations.setCursorField(preparedStatement, 1, MysqlType.TIMESTAMP, Instant.ofEpochSecond(1660298508L).toString()); try (final ResultSet resultSet = preparedStatement.executeQuery()) { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.setJsonField(resultSet, i, jsonNode); + sqlSourceOperations.putJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 0976219c8697..5246d18a0648 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -91,17 +91,17 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { } // convert to java types that will convert into reasonable json. - setJsonField(queryContext, i, jsonNode); + putJsonField(queryContext, i, jsonNode); } return jsonNode; } @Override - public void setStatementField(final PreparedStatement preparedStatement, - final int parameterIndex, - final PostgresType cursorFieldType, - final String value) + public void setCursorField(final PreparedStatement preparedStatement, + final int parameterIndex, + final PostgresType cursorFieldType, + final String value) throws SQLException { switch (cursorFieldType) { @@ -174,7 +174,8 @@ protected void setDate(final PreparedStatement preparedStatement, final int para } @Override - public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + // This reads the actual value (in read). + public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); final String columnName = metadata.getColumnName(colIndex); final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase(); @@ -399,7 +400,7 @@ protected void putTimestamp(final ObjectNode node, final String columnName, fina } @Override - public PostgresType getFieldType(final JsonNode field) { + public PostgresType getDatabaseFieldType(final JsonNode field) { try { final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase(); // Postgres boolean is mapped to JDBCType.BIT, but should be BOOLEAN diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java index 6fc97f7a4c13..edcebf253b24 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/AbstractDbSource.java @@ -535,12 +535,12 @@ private List> getTables(final Database database) throws Excepti } private Field toField(final CommonField field) { - if (getType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null + if (getAirbyteType(field.getType()) == JsonSchemaType.OBJECT && field.getProperties() != null && !field.getProperties().isEmpty()) { final var properties = field.getProperties().stream().map(this::toField).toList(); - return Field.of(field.getName(), getType(field.getType()), properties); + return Field.of(field.getName(), getAirbyteType(field.getType()), properties); } else { - return Field.of(field.getName(), getType(field.getType())); + return Field.of(field.getName(), getAirbyteType(field.getType())); } } @@ -604,12 +604,12 @@ protected abstract List> getCheckOperations throws Exception; /** - * Map source types and Airbyte types + * Map source types to Airbyte types * * @param columnType source data type * @return airbyte data type */ - protected abstract JsonSchemaType getType(DataType columnType); + protected abstract JsonSchemaType getAirbyteType(DataType columnType); /** * Get list of system namespaces(schemas) in order to exclude them from the discover result list. diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java index a050d44c59c5..91cf59d492aa 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java @@ -42,7 +42,7 @@ protected void putDouble(final ObjectNode node, final String columnName, final R } @Override - public JDBCType getFieldType(final JsonNode field) { + public JDBCType getDatabaseFieldType(final JsonNode field) { try { final String typeName = field.get(INTERNAL_COLUMN_TYPE_NAME).asText().toLowerCase(); return "TIMESTAMPLTZ".equalsIgnoreCase(typeName) @@ -94,7 +94,7 @@ public JsonSchemaType getJsonType(final JDBCType jdbcType) { } @Override - public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final String columnName = resultSet.getMetaData().getColumnName(colIndex); final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase(); @@ -103,7 +103,7 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob putTimestampWithTimezone(json, columnName, resultSet, colIndex); return; } - super.setJsonField(resultSet, colIndex, json); + super.putJsonField(resultSet, colIndex, json); } @Override diff --git a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java index 915b071a07e0..05c36d6d11b2 100644 --- a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java +++ b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java @@ -33,7 +33,7 @@ public class TiDBSourceOperations extends AbstractJdbcCompatibleSourceOperations YEAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT); @Override - public void setJsonField(ResultSet resultSet, int colIndex, ObjectNode json) throws SQLException { + public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final ResultSetMetaData metaData = (ResultSetMetaData) resultSet.getMetaData(); final Field field = metaData.getFields()[colIndex - 1]; final String columnName = field.getName(); @@ -92,7 +92,7 @@ public void setJsonField(ResultSet resultSet, int colIndex, ObjectNode json) thr } @Override - public void setStatementField(PreparedStatement preparedStatement, int parameterIndex, MysqlType cursorFieldType, String value) + public void setCursorField(final PreparedStatement preparedStatement, final int parameterIndex, final MysqlType cursorFieldType, final String value) throws SQLException { switch (cursorFieldType) { case BIT -> setBit(preparedStatement, parameterIndex, value); @@ -114,7 +114,7 @@ public void setStatementField(PreparedStatement preparedStatement, int parameter } @Override - public MysqlType getFieldType(JsonNode field) { + public MysqlType getDatabaseFieldType(final JsonNode field) { try { final MysqlType literalType = MysqlType.getByName(field.get(INTERNAL_COLUMN_TYPE_NAME).asText()); final int columnSize = field.get(INTERNAL_COLUMN_SIZE).asInt(); @@ -144,12 +144,12 @@ public MysqlType getFieldType(JsonNode field) { } @Override - public boolean isCursorType(MysqlType type) { + public boolean isCursorType(final MysqlType type) { return ALLOWED_CURSOR_TYPES.contains(type); } @Override - public JsonSchemaType getJsonType(MysqlType mysqlType) { + public JsonSchemaType getJsonType(final MysqlType mysqlType) { return switch (mysqlType) { case // TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java index 01658d114856..9286cbfbecca 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaReferenceTypes.java @@ -32,7 +32,7 @@ public class JsonSchemaReferenceTypes { * This is primarily useful for migrating from protocol v0 to v1. It provides a mapping from the old * style {airbyte_type: foo} to new style {$ref: WellKnownTypes#/definitions/Foo}. */ - public static final Map AIRBYTE_TYPE_TO_REFERENCE_TYPE = ImmutableMap.of( + public static final Map LEGACY_AIRBYTE_PROPERY_TO_REFERENCE = ImmutableMap.of( "timestamp_with_timezone", TIMESTAMP_WITH_TIMEZONE_REFERENCE, "timestamp_without_timezone", TIMESTAMP_WITHOUT_TIMEZONE_REFERENCE, "time_with_timezone", TIME_WITH_TIMEZONE_REFERENCE, diff --git a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java index 9bbdf41a1210..5e12ddb74b8d 100644 --- a/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java +++ b/airbyte-protocol/protocol-models/src/main/java/io/airbyte/protocol/models/JsonSchemaType.java @@ -11,6 +11,19 @@ import java.util.Map; import java.util.Objects; +/** + * Represents an Airbyte type. This corresponds to the data type that is present on the various + * AirbyteMessages (e.g. AirbyteRecordMessage, AirbyteCatalog). + * + * This type system is realized using JSON schemas. In order to work around some of the limitations + * of JSON schema, the newer version of the protocol defines new types in well_known_types.yaml. + * + * Note that the legacy version of the protocol relied on an airbyte_type property in the JSON + * schema. This is NOT to be confused with the overall concept of an Airbyte data types, which is + * essentially Airbyte's notion of what a record's data type is. + * + * TODO : Rename this file to AirbyteDataType. + */ public class JsonSchemaType { public static final String TYPE = "type"; @@ -26,7 +39,7 @@ public class JsonSchemaType { public static final String AIRYBTE_INT_TYPE = "integer"; public static final String CONTENT_ENCODING = "contentEncoding"; public static final String BASE_64 = "base64"; - public static final String AIRBYTE_TYPE = "airbyte_type"; + public static final String LEGACY_AIRBYTE_TYPE_PROPERTY = "airbyte_type"; public static final String ITEMS = "items"; public static final JsonSchemaType STRING_V1 = JsonSchemaType.builder(JsonSchemaPrimitive.STRING_V1).build(); @@ -43,7 +56,8 @@ public class JsonSchemaType { public static final JsonSchemaType STRING = JsonSchemaType.builder(JsonSchemaPrimitive.STRING).build(); public static final JsonSchemaType NUMBER = JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER).build(); - public static final JsonSchemaType INTEGER = JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER).withAirbyteType(AIRYBTE_INT_TYPE).build(); + public static final JsonSchemaType INTEGER = + JsonSchemaType.builder(JsonSchemaPrimitive.NUMBER).withLegacyAirbyteTypeProperty(AIRYBTE_INT_TYPE).build(); public static final JsonSchemaType BOOLEAN = JsonSchemaType.builder(JsonSchemaPrimitive.BOOLEAN).build(); public static final JsonSchemaType OBJECT = JsonSchemaType.builder(JsonSchemaPrimitive.OBJECT).build(); public static final JsonSchemaType ARRAY = JsonSchemaType.builder(JsonSchemaPrimitive.ARRAY).build(); @@ -52,26 +66,26 @@ public class JsonSchemaType { public static final JsonSchemaType STRING_TIME_WITH_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) .withFormat(TIME) - .withAirbyteType(TIME_WITH_TIMEZONE).build(); + .withLegacyAirbyteTypeProperty(TIME_WITH_TIMEZONE).build(); public static final JsonSchemaType STRING_TIME_WITHOUT_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) .withFormat(TIME) - .withAirbyteType(TIME_WITHOUT_TIMEZONE).build(); + .withLegacyAirbyteTypeProperty(TIME_WITHOUT_TIMEZONE).build(); public static final JsonSchemaType STRING_TIMESTAMP_WITH_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) .withFormat(DATE_TIME) - .withAirbyteType(TIMESTAMP_WITH_TIMEZONE).build(); + .withLegacyAirbyteTypeProperty(TIMESTAMP_WITH_TIMEZONE).build(); public static final JsonSchemaType STRING_TIMESTAMP_WITHOUT_TIMEZONE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) .withFormat(DATE_TIME) - .withAirbyteType(TIMESTAMP_WITHOUT_TIMEZONE).build(); + .withLegacyAirbyteTypeProperty(TIMESTAMP_WITHOUT_TIMEZONE).build(); public static final JsonSchemaType STRING_DATE = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) .withFormat(DATE) .build(); public static final JsonSchemaType NUMBER_BIGINT = JsonSchemaType.builder(JsonSchemaPrimitive.STRING) - .withAirbyteType("big_integer") + .withLegacyAirbyteTypeProperty("big_integer") .build(); private final Map jsonSchemaTypeMap; @@ -111,8 +125,8 @@ public Builder withContentEncoding(final String value) { return this; } - public Builder withAirbyteType(final String value) { - typeMapBuilder.put(AIRBYTE_TYPE, value); + public Builder withLegacyAirbyteTypeProperty(final String value) { + typeMapBuilder.put(LEGACY_AIRBYTE_TYPE_PROPERTY, value); return this; } From b0a1aaef6205311936de5a47f14be68f6fff1d11 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Fri, 23 Dec 2022 16:08:22 -0800 Subject: [PATCH 2/4] More cleanup --- .../db/JdbcCompatibleSourceOperations.java | 22 ------------- .../java/io/airbyte/db/PostgresUtils.java | 6 ++-- .../java/io/airbyte/db/SourceOperations.java | 5 +-- .../airbyte/db/bigquery/BigQueryDatabase.java | 2 +- .../db/bigquery/BigQuerySourceOperations.java | 2 +- ...bstractJdbcCompatibleSourceOperations.java | 32 ------------------- .../airbyte/db/jdbc/JdbcSourceOperations.java | 2 +- .../io/airbyte/db/jdbc/TestJdbcUtils.java | 2 +- ...DenormalizedDestinationAcceptanceTest.java | 2 +- .../BigQueryDestinationAcceptanceTest.java | 2 +- .../source/bigquery/BigQuerySource.java | 10 +++--- .../Db2JdbcSourceAcceptanceTest.java | 11 ++++--- .../Db2SourceOperations.java | 10 +++--- .../Db2JdbcSourceAcceptanceTest.java | 11 ++++--- .../source/jdbc/AbstractJdbcSource.java | 21 ++++++------ .../jdbc/test/JdbcSourceAcceptanceTest.java | 29 +++++++++-------- .../source/mssql/MssqlCdcTargetPosition.java | 2 +- .../source/mssql/MssqlSource.java | 7 ++-- .../source/mysql/MySqlSourceOperations.java | 2 +- ...StrictEncryptJdbcSourceAcceptanceTest.java | 5 ++- .../OracleJdbcSourceAcceptanceTest.java | 5 +-- .../oracle/OracleSourceDatatypeTest.java | 5 ++- .../postgres/PostgresSourceOperations.java | 3 +- .../relationaldb/RelationalDbQueryUtils.java | 19 ++++++++++- .../SnowflakeSourceOperations.java | 2 +- .../source/tidb/TiDBSourceOperations.java | 2 +- 26 files changed, 95 insertions(+), 126 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java index 55adeae111e8..764f9b9598f1 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java @@ -6,11 +6,9 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; -import java.util.List; public interface JdbcCompatibleSourceOperations extends SourceOperations { @@ -36,26 +34,6 @@ void setCursorField(final PreparedStatement preparedStatement, */ SourceType getDatabaseFieldType(final JsonNode field); - /** - * @return the input identifiers with quotes and delimiters. - */ - String enquoteIdentifierList(final Connection connection, final List identifiers) throws SQLException; - - /** - * @return the input identifier with quotes. - */ - String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException; - - /** - * @return fully qualified table name with the schema (if a schema exists). - */ - String getFullyQualifiedTableName(final String schemaName, final String tableName); - - /** - * @return fully qualified table name with the schema (if a schema exists) in quotes. - */ - String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) throws SQLException; - /** * This method will verify that filed could be used as cursor for incremental sync * diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java index ba50b0bfc044..f19fd0f343fe 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/PostgresUtils.java @@ -51,7 +51,7 @@ public static Certificate getCertificate(final PostgreSQLContainer container) container.execInContainer("su", "-c", "echo \"hostssl all all 127.0.0.1/32 cert clientcert=verify-full\" >> /var/lib/postgresql/data/pg_hba.conf"); - var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim(); + final var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim(); container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out client.key"); container.execInContainer("su", "-c", "openssl req -new -sha256 -key client.key -out client.csr -subj \"/CN=postgres\""); @@ -65,8 +65,8 @@ public static Certificate getCertificate(final PostgreSQLContainer container) container.execInContainer("su", "-c", "psql -U test -c \"SELECT pg_reload_conf();\""); - var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim(); - var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim(); + final var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim(); + final var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim(); return new Certificate(caCert, clientCert, clientKey); } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java index eb8946abc433..0ce9b879bd00 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java @@ -12,8 +12,5 @@ public interface SourceOperations { JsonNode rowToJson(QueryResult queryResult) throws SQLException; - JsonSchemaType getJsonType(SourceType sourceType); - - // - // JsonSchemaType getJsonSchemaType(SourceType columnType); + JsonSchemaType getAirbyteType(SourceType sourceType); } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java index 87ae1b56e91c..61bda08dc653 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQueryDatabase.java @@ -79,7 +79,7 @@ public BigQueryDatabase(final String projectId, final String jsonCreds, final Bi } } - private String getUserAgentHeader(String connectorVersion) { + private String getUserAgentHeader(final String connectorVersion) { return String.format(AGENT_TEMPLATE, connectorVersion); } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java index fd1f36877e27..890f16caf59d 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/bigquery/BigQuerySourceOperations.java @@ -116,7 +116,7 @@ public Date getDateValue(final FieldValue fieldValue, final DateFormat dateForma } @Override - public JsonSchemaType getJsonType(final StandardSQLTypeName bigQueryType) { + public JsonSchemaType getAirbyteType(final StandardSQLTypeName bigQueryType) { return switch (bigQueryType) { case BOOL -> JsonSchemaType.BOOLEAN; case INT64 -> JsonSchemaType.INTEGER; diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index 55378f929fb4..c85ece59785c 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -15,7 +15,6 @@ import io.airbyte.db.DataTypeUtils; import io.airbyte.db.JdbcCompatibleSourceOperations; import java.math.BigDecimal; -import java.sql.Connection; import java.sql.Date; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -28,8 +27,6 @@ import java.time.OffsetTime; import java.time.chrono.IsoEra; import java.util.Collections; -import java.util.List; -import java.util.StringJoiner; import javax.xml.bind.DatatypeConverter; /** @@ -229,35 +226,6 @@ protected void setBinary(final PreparedStatement preparedStatement, final int pa preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseHexBinary(value)); } - @Override - public String enquoteIdentifierList(final Connection connection, final List identifiers) throws SQLException { - final StringJoiner joiner = new StringJoiner(","); - for (final String col : identifiers) { - final String s = enquoteIdentifier(connection, col); - joiner.add(s); - } - return joiner.toString(); - } - - @Override - public String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException { - final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString(); - - return identifierQuoteString + identifier + identifierQuoteString; - } - - @Override - public String getFullyQualifiedTableName(final String schemaName, final String tableName) { - return JdbcUtils.getFullyQualifiedTableName(schemaName, tableName); - } - - @Override - public String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) - throws SQLException { - final String quotedTableName = enquoteIdentifier(connection, tableName); - return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName; - } - protected ObjectType getObject(final ResultSet resultSet, final int index, final Class clazz) throws SQLException { return resultSet.getObject(index, clazz); } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java index 32472b2a452a..68d3667fbbdb 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java @@ -109,7 +109,7 @@ public boolean isCursorType(final JDBCType type) { } @Override - public JsonSchemaType getJsonType(final JDBCType jdbcType) { + public JsonSchemaType getAirbyteType(final JDBCType jdbcType) { return switch (jdbcType) { case BIT, BOOLEAN -> JsonSchemaType.BOOLEAN; case TINYINT, SMALLINT -> JsonSchemaType.INTEGER; diff --git a/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java b/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java index 97476dbf3cd5..1ed5c1a4623b 100644 --- a/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java +++ b/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java @@ -332,7 +332,7 @@ private static void assertExpectedOutputTypes(final Connection connection) throw final int columnCount = resultSet.getMetaData().getColumnCount(); final Map actual = new HashMap<>(columnCount); for (int i = 1; i <= columnCount; i++) { - actual.put(resultSet.getMetaData().getColumnName(i), sourceOperations.getJsonType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i)))); + actual.put(resultSet.getMetaData().getColumnName(i), sourceOperations.getAirbyteType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i)))); } final Map expected = ImmutableMap.builder() diff --git a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java index 2efc146da794..bd35a6577418 100644 --- a/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery-denormalized/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDenormalizedDestinationAcceptanceTest.java @@ -174,7 +174,7 @@ private List retrieveRecordsFromTable(final String tableName, final St final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); final FieldList fields = queryResults.getSchema().getFields(); - BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); + final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); return Streams.stream(queryResults.iterateAll()) .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); diff --git a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java index 89960f67e499..dc5b7466a872 100644 --- a/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-bigquery/src/test-integration/java/io/airbyte/integrations/destination/bigquery/BigQueryDestinationAcceptanceTest.java @@ -179,7 +179,7 @@ private List retrieveRecordsFromTable(final String tableName, final St final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults(); final FieldList fields = queryResults.getSchema().getFields(); - BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); + final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations(); return Streams.stream(queryResults.iterateAll()) .map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList()); diff --git a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java index da4d9b0cbfab..bae1c07ff076 100644 --- a/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java +++ b/airbyte-integrations/connectors/source-bigquery/src/main/java/io/airbyte/integrations/source/bigquery/BigQuerySource.java @@ -5,7 +5,7 @@ package io.airbyte.integrations.source.bigquery; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList; -import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable; import com.fasterxml.jackson.databind.JsonNode; @@ -81,7 +81,7 @@ public List> getCheckOperations(fin checkList.add(database -> { if (isDatasetConfigured(database)) { database.query(String.format("select 1 from %s where 1=0", - getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString()))); + getFullyQualifiedTableNameWithQuoting(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString()))); LOGGER.info("The source passed the Dataset query test!"); } else { LOGGER.info("The Dataset query test is skipped due to not configured datasetId!"); @@ -93,7 +93,7 @@ public List> getCheckOperations(fin @Override protected JsonSchemaType getAirbyteType(final StandardSQLTypeName columnType) { - return sourceOperations.getJsonType(columnType); + return sourceOperations.getAirbyteType(columnType); } @Override @@ -146,7 +146,7 @@ public AutoCloseableIterator queryTableIncremental(final BigQueryDatab final StandardSQLTypeName cursorFieldType) { return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?", RelationalDbQueryUtils.enquoteIdentifierList(columnNames, getQuoteString()), - getFullTableName(schemaName, tableName, getQuoteString()), + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()), cursorInfo.getCursorField()), sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor())); } @@ -159,7 +159,7 @@ protected AutoCloseableIterator queryTableFullRefresh(final BigQueryDa LOGGER.info("Queueing query for table: {}", tableName); return queryTable(database, String.format("SELECT %s FROM %s", enquoteIdentifierList(columnNames, getQuoteString()), - getFullTableName(schemaName, tableName, getQuoteString()))); + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()))); } @Override diff --git a/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java index ebe3a7948d86..2a023f6f53cc 100644 --- a/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2-strict-encrypt/src/test/java/io/airbyte/integrations/source/db2_strict_encrypt/Db2JdbcSourceAcceptanceTest.java @@ -15,6 +15,7 @@ import io.airbyte.integrations.source.db2.Db2Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; import io.airbyte.protocol.models.v0.ConnectorSpecification; import java.io.File; import java.io.IOException; @@ -118,19 +119,19 @@ public void clean() throws Exception { } super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES + 2)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES + 2, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME2, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITHOUT_CURSOR_TYPE)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITHOUT_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); super.tearDown(); } diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java index cc5f283f5f7a..c3cb356e27bf 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java @@ -34,13 +34,13 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { /* Helpers */ - private void setFields(ResultSet queryContext, int index, ObjectNode jsonNode) throws SQLException { + private void setFields(final ResultSet queryContext, final int index, final ObjectNode jsonNode) throws SQLException { try { queryContext.getObject(index); if (!queryContext.wasNull()) { putJsonField(queryContext, index, jsonNode); } - } catch (SQLException e) { + } catch (final SQLException e) { if (DB2_UNIQUE_NUMBER_TYPES.contains(queryContext.getMetaData().getColumnTypeName(index))) { db2UniqueTypes(queryContext, index, jsonNode); } else { @@ -49,9 +49,9 @@ private void setFields(ResultSet queryContext, int index, ObjectNode jsonNode) t } } - private void db2UniqueTypes(ResultSet resultSet, int index, ObjectNode jsonNode) throws SQLException { - String columnType = resultSet.getMetaData().getColumnTypeName(index); - String columnName = resultSet.getMetaData().getColumnName(index); + private void db2UniqueTypes(final ResultSet resultSet, final int index, final ObjectNode jsonNode) throws SQLException { + final String columnType = resultSet.getMetaData().getColumnTypeName(index); + final String columnName = resultSet.getMetaData().getColumnName(index); if (DB2_UNIQUE_NUMBER_TYPES.contains(columnType)) { putDecfloat(jsonNode, columnName, resultSet, index); } diff --git a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java index cd99b0ab5da6..5dadcd5e111d 100644 --- a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java @@ -11,6 +11,7 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; import java.sql.JDBCType; import java.util.Collections; import java.util.Set; @@ -94,19 +95,19 @@ public void clean() throws Exception { } super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES + 2)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES + 2, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME2, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITHOUT_CURSOR_TYPE)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITHOUT_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE)))); + RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); super.tearDown(); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java index a1274a395a87..b5a4a8c249ec 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java +++ b/airbyte-integrations/connectors/source-jdbc/src/main/java/io/airbyte/integrations/source/jdbc/AbstractJdbcSource.java @@ -20,8 +20,9 @@ import static io.airbyte.db.jdbc.JdbcConstants.JDBC_COLUMN_TYPE_NAME; import static io.airbyte.db.jdbc.JdbcConstants.JDBC_IS_NULLABLE; import static io.airbyte.db.jdbc.JdbcUtils.EQUALS; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList; -import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable; import com.fasterxml.jackson.databind.JsonNode; @@ -147,7 +148,7 @@ protected AutoCloseableIterator queryTableFullRefresh(final JdbcDataba LOGGER.info("Queueing query for table: {}", tableName); return queryTable(database, String.format("SELECT %s FROM %s", enquoteIdentifierList(columnNames, getQuoteString()), - getFullTableName(schemaName, tableName, getQuoteString()))); + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()))); } /** @@ -276,7 +277,7 @@ public List>> discoverInternal(final JdbcDatabas @Override public JsonSchemaType getAirbyteType(final Datatype columnType) { - return sourceOperations.getJsonType(columnType); + return sourceOperations.getAirbyteType(columnType); } @Override @@ -291,7 +292,7 @@ protected Map> discoverPrimaryKeys(final JdbcDatabase datab r -> { final String schemaName = r.getObject(JDBC_COLUMN_SCHEMA_NAME) != null ? r.getString(JDBC_COLUMN_SCHEMA_NAME) : r.getString(JDBC_COLUMN_DATABASE_NAME); - final String streamName = sourceOperations.getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME)); + final String streamName = JdbcUtils.getFullyQualifiedTableName(schemaName, r.getString(JDBC_COLUMN_TABLE_NAME)); final String primaryKey = r.getString(JDBC_COLUMN_COLUMN_NAME); return new SimpleImmutableEntry<>(streamName, primaryKey); })); @@ -304,11 +305,9 @@ protected Map> discoverPrimaryKeys(final JdbcDatabase datab // Get primary keys one table at a time return tableInfos.stream() .collect(Collectors.toMap( - tableInfo -> sourceOperations - .getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()), + tableInfo -> JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()), tableInfo -> { - final String streamName = sourceOperations - .getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()); + final String streamName = JdbcUtils.getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()); try { final Map> primaryKeys = aggregatePrimateKeys(database.bufferedResultSetQuery( connection -> connection.getMetaData().getPrimaryKeys(getCatalog(database), tableInfo.getNameSpace(), tableInfo.getName()), @@ -344,8 +343,8 @@ public AutoCloseableIterator queryTableIncremental(final JdbcDatabase final Stream stream = database.unsafeQuery( connection -> { LOGGER.info("Preparing query for table: {}", tableName); - final String fullTableName = sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName); - final String quotedCursorField = sourceOperations.enquoteIdentifier(connection, cursorInfo.getCursorField()); + final String fullTableName = getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()); + final String quotedCursorField = enquoteIdentifier(cursorInfo.getCursorField(), getQuoteString()); final String operator; if (cursorInfo.getCursorRecordCount() <= 0L) { @@ -395,7 +394,7 @@ protected String getWrappedColumnNames(final JdbcDatabase database, final String schemaName, final String tableName) throws SQLException { - return sourceOperations.enquoteIdentifierList(connection, columnNames); + return enquoteIdentifierList(columnNames, getQuoteString()); } protected String getCountColumnName() { diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index 36b0b7cdccc2..1941ccb5fb7d 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -28,6 +28,7 @@ import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig; import io.airbyte.integrations.base.Source; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; +import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.Field; @@ -391,16 +392,16 @@ void testDiscoverWithMultipleSchemas() throws Exception { database.execute(connection -> { connection.createStatement().execute( String.format("CREATE TABLE %s(id VARCHAR(200) NOT NULL, name VARCHAR(200) NOT NULL)", - sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); connection.createStatement() .execute(String.format("INSERT INTO %s(id, name) VALUES ('1','picard')", - sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); connection.createStatement() .execute(String.format("INSERT INTO %s(id, name) VALUES ('2', 'crusher')", - sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); connection.createStatement() .execute(String.format("INSERT INTO %s(id, name) VALUES ('3', 'vash')", - sourceOperations.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); + RelationalDbQueryUtils.getFullyQualifiedTableName(SCHEMA_NAME2, TABLE_NAME))); }); final AirbyteCatalog actual = source.discover(config); @@ -1082,29 +1083,29 @@ protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException { final String streamName2 = tableNameWithSpaces; database.execute(connection -> { + final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString(); connection.createStatement() .execute( createTableQuery(getFullyQualifiedTableName( - sourceOperations.enquoteIdentifier(connection, tableNameWithSpaces)), - "id INTEGER, " + sourceOperations - .enquoteIdentifier(connection, COL_LAST_NAME_WITH_SPACE) + RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + "id INTEGER, " + RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString) + " VARCHAR(200)", "")); connection.createStatement() .execute(String.format("INSERT INTO %s(id, %s) VALUES (1,'picard')", getFullyQualifiedTableName( - sourceOperations.enquoteIdentifier(connection, tableNameWithSpaces)), - sourceOperations.enquoteIdentifier(connection, COL_LAST_NAME_WITH_SPACE))); + RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); connection.createStatement() .execute(String.format("INSERT INTO %s(id, %s) VALUES (2, 'crusher')", getFullyQualifiedTableName( - sourceOperations.enquoteIdentifier(connection, tableNameWithSpaces)), - sourceOperations.enquoteIdentifier(connection, COL_LAST_NAME_WITH_SPACE))); + RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); connection.createStatement() .execute(String.format("INSERT INTO %s(id, %s) VALUES (3, 'vash')", getFullyQualifiedTableName( - sourceOperations.enquoteIdentifier(connection, tableNameWithSpaces)), - sourceOperations.enquoteIdentifier(connection, COL_LAST_NAME_WITH_SPACE))); + RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); }); return CatalogHelpers.createConfiguredAirbyteStream( @@ -1115,7 +1116,7 @@ protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException { } public String getFullyQualifiedTableName(final String tableName) { - return sourceOperations.getFullyQualifiedTableName(getDefaultSchemaName(), tableName); + return RelationalDbQueryUtils.getFullyQualifiedTableName(getDefaultSchemaName(), tableName); } public void createSchemas() throws SQLException { diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java index 514f878e1c3f..26a74f6a1103 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlCdcTargetPosition.java @@ -38,7 +38,7 @@ public boolean reachedTargetPosition(final JsonNode valueAsJson) { return true; } else { final Lsn recordLsn = extractLsn(valueAsJson); - boolean isEventLSNAfter = targetLsn.compareTo(recordLsn) <= 0; + final boolean isEventLSNAfter = targetLsn.compareTo(recordLsn) <= 0; if (isEventLSNAfter) { LOGGER.info("Signalling close because record's LSN : " + recordLsn + " is after target LSN : " + targetLsn); } diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 5276609b904f..377c1025b99d 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -8,7 +8,7 @@ import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_DELETED_AT; import static io.airbyte.integrations.debezium.internals.DebeziumEventUtils.CDC_UPDATED_AT; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList; -import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getIdentifierWithQuoting; import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable; import static java.util.stream.Collectors.toList; @@ -87,7 +87,8 @@ public AutoCloseableIterator queryTableFullRefresh(final JdbcDatabase LOGGER.info("Queueing query for table: {}", tableName); final String newIdentifiers = getWrappedColumnNames(database, null, columnNames, schemaName, tableName); - final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName, getQuoteString())); + final String preparedSqlQuery = + String.format("SELECT %s FROM %s", newIdentifiers, getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString())); LOGGER.info("Prepared SQL query for TableFullRefresh is: " + preparedSqlQuery); return queryTable(database, preparedSqlQuery); @@ -115,7 +116,7 @@ protected String getWrappedColumnNames(final JdbcDatabase database, .queryMetadata(String .format("SELECT TOP 1 %s FROM %s", // only first row is enough to get field's type enquoteIdentifierList(columnNames, getQuoteString()), - getFullTableName(schemaName, tableName, getQuoteString()))); + getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()))); // metadata will be null if table doesn't contain records if (sqlServerResultSetMetaData != null) { diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index 46908ef46d51..dca60b973d8f 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -228,7 +228,7 @@ protected void putTimestamp(final ObjectNode node, final String columnName, fina } @Override - public JsonSchemaType getJsonType(final MysqlType mysqlType) { + public JsonSchemaType getAirbyteType(final MysqlType mysqlType) { return switch (mysqlType) { case // TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java index afdcdfc3dd00..d4868bb2d6fc 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java @@ -21,6 +21,7 @@ import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.oracle.OracleSource; +import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.Field; @@ -152,7 +153,9 @@ void cleanUpTables() throws SQLException { connection.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); while (resultSet.next()) { final String tableName = resultSet.getString("TABLE_NAME"); - final String tableNameProcessed = tableName.contains(" ") ? sourceOperations.enquoteIdentifier(connection, tableName) : tableName; + final String tableNameProcessed = + tableName.contains(" ") ? RelationalDbQueryUtils.enquoteIdentifier(tableName, connection.getMetaData().getIdentifierQuoteString()) + : tableName; connection.createStatement().executeQuery("DROP TABLE " + schemaName + "." + tableNameProcessed); } } diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java index 70c6f0d407c7..d4fd4b4be70a 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleJdbcSourceAcceptanceTest.java @@ -20,6 +20,7 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; +import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.Field; @@ -183,8 +184,8 @@ void cleanUpTables() throws SQLException { connection.createStatement().executeQuery(String.format("SELECT TABLE_NAME FROM ALL_TABLES WHERE OWNER = '%s'", schemaName)); while (resultSet.next()) { final String tableName = resultSet.getString("TABLE_NAME"); - final String tableNameProcessed = tableName.contains(" ") ? sourceOperations - .enquoteIdentifier(connection, tableName) : tableName; + final String tableNameProcessed = tableName.contains(" ") ? RelationalDbQueryUtils + .enquoteIdentifier(tableName, connection.getMetaData().getIdentifierQuoteString()) : tableName; connection.createStatement().executeQuery("DROP TABLE " + schemaName + "." + tableNameProcessed); } } diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java index c80b0998f91d..4364c4aeb0cb 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceDatatypeTest.java @@ -147,7 +147,10 @@ protected void initTests() { .airbyteType(JsonSchemaType.NUMBER) .addInsertValues("null", "1", "123.45", "power(10, -130)", "9.99999999999999999999 * power(10, 125)") /* The 999990000… below is the plain string representation of 9.999 * power(10, 125) */ - /* because normalization expects a plain integer strings whereas `Math.pow(10, 125)` returns a scientific notation */ + /* + * because normalization expects a plain integer strings whereas `Math.pow(10, 125)` returns a + * scientific notation + */ .addExpectedValues(null, "1", "123.45", String.valueOf(Math.pow(10, -130)), "999999999999999999999000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000") .build()); diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index 5246d18a0648..acf5e0f22251 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -174,7 +174,6 @@ protected void setDate(final PreparedStatement preparedStatement, final int para } @Override - // This reads the actual value (in read). public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); final String columnName = metadata.getColumnName(colIndex); @@ -447,7 +446,7 @@ public PostgresType getDatabaseFieldType(final JsonNode field) { } @Override - public JsonSchemaType getJsonType(final PostgresType jdbcType) { + public JsonSchemaType getAirbyteType(final PostgresType jdbcType) { return switch (jdbcType) { case BOOLEAN -> JsonSchemaType.BOOLEAN; case TINYINT, SMALLINT, INTEGER, BIGINT -> JsonSchemaType.INTEGER; diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java index 022279686090..3ca767961f3a 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java @@ -29,11 +29,28 @@ public static String enquoteIdentifierList(final List identifiers, final return joiner.toString(); } - public static String getFullTableName(final String nameSpace, final String tableName, final String quoteString) { + /** + * @return fully qualified table name with the schema (if a schema exists) in quotes. + */ + public static String getFullyQualifiedTableNameWithQuoting(final String nameSpace, final String tableName, final String quoteString) { return (nameSpace == null || nameSpace.isEmpty() ? getIdentifierWithQuoting(tableName, quoteString) : getIdentifierWithQuoting(nameSpace, quoteString) + "." + getIdentifierWithQuoting(tableName, quoteString)); } + /** + * @return the input identifier with quotes. + */ + public static String enquoteIdentifier(final String identifier, final String quoteString) { + return quoteString + identifier + quoteString; + } + + /** + * @return fully qualified table name with the schema (if a schema exists) without quotes. + */ + public static String getFullyQualifiedTableName(final String schemaName, final String tableName) { + return schemaName != null ? schemaName + "." + tableName : tableName; + } + public static AutoCloseableIterator queryTable(final Database database, final String sqlQuery) { return AutoCloseableIterators.lazyIterator(() -> { try { diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java index 91cf59d492aa..39375d744174 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java @@ -74,7 +74,7 @@ protected void setTimestamp(final PreparedStatement preparedStatement, final int } @Override - public JsonSchemaType getJsonType(final JDBCType jdbcType) { + public JsonSchemaType getAirbyteType(final JDBCType jdbcType) { return switch (jdbcType) { case BIT, BOOLEAN -> JsonSchemaType.BOOLEAN; case REAL, FLOAT, DOUBLE, NUMERIC, DECIMAL -> JsonSchemaType.NUMBER; diff --git a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java index 05c36d6d11b2..9c684d1caf89 100644 --- a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java +++ b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java @@ -149,7 +149,7 @@ public boolean isCursorType(final MysqlType type) { } @Override - public JsonSchemaType getJsonType(final MysqlType mysqlType) { + public JsonSchemaType getAirbyteType(final MysqlType mysqlType) { return switch (mysqlType) { case // TINYINT(1) is boolean, but it should have been converted to MysqlType.BOOLEAN in {@link From 0401f43208e09ba5742e01fedc177455f80432e2 Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 27 Dec 2022 10:51:55 -0800 Subject: [PATCH 3/4] Addressing comments --- .../db/JdbcCompatibleSourceOperations.java | 2 +- .../java/io/airbyte/db/SourceOperations.java | 7 +++++++ .../AbstractJdbcCompatibleSourceOperations.java | 2 +- .../airbyte/db/jdbc/JdbcSourceOperations.java | 2 +- .../CockroachJdbcSourceOperations.java | 2 +- .../Db2SourceOperations.java | 2 +- .../Db2JdbcSourceAcceptanceTest.java | 13 +++++++------ .../jdbc/test/JdbcSourceAcceptanceTest.java | 17 +++++++++-------- .../source/mssql/MssqlSourceOperations.java | 2 +- .../source/mysql/MySqlSourceOperations.java | 2 +- .../source/mysql/MySqlSourceOperationsTest.java | 16 ++++++++-------- ...leStrictEncryptJdbcSourceAcceptanceTest.java | 4 ++-- .../postgres/PostgresSourceOperations.java | 4 ++-- .../relationaldb/RelationalDbQueryUtils.java | 12 ++++++------ .../SnowflakeSourceOperations.java | 4 ++-- .../source/tidb/TiDBSourceOperations.java | 2 +- 16 files changed, 51 insertions(+), 42 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java index 764f9b9598f1..b6a5f9329452 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/JdbcCompatibleSourceOperations.java @@ -18,7 +18,7 @@ public interface JdbcCompatibleSourceOperations extends SourceOperat * * @param colIndex 1-based column index. */ - void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException; + void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException; /** * Set the cursor field in incremental table query. diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java index 0ce9b879bd00..5f028dc64a26 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java @@ -10,7 +10,14 @@ public interface SourceOperations { + /** + * Converts a database row into it's JSON representation. + * @throws SQLException + */ JsonNode rowToJson(QueryResult queryResult) throws SQLException; + /** + * Converts a database source type into an Airbyte type, which is currently represented by a {@link JsonSchemaType} + */ JsonSchemaType getAirbyteType(SourceType sourceType); } diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java index c85ece59785c..622ac41c7794 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/AbstractJdbcCompatibleSourceOperations.java @@ -55,7 +55,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { } // convert to java types that will convert into reasonable json. - putJsonField(queryContext, i, jsonNode); + copyToJsonField(queryContext, i, jsonNode); } return jsonNode; diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java index 68d3667fbbdb..5857e62b770d 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/jdbc/JdbcSourceOperations.java @@ -37,7 +37,7 @@ protected JDBCType safeGetJdbcType(final int columnTypeInt) { } @Override - public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final int columnTypeInt = resultSet.getMetaData().getColumnType(colIndex); final String columnName = resultSet.getMetaData().getColumnName(colIndex); final JDBCType columnType = safeGetJdbcType(columnTypeInt); diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java index 5c093d9b4850..fcff6cc366f6 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachJdbcSourceOperations.java @@ -45,7 +45,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { try { queryContext.getObject(i); if (!queryContext.wasNull()) { - putJsonField(queryContext, i, jsonNode); + copyToJsonField(queryContext, i, jsonNode); } } catch (final SQLException e) { putCockroachSpecialDataType(queryContext, i, jsonNode); diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java index c3cb356e27bf..54f56bb91412 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2SourceOperations.java @@ -38,7 +38,7 @@ private void setFields(final ResultSet queryContext, final int index, final Obje try { queryContext.getObject(index); if (!queryContext.wasNull()) { - putJsonField(queryContext, index, jsonNode); + copyToJsonField(queryContext, index, jsonNode); } } catch (final SQLException e) { if (DB2_UNIQUE_NUMBER_TYPES.contains(queryContext.getMetaData().getColumnTypeName(index))) { diff --git a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java index 5dadcd5e111d..12a8efffbb74 100644 --- a/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-db2/src/test/java/io.airbyte.integrations.source.db2/Db2JdbcSourceAcceptanceTest.java @@ -4,6 +4,8 @@ package io.airbyte.integrations.source.db2; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier; + import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -11,7 +13,6 @@ import io.airbyte.db.jdbc.JdbcUtils; import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; -import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; import java.sql.JDBCType; import java.util.Collections; import java.util.Set; @@ -95,19 +96,19 @@ public void clean() throws Exception { } super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES, connection.getMetaData().getIdentifierQuoteString())))); + enquoteIdentifier(TABLE_NAME_WITH_SPACES, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES + 2, connection.getMetaData().getIdentifierQuoteString())))); + enquoteIdentifier(TABLE_NAME_WITH_SPACES + 2, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME2, - RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME, connection.getMetaData().getIdentifierQuoteString())))); + enquoteIdentifier(TABLE_NAME, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITHOUT_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); + enquoteIdentifier(TABLE_NAME_WITHOUT_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); super.database.execute(connection -> connection.createStatement().execute(String .format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME, - RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); + enquoteIdentifier(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString())))); super.tearDown(); } diff --git a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java index 1941ccb5fb7d..fe803578aa2c 100644 --- a/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-jdbc/src/testFixtures/java/io/airbyte/integrations/source/jdbc/test/JdbcSourceAcceptanceTest.java @@ -5,6 +5,7 @@ package io.airbyte.integrations.source.jdbc.test; import static io.airbyte.db.jdbc.JdbcUtils.getDefaultSourceOperations; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; @@ -1087,25 +1088,25 @@ protected ConfiguredAirbyteStream createTableWithSpaces() throws SQLException { connection.createStatement() .execute( createTableQuery(getFullyQualifiedTableName( - RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), - "id INTEGER, " + RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString) + enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + "id INTEGER, " + enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString) + " VARCHAR(200)", "")); connection.createStatement() .execute(String.format("INSERT INTO %s(id, %s) VALUES (1,'picard')", getFullyQualifiedTableName( - RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), - RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); + enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); connection.createStatement() .execute(String.format("INSERT INTO %s(id, %s) VALUES (2, 'crusher')", getFullyQualifiedTableName( - RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), - RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); + enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); connection.createStatement() .execute(String.format("INSERT INTO %s(id, %s) VALUES (3, 'vash')", getFullyQualifiedTableName( - RelationalDbQueryUtils.enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), - RelationalDbQueryUtils.enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); + enquoteIdentifier(tableNameWithSpaces, identifierQuoteString)), + enquoteIdentifier(COL_LAST_NAME_WITH_SPACE, identifierQuoteString))); }); return CatalogHelpers.createConfiguredAirbyteStream( diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java index 28ddc78d9075..38cfae35ec91 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSourceOperations.java @@ -35,7 +35,7 @@ public class MssqlSourceOperations extends JdbcSourceOperations { * @throws SQLException */ @Override - public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final SQLServerResultSetMetaData metadata = (SQLServerResultSetMetaData) resultSet diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java index dca60b973d8f..e200f8e29de4 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlSourceOperations.java @@ -72,7 +72,7 @@ public class MySqlSourceOperations extends AbstractJdbcCompatibleSourceOperation * @param colIndex 1-based column index. */ @Override - public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final ResultSetMetaData metaData = (ResultSetMetaData) resultSet.getMetaData(); final Field field = metaData.getFields()[colIndex - 1]; final String columnName = field.getName(); diff --git a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java index c5c79a0df5b6..729dbc877243 100644 --- a/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java +++ b/airbyte-integrations/connectors/source-mysql/src/test/java/io/airbyte/integrations/source/mysql/MySqlSourceOperationsTest.java @@ -91,7 +91,7 @@ public void dateColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -111,7 +111,7 @@ public void dateColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -148,7 +148,7 @@ public void timeColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -168,7 +168,7 @@ public void timeColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -205,7 +205,7 @@ public void dateTimeColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -225,7 +225,7 @@ public void dateTimeColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -263,7 +263,7 @@ public void timestampColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } @@ -284,7 +284,7 @@ public void timestampColumnAsCursor() throws SQLException { while (resultSet.next()) { final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap()); for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) { - sqlSourceOperations.putJsonField(resultSet, i, jsonNode); + sqlSourceOperations.copyToJsonField(resultSet, i, jsonNode); } actualRecords.add(jsonNode); } diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java index d4868bb2d6fc..eaf13f11ab4d 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleStrictEncryptJdbcSourceAcceptanceTest.java @@ -4,6 +4,7 @@ package io.airbyte.integrations.source.oracle_strict_encrypt; +import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifier; import static org.junit.Assert.assertEquals; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -21,7 +22,6 @@ import io.airbyte.integrations.source.jdbc.AbstractJdbcSource; import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest; import io.airbyte.integrations.source.oracle.OracleSource; -import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils; import io.airbyte.integrations.source.relationaldb.models.DbState; import io.airbyte.integrations.source.relationaldb.models.DbStreamState; import io.airbyte.protocol.models.Field; @@ -154,7 +154,7 @@ void cleanUpTables() throws SQLException { while (resultSet.next()) { final String tableName = resultSet.getString("TABLE_NAME"); final String tableNameProcessed = - tableName.contains(" ") ? RelationalDbQueryUtils.enquoteIdentifier(tableName, connection.getMetaData().getIdentifierQuoteString()) + tableName.contains(" ") ? enquoteIdentifier(tableName, connection.getMetaData().getIdentifierQuoteString()) : tableName; connection.createStatement().executeQuery("DROP TABLE " + schemaName + "." + tableNameProcessed); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java index acf5e0f22251..44492b99b015 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSourceOperations.java @@ -91,7 +91,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException { } // convert to java types that will convert into reasonable json. - putJsonField(queryContext, i, jsonNode); + copyToJsonField(queryContext, i, jsonNode); } return jsonNode; @@ -174,7 +174,7 @@ protected void setDate(final PreparedStatement preparedStatement, final int para } @Override - public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final PgResultSetMetaData metadata = (PgResultSetMetaData) resultSet.getMetaData(); final String columnName = metadata.getColumnName(colIndex); final String columnTypeName = metadata.getColumnTypeName(colIndex).toLowerCase(); diff --git a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java index 3ca767961f3a..fdeee4b17f93 100644 --- a/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java +++ b/airbyte-integrations/connectors/source-relational-db/src/main/java/io/airbyte/integrations/source/relationaldb/RelationalDbQueryUtils.java @@ -38,17 +38,17 @@ public static String getFullyQualifiedTableNameWithQuoting(final String nameSpac } /** - * @return the input identifier with quotes. + * @return fully qualified table name with the schema (if a schema exists) without quotes. */ - public static String enquoteIdentifier(final String identifier, final String quoteString) { - return quoteString + identifier + quoteString; + public static String getFullyQualifiedTableName(final String schemaName, final String tableName) { + return schemaName != null ? schemaName + "." + tableName : tableName; } /** - * @return fully qualified table name with the schema (if a schema exists) without quotes. + * @return the input identifier with quotes. */ - public static String getFullyQualifiedTableName(final String schemaName, final String tableName) { - return schemaName != null ? schemaName + "." + tableName : tableName; + public static String enquoteIdentifier(final String identifier, final String quoteString) { + return quoteString + identifier + quoteString; } public static AutoCloseableIterator queryTable(final Database database, final String sqlQuery) { diff --git a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java index 39375d744174..9240c2c6c486 100644 --- a/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java +++ b/airbyte-integrations/connectors/source-snowflake/src/main/java/io.airbyte.integrations.source.snowflake/SnowflakeSourceOperations.java @@ -94,7 +94,7 @@ public JsonSchemaType getAirbyteType(final JDBCType jdbcType) { } @Override - public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final String columnName = resultSet.getMetaData().getColumnName(colIndex); final String columnTypeName = resultSet.getMetaData().getColumnTypeName(colIndex).toLowerCase(); @@ -103,7 +103,7 @@ public void putJsonField(final ResultSet resultSet, final int colIndex, final Ob putTimestampWithTimezone(json, columnName, resultSet, colIndex); return; } - super.putJsonField(resultSet, colIndex, json); + super.copyToJsonField(resultSet, colIndex, json); } @Override diff --git a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java index 9c684d1caf89..b30e7c12a897 100644 --- a/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java +++ b/airbyte-integrations/connectors/source-tidb/src/main/java/io/airbyte/integrations/source/tidb/TiDBSourceOperations.java @@ -33,7 +33,7 @@ public class TiDBSourceOperations extends AbstractJdbcCompatibleSourceOperations YEAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT); @Override - public void putJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { + public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException { final ResultSetMetaData metaData = (ResultSetMetaData) resultSet.getMetaData(); final Field field = metaData.getFields()[colIndex - 1]; final String columnName = field.getName(); From 60008b392a8c735b7d3f465fb6680d87571d9e8f Mon Sep 17 00:00:00 2001 From: Akash Kulkarni Date: Tue, 3 Jan 2023 13:53:23 +0530 Subject: [PATCH 4/4] Formatting --- .../db-lib/src/main/java/io/airbyte/db/SourceOperations.java | 5 ++++- .../src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java | 3 ++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java b/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java index 5f028dc64a26..312ab5d187db 100644 --- a/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java +++ b/airbyte-db/db-lib/src/main/java/io/airbyte/db/SourceOperations.java @@ -12,12 +12,15 @@ public interface SourceOperations { /** * Converts a database row into it's JSON representation. + * * @throws SQLException */ JsonNode rowToJson(QueryResult queryResult) throws SQLException; /** - * Converts a database source type into an Airbyte type, which is currently represented by a {@link JsonSchemaType} + * Converts a database source type into an Airbyte type, which is currently represented by a + * {@link JsonSchemaType} */ JsonSchemaType getAirbyteType(SourceType sourceType); + } diff --git a/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java b/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java index 1ed5c1a4623b..47fc5f7f83e2 100644 --- a/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java +++ b/airbyte-db/db-lib/src/test/java/io/airbyte/db/jdbc/TestJdbcUtils.java @@ -332,7 +332,8 @@ private static void assertExpectedOutputTypes(final Connection connection) throw final int columnCount = resultSet.getMetaData().getColumnCount(); final Map actual = new HashMap<>(columnCount); for (int i = 1; i <= columnCount; i++) { - actual.put(resultSet.getMetaData().getColumnName(i), sourceOperations.getAirbyteType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i)))); + actual.put(resultSet.getMetaData().getColumnName(i), + sourceOperations.getAirbyteType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i)))); } final Map expected = ImmutableMap.builder()