Skip to content

Commit

Permalink
airbyte-5050: Added GenericParamType
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandertsukanov committed Apr 15, 2022
1 parent f5815b0 commit d557456
Show file tree
Hide file tree
Showing 14 changed files with 92 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package io.airbyte.integrations.types;

/**
* This class used as the generic type parameter to define the type of the parameter.
* Could be useful fot specific situations when we need to have a generic type parameter
*
* @see io.airbyte.integrations.destination.redshift.RedshiftDestination
*/
public class GenericParamType<T> {

private T t;

public GenericParamType(T t) {
this.t = t;
}

public void set(T t) {
this.t = t;
}

public T get() {
return t;
}

public static <T> GenericParamType<T> of(T t) {
return new GenericParamType<>(t);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.airbyte.integrations.types;

import static org.junit.jupiter.api.Assertions.assertEquals;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

public class GenericParamTypeTest {

private GenericParamType<String> genericParamType;

@BeforeEach
void setUp() {
genericParamType = new GenericParamType("sd");
}

@Test
public void testGet() {
String result = genericParamType.get();
assertEquals("sd", result);
}

@Test
public void testOf() {
GenericParamType<String> genericParamType = GenericParamType.of("sd");
assertEquals("sd", genericParamType.get());
}

@Test
public void testSet() {
genericParamType.set("test");
assertEquals("java.lang.String", genericParamType.get().getClass().getName());
assertEquals("test", genericParamType.get());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.File;
import java.io.IOException;
Expand All @@ -20,7 +20,7 @@
import ru.yandex.clickhouse.ClickHouseStatement;
import ru.yandex.clickhouse.domain.ClickHouseFormat;

public class ClickhouseSqlOperations<T,S> extends JdbcSqlOperations<T,S> {
public class ClickhouseSqlOperations extends JdbcSqlOperations<GenericParamType, GenericParamType> {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSqlOperations.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.List;

public class DatabricksSqlOperations<T,S> extends JdbcSqlOperations<T,S> {
public class DatabricksSqlOperations extends JdbcSqlOperations<GenericParamType, GenericParamType> {

@Override
public void executeTransaction(final JdbcDatabase database, final List<String> queries) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.base.sentry.AirbyteSentry;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.File;
import java.io.PrintWriter;
Expand All @@ -27,7 +28,7 @@
import org.apache.commons.csv.CSVPrinter;

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public abstract class JdbcSqlOperations<T, S> implements SqlOperations<T, S> {
public abstract class JdbcSqlOperations<T extends GenericParamType, S extends GenericParamType> implements SqlOperations<T, S> {

protected static final String SHOW_SCHEMAS = "show schemas;";
protected static final String NAME = "name";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.List;
import java.util.Set;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.airbyte.integrations.destination.buffered_stream_consumer.RecordWriter;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.record_buffer.InMemoryRecordBufferingStrategy;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
Expand Down Expand Up @@ -158,7 +159,7 @@ private static void closeAsOneTransaction(final Map<AirbyteStreamNameNamespacePa
}
if (!hasFailed) {
sqlOperations.onCloseTransactionOperations(db,
pairToCopier.keySet().stream().map(AirbyteStreamNameNamespacePair::getNamespace).collect(toSet()));
GenericParamType.of(pairToCopier.keySet().stream().map(AirbyteStreamNameNamespacePair::getNamespace).collect(toSet())));
sqlOperations.executeTransaction(db, queries);
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.record_buffer.SerializableBuffer;
import io.airbyte.integrations.types.GenericParamType;
import java.util.List;
import java.util.UUID;
import org.joda.time.DateTime;

public interface StagingOperations extends SqlOperations {
public interface StagingOperations extends SqlOperations<GenericParamType, GenericParamType> {

String getStageName(String namespace, String streamName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@
package io.airbyte.integrations.destination.jdbc;

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.util.List;

public class TestJdbcSqlOperations<T,S> extends JdbcSqlOperations<T,S> {
public class TestJdbcSqlOperations extends JdbcSqlOperations<GenericParamType, GenericParamType> {

@Override
public void insertRecordsInternal(final JdbcDatabase database,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.File;
import java.io.IOException;
Expand All @@ -21,7 +22,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MariadbColumnstoreSqlOperations<T,S> extends JdbcSqlOperations<T,S> {
public class MariadbColumnstoreSqlOperations extends JdbcSqlOperations<GenericParamType, GenericParamType> {

private static final Logger LOGGER = LoggerFactory.getLogger(MariadbColumnstoreSqlOperations.class);
private final String MINIMUM_VERSION = "5.5.3";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.File;
import java.io.IOException;
Expand All @@ -18,7 +19,7 @@
import java.util.List;
import java.util.stream.Collectors;

public class MySQLSqlOperations<T,S> extends JdbcSqlOperations<T,S> {
public class MySQLSqlOperations extends JdbcSqlOperations<GenericParamType, GenericParamType> {

private boolean isLocalFileEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.io.BufferedReader;
import java.io.File;
Expand All @@ -18,7 +19,7 @@
import org.postgresql.copy.CopyManager;
import org.postgresql.core.BaseConnection;

public class PostgresSqlOperations<T,S> extends JdbcSqlOperations<T,S> {
public class PostgresSqlOperations extends JdbcSqlOperations<GenericParamType, GenericParamType> {

public PostgresSqlOperations() {
super(new PostgresDataAdapter());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.integrations.destination.jdbc.WriteConfig;
import io.airbyte.integrations.destination.redshift.enums.RedshiftDataTmpTableMode;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
Expand All @@ -26,7 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftSqlOperations extends JdbcSqlOperations<List<WriteConfig>, Set<String>> implements SqlOperations<List<WriteConfig>, Set<String>> {
public class RedshiftSqlOperations extends JdbcSqlOperations<GenericParamType<List<WriteConfig>>, GenericParamType<Set<String>>>{

private static final Logger LOGGER = LoggerFactory.getLogger(RedshiftSqlOperations.class);
protected static final int REDSHIFT_VARCHAR_MAX_BYTE_SIZE = 65535;
Expand Down Expand Up @@ -105,10 +105,12 @@ public boolean isValidData(final JsonNode data) {
* @param database - Database object for interacting with a JDBC connection.
* @param writeConfigsList - list of write configs.
*/


@Override
public void onDestinationStartOperations(final JdbcDatabase database, final List<WriteConfig> writeConfigsList) {
public void onDestinationStartOperations(final JdbcDatabase database, final GenericParamType<List<WriteConfig>> writeConfigsList) {
LOGGER.info("Executing specific logic for Redshift Destination DB engine...");
Set<String> schemas = writeConfigsList.stream().map(WriteConfig::getOutputSchemaName).collect(toSet());
Set<String> schemas = writeConfigsList.get().stream().map(WriteConfig::getOutputSchemaName).collect(toSet());
List<String> schemaAndTableWithNotSuperType = schemas
.stream()
.flatMap(schemaName -> discoverNotSuperTables(database, schemaName).stream())
Expand All @@ -125,8 +127,9 @@ public void onDestinationStartOperations(final JdbcDatabase database, final List
* @param schemaNames - schema names.
*/
@Override
public void onCloseTransactionOperations(final JdbcDatabase database, final Set<String> schemaNames) {
public void onCloseTransactionOperations(final JdbcDatabase database, final GenericParamType<Set<String>> schemaNames) {
List<String> schemaAndTableWithNotSuperType = schemaNames
.get()
.stream()
.flatMap(schemaName -> discoverNotSuperTables(database, schemaName).stream())
.toList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.integrations.destination.jdbc.SqlOperations;
import io.airbyte.integrations.destination.jdbc.SqlOperationsUtils;
import io.airbyte.integrations.types.GenericParamType;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.sql.SQLException;
import java.util.List;
Expand All @@ -18,7 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SnowflakeSqlOperations<T,S> extends JdbcSqlOperations<T,S> implements SqlOperations<T,S> {
class SnowflakeSqlOperations extends JdbcSqlOperations<GenericParamType, GenericParamType> {

private static final Logger LOGGER = LoggerFactory.getLogger(SnowflakeSqlOperations.class);
private static final int MAX_FILES_IN_LOADING_QUERY_LIMIT = 1000;
Expand Down

3 comments on commit d557456

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Destination Clickhouse(#12064)

Measures

Name Value Name Value Name Value
Duplicated Blocks 0 Lines of Code 175 Security Rating A
Coverage 0.0 Duplicated Lines (%) 0.0 Lines to Cover 87
Code Smells 2 Vulnerabilities 0 Reliability Rating A
Quality Gate Status OK Bugs 1 Blocker Issues 0
Critical Issues 2 Major Issues 2 Minor Issues 0

Detected Issues

Rule File Description Message
java:S3740 (MAJOR) clickhouse/ClickhouseDestination.java Raw types should not be used Provide the parametrized type for this generic.
java:S3457 (MAJOR) clickhouse/ClickhouseSqlOperations.java:29 Printf-style format strings should be used correctly %n should be used in place of \n to produce the platform-specific line separator.
java:S1163 (CRITICAL) clickhouse/ClickhouseSqlOperations.java:100 Exceptions should not be thrown in finally blocks Refactor this code to not throw exceptions in finally blocks.
java:S1143 (CRITICAL) clickhouse/ClickhouseSqlOperations.java:100 Jump statements should not occur in "finally" blocks Remove this throw statement from this finally block.

Coverage (0.0%)

File Coverage File Coverage
src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSQLNameTransformer.java 0.0
src/main/java/io/airbyte/integrations/destination/clickhouse/ClickhouseSqlOperations.java 0.0

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Destination Mariadb Columnstore(#12064)

Measures

Name Value Name Value Name Value
Coverage 0.0 Bugs 1 Quality Gate Status OK
Duplicated Lines (%) 0.0 Duplicated Blocks 0 Reliability Rating A
Lines to Cover 122 Security Rating A Code Smells 12
Lines of Code 228 Vulnerabilities 0 Blocker Issues 0
Critical Issues 3 Major Issues 9 Minor Issues 3

Detected Issues

Rule File Description Message
java:S3740 (MAJOR) mariadb_columnstore/MariadbColumnstoreDestination.java Raw types should not be used Provide the parametrized type for this generic.
java:S3740 (MAJOR) mariadb_columnstore/MariadbColumnstoreDestination.java Raw types should not be used Provide the parametrized type for this generic.
java:S112 (MAJOR) mariadb_columnstore/MariadbColumnstoreDestination.java:49 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1192 (CRITICAL) mariadb_columnstore/MariadbColumnstoreDestination.java:87 String literals should not be duplicated Define a constant instead of duplicating this literal "password" 3 times.
java:S1068 (MAJOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:27 Unused "private" fields should be removed Remove this unused "LOGGER" private field.
java:S116 (MINOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:28 Field names should comply with a naming convention Rename this field "MINIMUM_VERSION" to match the regular expression '^[a-z][a-zA-Z0-9]*$'.
java:S1170 (MINOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:28 Public constants and fields initialized at declaration should be "static final" rather than merely "final" Make this final field static too.
java:S116 (MINOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:29 Field names should comply with a naming convention Rename this field "VERSION_PATTERN" to match the regular expression '^[a-z][a-zA-Z0-9]*$'.
java:S3457 (MAJOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:50 Printf-style format strings should be used correctly %n should be used in place of \n to produce the platform-specific line separator.
java:S112 (MAJOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:57 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:66 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S1163 (CRITICAL) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:66 Exceptions should not be thrown in finally blocks Refactor this code to not throw exceptions in finally blocks.
java:S1143 (CRITICAL) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:66 Jump statements should not occur in "finally" blocks Remove this throw statement from this finally block.
java:S112 (MAJOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:113 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S3457 (MAJOR) mariadb_columnstore/MariadbColumnstoreSqlOperations.java:113 Printf-style format strings should be used correctly %n should be used in place of \n to produce the platform-specific line separator.

Coverage (0.0%)

File Coverage File Coverage
src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestination.java 0.0 src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreNameTransformer.java 0.0
src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java 0.0

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SonarQube Report

SonarQube report for Airbyte Connectors Destination Databricks(#12064)

Measures

Name Value Name Value Name Value
Code Smells 3 Reliability Rating A Lines of Code 404
Duplicated Blocks 0 Bugs 0 Vulnerabilities 0
Quality Gate Status OK Lines to Cover 175 Security Rating A
Coverage 0.0 Duplicated Lines (%) 0.0 Blocker Issues 0
Critical Issues 0 Major Issues 5 Minor Issues 0

Detected Issues

Rule File Description Message
java:S1118 (MAJOR) databricks/DatabricksConstants.java:9 Utility classes should not have public constructors Add a private constructor to hide the implicit public one.
java:S3740 (MAJOR) databricks/DatabricksDestination.java Raw types should not be used Provide the parametrized type for this generic.
java:S3740 (MAJOR) databricks/DatabricksStreamCopier.java Raw types should not be used Provide the parametrized type for this generic.
java:S112 (MAJOR) databricks/DatabricksStreamCopier.java:72 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.
java:S112 (MAJOR) databricks/DatabricksStreamCopierFactory.java:39 Generic exceptions should never be thrown Define and throw a dedicated exception instead of using a generic one.

Coverage (0.0%)

File Coverage File Coverage
src/main/java/io/airbyte/integrations/destination/databricks/DatabricksConstants.java 0.0 src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestination.java 0.0
src/main/java/io/airbyte/integrations/destination/databricks/DatabricksDestinationConfig.java 0.0 src/main/java/io/airbyte/integrations/destination/databricks/DatabricksNameTransformer.java 0.0
src/main/java/io/airbyte/integrations/destination/databricks/DatabricksSqlOperations.java 0.0 src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopier.java 0.0
src/main/java/io/airbyte/integrations/destination/databricks/DatabricksStreamCopierFactory.java 0.0

Please sign in to comment.