Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code cleanup in SourceOperations #20874

Merged
merged 7 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,9 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

public interface JdbcCompatibleSourceOperations<SourceType> extends SourceOperations<ResultSet, SourceType> {

Expand All @@ -20,41 +18,21 @@ public interface JdbcCompatibleSourceOperations<SourceType> extends SourceOperat
*
* @param colIndex 1-based column index.
*/
void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException;
void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException;

/**
* Set the cursor field in incremental table query.
*/
void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final SourceType cursorFieldType,
final String value)
void setCursorField(final PreparedStatement preparedStatement,
final int parameterIndex,
final SourceType cursorFieldType,
final String value)
throws SQLException;

/**
* Determine the database specific type of the input field based on its column metadata.
*/
SourceType getFieldType(final JsonNode field);

/**
* @return the input identifiers with quotes and delimiters.
*/
String enquoteIdentifierList(final Connection connection, final List<String> identifiers) throws SQLException;

/**
* @return the input identifier with quotes.
*/
String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException;

/**
* @return fully qualified table name with the schema (if a schema exists).
*/
String getFullyQualifiedTableName(final String schemaName, final String tableName);

/**
* @return fully qualified table name with the schema (if a schema exists) in quotes.
*/
String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName) throws SQLException;
SourceType getDatabaseFieldType(final JsonNode field);

/**
* This method will verify that filed could be used as cursor for incremental sync
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public static Certificate getCertificate(final PostgreSQLContainer<?> container)
container.execInContainer("su", "-c",
"echo \"hostssl all all 127.0.0.1/32 cert clientcert=verify-full\" >> /var/lib/postgresql/data/pg_hba.conf");

var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim();
final var caCert = container.execInContainer("su", "-c", "cat ca.crt").getStdout().trim();

container.execInContainer("su", "-c", "openssl ecparam -name prime256v1 -genkey -noout -out client.key");
container.execInContainer("su", "-c", "openssl req -new -sha256 -key client.key -out client.csr -subj \"/CN=postgres\"");
Expand All @@ -65,8 +65,8 @@ public static Certificate getCertificate(final PostgreSQLContainer<?> container)

container.execInContainer("su", "-c", "psql -U test -c \"SELECT pg_reload_conf();\"");

var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim();
var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim();
final var clientKey = container.execInContainer("su", "-c", "cat client.key").getStdout().trim();
final var clientCert = container.execInContainer("su", "-c", "cat client.crt").getStdout().trim();
return new Certificate(caCert, clientCert, clientKey);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@

public interface SourceOperations<QueryResult, SourceType> {

/**
* Converts a database row into it's JSON representation.
*
* @throws SQLException
*/
JsonNode rowToJson(QueryResult queryResult) throws SQLException;

JsonSchemaType getJsonType(SourceType sourceType);
/**
* Converts a database source type into an Airbyte type, which is currently represented by a
* {@link JsonSchemaType}
*/
JsonSchemaType getAirbyteType(SourceType sourceType);
ryankfu marked this conversation as resolved.
Show resolved Hide resolved

//
// JsonSchemaType getJsonSchemaType(SourceType columnType);
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public BigQueryDatabase(final String projectId, final String jsonCreds, final Bi
}
}

