Skip to content

Commit

Permalink
Merge pull request #2131 from ClickHouse/v2_connection_metrics
Browse files Browse the repository at this point in the history
Connection metrics
  • Loading branch information
Paultagoras authored Feb 13, 2025
2 parents 208d547 + 20314c6 commit 7b7749f
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,39 @@
import com.clickhouse.client.api.data_formats.internal.SerializerUtils;
import com.clickhouse.client.api.enums.ProxyType;
import com.clickhouse.client.api.http.ClickHouseHttpProto;
import io.micrometer.core.annotation.Timed;
import org.apache.hc.client5.http.AuthenticationStrategy;
import org.apache.hc.client5.http.ConnectTimeoutException;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.classic.ExecChainHandler;
import org.apache.hc.client5.http.classic.methods.HttpPost;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.config.RequestConfig;
import org.apache.hc.client5.http.impl.ChainElement;
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
import org.apache.hc.client5.http.impl.DefaultClientConnectionReuseStrategy;
import org.apache.hc.client5.http.impl.DefaultSchemePortResolver;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.ConnectExec;
import org.apache.hc.client5.http.impl.classic.HttpClientBuilder;
import org.apache.hc.client5.http.impl.io.BasicHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.ManagedHttpClientConnectionFactory;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.hc.client5.http.io.ManagedHttpClientConnection;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.client5.http.socket.ConnectionSocketFactory;
import org.apache.hc.client5.http.socket.LayeredConnectionSocketFactory;
import org.apache.hc.client5.http.socket.PlainConnectionSocketFactory;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.ConnectionRequestTimeoutException;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.HttpRequest;
Expand All @@ -45,7 +57,10 @@
import org.apache.hc.core5.http.impl.io.DefaultHttpResponseParserFactory;
import org.apache.hc.core5.http.io.SocketConfig;
import org.apache.hc.core5.http.io.entity.EntityTemplate;
import org.apache.hc.core5.http.protocol.DefaultHttpProcessor;
import org.apache.hc.core5.http.protocol.HttpContext;
import org.apache.hc.core5.http.protocol.RequestTargetHost;
import org.apache.hc.core5.http.protocol.RequestUserAgent;
import org.apache.hc.core5.io.CloseMode;
import org.apache.hc.core5.io.IOCallback;
import org.apache.hc.core5.net.URIBuilder;
Expand Down Expand Up @@ -81,7 +96,9 @@
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;

