You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
While the [Queuer](./queueing) provides synchronous queueing with timing controls, the `AsyncQueuer` is designed specifically for handling concurrent asynchronous operations. It implements what is traditionally known as a "task pool" or "worker pool" pattern, allowing multiple operations to be processed simultaneously while maintaining control over concurrency and timing. The implementation is mostly copied from [Swimmer](https://github.com/tannerlinsley/swimmer), Tanner's original task pooling utility that has been serving the JavaScript community since 2017.
7
7
8
-
## Key Features
9
-
10
-
### 1. Concurrent Processing (Task Pooling)
11
-
The AsyncQueuer can process multiple operations simultaneously through its task pooling mechanism. This capability is crucial for:
12
-
- Making efficient use of system resources
13
-
- Handling I/O-bound operations that spend most of their time waiting
14
-
- Maximizing throughput while maintaining control
15
-
- Processing independent operations in parallel
16
-
17
-
### 2. Promise Integration
18
-
AsyncQueuer is designed to work seamlessly with Promises and async/await:
19
-
- Each queued operation returns a Promise that resolves with the operation's result
20
-
- Operations can be awaited individually or as a group
21
-
- The queue itself can be awaited to determine when all operations complete
22
-
- Error handling follows standard Promise patterns
23
-
24
-
### 3. Dynamic Concurrency Control
25
-
The concurrency limit can be adjusted at runtime using the `throttle` method. This allows the system to:
26
-
- Respond to system load changes
27
-
- Implement adaptive rate limiting
28
-
- Handle varying resource availability
29
-
- Implement sophisticated backpressure mechanisms
30
-
31
-
## Basic Usage
8
+
## Async Queueing Concept
9
+
10
+
Async queueing extends the basic queueing concept by adding concurrent processing capabilities. Instead of processing one item at a time, an async queuer can process multiple items simultaneously while still maintaining order and control over the execution. This is particularly useful when dealing with I/O operations, network requests, or any tasks that spend most of their time waiting rather than consuming CPU.
Async queueing is particularly effective when you need to:
32
+
- Process multiple asynchronous operations concurrently
33
+
- Control the number of simultaneous operations
34
+
- Handle Promise-based tasks with proper error handling
35
+
- Maintain order while maximizing throughput
36
+
- Process background tasks that can run in parallel
37
+
38
+
Common use cases include:
39
+
- Making concurrent API requests with rate limiting
40
+
- Processing multiple file uploads simultaneously
41
+
- Running parallel database operations
42
+
- Handling multiple websocket connections
43
+
- Processing data streams with backpressure
44
+
- Managing resource-intensive background tasks
45
+
46
+
### When Not to Use Async Queueing
47
+
48
+
The AsyncQueuer is very versatile and can be used in many situations. Really, it's just not a good fit only when you don't plan to take advantage of all of its features. If you don't need all executions that are queued to go through, use [Throttling][../guides/throttling] instead. If you don't need concurrent processing, use [Queueing][../guides/queueing] instead.
49
+
50
+
## Async Queueing in TanStack Pacer
51
+
52
+
TanStack Pacer provides async queueing through the simple `asyncQueue` function and the more powerful `AsyncQueuer` class.
53
+
54
+
### Basic Usage with `asyncQueue`
55
+
56
+
The `asyncQueue` function provides a simple way to create an always-running async queue:
57
+
58
+
```ts
59
+
import { asyncQueue } from'@tanstack/pacer'
60
+
61
+
// Create a queue that processes up to 2 items concurrently
The usage of the `asyncQueue` function is a bit limited, as it is just a wrapper around the `AsyncQueuer` class that only exposes the `addItem` method. For more control over the queue, use the `AsyncQueuer` class directly.
82
+
83
+
### Advanced Usage with `AsyncQueuer` Class
84
+
85
+
The `AsyncQueuer` class provides complete control over async queue behavior:
32
86
33
87
```ts
34
88
import { AsyncQueuer } from'@tanstack/pacer'
35
89
36
-
constasyncQueuer=newAsyncQueuer<string>({
90
+
constqueue=newAsyncQueuer<string>({
37
91
concurrency: 2, // Process 2 items at once
38
92
wait: 1000, // Wait 1 second between starting new items
93
+
started: true// Start processing immediately
39
94
})
40
95
41
96
// Add error and success handlers
42
-
asyncQueuer.onError((error, task) => {
97
+
queue.onError((error) => {
43
98
console.error('Task failed:', error)
44
99
})
45
100
46
-
asyncQueuer.onSuccess((result, task) => {
101
+
queue.onSuccess((result) => {
47
102
console.log('Task completed:', result)
48
103
})
49
104
50
-
// Start processing
51
-
asyncQueuer.start()
52
-
53
105
// Add async tasks
54
-
asyncQueuer.addItem(async () => {
106
+
queue.addItem(async () => {
55
107
const result =awaitfetchData(1)
56
108
returnresult
57
109
})
58
110
59
-
asyncQueuer.addItem(async () => {
111
+
queue.addItem(async () => {
60
112
const result =awaitfetchData(2)
61
113
returnresult
62
114
})
63
115
```
64
116
65
-
## Error Handling
117
+
### Queue Types and Ordering
118
+
119
+
The AsyncQueuer supports different queueing strategies to handle various processing requirements. Each strategy determines how tasks are added and processed from the queue.
120
+
121
+
#### FIFO Queue (First In, First Out)
66
122
67
-
AsyncQueuer's error handling system is comprehensive and flexible, providing multiple ways to handle failures:
123
+
FIFO queues process tasks in the exact order they were added, making them ideal for maintaining sequence:
68
124
69
-
### 1. Per-Operation Error Handling
70
-
Each operation can handle its own errors through the Promise chain:
Priority queues process tasks based on their assigned priority values, ensuring important tasks are handled first:
156
+
157
+
```ts
158
+
const priorityQueue =newAsyncQueuer<string>({
159
+
concurrency: 2
92
160
})
93
161
94
-
// Handle all task completions (success or error)
95
-
asyncQueuer.onSettled((result, error) => {
96
-
if (error) {
97
-
console.log('Task failed:', error)
98
-
} else {
162
+
// Create tasks with static priority values
163
+
const lowPriorityTask =Object.assign(
164
+
async () =>'low priority result',
165
+
{ priority: 1 }
166
+
)
167
+
168
+
const highPriorityTask =Object.assign(
169
+
async () =>'high priority result',
170
+
{ priority: 3 }
171
+
)
172
+
173
+
const mediumPriorityTask =Object.assign(
174
+
async () =>'medium priority result',
175
+
{ priority: 2 }
176
+
)
177
+
178
+
// Add tasks in any order - they'll be processed by priority (higher numbers first)
179
+
priorityQueue.addItem(lowPriorityTask)
180
+
priorityQueue.addItem(highPriorityTask)
181
+
priorityQueue.addItem(mediumPriorityTask)
182
+
// Processes: high and medium concurrently, then low
183
+
```
184
+
185
+
### Error Handling
186
+
187
+
The AsyncQueuer provides comprehensive error handling capabilities to ensure robust task processing. You can handle errors at both the queue level and individual task level:
188
+
189
+
```ts
190
+
const queue =newAsyncQueuer<string>()
191
+
192
+
// Handle errors globally
193
+
const queue =newAsyncQueuer<string>({
194
+
onError: (error) => {
195
+
console.error('Task failed:', error)
196
+
},
197
+
onSuccess: (result) => {
99
198
console.log('Task succeeded:', result)
199
+
},
200
+
onSettled: (result) => {
201
+
if (resultinstanceofError) {
202
+
console.log('Task failed:', result)
203
+
} else {
204
+
console.log('Task succeeded:', result)
205
+
}
100
206
}
101
-
// Update progress indicators or clean up resources
207
+
})
208
+
209
+
// Handle errors per task
210
+
queue.addItem(async () => {
211
+
thrownewError('Task failed')
212
+
}).catch(error=> {
213
+
console.error('Individual task error:', error)
102
214
})
103
215
```
104
216
105
-
### 3. Error Recovery Strategies
106
-
The error handling system enables sophisticated recovery strategies:
107
-
- Automatic retries with exponential backoff
108
-
- Fallback to alternative operations
109
-
- Dead-letter queues for failed operations
110
-
- Circuit breakers for failing dependencies
111
-
112
-
## Common Use Cases
113
-
114
-
AsyncQueuer is particularly well-suited for:
115
-
- Parallel API requests with rate limiting
116
-
- Batch processing of data with concurrency control
117
-
- Background task processing with error recovery
118
-
- Resource-intensive operations that benefit from parallelization
119
-
- Long-running operations that need to be monitored and managed
120
-
- Systems requiring graceful degradation under load
217
+
### Queue Management
218
+
219
+
The AsyncQueuer provides several methods for monitoring and controlling queue state:
220
+
221
+
```ts
222
+
// Queue inspection
223
+
queue.peek() // View next item without removing it
224
+
queue.size() // Get current queue size
225
+
queue.isEmpty() // Check if queue is empty
226
+
queue.isFull() // Check if queue has reached maxSize
227
+
queue.getAllItems() // Get copy of all queued items
228
+
queue.getActiveItems() // Get currently processing items
229
+
queue.getPendingItems() // Get items waiting to be processed
230
+
231
+
// Queue manipulation
232
+
queue.clear() // Remove all items
233
+
queue.reset() // Reset to initial state
234
+
queue.getExecutionCount() // Get number of processed items
235
+
236
+
// Processing control
237
+
queue.start() // Begin processing items
238
+
queue.stop() // Pause processing
239
+
queue.isRunning() // Check if queue is processing
240
+
queue.isIdle() // Check if queue is empty and not processing
241
+
```
242
+
243
+
### Framework Adapters
244
+
245
+
Each framework adapter builds convenient hooks and functions around the async queuer classes. Hooks like `useAsyncQueuer` or `useAsyncQueuerState` are small wrappers that can cut down on the boilerplate needed in your own code for some common use cases.
0 commit comments