Skip to content

Commit

Permalink
chore: revert back to inProgress
Browse files Browse the repository at this point in the history
  • Loading branch information
vladfrangu committed Aug 25, 2023
1 parent 62d97c9 commit 415b0f5
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 36 deletions.
4 changes: 2 additions & 2 deletions packages/basic-crawler/src/internals/basic-crawler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1035,13 +1035,13 @@ export class BasicCrawler<Context extends CrawlingContext = BasicCrawlingContext
}

// eslint-disable-next-line dot-notation
source['requestIdsInProgress'].delete(request.id!);
source['inProgress'].delete(request.id!);
const delay = lastAccessTime + this.sameDomainDelayMillis - now;
this.log.debug(`Request ${request.url} (${request.id}) will be reclaimed after ${delay} milliseconds due to same domain delay`);
setTimeout(async () => {
this.log.debug(`Adding request ${request.url} (${request.id}) back to the queue`);
// eslint-disable-next-line dot-notation
source['requestIdsInProgress'].add(request.id!);
source['inProgress'].add(request.id!);
await source.reclaimRequest(request);
}, delay);

Expand Down
20 changes: 10 additions & 10 deletions packages/core/src/storages/request_list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ export class RequestList {
* Set of `uniqueKey`s of requests that were returned by fetchNextRequest().
* @internal
*/
requestIdsInProgress = new Set<string>();
inProgress = new Set<string>();

/**
* Set of `uniqueKey`s of requests for which reclaimRequest() was called.
Expand Down Expand Up @@ -492,7 +492,7 @@ export class RequestList {
});

this.nextIndex = state.nextIndex;
this.requestIdsInProgress = new Set(state.inProgress);
this.inProgress = new Set(state.inProgress);

// WORKAROUND:
// It happened to some users that state object contained something like:
Expand All @@ -512,12 +512,12 @@ export class RequestList {
deleteFromInProgress,
});
for (const uniqueKey of deleteFromInProgress) {
this.requestIdsInProgress.delete(uniqueKey);
this.inProgress.delete(uniqueKey);
}
}

// All in-progress requests need to be re-crawled
this.reclaimed = new Set(this.requestIdsInProgress);
this.reclaimed = new Set(this.inProgress);
}

/**
Expand Down Expand Up @@ -556,7 +556,7 @@ export class RequestList {
nextUniqueKey: this.nextIndex < this.requests.length
? this.requests[this.nextIndex].uniqueKey
: null,
inProgress: [...this.requestIdsInProgress],
inProgress: [...this.inProgress],
};
}

Expand All @@ -577,7 +577,7 @@ export class RequestList {
async isFinished(): Promise<boolean> {
this._ensureIsInitialized();

return this.requestIdsInProgress.size === 0 && this.nextIndex >= this.requests.length;
return this.inProgress.size === 0 && this.nextIndex >= this.requests.length;
}

/**
Expand All @@ -602,7 +602,7 @@ export class RequestList {
// Otherwise return next request.
if (this.nextIndex < this.requests.length) {
const request = this.requests[this.nextIndex];
this.requestIdsInProgress.add(request.uniqueKey);
this.inProgress.add(request.uniqueKey);
this.nextIndex++;
this.isStatePersisted = false;
return request;
Expand All @@ -621,7 +621,7 @@ export class RequestList {
this._ensureInProgressAndNotReclaimed(uniqueKey);
this._ensureIsInitialized();

this.requestIdsInProgress.delete(uniqueKey);
this.inProgress.delete(uniqueKey);
this.isStatePersisted = false;
}

Expand Down Expand Up @@ -742,7 +742,7 @@ export class RequestList {
* Checks that request is not reclaimed and throws an error if so.
*/
protected _ensureInProgressAndNotReclaimed(uniqueKey: string): void {
if (!this.requestIdsInProgress.has(uniqueKey)) {
if (!this.inProgress.has(uniqueKey)) {
throw new Error(`The request is not being processed (uniqueKey: ${uniqueKey})`);
}
if (this.reclaimed.has(uniqueKey)) {
Expand Down Expand Up @@ -774,7 +774,7 @@ export class RequestList {
handledCount(): number {
this._ensureIsInitialized();

return this.nextIndex - this.requestIdsInProgress.size;
return this.nextIndex - this.inProgress.size;
}

/**
Expand Down
22 changes: 11 additions & 11 deletions packages/core/src/storages/request_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export abstract class RequestProvider implements IStorage {
protected queueHeadIds = new ListDictionary<string>();
protected requestCache: LruCache<RequestLruItem>;
/** @internal */
requestIdsInProgress = new Set<string>();
inProgress = new Set<string>();
protected recentlyHandledRequestsCache: LruCache<boolean>;

// TODO: RQv1 logic for stuck queues, this might not be needed anymore
Expand Down Expand Up @@ -79,7 +79,7 @@ export abstract class RequestProvider implements IStorage {
* @ignore
*/
inProgressCount() {
return this.requestIdsInProgress.size;
return this.inProgress.size;
}

/**
Expand Down Expand Up @@ -142,7 +142,7 @@ export abstract class RequestProvider implements IStorage {
const { requestId, wasAlreadyPresent } = queueOperationInfo;
this._cacheRequest(cacheKey, queueOperationInfo);

if (!wasAlreadyPresent && !this.requestIdsInProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) {
if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) {
this.assumedTotalCount++;

// Performance optimization: add request straight to head if possible
Expand Down Expand Up @@ -245,7 +245,7 @@ export abstract class RequestProvider implements IStorage {
const { requestId, wasAlreadyPresent } = newRequest;
this._cacheRequest(cacheKey, newRequest);

if (!wasAlreadyPresent && !this.requestIdsInProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) {
if (!wasAlreadyPresent && !this.inProgress.has(requestId) && !this.recentlyHandledRequestsCache.get(requestId)) {
this.assumedTotalCount++;

// Performance optimization: add request straight to head if possible
Expand Down Expand Up @@ -377,7 +377,7 @@ export abstract class RequestProvider implements IStorage {
handledAt: ow.optional.string,
}));

if (!this.requestIdsInProgress.has(request.id)) {
if (!this.inProgress.has(request.id)) {
this.log.debug(`Cannot mark request ${request.id} as handled, because it is not in progress!`, { requestId: request.id });
return null;
}
Expand All @@ -387,7 +387,7 @@ export abstract class RequestProvider implements IStorage {
request.handledAt = handledAt;
queueOperationInfo.uniqueKey = request.uniqueKey;

this.requestIdsInProgress.delete(request.id);
this.inProgress.delete(request.id);
this.recentlyHandledRequestsCache.add(request.id, true);

if (!queueOperationInfo.wasAlreadyHandled) {
Expand Down Expand Up @@ -417,7 +417,7 @@ export abstract class RequestProvider implements IStorage {

const { forefront = false } = options;

if (!this.requestIdsInProgress.has(request.id)) {
if (!this.inProgress.has(request.id)) {
this.log.debug(`Cannot reclaim request ${request.id}, because it is not in progress!`, { requestId: request.id });
return null;
}
Expand All @@ -431,12 +431,12 @@ export abstract class RequestProvider implements IStorage {
// Wait a little to increase a chance that the next call to fetchNextRequest() will return the request with updated data.
// This is to compensate for the limitation of DynamoDB, where writes might not be immediately visible to subsequent reads.
setTimeout(() => {
if (!this.requestIdsInProgress.has(request.id)) {
if (!this.inProgress.has(request.id)) {
this.log.debug('The request is no longer marked as in progress in the queue?!', { requestId: request.id });
return;
}

this.requestIdsInProgress.delete(request.id);
this.inProgress.delete(request.id);

// Performance optimization: add request straight to head if possible
this._maybeAddRequestToQueueHead(request.id, forefront);
Expand Down Expand Up @@ -467,7 +467,7 @@ export abstract class RequestProvider implements IStorage {
async isFinished(): Promise<boolean> {
if ((Date.now() - +this.lastActivity) > this.internalTimeoutMillis) {
const message = `The request queue seems to be stuck for ${this.internalTimeoutMillis / 1e3}s, resetting internal state.`;
this.log.warning(message, { inProgress: [...this.requestIdsInProgress] });
this.log.warning(message, { inProgress: [...this.inProgress] });
this._reset();
}

Expand All @@ -479,7 +479,7 @@ export abstract class RequestProvider implements IStorage {

protected _reset() {
this.queueHeadIds.clear();
this.requestIdsInProgress.clear();
this.inProgress.clear();
this.recentlyHandledRequestsCache.clear();
this.assumedTotalCount = 0;
this.assumedHandledCount = 0;
Expand Down
14 changes: 7 additions & 7 deletions packages/core/src/storages/request_queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,24 +119,24 @@ export class RequestQueue extends RequestProvider {
if (!nextRequestId) return null;

// This should never happen, but...
if (this.requestIdsInProgress.has(nextRequestId) || this.recentlyHandledRequestsCache.get(nextRequestId)) {
if (this.inProgress.has(nextRequestId) || this.recentlyHandledRequestsCache.get(nextRequestId)) {
this.log.warning('Queue head returned a request that is already in progress?!', {
nextRequestId,
inProgress: this.requestIdsInProgress.has(nextRequestId),
inProgress: this.inProgress.has(nextRequestId),
recentlyHandled: !!this.recentlyHandledRequestsCache.get(nextRequestId),
});
return null;
}

this.requestIdsInProgress.add(nextRequestId);
this.inProgress.add(nextRequestId);
this.lastActivity = new Date();

let request: Request | null;
try {
request = await this.getRequest(nextRequestId);
} catch (e) {
// On error, remove the request from in progress, otherwise it would be there forever
this.requestIdsInProgress.delete(nextRequestId);
this.inProgress.delete(nextRequestId);
throw e;
}

Expand All @@ -150,7 +150,7 @@ export class RequestQueue extends RequestProvider {
if (!request) {
this.log.debug('Cannot find a request from the beginning of queue, will be retried later', { nextRequestId });
setTimeout(() => {
this.requestIdsInProgress.delete(nextRequestId);
this.inProgress.delete(nextRequestId);
}, STORAGE_CONSISTENCY_DELAY_MILLIS);
return null;
}
Expand Down Expand Up @@ -207,7 +207,7 @@ export class RequestQueue extends RequestProvider {
.then(({ items, queueModifiedAt, hadMultipleClients }) => {
items.forEach(({ id: requestId, uniqueKey }) => {
// Queue head index might be behind the main table, so ensure we don't recycle requests
if (!requestId || !uniqueKey || this.requestIdsInProgress.has(requestId) || this.recentlyHandledRequestsCache.get(requestId!)) return;
if (!requestId || !uniqueKey || this.inProgress.has(requestId) || this.recentlyHandledRequestsCache.get(requestId!)) return;

this.queueHeadIds.add(requestId, requestId, false);
this._cacheRequest(getRequestId(uniqueKey), {
Expand Down Expand Up @@ -283,7 +283,7 @@ export class RequestQueue extends RequestProvider {
override async isFinished(): Promise<boolean> {
if ((Date.now() - +this.lastActivity) > this.internalTimeoutMillis) {
const message = `The request queue seems to be stuck for ${this.internalTimeoutMillis / 1e3}s, resetting internal state.`;
this.log.warning(message, { inProgress: [...this.requestIdsInProgress] });
this.log.warning(message, { inProgress: [...this.inProgress] });
this._reset();
}

Expand Down
12 changes: 6 additions & 6 deletions packages/core/src/storages/request_queue_v2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,16 +79,16 @@ class RequestQueue extends RequestProvider {
}

// This should never happen, but...
if (this.requestIdsInProgress.has(nextRequestId) || this.recentlyHandledRequestsCache.get(nextRequestId)) {
if (this.inProgress.has(nextRequestId) || this.recentlyHandledRequestsCache.get(nextRequestId)) {
this.log.warning('Queue head returned a request that is already in progress?!', {
nextRequestId,
inProgress: this.requestIdsInProgress.has(nextRequestId),
inProgress: this.inProgress.has(nextRequestId),
recentlyHandled: !!this.recentlyHandledRequestsCache.get(nextRequestId),
});
return null;
}

this.requestIdsInProgress.add(nextRequestId);
this.inProgress.add(nextRequestId);
this.lastActivity = new Date();

let request: Request | null;
Expand All @@ -97,7 +97,7 @@ class RequestQueue extends RequestProvider {
request = await this.getOrHydrateRequest(nextRequestId);
} catch (e) {
// On error, remove the request from in progress, otherwise it would be there forever
this.requestIdsInProgress.delete(nextRequestId);
this.inProgress.delete(nextRequestId);
throw e;
}

Expand All @@ -112,7 +112,7 @@ class RequestQueue extends RequestProvider {
this.log.debug('Cannot find a request from the beginning of queue or lost lock, will be retried later', { nextRequestId });

setTimeout(() => {
this.requestIdsInProgress.delete(nextRequestId);
this.inProgress.delete(nextRequestId);
}, STORAGE_CONSISTENCY_DELAY_MILLIS);

return null;
Expand Down Expand Up @@ -154,7 +154,7 @@ class RequestQueue extends RequestProvider {

for (const { id, uniqueKey } of headData.items) {
// Queue head index might be behind the main table, so ensure we don't recycle requests
if (!id || !uniqueKey || this.requestIdsInProgress.has(id) || this.recentlyHandledRequestsCache.get(id)) {
if (!id || !uniqueKey || this.inProgress.has(id) || this.recentlyHandledRequestsCache.get(id)) {
continue;
}

Expand Down

0 comments on commit 415b0f5

Please sign in to comment.