public class HttpAPIClientHelper {
Expand Down Expand Up @@ -219,7 +236,7 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke


int networkBufferSize = MapUtils.getInt(chConfiguration, "client_network_buffer_size");
ManagedHttpClientConnectionFactory connectionFactory = new ManagedHttpClientConnectionFactory(
MeteredManagedHttpClientConnectionFactory connectionFactory = new MeteredManagedHttpClientConnectionFactory(
Http1Config.custom()
.setBufferSize(networkBufferSize)
.build(),
Expand All @@ -238,6 +255,9 @@ private HttpClientConnectionManager poolConnectionManager(LayeredConnectionSocke
Class<?> micrometerLoader = getClass().getClassLoader().loadClass("com.clickhouse.client.api.metrics.MicrometerLoader");
Method applyMethod = micrometerLoader.getDeclaredMethod("applyPoolingMetricsBinder", Object.class, String.class, PoolingHttpClientConnectionManager.class);
applyMethod.invoke(micrometerLoader, metricsRegistry, mGroupName, phccm);

applyMethod = micrometerLoader.getDeclaredMethod("applyConnectionMetricsBinder", Object.class, String.class, MeteredManagedHttpClientConnectionFactory.class);
applyMethod.invoke(micrometerLoader, metricsRegistry, mGroupName, connectionFactory);
} catch (Exception e) {
LOG.error("Failed to register metrics", e);
}
Expand Down Expand Up @@ -758,7 +778,6 @@ public void close() {
httpClient.close(CloseMode.IMMEDIATE);
}


/**
* This factory is used only when no ssl connections are required (no https endpoints).
* Internally http client would create factory and spend time if no supplied.
Expand All @@ -779,4 +798,34 @@ public Socket connectSocket(TimeValue connectTimeout, Socket socket, HttpHost ho
return null;
}
}

public class MeteredManagedHttpClientConnectionFactory extends ManagedHttpClientConnectionFactory {
public MeteredManagedHttpClientConnectionFactory(Http1Config http1Config, CharCodingConfig charCodingConfig, DefaultHttpResponseParserFactory defaultHttpResponseParserFactory) {
super(http1Config, charCodingConfig, defaultHttpResponseParserFactory);
}

ConcurrentLinkedQueue<Long> times = new ConcurrentLinkedQueue<>();


@Override
public ManagedHttpClientConnection createConnection(Socket socket) throws IOException {
long startT = System.currentTimeMillis();
try {
return super.createConnection(socket);
} finally {
long endT = System.currentTimeMillis();
times.add(endT - startT);
}
}

public long getTime() {
int count = times.size();
long runningAverage = 0;
for (int i = 0; i < count; i++) {
runningAverage += times.poll();
}

return count > 0 ? runningAverage / count : 0;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.clickhouse.client.api.metrics;

import com.clickhouse.client.api.ClientMisconfigurationException;
import com.clickhouse.client.api.internal.HttpAPIClientHelper;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.PoolingHttpClientConnectionManagerMetricsBinder;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
Expand All @@ -14,4 +17,15 @@ public static void applyPoolingMetricsBinder(Object registry, String metricsGrou
throw new ClientMisconfigurationException("Unsupported registry type." + registry.getClass());
}
}

public static void applyConnectionMetricsBinder(Object registry, String metricsGroupName, HttpAPIClientHelper.MeteredManagedHttpClientConnectionFactory factory) {
if (registry instanceof MeterRegistry) {
Gauge.builder("httpcomponents.httpclient.connect.time", factory, HttpAPIClientHelper.MeteredManagedHttpClientConnectionFactory::getTime)
.description("The running average connection creation time.")
.tag("httpclient", metricsGroupName)
.register((MeterRegistry) registry);
} else {
throw new ClientMisconfigurationException("Unsupported registry type." + registry.getClass());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public void testRegisterMetrics() throws Exception {
Gauge totalMax = meterRegistry.get("httpcomponents.httpclient.pool.total.max").gauge();
Gauge available = meterRegistry.get("httpcomponents.httpclient.pool.total.connections").tags("state", "available").gauge();
Gauge leased = meterRegistry.get("httpcomponents.httpclient.pool.total.connections").tags("state", "leased").gauge();
Gauge times = meterRegistry.get("httpcomponents.httpclient.connect.time").gauge();

Assert.assertEquals((int) totalMax.value(), Integer.parseInt(ClientConfigProperties.HTTP_MAX_OPEN_CONNECTIONS.getDefaultValue()));
Assert.assertEquals((int) available.value(), 1);
Expand Down Expand Up @@ -95,7 +96,30 @@ public void testRegisterMetrics() throws Exception {
Assert.assertEquals((int) leased.value(), 0);

}
// currently there are only 5 metrics that are monitored by micrometer (out of the box)
assertEquals(meterRegistry.getMeters().size(), 5);
// currently there are only 7 metrics that are monitored by micrometer (out of the box)
assertEquals(meterRegistry.getMeters().size(), 6);
}

//Disabled because we can't assume the time is greater than 0
@Test(groups = { "integration" }, enabled = false)
public void testConnectionTime() throws Exception {
ClickHouseNode node = getServer(ClickHouseProtocol.HTTP);
boolean isSecure = isCloud();

try (Client client = new Client.Builder()
.addEndpoint(Protocol.HTTP, "192.168.1.1", node.getPort(), isSecure)
.setUsername("default")
.setPassword(ClickHouseServerForTest.getPassword())
.setDefaultDatabase(ClickHouseServerForTest.getDatabase())
.setConnectTimeout(5, ChronoUnit.SECONDS)
.registerClientMetrics(meterRegistry, "pool-test")
.build()) {

client.ping();
Gauge times = meterRegistry.get("httpcomponents.httpclient.connect.time").gauge();

Assert.assertTrue(times.value() > 0);
assertEquals(times.value(), 0);//Second time should be 0
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2059,4 +2059,11 @@ public void testGettingRowsBeforeLimit() throws Exception {
Assert.assertEquals(response.getTotalRowsToRead(), expectedTotalRowsToRead);
}
}

@Test(groups = {"integration"})
public void testEmptyResponse() throws Exception {
try (QueryResponse response = client.query("SELECT number FROM system.numbers LIMIT 0", new QuerySettings().setFormat(ClickHouseFormat.RowBinary)).get()) {
System.out.println(response.getResultRows());
}
}
}

0 comments on commit 7b7749f

Please sign in to comment.