From 600178c16287185b4ce5a6147afbe469581b873a Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Wed, 12 Jul 2023 21:43:49 -0400 Subject: [PATCH 1/5] Cleaning up and adding additional logs --- .../connect/sink/ClickHouseSinkConfig.java | 64 ++++++--- .../connect/sink/ClickHouseSinkTask.java | 2 +- .../kafka/connect/sink/ProxySinkTask.java | 2 +- .../connect/sink/db/ClickHouseWriter.java | 128 ++++++++---------- .../db/helper/ClickHouseHelperClient.java | 31 +++-- .../kafka/connect/sink/db/mapping/Column.java | 9 +- .../kafka/connect/sink/db/mapping/Type.java | 2 +- .../clickhouse/kafka/connect/util/Utils.java | 3 +- 8 files changed, 127 insertions(+), 114 deletions(-) diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index bc1226b0..01789b3f 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -7,10 +7,13 @@ import org.slf4j.LoggerFactory; import java.io.UnsupportedEncodingException; +import java.nio.charset.StandardCharsets; import java.util.Map; public class ClickHouseSinkConfig { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkConfig.class); + + //Configuration Names public static final String HOSTNAME = "hostname"; public static final String PORT = "port"; public static final String DATABASE = "database"; @@ -20,7 +23,12 @@ public class ClickHouseSinkConfig { public static final String TIMEOUT_SECONDS = "timeoutSeconds"; public static final String RETRY_COUNT = "retryCount"; public static final String EXACTLY_ONCE = "exactlyOnce"; + public static final String INSERT_QUORUM = "insertQuorum"; + + + + public static final int MILLI_IN_A_SEC = 1000; private static final String databaseDefault = "default"; public static final int portDefault = 8443; @@ -30,6 +38,7 @@ public class ClickHouseSinkConfig { public static final Integer timeoutSecondsDefault = 30; public static final Integer retryCountDefault = 3; public static final Boolean exactlyOnceDefault = Boolean.FALSE; + public static final Integer insertQuorumDefault = 2; public enum StateStores { NONE, IN_MEMORY, @@ -37,30 +46,24 @@ public enum StateStores { KEEPER_MAP } - private Map settings = null; - private String hostname; - private int port; - private String database; - private String username; - private String password; - private boolean sslEnabled; - private boolean exactlyOnce; - - private int timeout; - - private int retry; + private final String hostname; + private final int port; + private final String database; + private final String username; + private final String password; + private final boolean sslEnabled; + private final boolean exactlyOnce; + private final int timeout; + private final int retry; + private final int insertQuorum; public static class UTF8String implements ConfigDef.Validator { @Override public void ensureValid(String name, Object o) { String s = (String) o; - try { - if (s != null ) { - byte[] tmpBytes = s.getBytes("UTF-8"); - } - } catch (UnsupportedEncodingException e) { - throw new ConfigException(name, o, "String must be non-empty"); + if (s != null ) { + byte[] tmpBytes = s.getBytes(StandardCharsets.UTF_8); } } @@ -73,16 +76,25 @@ public String toString() { public ClickHouseSinkConfig(Map props) { // Extracting configuration hostname = props.get(HOSTNAME); + LOGGER.debug(HOSTNAME + ": " + hostname); port = Integer.parseInt(props.getOrDefault(PORT, String.valueOf(portDefault))); + LOGGER.debug(PORT + ": " + port); database = props.getOrDefault(DATABASE, databaseDefault); + LOGGER.debug(DATABASE + ": " + database); username = props.getOrDefault(USERNAME, usernameDefault); + LOGGER.debug(USERNAME + ": " + username); password = props.getOrDefault(PASSWORD, passwordDefault).trim(); + LOGGER.debug("password: ********"); // don't actually log password sslEnabled = Boolean.parseBoolean(props.getOrDefault(SSL_ENABLED,"false")); + LOGGER.debug(SSL_ENABLED + ": " + sslEnabled); timeout = Integer.parseInt(props.getOrDefault(TIMEOUT_SECONDS, timeoutSecondsDefault.toString())) * MILLI_IN_A_SEC; // multiple in 1000 milli + LOGGER.debug(TIMEOUT_SECONDS + ": " + timeout); retry = Integer.parseInt(props.getOrDefault(RETRY_COUNT, retryCountDefault.toString())); + LOGGER.debug(RETRY_COUNT + ": " + retry); exactlyOnce = Boolean.parseBoolean(props.getOrDefault(EXACTLY_ONCE,"false")); - LOGGER.info("exactlyOnce: " + exactlyOnce); - LOGGER.info("props: " + props); + LOGGER.debug(EXACTLY_ONCE + ": " + exactlyOnce); + insertQuorum = Integer.parseInt(props.getOrDefault(INSERT_QUORUM, insertQuorumDefault.toString())); + LOGGER.debug(INSERT_QUORUM + ": " + insertQuorum); } public static final ConfigDef CONFIG = createConfigDef(); @@ -178,6 +190,16 @@ private static ConfigDef createConfigDef() { ++orderInGroup, ConfigDef.Width.MEDIUM, "enable exactly once semantics."); + configDef.define(INSERT_QUORUM, + ConfigDef.Type.INT, + insertQuorumDefault, + ConfigDef.Range.between(0, 10), + ConfigDef.Importance.LOW, + "insert quorum", + group, + ++orderInGroup, + ConfigDef.Width.SHORT, + "insert quorum"); return configDef; } @@ -205,10 +227,10 @@ public String getPassword() { public boolean isSslEnabled() { return sslEnabled; } - public int getTimeout() { return timeout; } public int getRetry() { return retry; } public boolean getExactlyOnce() { return exactlyOnce; } + public int getInsertQuorum() { return insertQuorum; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java index 4671d197..cd1b343e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTask.java @@ -43,7 +43,7 @@ public String version() { @Override public void start(Map props) { - LOGGER.info("start SinkTask: "); + LOGGER.info("Start SinkTask: "); ClickHouseSinkConfig clickHouseSinkConfig; try { clickHouseSinkConfig = new ClickHouseSinkConfig(props); diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java index 5d760b74..0997554e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java @@ -61,7 +61,7 @@ public void stop() { public void put(final Collection records) { if (records.isEmpty()) { - LOGGER.trace("No records send to SinkTask"); + LOGGER.trace("No records sent to SinkTask"); return; } // Group by topic & partition diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 6f2d8f3f..694db191 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -49,23 +49,14 @@ public ClickHouseWriter() { @Override public boolean start(ClickHouseSinkConfig csc) { + LOGGER.trace("Starting ClickHouseWriter"); this.csc = csc; - String hostname = csc.getHostname(); - int port = csc.getPort(); - String database = csc.getDatabase(); - String username = csc.getUsername(); - String password = csc.getPassword(); - boolean sslEnabled = csc.isSslEnabled(); - int timeout = csc.getTimeout(); - - LOGGER.info(String.format("hostname: [%s] port [%d] database [%s] username [%s] password [%s] sslEnabled [%s] timeout [%d]", hostname, port, database, username, Mask.passwordMask(password), sslEnabled, timeout)); - - chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port) - .setDatabase(database) - .setUsername(username) - .setPassword(password) - .sslEnable(sslEnabled) - .setTimeout(timeout) + chc = new ClickHouseHelperClient.ClickHouseClientBuilder(csc.getHostname(), csc.getPort()) + .setDatabase(csc.getDatabase()) + .setUsername(csc.getUsername()) + .setPassword(csc.getPassword()) + .sslEnable(csc.isSslEnabled()) + .setTimeout(csc.getTimeout()) .setRetry(csc.getRetry()) .build(); @@ -74,7 +65,7 @@ public boolean start(ClickHouseSinkConfig csc) { return false; } - LOGGER.info("Ping is successful."); + LOGGER.debug("Ping was successful."); List tableList = chc.extractTablesMapping(); if (tableList.isEmpty()) { @@ -82,7 +73,7 @@ public boolean start(ClickHouseSinkConfig csc) { return false; } - for (Table table: tableList) { + for (Table table: tableList) {//TODO: Should we pull ALL tables in memory? this.mapping.put(table.getName(), table); } return true; @@ -90,7 +81,7 @@ public boolean start(ClickHouseSinkConfig csc) { @Override public void stop() { - + LOGGER.debug("Stopping ClickHouseWriter"); } public void setBinary(boolean binary) { @@ -100,8 +91,7 @@ public void setBinary(boolean binary) { // TODO: we need to refactor that private String convertHelper(Object v) { if (v instanceof List) { - String value = ((List) v).stream().map( vv -> vv.toString()).collect(Collectors.joining(",","[","]")); - return value; + return ((List) v).stream().map(vv -> vv.toString()).collect(Collectors.joining(",","[","]")); } else { return v.toString(); @@ -163,38 +153,26 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField Data obj = record.getJsonMap().get(colName); if (obj == null) { validSchema = false; - LOGGER.error(String.format("Table column name [%s] is not found in data record.", colName)); + LOGGER.error(String.format("Table column name [%s] was not found.", colName)); } if (!onlyFieldsName) { String colTypeName = type.name(); String dataTypeName = obj.getFieldType().getName().toUpperCase(); // TODO: make extra validation for Map/Array type + LOGGER.debug(String.format("Column type name [%s] and data type name [%s]", colTypeName, dataTypeName)); switch (colTypeName) { case "Date": case "Date32": - if ( dataTypeName.equals(Type.INT32) || dataTypeName.equals(Type.STRING) ) { - LOGGER.debug(String.format("Will try to convert from %s to %s", colTypeName, dataTypeName)); - } - break; case "DateTime": case "DateTime64": - if ( dataTypeName.equals(Type.INT64) || dataTypeName.equals(Type.STRING) ) { - LOGGER.debug(String.format("Will try to convert from %s to %s", colTypeName, dataTypeName)); - } - break; case "UUID": - if ( dataTypeName.equals(Type.UUID) || dataTypeName.equals(Type.STRING) ) { - LOGGER.debug(String.format("Will try to convert from %s to %s", colTypeName, dataTypeName)); - } - break; + break;//I notice we just break here, rather than actually validate the type default: if (!colTypeName.equals(dataTypeName)) { validSchema = false; LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName)); } - } - } } } @@ -211,40 +189,42 @@ private void doWriteDates(Type type, ClickHousePipedOutputStream stream, Data va switch (type) { case Date: if (value.getFieldType().equals(Schema.Type.INT32)) { - BinaryStreamUtils.writeUnsignedInt16(stream, ((Integer) value.getObject()).intValue()); + BinaryStreamUtils.writeUnsignedInt16(stream, (Integer) value.getObject()); } else { unsupported = true; } break; case Date32: if (value.getFieldType().equals(Schema.Type.INT32)) { - BinaryStreamUtils.writeInt32(stream, ((Integer) value.getObject()).intValue()); + BinaryStreamUtils.writeInt32(stream, (Integer) value.getObject()); } else { unsupported = true; } break; case DateTime: if (value.getFieldType().equals(Schema.Type.INT64)) { - BinaryStreamUtils.writeUnsignedInt32(stream, ((Long) value.getObject()).longValue()); + BinaryStreamUtils.writeUnsignedInt32(stream, (Long) value.getObject()); } else { unsupported = true; } break; case DateTime64: if (value.getFieldType().equals(Schema.Type.INT64)) { - BinaryStreamUtils.writeInt64(stream, ((Long) value.getObject()).longValue()); + BinaryStreamUtils.writeInt64(stream, (Long) value.getObject()); } else { unsupported = true; } break; } if (unsupported) { - String msg = String.format("Not implemented conversion. from %s to %s", value.getFieldType(), type); + String msg = String.format("Not implemented conversion from %s to %s", value.getFieldType(), type); LOGGER.error(msg); throw new DataException(msg); } } private void doWritePrimitive(Type type, ClickHousePipedOutputStream stream, Object value) throws IOException { + LOGGER.trace("Writing primitive type: {}, value: {}", type, value); + if (value == null) { BinaryStreamUtils.writeNull(stream); return; @@ -293,18 +273,21 @@ private void doWritePrimitive(Type type, ClickHousePipedOutputStream stream, Obj } private void doWriteCol(Record record, Column col, ClickHousePipedOutputStream stream) throws IOException { + LOGGER.trace("Writing column {} to stream", col.getName()); + LOGGER.trace("Column type is {}", col.getType()); String name = col.getName(); Type colType = col.getType(); boolean filedExists = record.getJsonMap().containsKey(name); if (filedExists) { Data value = record.getJsonMap().get(name); + LOGGER.trace("Column value is {}", value); // TODO: the mapping need to be more efficient // If column is nullable && the object is also null add the not null marker if (col.isNullable() && value.getObject() != null) { BinaryStreamUtils.writeNonNull(stream); } - if (col.isNullable() == false && value.getObject() == null) { + if (!col.isNullable() && value.getObject() == null) { // this the situation when the col is not isNullable, but the data is null here we need to drop the records throw new RuntimeException(("col.isNullable() is false and value is empty")); } @@ -334,14 +317,13 @@ private void doWriteCol(Record record, Column col, ClickHousePipedOutputStream s Map mapTmp = (Map)value.getObject(); int mapSize = mapTmp.size(); BinaryStreamUtils.writeVarInt(stream, mapSize); - mapTmp.entrySet().forEach( v-> { + mapTmp.forEach((key, value1) -> { try { - doWritePrimitive(col.getMapKeyType(), stream, v.getKey()); - doWritePrimitive(col.getMapValueType(), stream, v.getValue()); + doWritePrimitive(col.getMapKeyType(), stream, key); + doWritePrimitive(col.getMapValueType(), stream, value1); } catch (IOException e) { throw new RuntimeException(e); } - }); break; case ARRAY: @@ -362,8 +344,8 @@ private void doWriteCol(Record record, Column col, ClickHousePipedOutputStream s // set null since there is no value BinaryStreamUtils.writeNull(stream); } else { - // no filed and not nullable - LOGGER.error(String.format("Record is missing field %s", name)); + // no filled and not nullable + LOGGER.error("Column {} is not nullable and no value is provided", name); throw new RuntimeException(); } } @@ -378,11 +360,11 @@ public void doInsertRawBinary(List records) throws IOException, Executio Record first = records.get(0); String topic = first.getTopic(); - LOGGER.info(String.format("Number of records to insert %d to table name %s", batchSize, topic)); + LOGGER.info("Inserting {} records into topic {}", batchSize, topic); Table table = this.mapping.get(Utils.escapeTopicName(topic)); if (table == null) { //TODO to pick the correct exception here - throw new RuntimeException(String.format("Table %s does not exists", topic)); + throw new RuntimeException(String.format("Table %s does not exist.", topic)); } if ( !validateDataSchema(table, first, false) ) @@ -397,7 +379,7 @@ public void doInsertRawBinary(List records) throws IOException, Executio .table(table.getName()) .format(ClickHouseFormat.RowBinary) // this is needed to get meaningful response summary - .set("insert_quorum", 2) + .set("insert_quorum", csc.getInsertQuorum()) .set("send_progress_in_http_headers", 1); ClickHouseConfig config = request.getConfig(); @@ -436,7 +418,6 @@ public void doInsertRawBinary(List records) throws IOException, Executio long s3 = System.currentTimeMillis(); LOGGER.info("batchSize {} data ms {} send {}", batchSize, s2 - s1, s3 - s2); - } @@ -457,6 +438,7 @@ public void doInsertJson(List records) throws IOException, ExecutionExce Table table = this.mapping.get(Utils.escapeTopicName(topic)); if (table == null) { //TODO to pick the correct exception here + LOGGER.error("Table {} does not exist - see docs for more details about table names and topic names.", topic); throw new RuntimeException(String.format("Table %s does not exists", topic)); } @@ -469,7 +451,7 @@ public void doInsertJson(List records) throws IOException, ExecutionExce .table(table.getName()) .format(ClickHouseFormat.JSONEachRow) // this is needed to get meaningful response summary - .set("insert_quorum", 2) + .set("insert_quorum", csc.getInsertQuorum()) .set("input_format_skip_unknown_fields", 1) .set("send_progress_in_http_headers", 1); @@ -536,6 +518,7 @@ public void doInsertJson(List records) throws IOException, ExecutionExce LOGGER.info("batchSize {} data ms {} send {}", batchSize, s2 - s1, s3 - s2); } + /** public void doInsertSimple(List records) { // TODO: here we will need to make refactor (not to use query & string , but we can make this optimization later ) long s1 = System.currentTimeMillis(); @@ -569,29 +552,30 @@ public void doInsertSimple(List records) { LOGGER.debug(insertStr); LOGGER.debug("*****************"); chc.query(insertStr, ClickHouseFormat.RowBinaryWithNamesAndTypes); - /* - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - ClickHouseResponse response = client.connect(chc.getServer()) // or client.connect(endpoints) - // you'll have to parse response manually if using a different format - .option(ClickHouseClientOption.CONNECTION_TIMEOUT, csc.getTimeout()) - .option(ClickHouseClientOption.SOCKET_TIMEOUT, csc.getTimeout()) - .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) - .query(insertStr) - .executeAndWait()) { - ClickHouseResponseSummary summary = response.getSummary(); - long totalRows = summary.getTotalRowsToRead(); - LOGGER.info("totalRows {}", totalRows); - - } catch (ClickHouseException e) { - LOGGER.debug(insertStr); - LOGGER.error(String.format("INSERT ErrorCode %d ", e.getErrorCode()), e); - throw new RuntimeException(e); - } - */ +// try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); +// ClickHouseResponse response = client.connect(chc.getServer()) // or client.connect(endpoints) +// // you'll have to parse response manually if using a different format +// .option(ClickHouseClientOption.CONNECTION_TIMEOUT, csc.getTimeout()) +// .option(ClickHouseClientOption.SOCKET_TIMEOUT, csc.getTimeout()) +// .format(ClickHouseFormat.RowBinaryWithNamesAndTypes) +// .query(insertStr) +// .executeAndWait()) { +// ClickHouseResponseSummary summary = response.getSummary(); +// long totalRows = summary.getTotalRowsToRead(); +// LOGGER.info("totalRows {}", totalRows); +// +// } catch (ClickHouseException e) { +// LOGGER.debug(insertStr); +// LOGGER.error(String.format("INSERT ErrorCode %d ", e.getErrorCode()), e); +// throw new RuntimeException(e); +// } + + long s3 = System.currentTimeMillis(); LOGGER.info("batchSize {} data ms {} send {}", batchSize, s2 - s1, s3 - s2); } + **/ @Override public long recordsInserted() { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index 8b78e317..bd3ad112 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -17,15 +17,15 @@ public class ClickHouseHelperClient { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseHelperClient.class); - private String hostname = null; - private int port = -1; - private String username = "default"; - private String database = "default"; - private String password = ""; - private boolean sslEnabled = false; - private int timeout = ClickHouseSinkConfig.timeoutSecondsDefault * ClickHouseSinkConfig.MILLI_IN_A_SEC; + private final String hostname; + private final int port; + private final String username; + private final String database; + private final String password; + private final boolean sslEnabled; + private final int timeout; private ClickHouseNode server = null; - private int retry; + private final int retry; public ClickHouseHelperClient(ClickHouseClientBuilder builder) { this.hostname = builder.hostname; this.port = builder.port; @@ -45,10 +45,10 @@ private ClickHouseNode create() { String url = String.format("%s://%s:%d/%s", protocol, hostname, port, database); - LOGGER.info("url: " + url); + LOGGER.info("ClickHouse URL: " + url); if (username != null && password != null) { - LOGGER.info(String.format("Adding username [%s] password [%s] ", username, Mask.passwordMask(password))); + LOGGER.debug(String.format("Adding username [%s]", username)); Map options = new HashMap<>(); options.put("user", username); options.put("password", password); @@ -61,19 +61,19 @@ private ClickHouseNode create() { public boolean ping() { ClickHouseClient clientPing = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - LOGGER.debug(String.format("server [%s] , timeout [%d]", server, timeout)); + LOGGER.debug(String.format("Server [%s] , Timeout [%d]", server, timeout)); int retryCount = 0; while (retryCount < retry) { if (clientPing.ping(server, timeout)) { - LOGGER.info("Ping is successful."); + LOGGER.info("Ping was successful."); clientPing.close(); return true; } retryCount++; LOGGER.warn(String.format("Ping retry %d out of %d", retryCount, retry)); } - LOGGER.error("unable to ping to clickhouse server. "); + LOGGER.error("Unable to ping ClickHouse instance."); clientPing.close(); return false; } @@ -132,6 +132,7 @@ public Table describeTable(String tableName) { if (tableName.startsWith(".inner")) return null; String describeQuery = String.format("DESCRIBE TABLE `%s`.`%s`", this.database, tableName); + LOGGER.debug(describeQuery); try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); ClickHouseResponse response = client.connect(server) // or client.connect(endpoints) @@ -145,6 +146,7 @@ public Table describeTable(String tableName) { if (cols.length > 2) { String defaultKind = cols[2]; if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) { + LOGGER.debug("Skipping column {} as it is an alias or materialized view", cols[0]); // Only insert into "real" columns continue; } else if("DEFAULT".equals(defaultKind)) { @@ -157,10 +159,9 @@ public Table describeTable(String tableName) { } return table; } catch (ClickHouseException e) { - LOGGER.error(String.format("Got exception when running %s", describeQuery), e); + LOGGER.error(String.format("Exception when running describeTable %s", describeQuery), e); return null; } - } public List
extractTablesMapping() { List
tableList = new ArrayList<>(); diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java index 1f661985..5ca1efcd 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Column.java @@ -1,6 +1,10 @@ package com.clickhouse.kafka.connect.sink.db.mapping; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class Column { + private static final Logger LOGGER = LoggerFactory.getLogger(Column.class); private String name; private Type type; private boolean isNullable; @@ -130,8 +134,9 @@ private static Type dispatchPrimitive(String valueType) { } public static Column extractColumn(String name, String valueType, boolean isNull) { - Type type = Type.NONE; - type = dispatchPrimitive(valueType); + LOGGER.trace("Extracting column {} with type {}", name, valueType); + + Type type = dispatchPrimitive(valueType); if (valueType.startsWith("Array")) { type = Type.ARRAY; Column subType = extractColumn(name, valueType.substring("Array".length() + 1, valueType.length() - 1), false); diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java index 4e4af222..4bbcf182 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/mapping/Type.java @@ -24,5 +24,5 @@ public enum Type { UINT32, UINT64, UINT128, - UINT256, + UINT256 } diff --git a/src/main/java/com/clickhouse/kafka/connect/util/Utils.java b/src/main/java/com/clickhouse/kafka/connect/util/Utils.java index ab72ce4b..9ecccc5f 100644 --- a/src/main/java/com/clickhouse/kafka/connect/util/Utils.java +++ b/src/main/java/com/clickhouse/kafka/connect/util/Utils.java @@ -31,6 +31,7 @@ public static Exception getRootCause (Exception e, Boolean prioritizeClickHouseE Throwable runningException = e;//We have to use Throwable because of the getCause() signature while (runningException.getCause() != null && (!prioritizeClickHouseException || !(runningException instanceof ClickHouseException))) { + LOGGER.trace("Found exception: {}", runningException.getLocalizedMessage()); runningException = runningException.getCause(); } @@ -44,7 +45,7 @@ public static Exception getRootCause (Exception e, Boolean prioritizeClickHouseE */ public static void handleException(Exception e) { - LOGGER.debug("Exception in doInsert", e); + LOGGER.warn("Deciding how to handle exception: {}", e.getLocalizedMessage()); //High-Level Explicit Exception Checking if (e instanceof DataException) { throw (DataException) e; From 91a279604ea5969fe6a678abfd4d0b33eafe33dc Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Thu, 13 Jul 2023 17:32:21 -0400 Subject: [PATCH 2/5] PR feedback --- .../connect/sink/ClickHouseSinkConfig.java | 27 ++----------------- .../connect/sink/db/ClickHouseWriter.java | 4 +-- 2 files changed, 4 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java index 01789b3f..8672c452 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkConfig.java @@ -23,7 +23,6 @@ public class ClickHouseSinkConfig { public static final String TIMEOUT_SECONDS = "timeoutSeconds"; public static final String RETRY_COUNT = "retryCount"; public static final String EXACTLY_ONCE = "exactlyOnce"; - public static final String INSERT_QUORUM = "insertQuorum"; @@ -38,7 +37,6 @@ public class ClickHouseSinkConfig { public static final Integer timeoutSecondsDefault = 30; public static final Integer retryCountDefault = 3; public static final Boolean exactlyOnceDefault = Boolean.FALSE; - public static final Integer insertQuorumDefault = 2; public enum StateStores { NONE, IN_MEMORY, @@ -55,7 +53,6 @@ public enum StateStores { private final boolean exactlyOnce; private final int timeout; private final int retry; - private final int insertQuorum; public static class UTF8String implements ConfigDef.Validator { @@ -76,25 +73,16 @@ public String toString() { public ClickHouseSinkConfig(Map props) { // Extracting configuration hostname = props.get(HOSTNAME); - LOGGER.debug(HOSTNAME + ": " + hostname); port = Integer.parseInt(props.getOrDefault(PORT, String.valueOf(portDefault))); - LOGGER.debug(PORT + ": " + port); database = props.getOrDefault(DATABASE, databaseDefault); - LOGGER.debug(DATABASE + ": " + database); username = props.getOrDefault(USERNAME, usernameDefault); - LOGGER.debug(USERNAME + ": " + username); password = props.getOrDefault(PASSWORD, passwordDefault).trim(); - LOGGER.debug("password: ********"); // don't actually log password sslEnabled = Boolean.parseBoolean(props.getOrDefault(SSL_ENABLED,"false")); - LOGGER.debug(SSL_ENABLED + ": " + sslEnabled); timeout = Integer.parseInt(props.getOrDefault(TIMEOUT_SECONDS, timeoutSecondsDefault.toString())) * MILLI_IN_A_SEC; // multiple in 1000 milli - LOGGER.debug(TIMEOUT_SECONDS + ": " + timeout); retry = Integer.parseInt(props.getOrDefault(RETRY_COUNT, retryCountDefault.toString())); - LOGGER.debug(RETRY_COUNT + ": " + retry); exactlyOnce = Boolean.parseBoolean(props.getOrDefault(EXACTLY_ONCE,"false")); - LOGGER.debug(EXACTLY_ONCE + ": " + exactlyOnce); - insertQuorum = Integer.parseInt(props.getOrDefault(INSERT_QUORUM, insertQuorumDefault.toString())); - LOGGER.debug(INSERT_QUORUM + ": " + insertQuorum); + LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}", + hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce); } public static final ConfigDef CONFIG = createConfigDef(); @@ -190,16 +178,6 @@ private static ConfigDef createConfigDef() { ++orderInGroup, ConfigDef.Width.MEDIUM, "enable exactly once semantics."); - configDef.define(INSERT_QUORUM, - ConfigDef.Type.INT, - insertQuorumDefault, - ConfigDef.Range.between(0, 10), - ConfigDef.Importance.LOW, - "insert quorum", - group, - ++orderInGroup, - ConfigDef.Width.SHORT, - "insert quorum"); return configDef; } @@ -232,5 +210,4 @@ public int getTimeout() { } public int getRetry() { return retry; } public boolean getExactlyOnce() { return exactlyOnce; } - public int getInsertQuorum() { return insertQuorum; } } diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 694db191..222bc89e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -379,7 +379,7 @@ public void doInsertRawBinary(List records) throws IOException, Executio .table(table.getName()) .format(ClickHouseFormat.RowBinary) // this is needed to get meaningful response summary - .set("insert_quorum", csc.getInsertQuorum()) + .set("insert_quorum", "auto") .set("send_progress_in_http_headers", 1); ClickHouseConfig config = request.getConfig(); @@ -451,7 +451,7 @@ public void doInsertJson(List records) throws IOException, ExecutionExce .table(table.getName()) .format(ClickHouseFormat.JSONEachRow) // this is needed to get meaningful response summary - .set("insert_quorum", csc.getInsertQuorum()) + .set("insert_quorum", "auto") .set("input_format_skip_unknown_fields", 1) .set("send_progress_in_http_headers", 1); From dd8f2d98f34fb448d90bcb148b43dbf398abfbd5 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Sun, 16 Jul 2023 06:22:07 -0400 Subject: [PATCH 3/5] Changing back for now Changing this back for now because the version we use doesn't support "auto" --- .../clickhouse/kafka/connect/sink/db/ClickHouseWriter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index 222bc89e..fc2c8ae8 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -379,7 +379,7 @@ public void doInsertRawBinary(List records) throws IOException, Executio .table(table.getName()) .format(ClickHouseFormat.RowBinary) // this is needed to get meaningful response summary - .set("insert_quorum", "auto") + .set("insert_quorum", 2) .set("send_progress_in_http_headers", 1); ClickHouseConfig config = request.getConfig(); @@ -451,7 +451,7 @@ public void doInsertJson(List records) throws IOException, ExecutionExce .table(table.getName()) .format(ClickHouseFormat.JSONEachRow) // this is needed to get meaningful response summary - .set("insert_quorum", "auto") + .set("insert_quorum", 2) .set("input_format_skip_unknown_fields", 1) .set("send_progress_in_http_headers", 1); From c79f8241946f9002187263cb09a04b58d4964245 Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Mon, 17 Jul 2023 01:25:01 -0400 Subject: [PATCH 4/5] Tweaking Debug/Trace payload logging --- .../clickhouse/kafka/connect/sink/db/ClickHouseWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index fc2c8ae8..c3954b5e 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -484,11 +484,11 @@ public void doInsertJson(List records) throws IOException, ExecutionExce java.lang.reflect.Type gsonType = new TypeToken() {}.getType(); String gsonString = gson.toJson(data, gsonType); - LOGGER.debug(String.format("topic [%s] partition [%d] offset [%d] payload '%s'", + LOGGER.debug(String.format("topic [%s] partition [%d] offset [%d]", record.getTopic(), record.getRecordOffsetContainer().getPartition(), - record.getRecordOffsetContainer().getOffset(), - gsonString)); + record.getRecordOffsetContainer().getOffset())); + LOGGER.trace("payload {}", gsonString); BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); } else { LOGGER.warn(String.format("Getting empty record skip the insert topic[%s] offset[%d]", record.getTopic(), record.getSinkRecord().kafkaOffset())); From eda8d5c5e1f3ec2a350f8e47002f7070b08d571e Mon Sep 17 00:00:00 2001 From: Paultagoras Date: Mon, 17 Jul 2023 01:29:10 -0400 Subject: [PATCH 5/5] Update ClickHouseWriter.java --- .../kafka/connect/sink/db/ClickHouseWriter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java index c3954b5e..bb5e3959 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/ClickHouseWriter.java @@ -484,11 +484,11 @@ public void doInsertJson(List records) throws IOException, ExecutionExce java.lang.reflect.Type gsonType = new TypeToken() {}.getType(); String gsonString = gson.toJson(data, gsonType); - LOGGER.debug(String.format("topic [%s] partition [%d] offset [%d]", + LOGGER.trace("topic {} partition {} offset {} payload {}", record.getTopic(), record.getRecordOffsetContainer().getPartition(), - record.getRecordOffsetContainer().getOffset())); - LOGGER.trace("payload {}", gsonString); + record.getRecordOffsetContainer().getOffset(), + gsonString); BinaryStreamUtils.writeBytes(stream, gsonString.getBytes(StandardCharsets.UTF_8)); } else { LOGGER.warn(String.format("Getting empty record skip the insert topic[%s] offset[%d]", record.getTopic(), record.getSinkRecord().kafkaOffset())); @@ -501,7 +501,7 @@ public void doInsertJson(List records) throws IOException, ExecutionExce try (ClickHouseResponse response = future.get()) { summary = response.getSummary(); long rows = summary.getWrittenRows(); - LOGGER.trace(String.format("insert num of rows %d", rows)); + LOGGER.debug("Number of rows inserted: {}", rows); } catch (Exception e) {//This is mostly for auto-closing LOGGER.trace("Exception", e); throw e;