private String getUserAgentHeader(String connectorVersion) {
private String getUserAgentHeader(final String connectorVersion) {
return String.format(AGENT_TEMPLATE, connectorVersion);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public Date getDateValue(final FieldValue fieldValue, final DateFormat dateForma
}

@Override
public JsonSchemaType getJsonType(final StandardSQLTypeName bigQueryType) {
public JsonSchemaType getAirbyteType(final StandardSQLTypeName bigQueryType) {
return switch (bigQueryType) {
case BOOL -> JsonSchemaType.BOOLEAN;
case INT64 -> JsonSchemaType.INTEGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.JdbcCompatibleSourceOperations;
import java.math.BigDecimal;
import java.sql.Connection;
import java.sql.Date;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -28,8 +27,6 @@
import java.time.OffsetTime;
import java.time.chrono.IsoEra;
import java.util.Collections;
import java.util.List;
import java.util.StringJoiner;
import javax.xml.bind.DatatypeConverter;

/**
Expand Down Expand Up @@ -58,7 +55,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
}

// convert to java types that will convert into reasonable json.
setJsonField(queryContext, i, jsonNode);
copyToJsonField(queryContext, i, jsonNode);
}

return jsonNode;
Expand Down Expand Up @@ -229,35 +226,6 @@ protected void setBinary(final PreparedStatement preparedStatement, final int pa
preparedStatement.setBytes(parameterIndex, DatatypeConverter.parseHexBinary(value));
}

@Override
public String enquoteIdentifierList(final Connection connection, final List<String> identifiers) throws SQLException {
final StringJoiner joiner = new StringJoiner(",");
for (final String col : identifiers) {
final String s = enquoteIdentifier(connection, col);
joiner.add(s);
}
return joiner.toString();
}

@Override
public String enquoteIdentifier(final Connection connection, final String identifier) throws SQLException {
final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString();

return identifierQuoteString + identifier + identifierQuoteString;
}

@Override
public String getFullyQualifiedTableName(final String schemaName, final String tableName) {
return JdbcUtils.getFullyQualifiedTableName(schemaName, tableName);
}

@Override
public String getFullyQualifiedTableNameWithQuoting(final Connection connection, final String schemaName, final String tableName)
throws SQLException {
final String quotedTableName = enquoteIdentifier(connection, tableName);
return schemaName != null ? enquoteIdentifier(connection, schemaName) + "." + quotedTableName : quotedTableName;
}

protected <ObjectType> ObjectType getObject(final ResultSet resultSet, final int index, final Class<ObjectType> clazz) throws SQLException {
return resultSet.getObject(index, clazz);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ protected JDBCType safeGetJdbcType(final int columnTypeInt) {
}

@Override
public void setJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
public void copyToJsonField(final ResultSet resultSet, final int colIndex, final ObjectNode json) throws SQLException {
final int columnTypeInt = resultSet.getMetaData().getColumnType(colIndex);
final String columnName = resultSet.getMetaData().getColumnName(colIndex);
final JDBCType columnType = safeGetJdbcType(columnTypeInt);
Expand All @@ -63,10 +63,10 @@ public void setJsonField(final ResultSet resultSet, final int colIndex, final Ob
}

@Override
public void setStatementField(final PreparedStatement preparedStatement,
final int parameterIndex,
final JDBCType cursorFieldType,
final String value)
public void setCursorField(final PreparedStatement preparedStatement,
final int parameterIndex,
final JDBCType cursorFieldType,
final String value)
throws SQLException {
switch (cursorFieldType) {

Expand All @@ -90,7 +90,7 @@ public void setStatementField(final PreparedStatement preparedStatement,
}

@Override
public JDBCType getFieldType(final JsonNode field) {
public JDBCType getDatabaseFieldType(final JsonNode field) {
try {
return JDBCType.valueOf(field.get(INTERNAL_COLUMN_TYPE).asInt());
} catch (final IllegalArgumentException ex) {
Expand All @@ -109,7 +109,7 @@ public boolean isCursorType(final JDBCType type) {
}

@Override
public JsonSchemaType getJsonType(final JDBCType jdbcType) {
public JsonSchemaType getAirbyteType(final JDBCType jdbcType) {
return switch (jdbcType) {
case BIT, BOOLEAN -> JsonSchemaType.BOOLEAN;
case TINYINT, SMALLINT -> JsonSchemaType.INTEGER;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,21 +152,21 @@ void testSetStatementField() throws SQLException {

// insert the bit here to stay consistent even though setStatementField does not support it yet.
ps.setString(1, "1");
sourceOperations.setStatementField(ps, 2, JDBCType.BOOLEAN, "true");
sourceOperations.setStatementField(ps, 3, JDBCType.SMALLINT, "1");
sourceOperations.setStatementField(ps, 4, JDBCType.INTEGER, "1");
sourceOperations.setStatementField(ps, 5, JDBCType.BIGINT, "1");
sourceOperations.setStatementField(ps, 6, JDBCType.FLOAT, "1.0");
sourceOperations.setStatementField(ps, 7, JDBCType.DOUBLE, "1.0");
sourceOperations.setStatementField(ps, 8, JDBCType.REAL, "1.0");
sourceOperations.setStatementField(ps, 9, JDBCType.NUMERIC, "1");
sourceOperations.setStatementField(ps, 10, JDBCType.DECIMAL, "1");
sourceOperations.setStatementField(ps, 11, JDBCType.CHAR, "a");
sourceOperations.setStatementField(ps, 12, JDBCType.VARCHAR, "a");
sourceOperations.setStatementField(ps, 13, JDBCType.DATE, "2020-11-01T00:00:00Z");
sourceOperations.setStatementField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00.000Z");
sourceOperations.setStatementField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00.000Z");
sourceOperations.setStatementField(ps, 16, JDBCType.BINARY, "61616161");
sourceOperations.setCursorField(ps, 2, JDBCType.BOOLEAN, "true");
sourceOperations.setCursorField(ps, 3, JDBCType.SMALLINT, "1");
sourceOperations.setCursorField(ps, 4, JDBCType.INTEGER, "1");
sourceOperations.setCursorField(ps, 5, JDBCType.BIGINT, "1");
sourceOperations.setCursorField(ps, 6, JDBCType.FLOAT, "1.0");
sourceOperations.setCursorField(ps, 7, JDBCType.DOUBLE, "1.0");
sourceOperations.setCursorField(ps, 8, JDBCType.REAL, "1.0");
sourceOperations.setCursorField(ps, 9, JDBCType.NUMERIC, "1");
sourceOperations.setCursorField(ps, 10, JDBCType.DECIMAL, "1");
sourceOperations.setCursorField(ps, 11, JDBCType.CHAR, "a");
sourceOperations.setCursorField(ps, 12, JDBCType.VARCHAR, "a");
sourceOperations.setCursorField(ps, 13, JDBCType.DATE, "2020-11-01T00:00:00Z");
sourceOperations.setCursorField(ps, 14, JDBCType.TIME, "1970-01-01T05:00:00.000Z");
sourceOperations.setCursorField(ps, 15, JDBCType.TIMESTAMP, "2001-09-29T03:00:00.000Z");
sourceOperations.setCursorField(ps, 16, JDBCType.BINARY, "61616161");

ps.execute();

Expand Down Expand Up @@ -332,7 +332,8 @@ private static void assertExpectedOutputTypes(final Connection connection) throw
final int columnCount = resultSet.getMetaData().getColumnCount();
final Map<String, JsonSchemaType> actual = new HashMap<>(columnCount);
for (int i = 1; i <= columnCount; i++) {
actual.put(resultSet.getMetaData().getColumnName(i), sourceOperations.getJsonType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i))));
actual.put(resultSet.getMetaData().getColumnName(i),
sourceOperations.getAirbyteType(JDBCType.valueOf(resultSet.getMetaData().getColumnType(i))));
}

final Map<String, JsonSchemaType> expected = ImmutableMap.<String, JsonSchemaType>builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ private List<JsonNode> retrieveRecordsFromTable(final String tableName, final St

final TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();
final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ protected List<JsonNode> retrieveRecordsFromTable(final String tableName, final

final TableResult queryResults = BigQueryUtils.executeQuery(bigquery, queryConfig).getLeft().getQueryResults();
final FieldList fields = queryResults.getSchema().getFields();
BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();
final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

return Streams.stream(queryResults.iterateAll())
.map(fieldValues -> sourceOperations.rowToJson(new BigQueryResultSet(fieldValues, fields))).collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
package io.airbyte.integrations.source.bigquery;

import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.enquoteIdentifierList;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullTableName;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.getFullyQualifiedTableNameWithQuoting;
import static io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils.queryTable;

import com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -81,7 +81,7 @@ public List<CheckedConsumer<BigQueryDatabase, Exception>> getCheckOperations(fin
checkList.add(database -> {
if (isDatasetConfigured(database)) {
database.query(String.format("select 1 from %s where 1=0",
getFullTableName(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString())));
getFullyQualifiedTableNameWithQuoting(getConfigDatasetId(database), "INFORMATION_SCHEMA.TABLES", getQuoteString())));
LOGGER.info("The source passed the Dataset query test!");
} else {
LOGGER.info("The Dataset query test is skipped due to not configured datasetId!");
Expand All @@ -92,8 +92,8 @@ public List<CheckedConsumer<BigQueryDatabase, Exception>> getCheckOperations(fin
}

@Override
protected JsonSchemaType getType(final StandardSQLTypeName columnType) {
return sourceOperations.getJsonType(columnType);
protected JsonSchemaType getAirbyteType(final StandardSQLTypeName columnType) {
return sourceOperations.getAirbyteType(columnType);
}

@Override
Expand Down Expand Up @@ -146,7 +146,7 @@ public AutoCloseableIterator<JsonNode> queryTableIncremental(final BigQueryDatab
final StandardSQLTypeName cursorFieldType) {
return queryTableWithParams(database, String.format("SELECT %s FROM %s WHERE %s > ?",
RelationalDbQueryUtils.enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString()),
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString()),
cursorInfo.getCursorField()),
sourceOperations.getQueryParameter(cursorFieldType, cursorInfo.getCursor()));
}
Expand All @@ -159,7 +159,7 @@ protected AutoCloseableIterator<JsonNode> queryTableFullRefresh(final BigQueryDa
LOGGER.info("Queueing query for table: {}", tableName);
return queryTable(database, String.format("SELECT %s FROM %s",
enquoteIdentifierList(columnNames, getQuoteString()),
getFullTableName(schemaName, tableName, getQuoteString())));
getFullyQualifiedTableNameWithQuoting(schemaName, tableName, getQuoteString())));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
try {
queryContext.getObject(i);
if (!queryContext.wasNull()) {
setJsonField(queryContext, i, jsonNode);
copyToJsonField(queryContext, i, jsonNode);
}
} catch (final SQLException e) {
putCockroachSpecialDataType(queryContext, i, jsonNode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.airbyte.integrations.source.db2.Db2Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.integrations.source.relationaldb.RelationalDbQueryUtils;
akashkulk marked this conversation as resolved.
Show resolved Hide resolved
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import java.io.File;
import java.io.IOException;
Expand Down Expand Up @@ -118,19 +119,19 @@ public void clean() throws Exception {
}
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_SPACES + 2))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_SPACES + 2, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME2,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITHOUT_CURSOR_TYPE))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITHOUT_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString()))));
super.database.execute(connection -> connection.createStatement().execute(String
.format("DROP TABLE IF EXISTS %s.%s", SCHEMA_NAME,
sourceOperations.enquoteIdentifier(connection, TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE))));
RelationalDbQueryUtils.enquoteIdentifier(TABLE_NAME_WITH_NULLABLE_CURSOR_TYPE, connection.getMetaData().getIdentifierQuoteString()))));
super.tearDown();
}

Expand Down
Loading