Skip to content

Commit

Permalink
Merge pull request #280 from ClickHouse/adjust-duplicate-token
Browse files Browse the repository at this point in the history
Adjusting the deduplication token hash
  • Loading branch information
Paultagoras authored Dec 11, 2023
2 parents 25f33d8 + 0bb52af commit 0110ca8
Show file tree
Hide file tree
Showing 10 changed files with 173 additions and 34 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 1.0.10 2023-12-08
* Bugfix (de-duplication token collision)

## 1.0.9 2023-12-06
* Added more logging to help debug future issues
* Restored send_progress_in_http_headers flag
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.9
v1.0.10
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package com.clickhouse.kafka.connect.sink;

import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.config.ClickHouseProxyType;
import com.clickhouse.kafka.connect.ClickHouseSinkConnector;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.clickhouse.kafka.connect.sink.helper.ConfluentPlatform;
import com.clickhouse.kafka.connect.sink.helper.SchemalessTestData;
import org.apache.kafka.connect.sink.SinkRecord;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.Network;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.*;

import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.createTable;
import static com.clickhouse.kafka.connect.sink.helper.ClickHouseAPI.dropTable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class ClickHouseCloudTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseCloudTest.class);
private static final Properties properties = System.getProperties();

private ClickHouseHelperClient createClient(Map<String,String> props) {
ClickHouseSinkConfig csc = new ClickHouseSinkConfig(props);

String hostname = csc.getHostname();
int port = csc.getPort();
String database = csc.getDatabase();
String username = csc.getUsername();
String password = csc.getPassword();
boolean sslEnabled = csc.isSslEnabled();
int timeout = csc.getTimeout();


return new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase(database)
.setUsername(username)
.setPassword(password)
.sslEnable(sslEnabled)
.setTimeout(timeout)
.setRetry(csc.getRetry())
.build();
}


private Map<String, String> getTestProperties() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, String.valueOf(properties.getOrDefault("clickhouse.host", "clickhouse")));
props.put(ClickHouseSinkConnector.PORT, String.valueOf(properties.getOrDefault("clickhouse.port", ClickHouseProtocol.HTTP.getDefaultPort())));
props.put(ClickHouseSinkConnector.DATABASE, String.valueOf(properties.getOrDefault("clickhouse.database", "default")));
props.put(ClickHouseSinkConnector.USERNAME, String.valueOf(properties.getOrDefault("clickhouse.username", "default")));
props.put(ClickHouseSinkConnector.PASSWORD, String.valueOf(properties.getOrDefault("clickhouse.password", "")));
props.put(ClickHouseSinkConnector.SSL_ENABLED, "true");
return props;
}



@Test
public void overlappingDataTest() {
Map<String, String> props = getTestProperties();
ClickHouseHelperClient chc = createClient(props);
String topic = "schemaless_overlap_table_test";
ClickHouseTestHelpers.dropTable(chc, topic);
ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `indexCount` Int64, `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " +
"`p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = ReplicatedMergeTree ORDER BY off16");
Collection<SinkRecord> sr = SchemalessTestData.createPrimitiveTypes(topic, 1);
Collection<SinkRecord> firstBatch = new ArrayList<>();
Collection<SinkRecord> secondBatch = new ArrayList<>();
Collection<SinkRecord> thirdBatch = new ArrayList<>();

//For the sake of the comments, assume size = 100
int firstBatchEndIndex = sr.size() / 2; // 0 - 50
int secondBatchStartIndex = firstBatchEndIndex - sr.size() / 4; // 25
int secondBatchEndIndex = firstBatchEndIndex + sr.size() / 4; // 75

for (SinkRecord record : sr) {
if (record.kafkaOffset() <= firstBatchEndIndex) {
firstBatch.add(record);
}

if (record.kafkaOffset() >= secondBatchStartIndex && record.kafkaOffset() <= secondBatchEndIndex) {
secondBatch.add(record);
}

if (record.kafkaOffset() >= secondBatchStartIndex) {
thirdBatch.add(record);
}
}

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(firstBatch);
chst.stop();
chst.start(props);
chst.put(secondBatch);
chst.stop();
chst.start(props);
chst.put(thirdBatch);
chst.stop();
LOGGER.info("Total Records: {}", sr.size());
LOGGER.info("Row Count: {}", ClickHouseTestHelpers.countRows(chc, topic));
assertTrue(ClickHouseTestHelpers.countRows(chc, topic) >= sr.size());
assertTrue(ClickHouseTestHelpers.checkSequentialRows(chc, topic, sr.size()));
ClickHouseTestHelpers.dropTable(chc, topic);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ private static void setupConnector(String fileName, String topicName, int taskCo
properties.getOrDefault("clickhouse.port", ClickHouseProtocol.HTTP.getDefaultPort()),
properties.getOrDefault("clickhouse.database", "default"),
properties.getOrDefault("clickhouse.username", "default"),
properties.getOrDefault("clickhouse.password", ""));
properties.getOrDefault("clickhouse.password", ""),
true);

