Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create a parallel abstract class hierarchy for JDBC vs non-JDBC classes #24172

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ properties:
type: array
description: An array of hosts that this connector can connect to. AllowedHosts not being present for the source or destination means that access to all hosts is allowed. An empty list here means that no network access is granted.
items:
type: string
type: string
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,3 @@ data:
oss:
enabled: true
definitionId: woohoo

Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,3 @@ data:
oss:
enabled: true
connectorType: destination

Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,3 @@ data:
oss:
enabled: true
what: is this?

Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ data:
sourceType: database
releaseStage: generally_available
license: MIT

Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ data:
hosts:
- "${host}"
- "${tunnel_method.tunnel_host}"

Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ data:
hosts:
- "${host}"
- "${tunnel_method.tunnel_host}"

Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ data:
hosts:
- "${host}"
- "${tunnel_method.tunnel_host}"

Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,3 @@ data:
hosts:
- "${host}"
- "${tunnel_method.tunnel_host}"

Original file line number Diff line number Diff line change
@@ -1 +1 @@
asdsad
asdsad
Original file line number Diff line number Diff line change
@@ -1 +1 @@
asdsad
asdsad
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,3 @@ data:
hosts:
- "${host}"
- "${tunnel_method.tunnel_host}"

Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,3 @@ data:
resourceRequirements:
memory_request: 1Gi
memory_limit: 1Gi


Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ data:
catalogs:
oss:
enabled: true

Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,3 @@ data:
resourceRequirements:
memory_request: 1Gi
memory_limit: 1Gi

Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,3 @@ data:
sourceType: database
releaseStage: generally_available
license: MIT

Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.Table;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.json.Jsons;
Expand All @@ -33,7 +34,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.slf4j.Logger;
Expand All @@ -51,17 +51,6 @@ public class BigQuerySource extends AbstractDbSource<StandardSQLTypeName, BigQue
private JsonNode dbConfig;
private final BigQuerySourceOperations sourceOperations = new BigQuerySourceOperations();

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final var conf = ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, config.get(CONFIG_PROJECT_ID).asText())
.put(CONFIG_CREDS, config.get(CONFIG_CREDS).asText());
if (config.hasNonNull(CONFIG_DATASET_ID)) {
conf.put(CONFIG_DATASET_ID, config.get(CONFIG_DATASET_ID).asText());
}
return Jsons.jsonNode(conf.build());
}

@Override
protected BigQueryDatabase createDatabase(final JsonNode sourceConfig) {
dbConfig = Jsons.clone(sourceConfig);
Expand Down Expand Up @@ -99,32 +88,22 @@ protected JsonSchemaType getAirbyteType(final StandardSQLTypeName columnType) {
return sourceOperations.getAirbyteType(columnType);
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
return Collections.emptySet();
}

@Override
protected List<TableInfo<CommonField<StandardSQLTypeName>>> discoverInternal(final BigQueryDatabase database) throws Exception {
return discoverInternal(database, null);
}

@Override
protected List<TableInfo<CommonField<StandardSQLTypeName>>> discoverInternal(final BigQueryDatabase database, final String schema) {
final String projectId = dbConfig.get(CONFIG_PROJECT_ID).asText();
final List<Table> tables =
(isDatasetConfigured(database) ? database.getDatasetTables(getConfigDatasetId(database)) : database.getProjectTables(projectId));
final List<TableInfo<CommonField<StandardSQLTypeName>>> result = new ArrayList<>();
tables.stream().map(table -> TableInfo.<CommonField<StandardSQLTypeName>>builder()
.nameSpace(table.getTableId().getDataset())
.name(table.getTableId().getTable())
.fields(Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream()
.map(f -> {
final StandardSQLTypeName standardType = f.getType().getStandardType();
return new CommonField<>(f.getName(), standardType);
})
.collect(Collectors.toList()))
.build())
.nameSpace(table.getTableId().getDataset())
.name(table.getTableId().getTable())
.fields(Objects.requireNonNull(table.getDefinition().getSchema()).getFields().stream()
.map(f -> {
final StandardSQLTypeName standardType = f.getType().getStandardType();
return new CommonField<>(f.getName(), standardType);
})
.collect(Collectors.toList()))
.build())
.forEach(result::add);
return result;
}
Expand All @@ -135,8 +114,7 @@ protected Map<String, List<String>> discoverPrimaryKeys(final BigQueryDatabase d
return Collections.emptyMap();
}

@Override
protected String getQuoteString() {
private String getQuoteString() {
return QUOTE;
}

Expand Down Expand Up @@ -192,6 +170,17 @@ private String getConfigDatasetId(final SqlDatabase database) {
return (isDatasetConfigured(database) ? database.getSourceConfig().get(CONFIG_DATASET_ID).asText() : "");
}

@VisibleForTesting
protected JsonNode toDatabaseConfig(final JsonNode config) {
final var conf = ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, config.get(CONFIG_PROJECT_ID).asText())
.put(CONFIG_CREDS, config.get(CONFIG_CREDS).asText());
if (config.hasNonNull(CONFIG_DATASET_ID)) {
conf.put(CONFIG_DATASET_ID, config.get(CONFIG_DATASET_ID).asText());
}
return Jsons.jsonNode(conf.build());
}

