Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions fe/fe-common/src/main/java/org/apache/doris/common/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -2341,10 +2341,16 @@ public class Config extends ConfigBase {
})
public static int autobucket_min_buckets = 1;

@ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为2000",
@ConfField(description = {"Arrow Flight Server中所有用户token的缓存上限,超过后LRU淘汰,默认值为512, "
+ "并强制限制小于 qe_max_connection/2, 避免`Reach limit of connections`, "
+ "因为arrow flight sql是无状态的协议,连接通常不会主动断开,"
+ "bearer token 从 cache 淘汰的同时会 unregister Connection.",
"The cache limit of all user tokens in Arrow Flight Server. which will be eliminated by"
+ "LRU rules after exceeding the limit, the default value is 2000."})
public static int arrow_flight_token_cache_size = 2000;
+ "LRU rules after exceeding the limit, the default value is 512, the mandatory limit is "
+ "less than qe_max_connection/2 to avoid `Reach limit of connections`, "
+ "because arrow flight sql is a stateless protocol, the connection is usually not actively "
+ "disconnected, bearer token is evict from the cache will unregister ConnectContext."})
public static int arrow_flight_token_cache_size = 512;

@ConfField(description = {"Arrow Flight Server中用户token的存活时间,自上次写入后过期时间,单位分钟,默认值为4320,即3天",
"The alive time of the user token in Arrow Flight Server, expire after write, unit minutes,"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,16 +238,16 @@ private FlightInfo executeQueryStatement(String peerIdentity, ConnectContext con
connectContext.getResultFlightServerAddr().port);
List<FlightEndpoint> endpoints = Collections.singletonList(new FlightEndpoint(ticket, location));
// TODO Set in BE callback after query end, Client will not callback.
connectContext.setCommand(MysqlCommand.COM_SLEEP);
return new FlightInfo(schema, descriptor, endpoints, -1, -1);
}
} catch (Exception e) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "get flight info statement failed, " + e.getMessage() + ", " + Util.getRootCauseMessage(e)
+ ", error code: " + connectContext.getState().getErrorCode() + ", error msg: "
+ connectContext.getState().getErrorMessage();
LOG.warn(errMsg, e);
throw CallStatus.INTERNAL.withDescription(errMsg).withCause(e).toRuntimeException();
} finally {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
}
}

