Skip to content

Commit

Permalink
Apply version range change and enhance DNS resolver
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Oct 10, 2021
1 parent 19f0c9d commit 1eb0753
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 66 deletions.
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
package com.clickhouse.client;

import java.net.InetSocketAddress;

import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;
import org.xbill.DNS.Lookup;
import org.xbill.DNS.Record;
import org.xbill.DNS.SRVRecord;
import org.xbill.DNS.TextParseException;
import org.xbill.DNS.Type;
import com.clickhouse.client.naming.SrvResolver;

/**
* Default DNS resolver. It tries to look up service record (SRV record) when
* {@link com.clickhouse.client.config.ClickHouseDefaults#DNS_RESOLVE} is set to
* {@link com.clickhouse.client.config.ClickHouseDefaults#SRV_RESOLVE} is set to
* {@code true}.
*/
public class ClickHouseDnsResolver {
Expand All @@ -21,57 +18,25 @@ public class ClickHouseDnsResolver {
private static final ClickHouseDnsResolver instance = ClickHouseUtils.getService(ClickHouseDnsResolver.class,
new ClickHouseDnsResolver());

public static ClickHouseDnsResolver getInstance() {
return instance;
}

protected ClickHouseDnsResolver() {
}
protected static ClickHouseDnsResolver newInstance() {
ClickHouseDnsResolver resolver = null;

public InetSocketAddress resolve(ClickHouseProtocol protocol, String host, int port) {
if (protocol == null || host == null) {
throw new IllegalArgumentException("Non-null protocol and host are required");
}

if ((boolean) ClickHouseDefaults.DNS_RESOLVE.getEffectiveDefaultValue()) {
SRVRecord r = resolve(host, false);
if (r != null) {
host = r.getName().canonicalize().toString(true);
port = r.getPort();
if ((boolean) ClickHouseDefaults.SRV_RESOLVE.getEffectiveDefaultValue()) {
try {
resolver = new SrvResolver();
} catch (Throwable e) {
log.warn("Failed to enable SRV resolver due to:", e);
}
} else {
host = ClickHouseDefaults.HOST.getEffectiveValue(host);
port = port <= 0 ? protocol.getDefaultPort() : port;
}

return new InetSocketAddress(host, port);
return resolver == null ? new ClickHouseDnsResolver() : resolver;
}

// TODO register a callback for DNS change?

protected SRVRecord resolve(String srvDns, boolean basedOnWeight) {
Record[] records = null;
try {
records = new Lookup(srvDns, Type.SRV).run();
} catch (TextParseException e) {
// fallback to a cached entry?
log.warn("Not able to resolve given DNS query: [%s]", srvDns, e);
}

SRVRecord record = null;
if (records != null) {
if (basedOnWeight) {
for (int i = 0; i < records.length; i++) {
SRVRecord rec = (SRVRecord) records[i];
if (record == null || record.getWeight() > rec.getWeight()) {
record = rec;
}
}
} else {
record = (SRVRecord) records[0];
}
}
public static ClickHouseDnsResolver getInstance() {
return instance;
}

return record;
public InetSocketAddress resolve(ClickHouseProtocol protocol, String host, int port) {
return new InetSocketAddress(host, port);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ public enum ClickHouseDefaults implements ClickHouseConfigOption {
* Whether to create session automatically when there are multiple queries.
*/
AUTO_SESSION("auto_session", true, "Whether to create session automatically when there are multiple queries."),
/**
* Whether to resolve DNS name using
* {@link com.clickhouse.client.ClickHouseDnsResolver}(e.g. resolve SRV record
* to extract both host and port from a given name).
*/
DNS_RESOLVE("dns_resolve", false, "Whether to resolve DNS name."),
/**
* Default cluster.
*/
Expand Down Expand Up @@ -74,7 +68,13 @@ public enum ClickHouseDefaults implements ClickHouseConfigOption {
/**
* Max requests.
*/
MAX_REQUESTS("max_requests", 0, "Maximum size of shared thread pool, 0 means no limit.");
MAX_REQUESTS("max_requests", 0, "Maximum size of shared thread pool, 0 means no limit."),
/**
* Whether to resolve DNS SRV name using
* {@link com.clickhouse.client.naming.SrvResolver}(e.g. resolve SRV record to
* extract both host and port from a given name).
*/
SRV_RESOLVE("srv_resolve", false, "Whether to resolve DNS SRV name.");

private final String key;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.clickhouse.client.naming;

import java.net.InetSocketAddress;

import com.clickhouse.client.ClickHouseDnsResolver;
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.logging.Logger;
import com.clickhouse.client.logging.LoggerFactory;

import org.xbill.DNS.Lookup;
import org.xbill.DNS.Record;
import org.xbill.DNS.SRVRecord;
import org.xbill.DNS.TextParseException;
import org.xbill.DNS.Type;

public class SrvResolver extends ClickHouseDnsResolver {
private static final Logger log = LoggerFactory.getLogger(SrvResolver.class);

protected SRVRecord lookup(String srvDns, boolean basedOnWeight) {
Record[] records = null;
try {
records = new Lookup(srvDns, Type.SRV).run();
} catch (TextParseException e) {
// fallback to a cached entry?
log.warn("Not able to resolve given DNS query: [%s]", srvDns, e);
}

SRVRecord record = null;
if (records != null) {
if (basedOnWeight) {
for (int i = 0; i < records.length; i++) {
SRVRecord rec = (SRVRecord) records[i];
if (record == null || record.getWeight() > rec.getWeight()) {
record = rec;
}
}
} else {
record = (SRVRecord) records[0];
}
}

return record;
}

@Override
public InetSocketAddress resolve(ClickHouseProtocol protocol, String host, int port) {
if (protocol == null || host == null) {
throw new IllegalArgumentException("Non-null protocol and host are required");
}

SRVRecord r = lookup(host, false);
if (r != null) {
host = r.getName().canonicalize().toString(true);
port = r.getPort();
}
return new InetSocketAddress(host, port);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.clickhouse.client.naming;

import java.net.InetSocketAddress;

import com.clickhouse.client.ClickHouseProtocol;

import org.testng.Assert;
import org.testng.annotations.Test;

public class SrvResolverTest {
@Test(groups = { "integration" })
public void testLookup() {
String dns = "_sip._udp.sip.voice.google.com";
Assert.assertNotNull(new SrvResolver().lookup(dns, true));
Assert.assertNotNull(new SrvResolver().lookup(dns, false));
}

@Test(groups = { "integration" })
public void testResolv() throws Exception {
String host = "_sip._udp.sip.voice.google.com";
int port = 5060;

String dns = "_sip._udp.sip.voice.google.com";
Assert.assertEquals(new SrvResolver().resolve(ClickHouseProtocol.ANY, host, 0),
new InetSocketAddress(host, port));
}
}
5 changes: 0 additions & 5 deletions clickhouse-grpc-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,6 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>dnsjava</groupId>
<artifactId>dnsjava</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,8 +449,8 @@ public void testReadWriteSimpleTypes(String dataType, String zero, String negati
Assert.assertEquals(records.get(3)[1], positiveOne);

if ((ClickHouseDataType.DateTime.name().equals(dataType)
|| ClickHouseDataType.DateTime32.name().equals(dataType)) && version.getYear() == 21
&& version.getMajor() == 3) {
|| ClickHouseDataType.DateTime32.name().equals(dataType)) && version.getMajorVersion() == 21
&& version.getMinorVersion() == 3) {
// skip DateTime and DateTime32 negative test on 21.3 since it's not doing well
// see https://github.com/ClickHouse/ClickHouse/issues/29835 for more
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void testExternalData() throws SQLException, UnsupportedEncodingException
// reproduce issue #634
@Test(groups = "integration")
public void testLargeQueryWithExternalData() throws Exception {
String[] rows = ClickHouseVersion.check(connection.getServerVersion(), "[21.3)")
String[] rows = ClickHouseVersion.check(connection.getServerVersion(), "[21.3,)")
? new String[] { "1\tGroup\n" }
: new String[] { "1\tGroup", "1\tGroup\n" };

Expand Down

0 comments on commit 1eb0753

Please sign in to comment.