Skip to content

Commit

Permalink
Improved internal state representation of the internal async executio…
Browse files Browse the repository at this point in the history
…n runtime in order to present potential race conditions
  • Loading branch information
ok2c committed Jan 21, 2025
1 parent 7f56b8e commit 9433bca
Showing 1 changed file with 20 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,18 @@

class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {

static class ReUseData {

final Object state;
final TimeValue validDuration;

ReUseData(final Object state, final TimeValue validDuration) {
this.state = state;
this.validDuration = validDuration;
}

}

private final Logger log;
private final AsyncClientConnectionManager manager;
private final ConnectionInitiator connectionInitiator;
Expand All @@ -64,9 +76,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
@Deprecated
private final TlsConfig tlsConfig;
private final AtomicReference<AsyncConnectionEndpoint> endpointRef;
private volatile boolean reusable;
private volatile Object state;
private volatile TimeValue validDuration;
private final AtomicReference<ReUseData> reuseDataRef;

InternalHttpAsyncExecRuntime(
final Logger log,
Expand All @@ -81,7 +91,7 @@ class InternalHttpAsyncExecRuntime implements AsyncExecRuntime {
this.pushHandlerFactory = pushHandlerFactory;
this.tlsConfig = tlsConfig;
this.endpointRef = new AtomicReference<>();
this.validDuration = TimeValue.NEG_ONE_MILLISECOND;
this.reuseDataRef = new AtomicReference<>();
}

@Override
Expand All @@ -97,7 +107,6 @@ public Cancellable acquireEndpoint(
final HttpClientContext context,
final FutureCallback<AsyncExecRuntime> callback) {
if (endpointRef.get() == null) {
state = object;
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
final Timeout connectionRequestTimeout = requestConfig.getConnectionRequestTimeout();
if (log.isDebugEnabled()) {
Expand All @@ -113,7 +122,6 @@ public Cancellable acquireEndpoint(
@Override
public void completed(final AsyncConnectionEndpoint connectionEndpoint) {
endpointRef.set(connectionEndpoint);
reusable = connectionEndpoint.isConnected();
if (log.isDebugEnabled()) {
log.debug("{} acquired endpoint {}", id, ConnPoolSupport.getId(connectionEndpoint));
}
Expand Down Expand Up @@ -153,11 +161,12 @@ private void discardEndpoint(final AsyncConnectionEndpoint endpoint) {
public void releaseEndpoint() {
final AsyncConnectionEndpoint endpoint = endpointRef.getAndSet(null);
if (endpoint != null) {
if (reusable) {
final ReUseData reUseData = reuseDataRef.getAndSet(null);
if (reUseData != null) {
if (log.isDebugEnabled()) {
log.debug("{} releasing valid endpoint", ConnPoolSupport.getId(endpoint));
}
manager.release(endpoint, state, validDuration);
manager.release(endpoint, reUseData.state, reUseData.validDuration);
} else {
discardEndpoint(endpoint);
}
Expand All @@ -174,7 +183,7 @@ public void discardEndpoint() {

@Override
public boolean validateConnection() {
if (reusable) {
if (reuseDataRef != null) {
final AsyncConnectionEndpoint endpoint = endpointRef.get();
return endpoint != null && endpoint.isConnected();
}
Expand Down Expand Up @@ -325,16 +334,12 @@ public void cancelled() {

@Override
public void markConnectionReusable(final Object newState, final TimeValue newValidDuration) {
reusable = true;
state = newState;
validDuration = newValidDuration;
reuseDataRef.set(new ReUseData(newState, newValidDuration));
}

@Override
public void markConnectionNonReusable() {
reusable = false;
state = null;
validDuration = null;
reuseDataRef.set(null);
}

@Override
Expand Down

0 comments on commit 9433bca

Please sign in to comment.