diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index 8639f62af455fc..a0e7311af0bdd6 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2341,10 +2341,16 @@ public class Config extends ConfigBase { }) public static int autobucket_min_buckets = 1; - @ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为2000", + @ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为512, " + + "并强制限制小于 qe_max_connection/2, 避免`Reach limit of connections`, " + + "因为arrow flight sql是无状态的协议,连接通常不会主动断开," + + "bearer token 从 cache 淘汰的同时会 unregister Connection.", "The cache limit of all user tokens in Arrow Flight Server. which will be eliminated by" - + "LRU rules after exceeding the limit, the default value is 2000."}) - public static int arrow_flight_token_cache_size = 2000; + + "LRU rules after exceeding the limit, the default value is 512, the mandatory limit is " + + "less than qe_max_connection/2 to avoid `Reach limit of connections`, " + + "because arrow flight sql is a stateless protocol, the connection is usually not actively " + + "disconnected, bearer token is evict from the cache will unregister ConnectContext."}) + public static int arrow_flight_token_cache_size = 512; @ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天", "The alive time of the user token in Arrow Flight Server, expire after write, unit minutes," diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java index 20ea5836483ce4..2c7aaae4f2a475 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlProducer.java @@ -238,16 +238,16 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con connectContext.getResultFlightServerAddr().port); List endpoints = Collections.singletonList(new FlightEndpoint(ticket, location)); // TODO Set in BE callback after query end, Client will not callback. - connectContext.setCommand(MysqlCommand.COM_SLEEP); return new FlightInfo(schema, descriptor, endpoints, -1, -1); } } catch (Exception e) { - connectContext.setCommand(MysqlCommand.COM_SLEEP); String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); LOG.warn(errMsg, e); throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException(); + } finally { + connectContext.setCommand(MysqlCommand.COM_SLEEP); } } @@ -306,6 +306,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r executorService.submit(() -> { ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity()); try { + connectContext.setCommand(MysqlCommand.COM_QUERY); final String query = request.getQuery(); String preparedStatementId = UUID.randomUUID().toString(); final ByteString handle = ByteString.copyFromUtf8(context.peerIdentity() + ":" + preparedStatementId); @@ -323,7 +324,6 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData)) .toByteArray())); } catch (Exception e) { - connectContext.setCommand(MysqlCommand.COM_SLEEP); String errMsg = "create prepared statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode() + ", error msg: " + connectContext.getState().getErrorMessage(); @@ -333,6 +333,8 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r } catch (final Throwable t) { listener.onError(CallStatus.INTERNAL.withDescription("Unknown error: " + t).toRuntimeException()); return; + } finally { + connectContext.setCommand(MysqlCommand.COM_SLEEP); } listener.onCompleted(); }); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java index fe5a60f0cc24ca..a8ad9a05c93704 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/DorisFlightSqlService.java @@ -47,7 +47,11 @@ public class DorisFlightSqlService { public DorisFlightSqlService(int port) { BufferAllocator allocator = new RootAllocator(); Location location = Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port); - this.flightTokenManager = new FlightTokenManagerImpl(Config.arrow_flight_token_cache_size, + // arrow_flight_token_cache_size less than qe_max_connection to avoid `Reach limit of connections`. + // arrow flight sql is a stateless protocol, connection is usually not actively disconnected. + // bearer token is evict from the cache will unregister ConnectContext. + this.flightTokenManager = new FlightTokenManagerImpl( + Math.min(Config.arrow_flight_token_cache_size, Config.qe_max_connection / 2), Config.arrow_flight_token_alive_time); this.flightSessionsManager = new FlightSessionsWithTokenManager(flightTokenManager); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java index ef6e28b034dd5e..9f4479c6bcd9d0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/auth2/FlightBearerTokenAuthenticator.java @@ -86,7 +86,7 @@ AuthResult validateBearer(String token) { return createAuthResultWithBearerToken(token); } catch (IllegalArgumentException e) { LOG.error("Bearer token validation failed.", e); - throw CallStatus.UNAUTHENTICATED.toRuntimeException(); + throw CallStatus.UNAUTHENTICATED.withCause(e).withDescription(e.getMessage()).toRuntimeException(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java index 26a48f0cfd2ead..b7e5ffa46466a6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/sessions/FlightSessionsWithTokenManager.java @@ -47,42 +47,36 @@ public ConnectContext getConnectContext(String peerIdentity) { ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity); if (null == connectContext) { connectContext = createConnectContext(peerIdentity); - if (null == connectContext) { - flightTokenManager.invalidateToken(peerIdentity); - String err = "UserSession expire after access, need reauthorize."; - LOG.error(err); - throw CallStatus.UNAUTHENTICATED.withDescription(err).toRuntimeException(); - } return connectContext; } return connectContext; } catch (Exception e) { - LOG.warn("getConnectContext failed, " + e.getMessage(), e); + LOG.warn("get ConnectContext failed, " + e.getMessage(), e); throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException(); } } @Override public ConnectContext createConnectContext(String peerIdentity) { - try { - final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity); - if (flightTokenDetails.getCreatedSession()) { - return null; - } - flightTokenDetails.setCreatedSession(true); - ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity, - flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp()); - connectContext.setConnectionId(nextConnectionId.getAndAdd(1)); - connectContext.resetLoginTime(); - if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) { - connectContext.getState() - .setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections"); - throw CallStatus.UNAUTHENTICATED.withDescription("Reach limit of connections").toRuntimeException(); - } - return connectContext; - } catch (IllegalArgumentException e) { - LOG.error("Bearer token validation failed.", e); - throw CallStatus.UNAUTHENTICATED.toRuntimeException(); + final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity); + if (flightTokenDetails.getCreatedSession()) { + flightTokenManager.invalidateToken(peerIdentity); + throw new IllegalArgumentException("UserSession expire after access, try reconnect, bearer token: " + + peerIdentity + ", a peerIdentity(bearer token) can only create a ConnectContext once. " + + "if ConnectContext is deleted without operation for a long time, it needs to be reconnected " + + "(at the same time obtain a new bearer token)."); + } + flightTokenDetails.setCreatedSession(true); + ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity, + flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp()); + connectContext.setConnectionId(nextConnectionId.getAndAdd(1)); + connectContext.resetLoginTime(); + if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) { + String err = "Reach limit of connections, increase `qe_max_connection` in fe.conf, or decrease " + + "`arrow_flight_token_cache_size` to evict unused bearer tokens and it connections faster"; + connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, err); + throw new IllegalArgumentException(err); } + return connectContext; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java index 54e53e931dd359..cd1b492de068cd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/arrowflight/tokens/FlightTokenManagerImpl.java @@ -19,12 +19,16 @@ package org.apache.doris.service.arrowflight.tokens; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.service.ExecuteEnv; import org.apache.doris.service.arrowflight.auth2.FlightAuthResult; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; +import com.google.common.cache.RemovalListener; +import com.google.common.cache.RemovalNotification; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -39,17 +43,32 @@ public class FlightTokenManagerImpl implements FlightTokenManager { private static final Logger LOG = LogManager.getLogger(FlightTokenManagerImpl.class); private final SecureRandom generator = new SecureRandom(); + private final int cacheSize; private final int cacheExpiration; private LoadingCache tokenCache; public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) { + this.cacheSize = cacheSize; this.cacheExpiration = cacheExpiration; - this.tokenCache = CacheBuilder.newBuilder() - .maximumSize(cacheSize) + this.tokenCache = CacheBuilder.newBuilder().maximumSize(cacheSize) .expireAfterWrite(cacheExpiration, TimeUnit.MINUTES) - .build(new CacheLoader() { + .removalListener(new RemovalListener() { + @Override + public void onRemoval(RemovalNotification notification) { + // TODO: broadcast this message to other FE + LOG.info("evict bearer token: " + notification.getKey() + ", reason: " + + notification.getCause()); + ConnectContext context = ExecuteEnv.getInstance().getScheduler() + .getContext(notification.getKey()); + if (context != null) { + ExecuteEnv.getInstance().getScheduler().unregisterConnection(context); + LOG.info("unregister flight connect context after evict bearer token: " + + notification.getKey()); + } + } + }).build(new CacheLoader() { @Override public FlightTokenDetails load(String key) { return new FlightTokenDetails(); @@ -77,26 +96,32 @@ public FlightTokenDetails createToken(final String username, final FlightAuthRes flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp()); tokenCache.put(token, flightTokenDetails); - LOG.trace("Created flight token for user: {}", username); + LOG.info("Created flight token for user: {}, token: {}", username, token); return flightTokenDetails; } @Override public FlightTokenDetails validateToken(final String token) throws IllegalArgumentException { final FlightTokenDetails value = getTokenDetails(token); + if (value.getToken().equals("")) { + throw new IllegalArgumentException("invalid bearer token: " + token + + ", try reconnect, bearer token may not be created, or may have been evict, search for this " + + "token in fe.log to see the evict reason. currently in fe.conf, `arrow_flight_token_cache_size`=" + + this.cacheSize + ", `arrow_flight_token_alive_time`=" + this.cacheExpiration); + } if (System.currentTimeMillis() >= value.getExpiresAt()) { - tokenCache.invalidate(token); // removes from the store as well - throw new IllegalArgumentException("token expired"); + tokenCache.invalidate(token); + throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, " + + "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration); } - - LOG.trace("Validated flight token for user: {}", value.getUsername()); + LOG.info("Validated bearer token for user: {}", value.getUsername()); return value; } @Override public void invalidateToken(final String token) { - LOG.trace("Invalidate flight token, {}", token); - tokenCache.invalidate(token); // removes from the store as well + LOG.info("Invalidate bearer token, {}", token); + tokenCache.invalidate(token); } private FlightTokenDetails getTokenDetails(final String token) { @@ -105,7 +130,7 @@ private FlightTokenDetails getTokenDetails(final String token) { try { value = tokenCache.getUnchecked(token); } catch (CacheLoader.InvalidCacheLoadException ignored) { - throw new IllegalArgumentException("invalid token"); + throw new IllegalArgumentException("InvalidCacheLoadException, invalid bearer token: " + token); } return value;