Skip to content

Commit

Permalink
feat: health check timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun committed Oct 9, 2024
1 parent 47349ff commit d5223e2
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 15 deletions.
5 changes: 5 additions & 0 deletions ingester-example/src/main/java/io/greptime/TestConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,11 @@ public static GreptimeDB connectToDefaultDB() {
// periodically. By default, the route tables will not be refreshed.
.routeTableRefreshPeriodSeconds(-1)
// Optional, the default value is fine.
// Timeout for health check, if the health check is not completed within the specified time,
// the health check will fail.
// The default is 1000
.checkHealthTimeoutMs(1000)
// Optional, the default value is fine.
// Sets the request router, The internal default implementation works well.
// You don't need to set it unless you have special requirements.
.router(null)
Expand Down
28 changes: 15 additions & 13 deletions ingester-protocol/src/main/java/io/greptime/RouterClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public boolean init(RouterOptions opts) {
this.refresher.scheduleWithFixedDelay(
() -> {
long thisSequence = this.refreshSequencer.incrementAndGet();
checkHealth().whenComplete((r, t) -> {
this.checkHealth().whenComplete((r, t) -> {
if (t != null) {
LOG.warn("Failed to check health", t);
return;
Expand Down Expand Up @@ -239,18 +239,20 @@ public String toString() {

@Override
public CompletableFuture<Map<Endpoint, Boolean>> checkHealth() {
Map<Endpoint, CompletableFuture<Boolean>> futures = this.opts.getEndpoints().stream()
.collect(Collectors.toMap(Function.identity(), endpoint -> {
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
return this.invoke(endpoint, req, Context.newDefault())
.thenApply(resp -> true)
.exceptionally(t -> false); // Handle failure and return false
}));

return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
.thenApply(
ok -> futures.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue()
.join())));
Map<Endpoint, CompletableFuture<Boolean>> futures =
this.opts.getEndpoints().stream().collect(Collectors.toMap(Function.identity(), this::doCheckHealth));

CompletableFuture<Void> all = CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]));

return all.thenApply(ok -> futures.entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().join())));
}

private CompletableFuture<Boolean> doCheckHealth(Endpoint endpoint) {
HealthCheckRequest req = HealthCheckRequest.newBuilder().build();
return this.invoke(endpoint, req, Context.newDefault(), this.opts.getCheckHealthTimeoutMs())
.thenApply(resp -> true)
.exceptionally(t -> false); // Handle failure and return false
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class GreptimeOptions implements Copiable<GreptimeOptions> {
public static final int DEFAULT_MAX_IN_FLIGHT_WRITE_POINTS = 10 * 65536;
public static final int DEFAULT_DEFAULT_STREAM_MAX_WRITE_POINTS_PER_SECOND = 10 * 65536;
public static final long DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS = 10 * 60;
public static final long DEFAULT_CHECK_HEALTH_TIMEOUT_MS = 1000;

private List<Endpoint> endpoints;
private RpcOptions rpcOptions;
Expand Down Expand Up @@ -158,6 +159,8 @@ public static final class Builder {
// Refresh frequency of route tables. The background refreshes all route tables periodically.
// If the value is less than or equal to 0, the route tables will not be refreshed.
private long routeTableRefreshPeriodSeconds = DEFAULT_ROUTE_TABLE_REFRESH_PERIOD_SECONDS;
// Timeout for health check
private long checkHealthTimeoutMs = DEFAULT_CHECK_HEALTH_TIMEOUT_MS;
// Authentication information
private AuthInfo authInfo;
// The request router
Expand Down Expand Up @@ -273,6 +276,19 @@ public Builder routeTableRefreshPeriodSeconds(long routeTableRefreshPeriodSecond
return this;
}

/**
* Timeout for health check. The default is 1000ms.
* If the health check is not completed within the specified time, the health
* check will fail.
*
* @param checkHealthTimeoutMs timeout for health check
* @return this builder
*/
public Builder checkHealthTimeoutMs(long checkHealthTimeoutMs) {
this.checkHealthTimeoutMs = checkHealthTimeoutMs;
return this;
}

/**
* Sets authentication information. If the DB is not required to authenticate,
* we can ignore this.
Expand Down Expand Up @@ -321,6 +337,7 @@ private RouterOptions routerOptions() {
routerOpts.setEndpoints(this.endpoints);
routerOpts.setRouter(this.router);
routerOpts.setRefreshPeriodSeconds(this.routeTableRefreshPeriodSeconds);
routerOpts.setCheckHealthTimeoutMs(this.checkHealthTimeoutMs);
return routerOpts;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ public class RouterOptions implements Copiable<RouterOptions> {
// all route tables periodically. If the value is less than or
// equal to 0, the route tables will not be refreshed.
private long refreshPeriodSeconds = -1;
// Timeout for health check
private long checkHealthTimeoutMs;
private Router<Void, Endpoint> router;

public RpcClient getRpcClient() {
Expand All @@ -60,6 +62,14 @@ public void setRefreshPeriodSeconds(long refreshPeriodSeconds) {
this.refreshPeriodSeconds = refreshPeriodSeconds;
}

public long getCheckHealthTimeoutMs() {
return checkHealthTimeoutMs;
}

public void setCheckHealthTimeoutMs(long checkHealthTimeoutMs) {
this.checkHealthTimeoutMs = checkHealthTimeoutMs;
}

public Router<Void, Endpoint> getRouter() {
return router;
}
Expand All @@ -74,15 +84,18 @@ public RouterOptions copy() {
opts.rpcClient = rpcClient;
opts.endpoints = this.endpoints;
opts.refreshPeriodSeconds = this.refreshPeriodSeconds;
opts.checkHealthTimeoutMs = this.checkHealthTimeoutMs;
opts.router = this.router;
return opts;
}

@Override
public String toString() {
return "RouterOptions{" + "endpoints="
return "RouterOptions{" + "rpcClient="
+ rpcClient + ", endpoints="
+ endpoints + ", refreshPeriodSeconds="
+ refreshPeriodSeconds + ", router="
+ refreshPeriodSeconds + ", checkHealthTimeoutMs="
+ checkHealthTimeoutMs + ", router="
+ router + '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public void testAllOptions() {
LimitedPolicy limitedPolicy = new LimitedPolicy.DiscardPolicy();
int defaultStreamMaxWritePointsPerSecond = 100000;
long routeTableRefreshPeriodSeconds = 99;
long checkHealthTimeoutMs = 1000;
AuthInfo authInfo = new AuthInfo("user", "password");
Router<Void, Endpoint> router = createTestRouter();
TlsOptions tlsOptions = new TlsOptions();
Expand All @@ -57,6 +58,7 @@ public void testAllOptions() {
.writeLimitedPolicy(limitedPolicy)
.defaultStreamMaxWritePointsPerSecond(defaultStreamMaxWritePointsPerSecond)
.routeTableRefreshPeriodSeconds(routeTableRefreshPeriodSeconds)
.checkHealthTimeoutMs(checkHealthTimeoutMs)
.authInfo(authInfo)
.router(router)
.build();
Expand All @@ -74,6 +76,7 @@ public void testAllOptions() {
routerOptions.getEndpoints().stream().map(Endpoint::toString).toArray());
Assert.assertEquals(router, routerOptions.getRouter());
Assert.assertEquals(routeTableRefreshPeriodSeconds, routerOptions.getRefreshPeriodSeconds());
Assert.assertEquals(checkHealthTimeoutMs, routerOptions.getCheckHealthTimeoutMs());

WriteOptions writeOptions = opts.getWriteOptions();
Assert.assertNotNull(writeOptions);
Expand Down

0 comments on commit d5223e2

Please sign in to comment.