public static void main(final String[] args) throws Exception {
final Source source = new BigQuerySource();
LOGGER.info("starting source: {}", BigQuerySource.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.airbyte.db.jdbc.streaming.JdbcStreamingQueryConfig;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.dto.JdbcPrivilegeDto;
import io.airbyte.integrations.source.relationaldb.AbstractDbSource;
import io.airbyte.integrations.source.relationaldb.AbstractJdbcBaseSource;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
Expand Down Expand Up @@ -79,7 +79,7 @@
* relational DB source which can be accessed via JDBC driver. If you are implementing a connector
* for a relational DB which has a JDBC driver, make an effort to use this class.
*/
public abstract class AbstractJdbcSource<Datatype> extends AbstractDbSource<Datatype, JdbcDatabase> implements Source {
public abstract class AbstractJdbcSource<Datatype> extends AbstractJdbcBaseSource<Datatype, JdbcDatabase> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractJdbcSource.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.JsonSchemaType;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -54,18 +53,6 @@ public static void main(final String[] args) throws Exception {
LOGGER.info("completed source: {}", MongoDbSource.class);
}

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final var credentials = config.has(MongoUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY)
? String.format("%s:%s@", config.get(MongoUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText())
: StringUtils.EMPTY;

return Jsons.jsonNode(ImmutableMap.builder()
.put("connectionString", buildConnectionString(config, credentials))
.put(JdbcUtils.DATABASE_KEY, config.get(JdbcUtils.DATABASE_KEY).asText())
.build());
}

@Override
protected MongoDatabase createDatabase(final JsonNode sourceConfig) throws Exception {
final var dbConfig = toDatabaseConfig(sourceConfig);
Expand Down Expand Up @@ -94,11 +81,6 @@ protected JsonSchemaType getAirbyteType(final BsonType fieldType) {
return MongoUtils.getType(fieldType);
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
return Collections.emptySet();
}

@Override
protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDatabase database)
throws Exception {
Expand All @@ -122,6 +104,45 @@ protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDat
return tableInfos;
}

@Override
protected Map<String, List<String>> discoverPrimaryKeys(final MongoDatabase database,
final List<TableInfo<CommonField<BsonType>>> tableInfos) {
return tableInfos.stream()
.collect(Collectors.toMap(
TableInfo::getName,
TableInfo::getPrimaryKeys));
}

@Override
public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final MongoDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
return queryTable(database, columnNames, tableName, null);
}

@Override
public AutoCloseableIterator<JsonNode> queryTableIncremental(final MongoDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final CursorInfo cursorInfo,
final BsonType cursorFieldType) {
final Bson greaterComparison = gt(cursorInfo.getCursorField(), MongoUtils.getBsonValue(cursorFieldType, cursorInfo.getCursor()));
return queryTable(database, columnNames, tableName, greaterComparison);
}

private JsonNode toDatabaseConfig(final JsonNode config) {
final var credentials = config.has(MongoUtils.USER) && config.has(JdbcUtils.PASSWORD_KEY)
? String.format("%s:%s@", config.get(MongoUtils.USER).asText(), config.get(JdbcUtils.PASSWORD_KEY).asText())
: StringUtils.EMPTY;

return Jsons.jsonNode(ImmutableMap.builder()
.put("connectionString", buildConnectionString(config, credentials))
.put(JdbcUtils.DATABASE_KEY, config.get(JdbcUtils.DATABASE_KEY).asText())
.build());
}

private Set<String> getAuthorizedCollections(final MongoDatabase database) {
/*
* db.runCommand ({listCollections: 1.0, authorizedCollections: true, nameOnly: true }) the command
Expand Down Expand Up @@ -150,45 +171,6 @@ private Set<String> getAuthorizedCollections(final MongoDatabase database) {
}
}

@Override
protected List<TableInfo<CommonField<BsonType>>> discoverInternal(final MongoDatabase database, final String schema) throws Exception {
// MondoDb doesn't support schemas
return discoverInternal(database);
}

@Override
protected Map<String, List<String>> discoverPrimaryKeys(final MongoDatabase database,
final List<TableInfo<CommonField<BsonType>>> tableInfos) {
return tableInfos.stream()
.collect(Collectors.toMap(
TableInfo::getName,
TableInfo::getPrimaryKeys));
}

@Override
protected String getQuoteString() {
return "";
}

@Override
public AutoCloseableIterator<JsonNode> queryTableFullRefresh(final MongoDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName) {
return queryTable(database, columnNames, tableName, null);
}

@Override
public AutoCloseableIterator<JsonNode> queryTableIncremental(final MongoDatabase database,
final List<String> columnNames,
final String schemaName,
final String tableName,
final CursorInfo cursorInfo,
final BsonType cursorFieldType) {
final Bson greaterComparison = gt(cursorInfo.getCursorField(), MongoUtils.getBsonValue(cursorFieldType, cursorInfo.getCursor()));
return queryTable(database, columnNames, tableName, greaterComparison);
}

@Override
public boolean isCursorType(final BsonType bsonType) {
// while reading from mongo primary key "id" is always added, so there will be no situation
Expand Down
Loading