Skip to content

Commit

Permalink
fix(request-queue): Update request queue locking docs (#1322)
Browse files Browse the repository at this point in the history
I made some changes in docs regarding the API changes.

* stress out that locking works on the client level and the same as on
the run level.
* update code example by separating code into two actors using code tabs

---------

Co-authored-by: Michał Olender <92638966+TC-MO@users.noreply.github.com>
  • Loading branch information
drobnikj and TC-MO authored Dec 10, 2024
1 parent 35d2b5e commit 7951a66
Showing 1 changed file with 79 additions and 28 deletions.
107 changes: 79 additions & 28 deletions sources/platform/storage/request_queue.md
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,20 @@ You can lock a request so that no other clients receive it when they fetch the q
This feature is seamlessly integrated into Crawlee, requiring minimal extra setup. By default, requests are locked for the same duration as the timeout for processing requests in the crawler ([`requestHandlerTimeoutSecs`](https://crawlee.dev/api/next/basic-crawler/interface/BasicCrawlerOptions#requestHandlerTimeoutSecs)).
If the Actor processing the request fails, the lock expires, and the request is processed again eventually. For more details, refer to the [Crawlee documentation](https://crawlee.dev/docs/next/experiments/experiments-request-locking).

In the following example, we demonstrate how we can use locking mechanisms to avoid concurrent processing of the same request.
In the following example, we demonstrate how you can use locking mechanisms to avoid concurrent processing of the same request across multiple Actor runs.

:::info
The lock mechanism works on the client level, as well as the run level, when running the Actor on the Apify platform.

This means you can unlock or prolong the lock the locked request only if:

- You are using the same client key, or
- The operation is being called from the same Actor run.

:::

<Tabs groupId="main">
<TabItem value="Actor 1" label="Actor 1">

```js
import { Actor, ApifyClient } from 'apify';
Expand All @@ -422,15 +435,12 @@ const client = new ApifyClient({
const requestQueue = await client.requestQueues().getOrCreate('example-queue');

// Creates two clients with different keys for the same request queue.
const requestQueueClientOne = client.requestQueue(requestQueue.id, {
const requestQueueClient = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueueone',
});
const requestQueueClientTwo = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});

// Adds multiple requests to the queue.
await requestQueueClientOne.batchAddRequests([
await requestQueueClient.batchAddRequests([
{
url: 'http://example.com/foo',
uniqueKey: 'http://example.com/foo',
Expand All @@ -454,53 +464,94 @@ await requestQueueClientOne.batchAddRequests([
]);

// Locks the first two requests at the head of the queue.
const processingRequestsClientOne = await requestQueueClientOne.listAndLockHead(
const processingRequestsClientOne = await requestQueueClient.listAndLockHead(
{
limit: 2,
lockSecs: 60,
lockSecs: 120,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const lockedRequest = processingRequestsClientOne.items[0];
const lockedRequestDetail = await requestQueueClient.getRequest(
lockedRequest.id,
);
console.log(`Request locked until ${lockedRequestDetail?.lockExpiresAt}`);
// Prolongs the lock of the first request or unlocks it.
await requestQueueClient.prolongRequestLock(
lockedRequest.id,
{ lockSecs: 120 },
);
await requestQueueClient.deleteRequestLock(
lockedRequest.id,
);
await Actor.exit();
```
</TabItem>
<TabItem value="Actor 2" label="Actor 2">
```js
import { Actor, ApifyClient } from 'apify';
await Actor.init();
const client = new ApifyClient({
token: 'MY-APIFY-TOKEN',
});
// Waits for the first Actor to lock the requests.
await new Promise((resolve) => setTimeout(resolve, 5000));
// Get the same request queue in different Actor run and with a different client key.
const requestQueue = await client.requestQueues().getOrCreate('example-queue');
const requestQueueClient = client.requestQueue(requestQueue.id, {
clientKey: 'requestqueuetwo',
});
// Get all requests from the queue and check one locked by the first Actor.
const requests = await requestQueueClient.listRequests();
const requestsLockedByAnotherRun = requests.items.filter((request) => request.lockByClient === 'requestqueueone');
const requestLockedByAnotherRunDetail = await requestQueueClient.getRequest(
requestsLockedByAnotherRun[0].id,
);
// Other clients cannot list and lock these requests; the listAndLockHead call returns other requests from the queue.
const processingRequestsClientTwo = await requestQueueClientTwo.listAndLockHead(
const processingRequestsClientTwo = await requestQueueClient.listAndLockHead(
{
limit: 2,
limit: 10,
lockSecs: 60,
},
);

// Checks when the lock will expire. The locked request will have a lockExpiresAt attribute.
const theFirstRequestLockedByClientOne = processingRequestsClientOne.items[0];
const requestLockedByClientOne = await requestQueueClientOne.getRequest(
theFirstRequestLockedByClientOne.id,
const wasBothRunsLockedSameRequest = !!processingRequestsClientTwo.items.find(
(request) => request.id === requestLockedByAnotherRunDetail.id,
);
console.log(`Request locked until ${requestLockedByClientOne?.lockExpiresAt}`);
console.log(`Was the request locked by the first run locked by the second run? ${wasBothRunsLockedSameRequest}`);
console.log(`Request locked until ${requestLockedByAnotherRunDetail?.lockExpiresAt}`);
// Other clients cannot modify the lock; attempting to do so will throw an error.
try {
await requestQueueClientTwo.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
await requestQueueClient.prolongRequestLock(
requestLockedByAnotherRunDetail.id,
{ lockSecs: 60 },
);
} catch (err) {
// This will throw an error.
}
// Prolongs the lock of the first request or unlocks it.
await requestQueueClientOne.prolongRequestLock(
theFirstRequestLockedByClientOne.id,
{ lockSecs: 60 },
);
await requestQueueClientOne.deleteRequestLock(
theFirstRequestLockedByClientOne.id,
);
// Cleans up the queue.
await requestQueueClientOne.delete();
await requestQueueClient.delete();
await Actor.exit();
```
</TabItem>
</Tabs>
A detailed tutorial on how to process one request queue with multiple Actor runs can be found in [Academy tutorials](https://docs.apify.com/academy/node-js/multiple-runs-scrape).
## Sharing
Expand Down

0 comments on commit 7951a66

Please sign in to comment.