Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce lock contention in HttpRemoteTask #13934

Merged
merged 5 commits into from
Sep 2, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,12 @@ public final class HttpRemoteTask
private final DynamicFiltersFetcher dynamicFiltersFetcher;

private final DynamicFiltersCollector outboundDynamicFiltersCollector;
@GuardedBy("this")
// The version of dynamic filters that has been successfully sent to the worker
private long sentDynamicFiltersVersion = INITIAL_DYNAMIC_FILTERS_VERSION;
private final AtomicLong sentDynamicFiltersVersion = new AtomicLong(INITIAL_DYNAMIC_FILTERS_VERSION);

@GuardedBy("this")
@GuardedBy("httpClient")
private Future<?> currentRequest;
@GuardedBy("this")
@GuardedBy("httpClient")
private long currentRequestStartNanos;
sopel39 marked this conversation as resolved.
Show resolved Hide resolved

@GuardedBy("this")
Expand All @@ -146,7 +145,6 @@ public final class HttpRemoteTask
// The keys of this map represent all plan nodes that have "no more splits".
// The boolean value of each entry represents whether the "no more splits" notification is pending delivery to workers.
private final Map<PlanNodeId, Boolean> noMoreSplits = new HashMap<>();
@GuardedBy("this")
private final AtomicReference<OutputBuffers> outputBuffers = new AtomicReference<>();
private final FutureStateChange<Void> whenSplitQueueHasSpace = new FutureStateChange<>();
@GuardedBy("this")
Expand Down Expand Up @@ -415,14 +413,21 @@ public synchronized void noMoreSplits(PlanNodeId sourceId)
}

@Override
public synchronized void setOutputBuffers(OutputBuffers newOutputBuffers)
public void setOutputBuffers(OutputBuffers newOutputBuffers)
{
if (getTaskStatus().getState().isDone()) {
return;
}

if (newOutputBuffers.getVersion() > outputBuffers.get().getVersion()) {
outputBuffers.set(newOutputBuffers);
long previousVersion = outputBuffers.getAndUpdate(previousOutputBuffers -> {
if (newOutputBuffers.getVersion() > previousOutputBuffers.getVersion()) {
return newOutputBuffers;
}

return previousOutputBuffers;
}).getVersion();

if (newOutputBuffers.getVersion() > previousVersion) {
triggerUpdate();
}
}
Expand Down Expand Up @@ -579,73 +584,75 @@ private void triggerUpdate()
}
}

private synchronized void sendUpdate()
private void sendUpdate()
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
{
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
if (!started.get() || taskStatus.getState().isDone()) {
return;
}

int currentPendingRequestsCounter = pendingRequestsCounter.get();
if (currentPendingRequestsCounter == 0) {
return;
}
synchronized (httpClient) {
TaskStatus taskStatus = getTaskStatus();
// don't update if the task hasn't been started yet or if it is already finished
if (!started.get() || taskStatus.getState().isDone()) {
return;
}

// if there is a request already running, wait for it to complete
// currentRequest is always cleared when request is complete
if (currentRequest != null) {
return;
}
int currentPendingRequestsCounter = pendingRequestsCounter.get();
if (currentPendingRequestsCounter == 0) {
return;
}

// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<Void> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}
// if there is a request already running, wait for it to complete
// currentRequest is always cleared when request is complete
if (currentRequest != null) {
return;
}

List<SplitAssignment> splitAssignments = getSplitAssignments();
VersionedDynamicFilterDomains dynamicFilterDomains = outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(sentDynamicFiltersVersion);
// if throttled due to error, asynchronously wait for timeout and try again
ListenableFuture<Void> errorRateLimit = updateErrorTracker.acquireRequestPermit();
if (!errorRateLimit.isDone()) {
errorRateLimit.addListener(this::sendUpdate, executor);
return;
}

// Workers don't need the embedded JSON representation when the fragment is sent
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
splitAssignments,
outputBuffers.get(),
dynamicFilterDomains.getDynamicFilterDomains());
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
}
List<SplitAssignment> splitAssignments = getSplitAssignments();
VersionedDynamicFilterDomains dynamicFilterDomains = outboundDynamicFiltersCollector.acknowledgeAndGetNewDomains(sentDynamicFiltersVersion.get());

// Workers don't need the embedded JSON representation when the fragment is sent
Optional<PlanFragment> fragment = sendPlan.get() ? Optional.of(planFragment.withoutEmbeddedJsonRepresentation()) : Optional.empty();
TaskUpdateRequest updateRequest = new TaskUpdateRequest(
session.toSessionRepresentation(),
session.getIdentity().getExtraCredentials(),
fragment,
splitAssignments,
outputBuffers.get(),
dynamicFilterDomains.getDynamicFilterDomains());
byte[] taskUpdateRequestJson = taskUpdateRequestCodec.toJsonBytes(updateRequest);
if (fragment.isPresent()) {
stats.updateWithPlanBytes(taskUpdateRequestJson.length);
}
if (!dynamicFilterDomains.getDynamicFilterDomains().isEmpty()) {
stats.updateWithDynamicFilterBytes(taskUpdateRequestJson.length);
}

HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = preparePost()
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
.build();
HttpUriBuilder uriBuilder = getHttpUriBuilder(taskStatus);
Request request = preparePost()
.setUri(uriBuilder.build())
.setHeader(HttpHeaders.CONTENT_TYPE, MediaType.JSON_UTF_8.toString())
.setBodyGenerator(createStaticBodyGenerator(taskUpdateRequestJson))
.build();

updateErrorTracker.startRequest();
updateErrorTracker.startRequest();

ListenableFuture<JsonResponse<TaskInfo>> future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequest = future;
currentRequestStartNanos = System.nanoTime();
ListenableFuture<JsonResponse<TaskInfo>> future = httpClient.executeAsync(request, createFullJsonResponseHandler(taskInfoCodec));
currentRequest = future;
currentRequestStartNanos = System.nanoTime();

// if pendingRequestsCounter is still non-zero (e.g. because triggerUpdate was called in the meantime)
// then the request Future callback will send a new update via sendUpdate method call
pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter);
// if pendingRequestsCounter is still non-zero (e.g. because triggerUpdate was called in the meantime)
// then the request Future callback will send a new update via sendUpdate method call
pendingRequestsCounter.addAndGet(-currentPendingRequestsCounter);

Futures.addCallback(
future,
new SimpleHttpResponseHandler<>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion()), request.getUri(), stats),
executor);
Futures.addCallback(
future,
new SimpleHttpResponseHandler<>(new UpdateResponseHandler(splitAssignments, dynamicFilterDomains.getVersion()), request.getUri(), stats),
executor);
}
}

private synchronized List<SplitAssignment> getSplitAssignments()
Expand Down Expand Up @@ -685,26 +692,30 @@ public synchronized void cancel()
}
}

private synchronized void cleanUpTask()
private void cleanUpTask()
{
checkState(getTaskStatus().getState().isDone(), "attempt to clean up a task that is not done yet");

// clear pending splits to free memory
pendingSplits.clear();
pendingSourceSplitCount = 0;
pendingSourceSplitsWeight = 0;
partitionedSplitCountTracker.setPartitionedSplits(PartitionedSplitsInfo.forZeroSplits());
splitQueueHasSpace = true;
whenSplitQueueHasSpace.complete(null, executor);
synchronized (this) {
pendingSplits.clear();
pendingSourceSplitCount = 0;
pendingSourceSplitsWeight = 0;
partitionedSplitCountTracker.setPartitionedSplits(PartitionedSplitsInfo.forZeroSplits());
splitQueueHasSpace = true;
whenSplitQueueHasSpace.complete(null, executor);
}

// clear pending outbound dynamic filters to free memory
outboundDynamicFiltersCollector.acknowledge(Long.MAX_VALUE);

// cancel pending request
if (currentRequest != null) {
currentRequest.cancel(true);
currentRequest = null;
currentRequestStartNanos = 0;
synchronized (httpClient) {
if (currentRequest != null) {
currentRequest.cancel(true);
currentRequest = null;
currentRequestStartNanos = 0;
}
}

taskStatusFetcher.stop();
Expand Down Expand Up @@ -916,15 +927,15 @@ public void success(TaskInfo value)
{
try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", taskId)) {
try {
sentDynamicFiltersVersion.set(currentRequestDynamicFiltersVersion);
// Remove dynamic filters which were successfully sent to free up memory
outboundDynamicFiltersCollector.acknowledge(currentRequestDynamicFiltersVersion);
sendPlan.set(value.isNeedsPlan());
long currentRequestStartNanos;
synchronized (HttpRemoteTask.this) {
synchronized (httpClient) {
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
currentRequest = null;
sendPlan.set(value.isNeedsPlan());
currentRequestStartNanos = HttpRemoteTask.this.currentRequestStartNanos;
sentDynamicFiltersVersion = currentRequestDynamicFiltersVersion;
}
// Remove dynamic filters which were successfully sent to free up memory
outboundDynamicFiltersCollector.acknowledge(currentRequestDynamicFiltersVersion);
updateStats(currentRequestStartNanos);
processTaskUpdate(value, splitAssignments);
updateErrorTracker.requestSucceeded();
Expand All @@ -941,7 +952,7 @@ public void failed(Throwable cause)
try (SetThreadName ignored = new SetThreadName("UpdateResponseHandler-%s", taskId)) {
try {
long currentRequestStartNanos;
synchronized (HttpRemoteTask.this) {
synchronized (httpClient) {
currentRequest = null;
currentRequestStartNanos = HttpRemoteTask.this.currentRequestStartNanos;
}
Expand Down