Skip to content

Commit

Permalink
fix: clear stale queues instead of reclaiming them
Browse files Browse the repository at this point in the history
  • Loading branch information
saikumarrs committed Nov 27, 2024
1 parent 2c17244 commit 8c005fb
Showing 1 changed file with 2 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -724,40 +724,6 @@ class RetryQueue implements IQueue<QueueItemData> {
}

checkReclaim() {
const createReclaimStartTask = (store: IStore) => () => {
if (store.get(QueueStatuses.RECLAIM_END) !== this.id) {
return;
}

if (store.get(QueueStatuses.RECLAIM_START) !== this.id) {
return;
}

this.reclaim(store.id);
};
const createReclaimEndTask = (store: IStore) => () => {
if (store.get(QueueStatuses.RECLAIM_START) !== this.id) {
return;
}

store.set(QueueStatuses.RECLAIM_END, this.id);

this.schedule.run(
createReclaimStartTask(store),
this.timeouts.reclaimWait,
ScheduleModes.ABANDON,
);
};
const tryReclaim = (store: IStore) => {
store.set(QueueStatuses.RECLAIM_START, this.id);
store.set(QueueStatuses.ACK, this.schedule.now());

this.schedule.run(
createReclaimEndTask(store),
this.timeouts.reclaimWait,
ScheduleModes.ABANDON,
);
};
const findOtherQueues = (name: string): IStore[] => {
const res: IStore[] = [];
const storageEngine = this.store.getOriginalEngine();
Expand Down Expand Up @@ -798,15 +764,8 @@ class RetryQueue implements IQueue<QueueItemData> {
return res;
};

findOtherQueues(this.name).forEach(store => {
if (this.schedule.now() - store.get(QueueStatuses.ACK) < this.timeouts.reclaimTimeout) {
return;
}

tryReclaim(store);
});

this.schedule.run(this.checkReclaim, this.timeouts.reclaimTimer, ScheduleModes.RESCHEDULE);
// Instead of reclaiming stale queues, clear them
findOtherQueues(this.name).forEach(store => this.clearQueueEntries(store, 0));
}

clear() {
Expand Down

0 comments on commit 8c005fb

Please sign in to comment.