Skip to content

ParallelIterable is deadlocking and is generally really complicated #11768

@sopel39

Description

@sopel39

Apache Iceberg version

1.7.1 (latest release)

Query engine

Trino

Please describe the bug 🐞

ParallelIterable implementation is really complicated and has subtle concurrency bugs.

Context #1

It was observed that with high concurrency/high workload scenario cluster concurrency is reduced to 0 or 1 due to S3 Timeout waiting for connection from pool errors. Once that starts to happening, it will continue to go on effectively making cluster unusable.

Context #2

ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use. The number of tasks is not bounded by worker pool size, but rather X = num(ParallelIterable instances) * size(ParallelIterator#taskFutures). One can see that X can be significant with high number of concurrent queries.

Issue #1

ParallelIterable is not batch based. This means it will produce read-ahead results even if downstream consumer doesn't have slots for them. This can lead to subtle concurrency issues. For instance consider two parallel iterables P1, P2. Let's assume single threaded reader consumes 500 elements from P1, then P2 then P1 and so on (this could be splits for instance). If P1 becomes full then it will no longer fetch more elements while holding of tasks (which in turn hold S3 connections). This will prevent fetching of tasks from P2 from completion (because there are no "free" S3 slots).

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

P1: starts TaskP1
P1: produces result, queue full, TaskP1 put on hold (holds S3 connection)
P2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
P1: result consumed, TaskP1 is scheduled again
P1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection
DEADLOCK

Issue #2

Active waiting. This one is a known one. However, if one looks at ParallelIterable.ParallelIterator#checkTasks there is:

        if (taskFutures[i] == null || taskFutures[i].isDone()) {
           continuation.ifPresent(yieldedTasks::addLast);
...
          taskFutures[i] = submitNextTask();
      }

which means active waiting is actually happening though workerPool (e.g. task is started on worker pool just to check that queue is full and it should be put on hold).

Short term fix?

Once ParallelIterable.Task is started it should continue until entire task is consumed. This will prevent putting limited resourcs on hold. if (queue.size() >= approximateMaxQueueSize) { check should only happen once per task before iterator is created.

Long term fix?

Perhaps the code can be refactored to be more readable and streamlined?

cc @findepi @raunaqmorarka

Willingness to contribute

  • I can contribute a fix for this bug independently
  • I would be willing to contribute a fix for this bug with guidance from the Iceberg community
  • I cannot contribute a fix for this bug at this time

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions