Skip to content

Commit 088a55a

Browse files
authored
Merge pull request #36 from aleph-im/fix/indexer-backpressure
Fix/indexer backpressure
2 parents c6dba57 + b627951 commit 088a55a

File tree

5 files changed

+139
-20
lines changed

5 files changed

+139
-20
lines changed

packages/core/src/utils/concurrence/common.ts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ import { sleep } from '../time.js'
44
/**
55
* It keeps a concurrent fixed size buffer of pending promises.
66
* When some of them finish, it takes another one from the provided iterator
7-
* @param {Iterator} iterator - A iterator function that returns promises
8-
* @param {number} concurrency - The max number of concurrent pending promises
7+
* @param it - A iterator function that returns promises
8+
* @param concurrency - The max number of concurrent pending promises
99
*/
1010
export async function concurrentPromises<T>(
1111
it: Iterator<Promise<unknown>, T>,
12-
concurrency = 20,
12+
concurrency: number = 20,
1313
): Promise<T> {
1414
let done
1515
let lastValue!: T
@@ -52,7 +52,7 @@ export async function concurrentPromises<T>(
5252
* In other frameworks they usually call it "Deferred" too.
5353
*
5454
* Example:
55-
*
55+
* ```ts
5656
* function sleep(ms) {
5757
* const future = new Future()
5858
* setTimeout(() => future.resolve(), ms)
@@ -62,6 +62,7 @@ export async function concurrentPromises<T>(
6262
* async function main() {
6363
* await sleep(1000)
6464
* }
65+
* ```
6566
*/
6667
export class Future<T> {
6768
public resolve!: (value: T | PromiseLike<T>) => void
@@ -81,7 +82,7 @@ export class Future<T> {
8182
* to some region of the code
8283
*
8384
* Example:
84-
*
85+
* ```ts
8586
* // Waits for the lock to be free and returns the releaser function
8687
* const release = await mutex.acquire()
8788
*
@@ -92,6 +93,7 @@ export class Future<T> {
9293
* // Ensures that the lock is always released, even if there are errors
9394
* release()
9495
* }
96+
* ```
9597
*/
9698
export class Mutex {
9799
protected queue = Promise.resolve()
@@ -114,8 +116,22 @@ export class Mutex {
114116
}
115117

116118
/**
117-
* An util for retaining an unique snapshot of data while
118-
* the previous snapshot is being processed
119+
* An util for retaining a unique snapshot of data while
120+
* the previous snapshot is being processed.
121+
*
122+
* Example:
123+
* ```ts
124+
* const job = new DebouncedJob<string>(async (data) => {
125+
* // Do something with the data
126+
* console.log(data)
127+
* return data
128+
* }, 1000)
129+
*
130+
* job.run('foo')
131+
* ```
132+
*
133+
* @param callback - The callback function that will be called with the data
134+
* @param throttle - The minimum time between calls
119135
*/
120136
export class DebouncedJob<T = void, R = unknown> {
121137
protected pendingData: T | undefined

packages/core/src/utils/concurrence/jobRunner.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,23 @@ export interface JobRunnerOptions {
2424
jitter?: number
2525
}
2626

27+
/**
28+
* Runs an `intervalFn` at a given `interval`. The `intervalFn` can return a
29+
* number to change the interval for the next run. `intervalMax` can be used to
30+
* limit the maximum interval time.
31+
*
32+
* The `intervalFn` can also return `JobRunnerReturnCode.Reset == -1` to reset
33+
* the interval to the initial value, or `JobRunnerReturnCode.Stop == -2` to
34+
* stop the runner.
35+
*
36+
* You can configure the `times` an interval is run, and also set a `jitter` to
37+
* randomize the interval. This is useful to avoid stampedes.
38+
*
39+
* Set `startAfter` to delay the first run. `intervalInit` can be used to set a
40+
* different interval for after the first run. `intervalAccuracy` can be used to
41+
* make sure the `intervalFn` is called at the exact interval time, otherwise
42+
* it will be called again after execution time + interval.
43+
*/
2744
export class JobRunner {
2845
private _events = new EventEmitter()
2946
private _isRunning = false
@@ -84,6 +101,13 @@ export class JobRunner {
84101
}
85102
}
86103

104+
/**
105+
* Registers an event handler. Possible events are:
106+
* - `beforeSleep`: Called before the runner sleeps until the next interval.
107+
* - `firstRun`: Called after the first run.
108+
* @param event
109+
* @param handler
110+
*/
87111
on(
88112
event: 'beforeSleep' | 'firstRun',
89113
handler: (...args: any[]) => void | Promise<void>,
@@ -98,10 +122,17 @@ export class JobRunner {
98122
}
99123
}
100124

125+
/**
126+
* Starts the runner.
127+
*/
101128
async start(): Promise<void> {
102129
return this.run()
103130
}
104131

132+
/**
133+
* The runner function. Returns a promise that resolves when the runner is
134+
* finished.
135+
*/
105136
async run(): Promise<void> {
106137
if (this._isRunning) return
107138
this._isRunning = true
@@ -202,11 +233,17 @@ export class JobRunner {
202233
p.resolve()
203234
}
204235

236+
/**
237+
* Stops the runner.
238+
*/
205239
stop(): Promise<void> {
206240
this._isRunning = false
207241
return this.hasFinished()
208242
}
209243

244+
/**
245+
* Returns a promise that resolves when the runner is finished.
246+
*/
210247
hasFinished(): Promise<void> {
211248
return this._finished.promise
212249
}

packages/core/src/utils/concurrence/pendingWork.ts

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,35 @@ import { JobRunner } from './jobRunner.js'
66

77
export * from './dal/pendingWork.js'
88

9+
/**
10+
* A pending work item
11+
*/
912
export type PendingWork<T> = {
1013
id: string
1114
time: number
1215
payload: T
1316
}
1417

18+
/**
19+
* Options for the pending work pool
20+
* @template T The type of the payload
21+
* @property id - The id of the pool
22+
* @property dal - The data access layer
23+
* @property interval - The interval to run the pool
24+
* @property concurrency - The number of concurrent jobs to run
25+
* @property maxQueueSize - The max number of works to queue
26+
* @property chunkSize - The number of works to fetch at once
27+
* @property handleWork - The function to handle the works
28+
* @property checkComplete - The function to check if a work is complete
29+
* @property preCheckComplete - Whether to check if a work is complete before
30+
* handling it
31+
*/
1532
export interface PendingWorkOptions<T> {
1633
id: string
1734
dal: PendingWorkDAL<T>
1835
interval: number
1936
concurrency: number
37+
maxQueueSize?: number
2038
chunkSize: number
2139
handleWork: (
2240
works: PendingWork<T>[],
@@ -25,12 +43,31 @@ export interface PendingWorkOptions<T> {
2543
preCheckComplete?: boolean
2644
}
2745

46+
/**
47+
* Error thrown when the pending work queue is full
48+
*/
49+
export class QueueFullError extends Error {
50+
constructor(
51+
protected pendingWork: PendingWorkPool<any>,
52+
) {
53+
super(`Queue (max size: ${pendingWork.options.maxQueueSize}) is full for ${pendingWork.options.id}`)
54+
}
55+
}
56+
57+
/**
58+
* A pool of pending works. It will run the works in the pool at a given interval
59+
* or when new works arrive. It will also check if the works are complete before
60+
* running them. If they are, it will remove them from the pool.
61+
* @note If the interval is 0, it will only run when new works arrive.
62+
* @template T The type of the payload
63+
* @property options The options of the pool
64+
*/
2865
export class PendingWorkPool<T> {
2966
protected skipSleep = false
3067
protected debouncedJob: DebouncedJobRunner | undefined
3168
protected coordinatorJob: JobRunner | undefined
3269

33-
constructor(protected options: PendingWorkOptions<T>) {
70+
constructor(public readonly options: PendingWorkOptions<T>) {
3471
const name = `${this.options.id} 🔄`
3572

3673
// @note: If interval is 0, run it only when new items arrive
@@ -67,9 +104,19 @@ export class PendingWorkPool<T> {
67104
return this.coordinatorJob && this.coordinatorJob.stop()
68105
}
69106

107+
/**
108+
* Add work to the pool
109+
* @param work The work payload to add
110+
* @throws QueueFullError if the queue is full
111+
*/
70112
async addWork(work: PendingWork<T> | PendingWork<T>[]): Promise<void> {
71113
work = Array.isArray(work) ? work : [work]
72114
if (!work.length) return
115+
if (this.options.maxQueueSize) {
116+
const count = await this.getCount()
117+
if (count + work.length > this.options.maxQueueSize)
118+
throw new QueueFullError(this)
119+
}
73120

74121
await this.options.dal.save(work)
75122

packages/framework/src/services/fetcher/src/entityFetcher.ts

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@ import {
44
EntityStorage,
55
PendingWork,
66
PendingWorkPool,
7-
Utils,
7+
QueueFullError,
8+
Utils
89
} from '@aleph-indexer/core'
910
import {
1011
FetchEntitiesByIdRequestArgs,
@@ -33,13 +34,15 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
3334

3435
/**
3536
* Initialize the fetcher service.
37+
* @param type
38+
* @param blockchainId The blockchain
3639
* @param broker The moleculer broker to assign to the service.
3740
* @param pendingEntityDAL The pending entities' storage.
38-
* @param pendingEntityCacheDAL
39-
* @param pendingEntityFetchDAL
41+
* @param pendingEntityCacheDAL The pending entity cache storage.
42+
* @param pendingEntityFetchDAL The pending entity
4043
* @param entityCacheDAL The raw entities' storage.
4144
*/
42-
constructor(
45+
protected constructor(
4346
protected type: IndexableEntityType,
4447
protected blockchainId: Blockchain,
4548
protected broker: ServiceBroker,
@@ -73,6 +76,7 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
7376
id: `${blockchainId}:pending-${type}`,
7477
interval: 0,
7578
chunkSize: 1000,
79+
maxQueueSize: 10000,
7680
concurrency: 1,
7781
dal: this.pendingEntityDAL,
7882
handleWork: this.handlePendingEntities.bind(this),
@@ -106,21 +110,35 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
106110
/**
107111
* Fetch entities from an account by ids.
108112
* @param args Entity ids.
113+
* @throws QueueFullError if fetcher queue is full
109114
*/
110115
async fetchEntitiesById(args: FetchEntitiesByIdRequestArgs): Promise<void> {
111116
const { ids, indexerId } = args
112117

113-
this.log(
114-
`🔗 ${ids.length} new ids added to the ${this.type} fetcher queue... [${indexerId}]`,
115-
)
116-
117118
const entities = ids.filter(this.filterEntityId.bind(this)).map((id) => ({
118119
id,
119120
time: Date.now(),
120121
payload: indexerId ? [indexerId] : [],
121122
}))
122123

123124
await this.pendingEntities.addWork(entities)
125+
.catch((e: Error) => {
126+
if (e.constructor == QueueFullError) {
127+
return Promise.reject(e)
128+
} else {
129+
throw e
130+
}
131+
})
132+
.then(() => {
133+
this.log(
134+
`🔗 ${ids.length} new ids added to the ${this.type} fetcher queue... [${indexerId}]`,
135+
)
136+
},
137+
() => {
138+
this.log(
139+
`⏯ ${ids.length} new ids waiting to be added to the ${this.type} fetcher queue... [${indexerId}]`,
140+
)
141+
})
124142
}
125143

126144
/**
@@ -302,7 +320,7 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
302320

303321
/**
304322
* Returns the fetch status of certain txn signatures.
305-
* @param signatures The txn signatures to get its state.
323+
* @param args The txn ids to check
306324
*/
307325
async getEntityState(args: CheckEntityRequestArgs): Promise<EntityState[]> {
308326
const { ids } = args
@@ -356,8 +374,9 @@ export abstract class BaseEntityFetcher<RE extends RawEntity> {
356374
}
357375

358376
/**
359-
* Fetch entities from a RPC Node.
360-
* @param works Entity ids with extra properties as time and payload.
377+
* Guard to fetch entities from an RPC Node.
378+
* @param ids Entity ids with extra properties as time and payload.
379+
* @param isRetry Whether this request is a retry
361380
*/
362381
protected abstract remoteFetchIds(
363382
ids: string[],

packages/framework/src/services/fetcher/src/fetcherPool.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ export interface FetcherPoolOptions<T>
2828

2929
export class FetcherPool<T> extends PendingWorkPool<T> {
3030
protected workFetcher: Record<string, BaseHistoryFetcher<any>> = {}
31-
public options!: FetcherPoolOptions<T> & PendingWorkOptions<T>
31+
public readonly options!: FetcherPoolOptions<T> & PendingWorkOptions<T>
3232

3333
constructor(options: FetcherPoolOptions<T>) {
3434
const { checkComplete, ...rest } = options

0 commit comments

Comments
 (0)