Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updating the version and removing old imports #432

Merged
merged 17 commits into from
Aug 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# 1.2.0
* Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse

# 1.1.4
* Bugfix to address field value to column name mapping for Tuples
* Adding a KeyToValue transformation to allow for key to be stored in a separate column in ClickHouse

# 1.1.3
* Update to java-client 0.6.3
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.1.4
v1.2.0
4 changes: 0 additions & 4 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
*/

import com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar
import java.io.ByteArrayOutputStream
import java.net.URI
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import org.gradle.api.tasks.testing.logging.TestExceptionFormat
Expand All @@ -34,7 +32,6 @@ plugins {
signing
// checkstyle
id("com.github.gmazzo.buildconfig") version "5.3.5"
//id("com.github.spotbugs") version "4.7.9"
id("com.diffplug.spotless") version "6.25.0"
id("com.github.johnrengelman.shadow") version "7.1.2"
}
Expand All @@ -50,7 +47,6 @@ repositories {
}

extra.apply {

set("clickHouseDriverVersion", "0.6.3")
set("kafkaVersion", "2.7.0")
set("avroVersion", "1.9.2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ public ClickHouseSinkConfig(Map<String, String> props) {
}
}
}
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V2");
this.clientVersion = props.getOrDefault(CLIENT_VERSION, "V1");

LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,20 +91,19 @@ protected ClickHouseHelperClient createClient(Map<String,String> props, boolean
.setRetry(csc.getRetry())
.build();


if (withDatabase) {
createDatabase(this.database, tmpChc);
props.put(ClickHouseSinkConnector.DATABASE, this.database);
ClickHouseHelperClient chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase(this.database)
createDatabase(ClickHouseBase.database, tmpChc);
props.put(ClickHouseSinkConnector.DATABASE, ClickHouseBase.database);
tmpChc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase(ClickHouseBase.database)
.setUsername(username)
.setPassword(password)
.sslEnable(sslEnabled)
.setTimeout(timeout)
.setRetry(csc.getRetry())
.build();
return chc;
}

chc = tmpChc;
return chc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,14 @@
import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData;
import eu.rekawek.toxiproxy.Proxy;
import eu.rekawek.toxiproxy.ToxiproxyClient;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.*;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.ToxiproxyContainer;

import java.io.IOException;
import java.math.BigDecimal;
import java.util.*;
import java.util.stream.LongStream;

import static org.junit.jupiter.api.Assertions.*;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private Map<String, String> getTestProperties() {

@Test
public void proxyPingTest() throws IOException {
ClickHouseHelperClient chc = createClient(getTestProperties(), false);
ClickHouseHelperClient chc = createClient(getTestProperties());
assertTrue(chc.ping());
proxy.disable();
assertFalse(chc.ping());
Expand All @@ -72,7 +72,7 @@ public void proxyPingTest() throws IOException {
@Test
public void arrayTypesTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "array_string_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -92,7 +92,7 @@ public void arrayTypesTest() {
@Test
public void mapTypesTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "map_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -114,7 +114,7 @@ public void mapTypesTest() {
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/33
public void materializedViewsBug() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "m_array_string_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -135,7 +135,7 @@ public void materializedViewsBug() {
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38
public void specialCharTableNameTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "special-char-table-test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -157,7 +157,7 @@ public void specialCharTableNameTest() {
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/62
public void nullValueDataTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "null-value-table-test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -178,7 +178,7 @@ public void nullValueDataTest() {
// https://github.com/ClickHouse/clickhouse-kafka-connect/issues/57
public void supportDatesTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "support-dates-table-test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -197,7 +197,7 @@ public void supportDatesTest() {
@Test
public void detectUnsupportedDataConversions() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "support-unsupported-dates-table-test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -218,7 +218,7 @@ public void detectUnsupportedDataConversions() {
@Test
public void withEmptyDataRecordsTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props);

String topic = "schema_empty_records_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -235,7 +235,7 @@ public void withEmptyDataRecordsTest() {
@Test
public void withLowCardinalityTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props, true);

String topic = "schema_empty_records_lc_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -252,7 +252,7 @@ public void withLowCardinalityTest() {
@Test
public void withUUIDTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props, true);

String topic = "schema_empty_records_lc_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -269,7 +269,7 @@ public void withUUIDTest() {
@Test
public void schemaWithDefaultsTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props, true);

String topic = "default-value-table-test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -288,7 +288,7 @@ public void schemaWithDefaultsTest() {
@Test
public void schemaWithDecimalTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props, true);

String topic = "decimal-value-table-test";
ClickHouseTestHelpers.dropTable(chc, topic);
Expand All @@ -307,7 +307,7 @@ public void schemaWithDecimalTest() {
@Test
public void schemaWithBytesTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props, false);
ClickHouseHelperClient chc = createClient(props, true);
String topic = "bytes-value-table-test";
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (`string` String) Engine = MergeTree ORDER BY `string`");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public void materializedViewsBug() {
ClickHouseTestHelpers.dropTable(chc, topic + "mate");
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(String), `arr_empty` Array(String), `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) ) Engine = MergeTree ORDER BY off16");
ClickHouseTestHelpers.createTable(chc, topic + "mate", "CREATE MATERIALIZED VIEW %s ( `off16` Int16 ) Engine = MergeTree ORDER BY `off16` POPULATE AS SELECT off16 FROM " + topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE MATERIALIZED VIEW %s_mv TO " + topic + "_mate AS SELECT off16 FROM " + topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s_mate ( `off16` Int16 ) Engine = Null");
Collection<SinkRecord> sr = SchemaTestData.createArrayType(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@
import com.clickhouse.client.api.query.QueryResponse;
import com.clickhouse.client.api.query.QuerySettings;
import com.clickhouse.client.api.query.Records;
import com.clickhouse.data.ClickHouseColumn;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.data.ClickHouseValue;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseFieldDescriptor;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
Expand All @@ -26,12 +23,9 @@
import java.io.InputStreamReader;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

public class ClickHouseTestHelpers {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseTestHelpers.class);
Expand Down Expand Up @@ -74,6 +68,7 @@ public static void dropTable(ClickHouseHelperClient chc, String tableName) {
}
}
public static OperationMetrics createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
LOGGER.info("Creating table: {}, Query: {}", tableName, createTableQuery);
OperationMetrics operationMetrics = createTable(chc, tableName, createTableQuery, new HashMap<>());
if (isCloud()) {
try {
Expand All @@ -92,7 +87,7 @@ public static OperationMetrics createTable(ClickHouseHelperClient chc, String ta
settings.setOption(entry.getKey(), entry.getValue());
}
try {
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(10, java.util.concurrent.TimeUnit.SECONDS);
Records records = chc.getClient().queryRecords(createTableQueryTmp, settings).get(120, java.util.concurrent.TimeUnit.SECONDS);
return records.getMetrics();
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down Expand Up @@ -141,7 +136,7 @@ public static int countRows(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName);

try {
Records records = chc.getClient().queryRecords(queryCount).get(10, TimeUnit.SECONDS);
Records records = chc.getClient().queryRecords(queryCount).get(120, TimeUnit.SECONDS);
// Note we probrbly need asInteger() here
String value = records.iterator().next().getString(1);
return Integer.parseInt(value);
Expand Down
Loading