Skip to content

Commit

Permalink
Code cleanup in SourceOperations (#20874)
Browse files Browse the repository at this point in the history
* Refactor SourceOperations class

* More cleanup

* Addressing comments

* Formatting
  • Loading branch information
akashkulk authored Jan 3, 2023
1 parent 99905b2 commit bac789e
Show file tree
Hide file tree
Showing 34 changed files with 215 additions and 227 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface JdbcCompatibleSourceOperations<SourceType> extends SourceOperations<ResultSet, SourceType> {

Expand All @@ -20,41 +18,21 @@ public interface JdbcCompatibleSourceOperations<SourceType> extends SourceOperat
*
* @param colIndex 1-based column index.
*/
void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException;
void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException;

/**
* Set the cursor field in incremental table query.
*/
void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final SourceType cursorFieldType,
final String value)
void setCursorField(final PreparedStatement preparedStatement,
final int parameterIndex,
final SourceType cursorFieldType,
final String value)
throws SQLException;

/**
* Determine the database specific type of the input field based on its column metadata.
*/
SourceType getFieldType(final JsonNode field);

/**
* @return the input identifiers with quotes and delimiters.
*/
String enquoteIdentifierList(final Connection connection, final List<String> identifiers) throws SQLException;

/**
* @return the input identifier with quotes.
*/
String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException;

/**
* @return fully qualified table name with the schema (if a schema exists).
*/
String getFullyQualifiedTableName(final String schemaName, final String tableName);

/**
* @return fully qualified table name with the schema (if a schema exists) in quotes.
*/
String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) throws SQLException;
SourceType getDatabaseFieldType(final JsonNode field);

/**
* This method will verify that filed could be used as cursor for incremental sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static Certificate getCertificate(final PostgreSQLContainer<?> container)
container.execInContainer("su", "-c",
"echo \"hostssl all all 127.0.0.1/32 cert clientcert=verify-full\" >> /var/lib/postgresql/data/pg_hba.conf");

var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim();
final var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim();

container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out client.key");
container.execInContainer("su", "-c", "openssl req -new -sha256 -key client.key -out client.csr -subj \"/CN=postgres\"");
Expand All @@ -65,8 +65,8 @@ public static Certificate getCertificate(final PostgreSQLContainer<?> container)

container.execInContainer("su", "-c", "psql -U test -c \"SELECT pg_reload_conf();\"");

var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim();
var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim();
final var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim();
final var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim();
return new Certificate(caCert, clientCert, clientKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@

public interface SourceOperations<QueryResult, SourceType> {

/**
* Converts a database row into it's JSON representation.
*
* @throws SQLException
*/
JsonNode rowToJson(QueryResult queryResult) throws SQLException;

JsonSchemaType getJsonType(SourceType sourceType);
/**
* Converts a database source type into an Airbyte type, which is currently represented by a
* {@link JsonSchemaType}
*/
JsonSchemaType getAirbyteType(SourceType sourceType);

//
// JsonSchemaType getJsonSchemaType(SourceType columnType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public BigQueryDatabase(final String projectId, final String jsonCreds, final Bi
}
}

