Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix sequence continuous batching close session race condition #3198

Merged
merged 24 commits into from
Jun 22, 2024

Conversation

namannandan
Copy link
Collaborator

@namannandan namannandan commented Jun 20, 2024

Description

In the current Sequence Batching event dispatcher implementation, we do the following:

class EventDispatcher implements Runnable {
@Override
public void run() {
while (running.get()) {
try {
String jobGroupId =
eventJobGroupIds.poll(model.getMaxBatchDelay(), TimeUnit.MILLISECONDS);
if (jobGroupId == null || jobGroupId.isEmpty()) {
CompletableFuture.runAsync(
() -> {
try {
pollJobGroup();
} catch (InterruptedException e) {
logger.error("Failed to poll a job group", e);
}
},
pollExecutors);
} else {
CompletableFuture.runAsync(
() -> {
pollJobFromJobGroup(jobGroupId);
},
pollExecutors);
}
} catch (InterruptedException e) {
if (running.get()) {
logger.error("EventDispatcher failed to get jobGroup", e);
}
}
}
}

Every maxBatchDelay interval:

  1. We either queue a task to poll for new job groups that the worker can process jobs from or
  2. queue a task to poll an existing job group queue for new jobs

As a result of this we have the following outcomes:

  1. Duplicate tasks to poll for new job groups which may hold up more than one thread in the poll executor thread pool
  2. Duplicate tasks to poll the same existing job group queue which affects job group clean up on close session requests

Concretely, the issue is triggered in the following scenario:

  1. maxNumSequence number of sessions are actively open
  2. A sequence gets a stream response request
  3. The same sequence subsequently gets a close session request
  4. Although the sequence is closed and should free up capacity to open up a new session, it holds session capacity until the session times out and only then gets cleaned up.

In summary, a stream response request prevents graceful session closure and cleanup.

The likely root cause here is that the session cleanup logic fails to detect session closure after stream response because, the logic to poll jobs from an existing job group would have already gone past the point where we detect closed sessions:

private void pollJobFromJobGroup(String jobGroupId) {
// Poll a job from a jobGroup
JobGroup jobGroup = model.getJobGroup(jobGroupId);
Job job = null;
if (!jobGroup.isFinished()) {
job = jobGroup.pollJob(model.getSequenceMaxIdleMSec());
}

Moreover, with stream response, for each chunk we send back, we will enqueue a poll job group task on the poll executor queue although we would expect to have only one such active task at a given point in time.

Type of change

Please delete options that are not relevant.

  • Bug fix (non-breaking change which fixes an issue)

Feature/Issue validation/testing

  • Updated regression test which opens session, makes stream response request and closes session multiple times to test graceful session closure and cleanup.

Without the fix in this PR, the test fails as follows:

$ python -m pytest test/pytest/test_example_stateful_sequence_continuous_batching_http.py::test_infer_stateful_cancel
.....
.....
  AssertionError: assert '{\n  "code":...eueSize"\n}\n' == '-1'
    - -1
    + {
    +   "code": 503,
    +   "type": "ServiceUnavailableException",
    +   "message": "Model \"stateful\" has no worker to serve inference request. Please use scale workers API to add workers. If this is a sequence inference, please check if it is closed, or expired; or exceeds maxSequenceJobQueueSize"
    + }

With the fix in this PR:

$ python -m pytest test/pytest/test_example_stateful_sequence_continuous_batching_http.py::test_infer_stateful_cancel

=============================================================== test session starts ===============================================================
platform darwin -- Python 3.9.6, pytest-7.3.1, pluggy-1.5.0
rootdir: /Volumes/workplace/pytorch/serve
plugins: cov-4.1.0, timeout-2.3.1, mock-3.14.0
collected 1 item                                                                                                                                  

test/pytest/test_example_stateful_sequence_continuous_batching_http.py .                                                                    [100%]

================================================================ warnings summary =================================================================
venvs/ts_dev/lib/python3.9/site-packages/urllib3/__init__.py:35
  /Volumes/workplace/pytorch/serve/venvs/ts_dev/lib/python3.9/site-packages/urllib3/__init__.py:35: NotOpenSSLWarning: urllib3 v2 only supports OpenSSL 1.1.1+, currently the 'ssl' module is compiled with 'LibreSSL 2.8.3'. See: https://github.com/urllib3/urllib3/issues/3020
    warnings.warn(

-- Docs: https://docs.pytest.org/en/stable/how-to/capture-warnings.html
========================================================== 1 passed, 1 warning in 15.58s ==========================================================
  • CI

@namannandan namannandan marked this pull request as ready for review June 21, 2024 23:09
@namannandan namannandan requested a review from mreso June 21, 2024 23:09
Copy link
Collaborator

@mreso mreso left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

continue;
}
// Avoid duplicate poll tasks in the executor queue
if (pollQueueTasks.containsKey("") && !pollQueueTasks.get("").isDone()) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about naming the key pollJobGroup for doc purposes?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that makes sense, will update the key.

@namannandan namannandan force-pushed the debug-stateful-continuous-batching branch from 34feee4 to 80d657c Compare June 22, 2024 02:23
@namannandan namannandan enabled auto-merge June 22, 2024 02:26
@namannandan namannandan added this pull request to the merge queue Jun 22, 2024
Merged via the queue into master with commit 4c96e6f Jun 22, 2024
12 checks passed

public SequenceBatching(Model model) {
super(model);
this.localCapacity =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a static value, not a dynamic value. The real quota is calculated in original code.
int quota =
Math.min(
this.localCapacity - jobsQueue.size(),
model.getPendingJobGroups().size() / model.getMaxWorkers());

Comment on lines +79 to +81
this.localCapacity.get(),
Math.max(
1, model.getPendingJobGroups().size() / model.getMaxWorkers()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only fix should be:
int quota =
Math.min(
this.localCapacity - jobsQueue.size(),
Math.max(
1, model.getPendingJobGroups().size() / model.getMaxWorkers()));

@@ -120,6 +127,8 @@ private void cleanJobGroup(String jobGroupId) {
logger.debug("Clean jobGroup: {}", jobGroupId);
if (jobGroupId != null) {
model.removeJobGroup(jobGroupId);
pollQueueTasks.remove(jobGroupId);
localCapacity.incrementAndGet();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"localCapacity.incrementAndGet();" is not needed.

@@ -176,6 +185,7 @@ public void shutdownExecutors() {

private void addJobGroup(String jobGroupId) {
if (jobGroupId != null) {
localCapacity.decrementAndGet();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not needed.

Comment on lines +206 to +213
if (localCapacity.get() <= 0) {
continue;
}
// Avoid duplicate poll tasks in the executor queue
if (pollQueueTasks.containsKey("pollJobGroup")
&& !pollQueueTasks.get("pollJobGroup").isDone()) {
continue;
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all of this logic is covered by original func pollJobGroup().

if (isPollJobGroup.getAndSet(true)) {
return;
}

Comment on lines +41 to +43
// HashMap to track poll queue tasks in the executor queue
private ConcurrentHashMap<String, CompletableFuture<Void>> pollQueueTasks =
new ConcurrentHashMap<String, CompletableFuture<Void>>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this var is not necessary. in fact, it breaks the existing logic in eventJobGroupIds.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants