Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue1304 - Add more debug and info messages #1309

Merged
merged 6 commits into from
Jul 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions client/src/main/java/com/networknt/client/Http2Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,11 @@ public IoFuture<ClientConnection> borrowConnection(final URI uri, final XnioWork
final FutureResult<ClientConnection> result = new FutureResult<>();
ClientConnection connection = http2ClientConnectionPool.getConnection(uri);
if(connection != null && connection.isOpen()) {
logger.info("Got an open connection from http2ClientConnectionPool");
result.setResult(connection);
return result.getIoFuture();
}
logger.info("Got a null or non open connection: {} from http2ClientConnectionPool. Creating a new one ...", connection);
if("https".equals(uri.getScheme()) && ssl == null) ssl = getDefaultXnioSsl();
return connect((InetSocketAddress) null, uri, worker, ssl, bufferPool, options);
}
Expand All @@ -245,6 +247,7 @@ public IoFuture<ClientConnection> connect(InetSocketAddress bindAddress, final U
provider.connect(new ClientCallback<ClientConnection>() {
@Override
public void completed(ClientConnection r) {
logger.info("Adding the new connection: {} to FutureResult and cache it for uri: {}", r, uri);
result.setResult(r);
http2ClientConnectionPool.cacheConnection(uri, r);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,13 @@ public synchronized ClientConnection getConnection(URI uri) {
List<CachedConnection> result = connectionPool.get(uriString);
if (result == null) {
synchronized (Http2ClientConnectionPool.class) {
logger.info("Second try of getting connections for uri: {}", uriString);
result = connectionPool.get(uriString);
}
}
logger.info("Got {} connections for uri: {}", result != null ? result.size() : null, uriString);
CachedConnection cachedConnection = selectConnection(uri, result, false);
logger.info("Got cached connection: {} for uri: {}", cachedConnection, uriString);
if (cachedConnection != null) {
hangConnection(uri, cachedConnection);
return cachedConnection.get();
Expand All @@ -80,6 +83,7 @@ public synchronized ClientConnection getConnection(URI uri) {
public synchronized void cacheConnection(URI uri, ClientConnection connection) {
CachedConnection cachedConnection = getAndRemoveClosedConnection(uri);
if (cachedConnection == null || getConnectionStatus(uri, cachedConnection) != ConnectionStatus.MULTIPLEX_SUPPORT) {
logger.info("Cached connection: {} is either null or not support multiplex", cachedConnection);
CachedConnection newConnection = new CachedConnection(connection);
connectionPool.computeIfAbsent(uri.toString(), k -> new LinkedList<>()).add(newConnection);
connectionCount.getAndIncrement();
Expand All @@ -97,6 +101,7 @@ private synchronized CachedConnection getAndRemoveClosedConnection(URI uri) {
result = connectionPool.get(uriString);
}
}
logger.info("Got {} cache connections for uri: {} from connectionPool", result != null ? result.size() : null, uriString);
return selectConnection(uri, result, true);
}

Expand Down Expand Up @@ -135,11 +140,13 @@ private void handleParkedConnection(URI uri, CachedConnection connection) {

private synchronized CachedConnection selectConnection(URI uri, List<CachedConnection> connections, boolean isRemoveClosedConnection) {
if (connections != null) {
logger.info("Before removing max connections per host from list of {} connections for uri: {} ...", connections.size(), uri);
if (connections.size() > ClientConfig.get().getMaxConnectionNumPerHost() * 0.75) {
while (connections.size() > ClientConfig.get().getMinConnectionNumPerHost() && connections.size() > 0) {
connections.remove(0);
}
}
logger.info("After removing max connections per host from list of {} connections for uri: {} ...", connections.size(), uri);
if (isRemoveClosedConnection) {
Iterator<CachedConnection> iterator = connections.iterator();
while (iterator.hasNext()) {
Expand All @@ -154,8 +161,10 @@ private synchronized CachedConnection selectConnection(URI uri, List<CachedConne
}
}
}
logger.info("After removing closed connections, {} connections left in the list for uri: {}", connections.size(), uri);
}
if (connections.size() > 0) {
logger.info("Selecting a valid connections from cached {} connections for uri: {} ...", connections.size(), uri);
// Balance the selection of each connection
int randomInt = ThreadLocalRandom.current().nextInt(0, connections.size());
for (int i = 0; i < connections.size(); i++) {
Expand All @@ -165,7 +174,9 @@ private synchronized CachedConnection selectConnection(URI uri, List<CachedConne
if (status == ConnectionStatus.AVAILABLE || status == ConnectionStatus.MULTIPLEX_SUPPORT) {
return connection;
}
logger.warn("Connection status for uri: {} is {}", uri, status);
}
logger.warn("None of the connection cached can be used for uri: {}", uri);
}
}
return null;
Expand Down Expand Up @@ -209,6 +220,7 @@ private void hangConnection(URI uri, CachedConnection connection) {
// Increase the request count for the connection for every invocation
connection.incrementRequestCount();
ConnectionStatus status = getConnectionStatus(uri, connection);
logger.info("Got connection status: {} for uri: {} before hang it", status, uri);
// Hanging new or old Http/1.1 connection
if ((status == null && !connection.isHttp2Connection()) || status == ConnectionStatus.AVAILABLE) {
connectionStatusMap.put(connection.get(), ConnectionStatus.HANGING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private ConcurrentHashMap<String, List<URL>> lookupServiceUpdate(String protocol
lastConsulIndexId = lookupServices.get(serviceName) == null ? 0L : lookupServices.get(serviceName);
}

if(logger.isTraceEnabled()) logger.trace("serviceName = " + serviceName + " lastConsulIndexId = " + lastConsulIndexId);
logger.debug("serviceName = {} lastConsulIndexId = {}", serviceName, lastConsulIndexId);
ConsulResponse<List<ConsulService>> response = lookupConsulService(serviceName, lastConsulIndexId);
if(logger.isTraceEnabled()) {
try {
Expand All @@ -212,6 +212,7 @@ private ConcurrentHashMap<String, List<URL>> lookupServiceUpdate(String protocol
if(logger.isDebugEnabled()) try {logger.debug("services = " + Config.getInstance().getMapper().writeValueAsString(services));} catch (Exception e) {}
if (services != null && !services.isEmpty()
&& response.getConsulIndex() > lastConsulIndexId) {
logger.info("Got {} updated urls for service: {}", services.size(), serviceName);
for (ConsulService service : services) {
try {
URL url = ConsulUtils.buildUrl(protocol, service);
Expand All @@ -227,6 +228,7 @@ private ConcurrentHashMap<String, List<URL>> lookupServiceUpdate(String protocol
}
}
lookupServices.put(serviceName, response.getConsulIndex());
logger.info("Service {} consul index {} put into lookupServices", serviceName, response.getConsulIndex());
return serviceUrls;
} else if (response.getConsulIndex() < lastConsulIndexId) {
logger.info(serviceName + " lastIndex:" + lastConsulIndexId + "; response consul Index:" + response.getConsulIndex());
Expand Down Expand Up @@ -301,9 +303,13 @@ public void run() {
logger.info("start service lookup thread. lookup interval: " + lookupInterval + "ms, service: " + serviceName);
while (true) {
try {
logger.info("Start to sleep {} ms for the next lookupServiceUpdate", lookupInterval);
sleep(lookupInterval);
logger.info("Woke up from the sleep for the next lookupServiceUpdate");
ConcurrentHashMap<String, List<URL>> serviceUrls = lookupServiceUpdate(protocol, serviceName);
logger.info("Got {} serviceUrls from lookupServiceUpdate({}, {})", serviceUrls.size(), protocol, serviceName);
updateServiceCache(serviceName, serviceUrls, true);
logger.info("Service cache updated with the serviceUrls from lookupServiceUpdate");
} catch (Throwable e) {
logger.error("service lookup thread fail!", e);
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,13 @@
import org.xnio.IoUtils;
import org.xnio.OptionMap;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

/**
Expand Down Expand Up @@ -182,13 +179,17 @@ public ConsulResponse<List<ConsulService>> lookupHealthService(String serviceNam
}
logger.trace("path = {}", path);
try {
logger.debug("Getting connection from pool with {}", uri);
connection = client.borrowConnection(uri, Http2Client.WORKER, Http2Client.SSL, Http2Client.BUFFER_POOL, optionMap).get();
logger.info("Got connection: {} from pool and send request to {}", connection, path);
AtomicReference<ClientResponse> reference = send(connection, Methods.GET, path, token, null);
int statusCode = reference.get().getResponseCode();
logger.info("Got status code: {} from the consul query", statusCode);
if(statusCode >= UNUSUAL_STATUS_CODE){
throw new Exception("Failed to unregister on Consul: " + statusCode);
} else {
String body = reference.get().getAttachment(Http2Client.RESPONSE_BODY);
logger.debug("Got response body: {} from the consul query", body);
List<Map<String, Object>> services = Config.getInstance().getMapper().readValue(body, new TypeReference<List<Map<String, Object>>>(){});
List<ConsulService> ConsulServcies = new ArrayList<>(
services.size());
Expand Down Expand Up @@ -238,21 +239,23 @@ AtomicReference<ClientResponse> send(ClientConnection connection, HttpString met
ClientRequest request = new ClientRequest().setMethod(method).setPath(path);
request.getRequestHeaders().put(Headers.HOST, "localhost");
if (token != null) request.getRequestHeaders().put(HttpStringConstants.CONSUL_TOKEN, token);
if(logger.isTraceEnabled()) logger.trace("The request sent to consul: {} = request header: {}, request body is empty", uri.toString(), request.toString());
if (logger.isTraceEnabled()) logger.trace("The request sent to consul: {} = request header: {}, request body is empty", uri.toString(), request.toString());
if(StringUtils.isBlank(json)) {
connection.sendRequest(request, client.createClientCallback(reference, latch));
} else {
request.getRequestHeaders().put(Headers.TRANSFER_ENCODING, "chunked");
connection.sendRequest(request, client.createClientCallback(reference, latch, json));
}
latch.await(ConsulUtils.getWaitInSecond(wait), TimeUnit.SECONDS);
if(reference != null) {
if(logger.isTraceEnabled()) logger.trace("The response got from consul: {} = {}", uri.toString(), reference.get().toString());
int waitInSecond = ConsulUtils.getWaitInSecond(wait);
boolean isNotTimeout = latch.await(waitInSecond, TimeUnit.SECONDS);
if (isNotTimeout) {
logger.debug("The response from Consul: {} = {}", uri, reference != null ? reference.get() : null);
} else {
// timeout happens, do not know if the Consul server is still alive. Close the connection to force reconnect. The next time this connection
// is borrowed from the pool, a new connection will be created as the one returned is not open.
if(connection != null && connection.isOpen()) IoUtils.safeClose(connection);
if(logger.isTraceEnabled()) logger.trace("The request is timeout after {} seconds and reference is null.", ConsulUtils.getWaitInSecond(wait));
throw new RuntimeException(
String.format("The request to Consul: %s timed out after %d seconds", uri, waitInSecond));
}
return reference;
}
Expand Down