Skip to content

Commit

Permalink
🐛Destination-clickhouse: enabled and fixed tests for normalization (#…
Browse files Browse the repository at this point in the history
…14783)

[12582] destination-clickhouse: enabled normalization tests
  • Loading branch information
etsybaev authored Jul 27, 2022
1 parent 060b057 commit ce9b1d3
Show file tree
Hide file tree
Showing 6 changed files with 202 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public class JdbcUtils {
public static final String JDBC_URL_PARAMS_KEY = "jdbc_url_params";
public static final String PASSWORD_KEY = "password";
public static final String PORT_KEY = "port";
public static final String TCP_PORT_KEY = "tcp-port";
public static final List<String> PORT_LIST_KEY = List.of("port");
public static final String SCHEMA_KEY = "schema";
// NOTE: this is the plural version of SCHEMA_KEY
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ dependencies {
integrationTestJavaImplementation project(':airbyte-workers')
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
integrationTestJavaImplementation libs.connectors.destination.testcontainers.clickhouse
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-normalization').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.DataTypeTestArgumentProvider;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.integrations.util.HostPortResolver;
import java.sql.SQLException;
import java.time.Duration;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.wait.strategy.Wait;

Expand All @@ -43,7 +46,7 @@ protected String getImageName() {

@Override
protected boolean supportsNormalization() {
return false;
return true;
}

@Override
Expand Down Expand Up @@ -86,12 +89,15 @@ protected String getDefaultSchema(final JsonNode config) {

@Override
protected JsonNode getConfig() {
// Note: ClickHouse official JDBC driver uses HTTP protocol, its default port is 8123
// dbt clickhouse adapter uses native protocol, its default port is 9000
// Since we disabled normalization and dbt test, we only use the JDBC port here.
final Optional tcpPort = db.getExposedPorts().stream()
.map(exPort -> db.getMappedPort((Integer) exPort))
.filter(el -> !db.getFirstMappedPort().equals(el))
.findFirst();

return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(db))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(db))
.put(JdbcUtils.TCP_PORT_KEY, tcpPort.get())
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
.put(JdbcUtils.PASSWORD_KEY, db.getPassword())
Expand Down Expand Up @@ -143,7 +149,8 @@ private static JdbcDatabase getDatabase(final JsonNode config) {
ClickhouseDestination.HTTP_PROTOCOL,
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText())));
config.get(JdbcUtils.DATABASE_KEY).asText())),
new ClickhouseTestSourceOperations());
}

@Override
Expand All @@ -160,49 +167,20 @@ protected void tearDown(final TestDestinationEnv testEnv) {
db.close();
}

/**
* The SQL script generated by old version of dbt in 'test' step isn't compatible with ClickHouse,
* so we skip this test for now.
*
* Ref: https://github.com/dbt-labs/dbt-core/issues/3905
*
* @throws Exception
*/
@Disabled
public void testCustomDbtTransformations() throws Exception {
super.testCustomDbtTransformations();
}

@Disabled
public void testCustomDbtTransformationsFailure() throws Exception {}

/**
* The normalization container needs native port, while destination container needs HTTP port, we
* can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this
* test, so we skip it.
*
* @throws Exception
*/
@Disabled
public void testIncrementalDedupeSync() throws Exception {
super.testIncrementalDedupeSync();
}

/**
* The normalization container needs native port, while destination container needs HTTP port, we
* can't inject the port switch statement into DestinationAcceptanceTest.runSync() method for this
* test, so we skip it.
*
* @throws Exception
*/
@Disabled
public void testSyncWithNormalization(final String messagesFilename, final String catalogFilename) throws Exception {
super.testSyncWithNormalization(messagesFilename, catalogFilename);
}

