Skip to content

Commit

Permalink
Merge pull request #149 from ClickHouse/logging-updates-121
Browse files Browse the repository at this point in the history
Cleaning up and adding additional logs
  • Loading branch information
Paultagoras authored Jul 17, 2023
2 parents 3f28549 + eda8d5c commit f0f888a
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,13 @@
import org.slf4j.LoggerFactory;

import java.io.UnsupportedEncodingException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

public class ClickHouseSinkConfig {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkConfig.class);

//Configuration Names
public static final String HOSTNAME = "hostname";
public static final String PORT = "port";
public static final String DATABASE = "database";
Expand All @@ -21,6 +24,10 @@ public class ClickHouseSinkConfig {
public static final String RETRY_COUNT = "retryCount";
public static final String EXACTLY_ONCE = "exactlyOnce";





public static final int MILLI_IN_A_SEC = 1000;
private static final String databaseDefault = "default";
public static final int portDefault = 8443;
Expand All @@ -37,30 +44,23 @@ public enum StateStores {
KEEPER_MAP
}

private Map<String, String> settings = null;
private String hostname;
private int port;
private String database;
private String username;
private String password;
private boolean sslEnabled;
private boolean exactlyOnce;

private int timeout;

private int retry;
private final String hostname;
private final int port;
private final String database;
private final String username;
private final String password;
private final boolean sslEnabled;
private final boolean exactlyOnce;
private final int timeout;
private final int retry;

public static class UTF8String implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object o) {
String s = (String) o;
try {
if (s != null ) {
byte[] tmpBytes = s.getBytes("UTF-8");
}
} catch (UnsupportedEncodingException e) {
throw new ConfigException(name, o, "String must be non-empty");
if (s != null ) {
byte[] tmpBytes = s.getBytes(StandardCharsets.UTF_8);
}
}

Expand All @@ -81,8 +81,8 @@ public ClickHouseSinkConfig(Map<String, String> props) {
timeout = Integer.parseInt(props.getOrDefault(TIMEOUT_SECONDS, timeoutSecondsDefault.toString())) * MILLI_IN_A_SEC; // multiple in 1000 milli
retry = Integer.parseInt(props.getOrDefault(RETRY_COUNT, retryCountDefault.toString()));
exactlyOnce = Boolean.parseBoolean(props.getOrDefault(EXACTLY_ONCE,"false"));
LOGGER.info("exactlyOnce: " + exactlyOnce);
LOGGER.info("props: " + props);
LOGGER.debug("ClickHouseSinkConfig: hostname: {}, port: {}, database: {}, username: {}, sslEnabled: {}, timeout: {}, retry: {}, exactlyOnce: {}",
hostname, port, database, username, sslEnabled, timeout, retry, exactlyOnce);
}

public static final ConfigDef CONFIG = createConfigDef();
Expand Down Expand Up @@ -205,7 +205,6 @@ public String getPassword() {
public boolean isSslEnabled() {
return sslEnabled;
}

public int getTimeout() {
return timeout;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public String version() {

@Override
public void start(Map<String, String> props) {
LOGGER.info("start SinkTask: ");
LOGGER.info("Start SinkTask: ");
ClickHouseSinkConfig clickHouseSinkConfig;
try {
clickHouseSinkConfig = new ClickHouseSinkConfig(props);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public void stop() {

public void put(final Collection<SinkRecord> records) {
if (records.isEmpty()) {
LOGGER.trace("No records send to SinkTask");
LOGGER.trace("No records sent to SinkTask");
return;
}
// Group by topic & partition
Expand Down
Loading

0 comments on commit f0f888a

Please sign in to comment.