Skip to content

Commit

Permalink
[branch-2.1](arrow-flight-sql) Fix exceed user property max connectio…
Browse files Browse the repository at this point in the history
…n cause Reach limit of connections (apache#39836)

pick apache#39127
pick apache#39802
  • Loading branch information
xinyiZzz authored Aug 23, 2024
1 parent 424ad23 commit e716658
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ public DorisFlightSqlService(int port) {
DorisFlightSqlProducer producer = new DorisFlightSqlProducer(location, flightSessionsManager);
flightServer = FlightServer.builder(allocator, location, producer)
.headerAuthenticator(new FlightBearerTokenAuthenticator(flightTokenManager)).build();
LOG.info("Arrow Flight SQL service is created, port: {}, token_cache_size: {}"
+ ", qe_max_connection: {}, token_alive_time: {}",
port, Config.arrow_flight_token_cache_size, Config.qe_max_connection,
Config.arrow_flight_token_alive_time);
}

// start Arrow Flight SQL service, return true if success, otherwise false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.doris.service.arrowflight.tokens;

import org.apache.doris.catalog.Env;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;
Expand All @@ -31,9 +32,12 @@
import com.google.common.cache.RemovalNotification;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;

import java.math.BigInteger;
import java.security.SecureRandom;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -46,7 +50,9 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
private final int cacheSize;
private final int cacheExpiration;

private LoadingCache<String, FlightTokenDetails> tokenCache;
private final LoadingCache<String, FlightTokenDetails> tokenCache;
// <username, <token, 1>>
private final ConcurrentHashMap<String, LoadingCache<String, Integer>> usersTokenLRU = new ConcurrentHashMap<>();

public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) {
this.cacheSize = cacheSize;
Expand All @@ -56,17 +62,19 @@ public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) {
.expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
.removalListener(new RemovalListener<String, FlightTokenDetails>() {
@Override
public void onRemoval(RemovalNotification<String, FlightTokenDetails> notification) {
public void onRemoval(@NotNull RemovalNotification<String, FlightTokenDetails> notification) {
// TODO: broadcast this message to other FE
LOG.info("evict bearer token: " + notification.getKey() + ", reason: "
String token = notification.getKey();
FlightTokenDetails tokenDetails = notification.getValue();
LOG.info("evict bearer token: " + token + ", reason: token number exceeded, "
+ notification.getCause());
ConnectContext context = ExecuteEnv.getInstance().getScheduler()
.getContext(notification.getKey());
.getContext(token);
if (context != null) {
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
LOG.info("unregister flight connect context after evict bearer token: "
+ notification.getKey());
LOG.info("unregister flight connect context after evict bearer token: " + token);
}
usersTokenLRU.get(tokenDetails.getUsername()).invalidate(token);
}
}).build(new CacheLoader<String, FlightTokenDetails>() {
@Override
Expand Down Expand Up @@ -96,6 +104,29 @@ public FlightTokenDetails createToken(final String username, final FlightAuthRes
flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp());

tokenCache.put(token, flightTokenDetails);
if (!usersTokenLRU.containsKey(username)) {
// TODO Modify usersTokenLRU size when user property maxConn changes. but LoadingCache currently not
// support modify.
usersTokenLRU.put(username,
CacheBuilder.newBuilder().maximumSize(Env.getCurrentEnv().getAuth().getMaxConn(username) / 2)
.removalListener(new RemovalListener<String, Integer>() {
@Override
public void onRemoval(@NotNull RemovalNotification<String, Integer> notification) {
// TODO: broadcast this message to other FE
assert notification.getKey() != null;
tokenCache.invalidate(notification.getKey());
LOG.info("evict bearer token: " + notification.getKey()
+ ", reason: user connection exceeded, " + notification.getCause());
}
}).build(new CacheLoader<String, Integer>() {
@NotNull
@Override
public Integer load(@NotNull String key) {
return 1;
}
}));
}
usersTokenLRU.get(username).put(token, 1);
LOG.info("Created flight token for user: {}, token: {}", username, token);
return flightTokenDetails;
}
Expand All @@ -114,6 +145,16 @@ public FlightTokenDetails validateToken(final String token) throws IllegalArgume
throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, "
+ "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration);
}
if (usersTokenLRU.containsKey(value.getUsername())) {
try {
usersTokenLRU.get(value.getUsername()).get(token);
} catch (ExecutionException ignored) {
throw new IllegalArgumentException("usersTokenLRU not exist bearer token: " + token);
}
} else {
throw new IllegalArgumentException(
"bearer token not created: " + token + ", username: " + value.getUsername());
}
LOG.info("Validated bearer token for user: {}", value.getUsername());
return value;
}
Expand Down
2 changes: 2 additions & 0 deletions regression-test/pipeline/external/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ catalog_trash_expire_second=1
# priority_networks = 10.10.10.0/24;192.168.0.0/16
priority_networks=172.19.0.0/24

arrow_flight_sql_port = 8081

# Advanced configurations
# log_roll_size_mb = 1024
# sys_log_dir = ${DORIS_HOME}/log
Expand Down
1 change: 1 addition & 0 deletions regression-test/pipeline/p1/conf/be.conf
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ be_port = 9162
webserver_port = 8142
heartbeat_service_port = 9152
brpc_port = 8162
arrow_flight_sql_port = 8181

mem_limit = 90%
disable_minidump = true
Expand Down
2 changes: 2 additions & 0 deletions regression-test/pipeline/p1/conf/fe.conf
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ sys_log_verbose_modules=org.apache.doris.service.FrontendServiceImpl
# Default value is ${DORIS_HOME}/doris-meta
# meta_dir = ${DORIS_HOME}/doris-meta

arrow_flight_sql_port = 8081

disable_decimalv2 = false
disable_datev1 = false
catalog_trash_expire_second=1
Expand Down

0 comments on commit e716658

Please sign in to comment.