private String getUserAgentHeader(String connectorVersion) {
private String getUserAgentHeader(final String connectorVersion) {
return String.format(AGENT_TEMPLATE, connectorVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public Date getDateValue(final FieldValue fieldValue, final DateFormat dateForma
}

@Override
public JsonSchemaType getJsonType(final StandardSQLTypeName bigQueryType) {
public JsonSchemaType getAirbyteType(final StandardSQLTypeName bigQueryType) {
return switch (bigQueryType) {
case BOOL -> JsonSchemaType.BOOLEAN;
case INT64 -> JsonSchemaType.INTEGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -28,8 +27,6 @@
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import javax.xml.bind.DatatypeConverter;

/**
Expand Down Expand Up @@ -58,7 +55,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
}

// convert to java types that will convert into reasonable json.
setJsonField(queryContext, i, jsonNode);
copyToJsonField(queryContext, i, jsonNode);
}

return jsonNode;
Expand Down Expand Up @@ -229,35 +226,6 @@ protected void setBinary(final PreparedStatement preparedStatement, final int pa
preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseHexBinary(value));
}

@Override
public String enquoteIdentifierList(final Connection connection, final List<String> identifiers) throws SQLException {
final StringJoiner joiner = new StringJoiner(",");
for (final String col : identifiers) {
final String s = enquoteIdentifier(connection, col);
joiner.add(s);
}
return joiner.toString();
}

@Override
public String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException {
final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString();

return identifierQuoteString + identifier + identifierQuoteString;
}

@Override
public String getFullyQualifiedTableName(final String schemaName, final String tableName) {
return JdbcUtils.getFullyQualifiedTableName(schemaName, tableName);
}

@Override
public String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName)
throws SQLException {
final String quotedTableName = enquoteIdentifier(connection, tableName);
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <ObjectType> ObjectType getObject(final ResultSet resultSet, final int index, final Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected JDBCType safeGetJdbcType(final int columnTypeInt) {
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final int columnTypeInt = resultSet.getMetaData().getColumnType(colIndex);
final String columnName = resultSet.getMetaData().getColumnName(colIndex);
final JDBCType columnType = safeGetJdbcType(columnTypeInt);
Expand All @@ -63,10 +63,10 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
}

@Override
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final JDBCType cursorFieldType,
final String value)
public void setCursorField(final PreparedStatement preparedStatement,
final int parameterIndex,
final JDBCType cursorFieldType,
final String value)
throws SQLException {
switch (cursorFieldType) {

Expand All @@ -90,7 +90,7 @@ public void setStatementField(final PreparedStatement preparedStatement,
}

@Override
public JDBCType getFieldType(final JsonNode field) {
public JDBCType getDatabaseFieldType(final JsonNode field) {
try {
return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
Expand All @@ -109,7 +109,7 @@ public boolean isCursorType(final JDBCType type) {
}

@Override
public JsonSchemaType getJsonType(final JDBCType jdbcType) {
public JsonSchemaType getAirbyteType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BIT, BOOLEAN -> JsonSchemaType.BOOLEAN;
case TINYINT, SMALLINT -> JsonSchemaType.INTEGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,21 @@ void testSetStatementField() throws SQLException {

// insert the bit here to stay consistent even though setStatementField does not support it yet.
ps.setString(1, "1");
sourceOperations.setStatementField(ps, 2, JDBCType.BOOLEAN, "true");
sourceOperations.setStatementField(ps, 3, JDBCType.SMALLINT, "1");
sourceOperations.setStatementField(ps, 4, JDBCType.INTEGER, "1");
sourceOperations.setStatementField(ps, 5, JDBCType.BIGINT, "1");
sourceOperations.setStatementField(ps, 6, JDBCType.FLOAT, "1.0");
sourceOperations.setStatementField(ps, 7, JDBCType.DOUBLE, "1.0");
sourceOperations.setStatementField(ps, 8, JDBCType.REAL, "1.0");
sourceOperations.setStatementField(ps, 9, JDBCType.NUMERIC, "1");
sourceOperations.setStatementField(ps, 10, JDBCType.DECIMAL, "1");
sourceOperations.setStatementField(ps, 11, JDBCType.CHAR, "a");
sourceOperations.setStatementField(ps, 12, JDBCType.VARCHAR, "a");
sourceOperations.setStatementField(ps, 13, JDBCType.DATE, "2020-11-01T00:00:00Z");
sourceOperations.setStatementField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00.000Z");
sourceOperations.setStatementField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00.000Z");
sourceOperations.setStatementField(ps, 16, JDBCType.BINARY, "61616161");
sourceOperations.setCursorField(ps, 2, JDBCType.BOOLEAN, "true");
sourceOperations.setCursorField(ps, 3, JDBCType.SMALLINT, "1");
sourceOperations.setCursorField(ps, 4, JDBCType.INTEGER, "1");
sourceOperations.setCursorField(ps, 5, JDBCType.BIGINT, "1");
sourceOperations.setCursorField(ps, 6, JDBCType.FLOAT, "1.0");
sourceOperations.setCursorField(ps, 7, JDBCType.DOUBLE, "1.0");
sourceOperations.setCursorField(ps, 8, JDBCType.REAL, "1.0");
sourceOperations.setCursorField(ps, 9, JDBCType.NUMERIC, "1");
sourceOperations.setCursorField(ps, 10, JDBCType.DECIMAL, "1");
sourceOperations.setCursorField(ps, 11, JDBCType.CHAR, "a");
sourceOperations.setCursorField(ps, 12, JDBCType.VARCHAR, "a");
sourceOperations.setCursorField(ps, 13, JDBCType.DATE, "2020-11-01T00:00:00Z");
sourceOperations.setCursorField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00.000Z");
sourceOperations.setCursorField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00.000Z");
sourceOperations.setCursorField(ps, 16, JDBCType.BINARY, "61616161");

ps.execute();

Expand Down Expand Up @@ -332,7 +332,8 @@ private static void assertExpectedOutputTypes(final Connection connection) throw
final int columnCount = resultSet.getMetaData().getColumnCount();
final Map<String, JsonSchemaType> actual = new HashMap<>(columnCount);
for (int i = 1; i <= columnCount; i++) {
actual.put(resultSet.getMetaData().getColumnName(i), sourceOperations.getJsonType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i))));
actual.put(resultSet.getMetaData().getColumnName(i),
sourceOperations.getAirbyteType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i))));
}

final Map<String, JsonSchemaType> expected = ImmutableMap.<String, JsonSchemaType>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St

final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();
final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected List<JsonNode> retrieveRecordsFromTable(final String tableName, final

final TableResult queryResults = BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();
final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.integrations.source.bigquery;

import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -81,7 +81,7 @@ public List<CheckedConsumer<BigQueryDatabase, Exception>> getCheckOperations(fin
checkList.add(database -> {
if (isDatasetConfigured(database)) {
database.query(String.format("select 1 from %s where 1=0",
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString())));
getFullyQualifiedTableNameWithQuoting(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString())));
LOGGER.info("The source passed the Dataset query test!");
} else {
LOGGER.info("The Dataset query test is skipped due to not configured datasetId!");
Expand All @@ -92,8 +92,8 @@ public List<CheckedConsumer<BigQueryDatabase, Exception>> getCheckOperations(fin
}

@Override
protected JsonSchemaType getType(final StandardSQLTypeName columnType) {
return sourceOperations.getJsonType(columnType);
protected JsonSchemaType getAirbyteType(final StandardSQLTypeName columnType) {
return sourceOperations.getAirbyteType(columnType);
}

@Override
Expand Down Expand Up @@ -146,7 +146,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
final StandardSQLTypeName cursorFieldType) {
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString()),
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()),
cursorInfo.getCursorField()),
sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor()));
}
Expand All @@ -159,7 +159,7 @@ protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final BigQueryDa
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
try {
queryContext.getObject(i);
if (!queryContext.wasNull()) {
setJsonField(queryContext, i, jsonNode);
copyToJsonField(queryContext, i, jsonNode);
}
} catch (final SQLException e) {
putCockroachSpecialDataType(queryContext, i, jsonNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -118,19 +119,19 @@ public void clean() throws Exception {
}
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES + 2))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES + 2, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME2,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITHOUT_CURSOR_TYPE))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITHOUT_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString()))));
super.tearDown();
}

Expand Down
Loading

0 comments on commit bac789e

Please sign in to comment.