@Disabled
public void specNormalizationValueShouldBeCorrect() throws Exception {
super.specNormalizationValueShouldBeCorrect();
@ParameterizedTest
@ArgumentsSource(DataTypeTestArgumentProvider.class)
public void testDataTypeTestWithNormalization(final String messagesFilename,
final String catalogFilename,
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility)
throws Exception {

// arrays are not fully supported yet in jdbc driver
// https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseArray.java
if (messagesFilename.contains("array")) {
return;
}

super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,32 @@

import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickhouseTestDataComparator extends AdvancedTestDataComparator {

private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseTestDataComparator.class);
private final ExtendedNameTransformer namingResolver = new ExtendedNameTransformer();

private static final String CLICKHOUSE_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSX";

// https://clickhouse.com/docs/en/sql-reference/data-types/date32/
private final LocalDate minSupportedDate = LocalDate.parse("1970-01-01");
private final LocalDate maxSupportedDate = LocalDate.parse("2149-06-06");
private final ZonedDateTime minSupportedDateTime = ZonedDateTime.parse(
"1925-01-01T00:00:00.000Z");
private final ZonedDateTime maxSupportedDateTime = ZonedDateTime.parse(
"2283-11-10T20:23:45.000Z");

@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
Expand All @@ -26,4 +45,103 @@ protected List<String> resolveIdentifier(final String identifier) {
return result;
}

@Override
protected boolean compareNumericValues(final String firstNumericValue,
final String secondNumericValue) {
// clickhouse stores double 1.14 as 1.1400000000000001
// https://clickhouse.com/docs/en/sql-reference/data-types/float/
double epsilon = 0.000000000000001d;

double firstValue = Double.parseDouble(firstNumericValue);
double secondValue = Double.parseDouble(secondNumericValue);

return Math.abs(firstValue - secondValue) < epsilon;
}

@Override
protected boolean compareBooleanValues(final String firstValue, final String secondValue) {
return parseBool(firstValue) == parseBool(secondValue);
}

@Override
protected boolean compareDateValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDate expectedDate = LocalDate.parse(airbyteMessageValue);
final LocalDate actualDate = LocalDate.parse(destinationValue);

if (expectedDate.isBefore(minSupportedDate) || expectedDate.isAfter(maxSupportedDate)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/date32
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}

return actualDate.equals(expectedDate);
}

@Override
protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue,
final String destinationValue) {
try {
ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue,
getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC);
ZonedDateTime destinationDate = parseDestinationDateWithTz(destinationValue);

if (airbyteDate.isBefore(minSupportedDateTime) || airbyteDate.isAfter(maxSupportedDateTime)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return airbyteDate.equals(destinationDate);
} catch (DateTimeParseException e) {
LOGGER.warn(
"Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}",
airbyteMessageValue, destinationValue, e);
return compareTextValues(airbyteMessageValue, destinationValue);
}
}

@Override
protected ZonedDateTime parseDestinationDateWithTz(final String destinationValue) {
return ZonedDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(
ZoneOffset.UTC);
}

@Override
protected boolean compareDateTimeValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDateTime expectedDateTime = LocalDateTime.parse(airbyteMessageValue);
final LocalDateTime actualDateTime = LocalDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT));

if (expectedDateTime.isBefore(minSupportedDateTime.toLocalDateTime())
|| expectedDateTime.isAfter(maxSupportedDateTime.toLocalDateTime())) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}

return expectedDateTime.equals(actualDateTime);
}

private boolean parseBool(final String valueAsString) {
// boolen as a String may be returned as true\false and as 0\1
// https://clickhouse.com/docs/en/sql-reference/data-types/boolean
try {
return Integer.parseInt(valueAsString) > 0;
} catch (final NumberFormatException ex) {
return Boolean.parseBoolean(valueAsString);
}

}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.clickhouse;

import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.jdbc.JdbcSourceOperations;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class ClickhouseTestSourceOperations extends JdbcSourceOperations {

@Override
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeFormatter.ISO_DATE.format(resultSet.getTimestamp(index).toLocalDateTime()));
}

@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getDateTimeObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();

DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
DataTypeUtils.DATE_FORMAT_WITH_MILLISECONDS_PATTERN);

node.put(columnName, resolveEra(date, timestamp.format(dateTimeFormatter)));
}

}
Loading

0 comments on commit ce9b1d3

Please sign in to comment.