Skip to content

Commit

Permalink
fix(request-queue): Update rq locking docs
Browse files Browse the repository at this point in the history
  • Loading branch information
drobnikj committed Dec 6, 2024
1 parent 6873e01 commit df52238
Showing 1 changed file with 62 additions and 22 deletions.
84 changes: 62 additions & 22 deletions sources/platform/storage/request_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,10 @@ You can lock a request so that no other clients receive it when they fetch the q
This feature is seamlessly integrated into Crawlee, requiring minimal extra setup. By default, requests are locked for the same duration as the timeout for processing requests in the crawler ([`requestHandlerTimeoutSecs`](https://crawlee.dev/api/next/basic-crawler/interface/BasicCrawlerOptions#requestHandlerTimeoutSecs)).
If the Actor processing the request fails, the lock expires, and the request is processed again eventually. For more details, refer to the [Crawlee documentation](https://crawlee.dev/docs/next/experiments/experiments-request-locking).

In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request.
In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs.

<Tabs groupId="main">
<TabItem value="Actor 1" label="Actor 1">

```js
import { Actor, ApifyClient } from 'apify';
Expand All @@ -425,9 +428,6 @@ const requestQueue = await client.requestQueues().getOrCreate('example-queue');
const requestQueueClientOne = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueueone',
});
const requestQueueClientTwo = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});

// Adds multiple requests to the queue.
await requestQueueClientOne.batchAddRequests([
Expand Down Expand Up @@ -457,23 +457,71 @@ await requestQueueClientOne.batchAddRequests([
const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead(
{
limit: 2,
lockSecs: 60,
lockSecs: 120,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0];
const requestLockedByClientOne = await requestQueueClientOne.getRequest(
theFirstRequestLockedByClientOne.id,
);
console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`);
// Prolongs the lock of the first request or unlocks it.
await requestQueueClientOne.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
{ lockSecs: 120 },
);
await requestQueueClientOne.deleteRequestLock(
theFirstRequestLockedByClientOne.id,
);
// Cleans up the queue.
await requestQueueClientOne.delete();
await Actor.exit();
```
</TabItem>
<TabItem value="Actor 2" label="Actor 2">
```js
import { Actor, ApifyClient } from 'apify';
await Actor.init();
const client = new ApifyClient({
token: 'MY-APIFY-TOKEN',
});
// Waits for the first Actor to lock the requests.
await new Promise((resolve) => setTimeout(resolve, 5000));
// Creates a new request queue.
const requestQueue = await client.requestQueues().getOrCreate('example-queue');
const requestQueueClientTwo = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});
// Get all requests from the queue and check one locked by the first Actor.
const requests = await requestQueueClientTwo.listRequests();
const requestLockedByClientOne = requests.items.filter((request) => request.lockedByClientKey === 'requestqueueone');
const theFirstRequestLockedByClientOne = requestLockedByClientOne[0];
// Other clients cannot list and lock these requests; the listAndLockHead call returns other requests from the queue.
const processingRequestsClientTwo = await requestQueueClientTwo.listAndLockHead(
{
limit: 2,
limit: 10,
lockSecs: 60,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0];
const requestLockedByClientOne = await requestQueueClientOne.getRequest(
theFirstRequestLockedByClientOne.id,
const wasTheClientTwoLockedSameRequest = !!processingRequestsClientTwo.items.find(
(request) => request.id === theFirstRequestLockedByClientOne.id,
);
console.log(`Was the request locked by the first client locked by the second client? ${wasTheClientTwoLockedSameRequest}`);
console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`);
// Other clients cannot modify the lock; attempting to do so will throw an error.
Expand All @@ -486,21 +534,13 @@ try {
// This will throw an error.
}
// Prolongs the lock of the first request or unlocks it.
await requestQueueClientOne.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
{ lockSecs: 60 },
);
await requestQueueClientOne.deleteRequestLock(
theFirstRequestLockedByClientOne.id,
);
// Cleans up the queue.
await requestQueueClientOne.delete();
await Actor.exit();
```
</TabItem>
</Tabs>
A detailed tutorial on how to process one request queue with multiple Actor runs can be found in [Academy tutorials](https://docs.apify.com/academy/node-js/multiple-runs-scrape).
## Sharing
Expand Down

0 comments on commit df52238

Please sign in to comment.