confluentPlatform.createConnect(jsonString);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package com.clickhouse.kafka.connect.sink.helper;

import com.clickhouse.client.*;
import com.clickhouse.data.ClickHouseRecord;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClickHouseTestHelpers {
public static ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, String tableName) {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseTestHelpers.class);
public static void dropTable(ClickHouseHelperClient chc, String tableName) {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
Expand All @@ -13,13 +17,13 @@ public static ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, St
ClickHouseResponse response = client.read(chc.getServer())
.query(dropTable)
.executeAndWait()) {
return response.getSummary();
return;
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
}

public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
public static void createTable(ClickHouseHelperClient chc, String tableName, String createTableQuery) {
String createTableQueryTmp = String.format(createTableQuery, tableName);

try (ClickHouseClient client = ClickHouseClient.builder()
Expand All @@ -29,53 +33,63 @@ public static ClickHouseResponseSummary createTable(ClickHouseHelperClient chc,
ClickHouseResponse response = client.read(chc.getServer())
.query(createTableQueryTmp)
.executeAndWait()) {
return response.getSummary();
return;
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
}

public static int countRows(ClickHouseHelperClient chc, String tableName) {
String queryCount = String.format("SELECT COUNT(*) FROM `%s`", tableName);
try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(chc.getServer())
.query(queryCount)
.executeAndWait()) {
return response.firstRecord().getValue(0).asInteger();
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
return runQuery(chc, queryCount);
}

public static int sumRows(ClickHouseHelperClient chc, String tableName, String column) {
String queryCount = String.format("SELECT SUM(`%s`) FROM `%s`", column, tableName);
return runQuery(chc, queryCount);
}

public static int countRowsWithEmojis(ClickHouseHelperClient chc, String tableName) {
String queryCount = "SELECT COUNT(*) FROM `" + tableName + "` WHERE str LIKE '%\uD83D\uDE00%'";
return runQuery(chc, queryCount);
}

private static int runQuery(ClickHouseHelperClient chc, String query) {
try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(chc.getServer())
.query(queryCount)
.query(query)
.executeAndWait()) {
return response.firstRecord().getValue(0).asInteger();
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
}

public static int countRowsWithEmojis(ClickHouseHelperClient chc, String tableName) {
String queryCount = "SELECT COUNT(*) FROM `" + tableName + "` WHERE str LIKE '%\uD83D\uDE00%'";

public static boolean checkSequentialRows(ClickHouseHelperClient chc, String tableName, int totalRecords) {
String queryCount = String.format("SELECT DISTINCT `indexCount` FROM `%s` ORDER BY `indexCount` ASC", tableName);
try (ClickHouseClient client = ClickHouseClient.builder()
.options(chc.getDefaultClientOptions())
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints)
ClickHouseResponse response = client.read(chc.getServer())
.query(queryCount)
.executeAndWait()) {
return response.firstRecord().getValue(0).asInteger();

int expectedIndexCount = 0;
for (ClickHouseRecord record : response.records()) {
int currentIndexCount = record.getValue(0).asInteger();
if (currentIndexCount != expectedIndexCount) {
LOGGER.error("currentIndexCount: {}, expectedIndexCount: {}", currentIndexCount, expectedIndexCount);
return false;
}
expectedIndexCount++;
}

LOGGER.info("Total Records: {}, expectedIndexCount: {}", totalRecords, expectedIndexCount);
return totalRecords == expectedIndexCount;
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public static Collection<SinkRecord> createPrimitiveTypes(String topic, int part
LongStream.range(0, totalRecords).forEachOrdered(n -> {
Map<String, Object> value_struct = new HashMap<>();
value_struct.put("str", "num" + n);
value_struct.put("indexCount", n);
value_struct.put("off16", (short)n);
value_struct.put("p_int8", (byte)n);
value_struct.put("p_int16", (short)n);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,6 @@
"username": "%s",
"password": "%s",
"ssl": "true",
"exactlyOnce" : "true"
"exactlyOnce" : "%b"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,6 @@
"username": "%s",
"password": "%s",
"ssl": "true",
"exactlyOnce" : "true"
"exactlyOnce" : "%b"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -443,8 +443,7 @@ protected void doInsertRawBinary(List<Record> records, Table table, QueryIdentif

s2 = System.currentTimeMillis();
try (ClickHouseClient client = getClient()) {
ClickHouseRequest.Mutation request = getMutationRequest(client, ClickHouseFormat.RowBinary, table.getName(), queryId.getQueryId(),
first.getRecordOffsetContainer().getOffset() + first.getTopicAndPartition());
ClickHouseRequest.Mutation request = getMutationRequest(client, ClickHouseFormat.RowBinary, table.getName(), queryId);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;

Expand Down Expand Up @@ -488,8 +487,7 @@ protected void doInsertJson(List<Record> records, Table table, QueryIdentifier q


try (ClickHouseClient client = getClient()) {
ClickHouseRequest.Mutation request = getMutationRequest(client, ClickHouseFormat.JSONEachRow, table.getName(), queryId.getQueryId(),
first.getRecordOffsetContainer().getOffset() + first.getTopicAndPartition());
ClickHouseRequest.Mutation request = getMutationRequest(client, ClickHouseFormat.JSONEachRow, table.getName(), queryId);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;

Expand Down Expand Up @@ -563,8 +561,7 @@ protected void doInsertString(List<Record> records, Table table, QueryIdentifier
}

try (ClickHouseClient client = getClient()) {
ClickHouseRequest.Mutation request = getMutationRequest(client, clickHouseFormat, table.getName(), queryId.getQueryId(),
first.getRecordOffsetContainer().getOffset() + first.getTopicAndPartition());
ClickHouseRequest.Mutation request = getMutationRequest(client, clickHouseFormat, table.getName(), queryId);
ClickHouseConfig config = request.getConfig();
CompletableFuture<ClickHouseResponse> future;

Expand Down Expand Up @@ -614,12 +611,12 @@ private ClickHouseClient getClient() {
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
}
private ClickHouseRequest.Mutation getMutationRequest(ClickHouseClient client, ClickHouseFormat format, String tableName, String queryId, String deduplicationToken) {
private ClickHouseRequest.Mutation getMutationRequest(ClickHouseClient client, ClickHouseFormat format, String tableName, QueryIdentifier queryId) {
ClickHouseRequest.Mutation request = client.read(chc.getServer())
.write()
.table(tableName, queryId)
.table(tableName, queryId.getQueryId())
.format(format)
.set("insert_deduplication_token", deduplicationToken);
.set("insert_deduplication_token", queryId.getDeduplicationToken());

for (String clickhouseSetting : csc.getClickhouseSettings().keySet()) {//THIS ASSUMES YOU DON'T ADD insert_deduplication_token
request.set(clickhouseSetting, csc.getClickhouseSettings().get(clickhouseSetting));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,8 @@ public long getMinOffset() {
public long getMaxOffset() {
return maxOffset;
}

public String getDeduplicationToken() {
return String.format("%s-%s-%s-%s", topic, partition, minOffset, maxOffset);
}
}

0 comments on commit 0110ca8

Please sign in to comment.