Expand Down Expand Up @@ -306,6 +306,7 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
executorService.submit(() -> {
ConnectContext connectContext = flightSessionsManager.getConnectContext(context.peerIdentity());
try {
connectContext.setCommand(MysqlCommand.COM_QUERY);
final String query = request.getQuery();
String preparedStatementId = UUID.randomUUID().toString();
final ByteString handle = ByteString.copyFromUtf8(context.peerIdentity() + ":" + preparedStatementId);
Expand All @@ -323,7 +324,6 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
Any.pack(buildCreatePreparedStatementResult(handle, parameterSchema, metaData))
.toByteArray()));
} catch (Exception e) {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
String errMsg = "create prepared statement failed, " + e.getMessage() + ", "
+ Util.getRootCauseMessage(e) + ", error code: " + connectContext.getState().getErrorCode()
+ ", error msg: " + connectContext.getState().getErrorMessage();
Expand All @@ -333,6 +333,8 @@ public void createPreparedStatement(final ActionCreatePreparedStatementRequest r
} catch (final Throwable t) {
listener.onError(CallStatus.INTERNAL.withDescription("Unknown error: " + t).toRuntimeException());
return;
} finally {
connectContext.setCommand(MysqlCommand.COM_SLEEP);
}
listener.onCompleted();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ public class DorisFlightSqlService {
public DorisFlightSqlService(int port) {
BufferAllocator allocator = new RootAllocator();
Location location = Location.forGrpcInsecure(FrontendOptions.getLocalHostAddress(), port);
this.flightTokenManager = new FlightTokenManagerImpl(Config.arrow_flight_token_cache_size,
// arrow_flight_token_cache_size less than qe_max_connection to avoid `Reach limit of connections`.
// arrow flight sql is a stateless protocol, connection is usually not actively disconnected.
// bearer token is evict from the cache will unregister ConnectContext.
this.flightTokenManager = new FlightTokenManagerImpl(
Math.min(Config.arrow_flight_token_cache_size, Config.qe_max_connection / 2),
Config.arrow_flight_token_alive_time);
this.flightSessionsManager = new FlightSessionsWithTokenManager(flightTokenManager);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ AuthResult validateBearer(String token) {
return createAuthResultWithBearerToken(token);
} catch (IllegalArgumentException e) {
LOG.error("Bearer token validation failed.", e);
throw CallStatus.UNAUTHENTICATED.toRuntimeException();
throw CallStatus.UNAUTHENTICATED.withCause(e).withDescription(e.getMessage()).toRuntimeException();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,42 +47,36 @@ public ConnectContext getConnectContext(String peerIdentity) {
ConnectContext connectContext = ExecuteEnv.getInstance().getScheduler().getContext(peerIdentity);
if (null == connectContext) {
connectContext = createConnectContext(peerIdentity);
if (null == connectContext) {
flightTokenManager.invalidateToken(peerIdentity);
String err = "UserSession expire after access, need reauthorize.";
LOG.error(err);
throw CallStatus.UNAUTHENTICATED.withDescription(err).toRuntimeException();
}
return connectContext;
}
return connectContext;
} catch (Exception e) {
LOG.warn("getConnectContext failed, " + e.getMessage(), e);
LOG.warn("get ConnectContext failed, " + e.getMessage(), e);
throw CallStatus.INTERNAL.withDescription(Util.getRootCauseMessage(e)).withCause(e).toRuntimeException();
}
}

@Override
public ConnectContext createConnectContext(String peerIdentity) {
try {
final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity);
if (flightTokenDetails.getCreatedSession()) {
return null;
}
flightTokenDetails.setCreatedSession(true);
ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity,
flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp());
connectContext.setConnectionId(nextConnectionId.getAndAdd(1));
connectContext.resetLoginTime();
if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
connectContext.getState()
.setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, "Reach limit of connections");
throw CallStatus.UNAUTHENTICATED.withDescription("Reach limit of connections").toRuntimeException();
}
return connectContext;
} catch (IllegalArgumentException e) {
LOG.error("Bearer token validation failed.", e);
throw CallStatus.UNAUTHENTICATED.toRuntimeException();
final FlightTokenDetails flightTokenDetails = flightTokenManager.validateToken(peerIdentity);
if (flightTokenDetails.getCreatedSession()) {
flightTokenManager.invalidateToken(peerIdentity);
throw new IllegalArgumentException("UserSession expire after access, try reconnect, bearer token: "
+ peerIdentity + ", a peerIdentity(bearer token) can only create a ConnectContext once. "
+ "if ConnectContext is deleted without operation for a long time, it needs to be reconnected "
+ "(at the same time obtain a new bearer token).");
}
flightTokenDetails.setCreatedSession(true);
ConnectContext connectContext = FlightSessionsManager.buildConnectContext(peerIdentity,
flightTokenDetails.getUserIdentity(), flightTokenDetails.getRemoteIp());
connectContext.setConnectionId(nextConnectionId.getAndAdd(1));
connectContext.resetLoginTime();
if (!ExecuteEnv.getInstance().getScheduler().registerConnection(connectContext)) {
String err = "Reach limit of connections, increase `qe_max_connection` in fe.conf, or decrease "
+ "`arrow_flight_token_cache_size` to evict unused bearer tokens and it connections faster";
connectContext.getState().setError(ErrorCode.ERR_TOO_MANY_USER_CONNECTIONS, err);
throw new IllegalArgumentException(err);
}
return connectContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@

package org.apache.doris.service.arrowflight.tokens;

import org.apache.doris.qe.ConnectContext;
import org.apache.doris.service.ExecuteEnv;
import org.apache.doris.service.arrowflight.auth2.FlightAuthResult;

import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

Expand All @@ -39,17 +43,32 @@ public class FlightTokenManagerImpl implements FlightTokenManager {
private static final Logger LOG = LogManager.getLogger(FlightTokenManagerImpl.class);

private final SecureRandom generator = new SecureRandom();
private final int cacheSize;
private final int cacheExpiration;

private LoadingCache<String, FlightTokenDetails> tokenCache;

public FlightTokenManagerImpl(final int cacheSize, final int cacheExpiration) {
this.cacheSize = cacheSize;
this.cacheExpiration = cacheExpiration;

this.tokenCache = CacheBuilder.newBuilder()
.maximumSize(cacheSize)
this.tokenCache = CacheBuilder.newBuilder().maximumSize(cacheSize)
.expireAfterWrite(cacheExpiration, TimeUnit.MINUTES)
.build(new CacheLoader<String, FlightTokenDetails>() {
.removalListener(new RemovalListener<String, FlightTokenDetails>() {
@Override
public void onRemoval(RemovalNotification<String, FlightTokenDetails> notification) {
// TODO: broadcast this message to other FE
LOG.info("evict bearer token: " + notification.getKey() + ", reason: "
+ notification.getCause());
ConnectContext context = ExecuteEnv.getInstance().getScheduler()
.getContext(notification.getKey());
if (context != null) {
ExecuteEnv.getInstance().getScheduler().unregisterConnection(context);
LOG.info("unregister flight connect context after evict bearer token: "
+ notification.getKey());
}
}
}).build(new CacheLoader<String, FlightTokenDetails>() {
@Override
public FlightTokenDetails load(String key) {
return new FlightTokenDetails();
Expand Down Expand Up @@ -77,26 +96,32 @@ public FlightTokenDetails createToken(final String username, final FlightAuthRes
flightAuthResult.getUserIdentity(), flightAuthResult.getRemoteIp());

tokenCache.put(token, flightTokenDetails);
LOG.trace("Created flight token for user: {}", username);
LOG.info("Created flight token for user: {}, token: {}", username, token);
return flightTokenDetails;
}

@Override
public FlightTokenDetails validateToken(final String token) throws IllegalArgumentException {
final FlightTokenDetails value = getTokenDetails(token);
if (value.getToken().equals("")) {
throw new IllegalArgumentException("invalid bearer token: " + token
+ ", try reconnect, bearer token may not be created, or may have been evict, search for this "
+ "token in fe.log to see the evict reason. currently in fe.conf, `arrow_flight_token_cache_size`="
+ this.cacheSize + ", `arrow_flight_token_alive_time`=" + this.cacheExpiration);
}
if (System.currentTimeMillis() >= value.getExpiresAt()) {
tokenCache.invalidate(token); // removes from the store as well
throw new IllegalArgumentException("token expired");
tokenCache.invalidate(token);
throw new IllegalArgumentException("bearer token expired: " + token + ", try reconnect, "
+ "currently in fe.conf, `arrow_flight_token_alive_time`=" + this.cacheExpiration);
}

LOG.trace("Validated flight token for user: {}", value.getUsername());
LOG.info("Validated bearer token for user: {}", value.getUsername());
return value;
}

@Override
public void invalidateToken(final String token) {
LOG.trace("Invalidate flight token, {}", token);
tokenCache.invalidate(token); // removes from the store as well
LOG.info("Invalidate bearer token, {}", token);
tokenCache.invalidate(token);
}

private FlightTokenDetails getTokenDetails(final String token) {
Expand All @@ -105,7 +130,7 @@ private FlightTokenDetails getTokenDetails(final String token) {
try {
value = tokenCache.getUnchecked(token);
} catch (CacheLoader.InvalidCacheLoadException ignored) {
throw new IllegalArgumentException("invalid token");
throw new IllegalArgumentException("InvalidCacheLoadException, invalid bearer token: " + token);
}

return value;
Expand Down