From e067ab6e69ba2464181e77863648bf7c0533f023 Mon Sep 17 00:00:00 2001 From: oleksandravalko Date: Tue, 30 Sep 2025 15:24:19 +0200 Subject: [PATCH 1/7] fix(apify-run-task) 11: rename --- .../run-task-synchronously.mjs => run-task/run-task.mjs} | 4 ++++ .../run-task-synchronously.mjs => run-task/run-task.mjs} | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) rename components/apify/actions/{run-task-synchronously/run-task-synchronously.mjs => run-task/run-task.mjs} (92%) rename components/apify_oauth/actions/{run-task-synchronously/run-task-synchronously.mjs => run-task/run-task.mjs} (90%) diff --git a/components/apify/actions/run-task-synchronously/run-task-synchronously.mjs b/components/apify/actions/run-task/run-task.mjs similarity index 92% rename from components/apify/actions/run-task-synchronously/run-task-synchronously.mjs rename to components/apify/actions/run-task/run-task.mjs index 776620f885c1b..160973959331a 100644 --- a/components/apify/actions/run-task-synchronously/run-task-synchronously.mjs +++ b/components/apify/actions/run-task/run-task.mjs @@ -6,6 +6,10 @@ export default { name: "Run Task Synchronously", description: "Run a specific task and return its dataset items. [See the documentation](https://docs.apify.com/api/v2/actor-task-run-sync-get-dataset-items-get)", version: "0.0.4", + key: "apify-run-task", + name: "Run Task", + description: "Run a specific task and optionally return its dataset items. [See the documentation](https://docs.apify.com/api/v2/actor-task-run-sync-get-dataset-items-get)", + version: "0.0.5", type: "action", props: { apify, diff --git a/components/apify_oauth/actions/run-task-synchronously/run-task-synchronously.mjs b/components/apify_oauth/actions/run-task/run-task.mjs similarity index 90% rename from components/apify_oauth/actions/run-task-synchronously/run-task-synchronously.mjs rename to components/apify_oauth/actions/run-task/run-task.mjs index bc1148e71b47f..6be9e7d5d5519 100644 --- a/components/apify_oauth/actions/run-task-synchronously/run-task-synchronously.mjs +++ b/components/apify_oauth/actions/run-task/run-task.mjs @@ -10,7 +10,7 @@ const props = adjustPropDefinitions(others.props, app); export default { ...others, - key: "apify_oauth-run-task-synchronously", + key: "apify_oauth-run-task", version: "0.0.1", name, description, From da4f56c34e6ac957c9ab5873290dd14358151066 Mon Sep 17 00:00:00 2001 From: oleksandravalko Date: Mon, 6 Oct 2025 17:52:34 +0200 Subject: [PATCH 2/7] fix(apify-run-task) 11: introduce waitForFinish and overrideInput input fields --- components/apify/actions/run-task/run-task.mjs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/components/apify/actions/run-task/run-task.mjs b/components/apify/actions/run-task/run-task.mjs index 160973959331a..ac8fd2084beeb 100644 --- a/components/apify/actions/run-task/run-task.mjs +++ b/components/apify/actions/run-task/run-task.mjs @@ -20,6 +20,22 @@ export default { ], description: "The ID of the task to run", }, + + waitForFinish: { + type: "boolean", + label: "Wait for finish", + description: + "If false, returns immediately after starting the task. If true, waits for task completion (via webhook or polling) and returns dataset items.", + default: true, + }, + + // Apify run params + overrideInput: { + type: "string", + label: "Override Input", + description: "Optional JSON string to override the default input for the task run. Must be valid JSON.", + optional: true, + }, timeout: { type: "integer", label: "Timeout", From 3387ca4e5afbca7fcb5100eb36a276e981c25328 Mon Sep 17 00:00:00 2001 From: oleksandravalko Date: Mon, 6 Oct 2025 17:53:13 +0200 Subject: [PATCH 3/7] fix(apify-run-task) 11: incorporate wait for finish and get dataset items functionality --- .../apify/actions/run-task/run-task.mjs | 238 ++++++++++++++---- components/apify/apify.app.mjs | 15 +- 2 files changed, 200 insertions(+), 53 deletions(-) diff --git a/components/apify/actions/run-task/run-task.mjs b/components/apify/actions/run-task/run-task.mjs index ac8fd2084beeb..46ea6a8c9fc22 100644 --- a/components/apify/actions/run-task/run-task.mjs +++ b/components/apify/actions/run-task/run-task.mjs @@ -1,11 +1,9 @@ import apify from "../../apify.app.mjs"; -import { ACTOR_JOB_STATUSES } from "@apify/consts"; +import { + ACTOR_JOB_STATUSES, ACTOR_JOB_TERMINAL_STATUSES, WEBHOOK_EVENT_TYPES, +} from "@apify/consts"; export default { - key: "apify-run-task-synchronously", - name: "Run Task Synchronously", - description: "Run a specific task and return its dataset items. [See the documentation](https://docs.apify.com/api/v2/actor-task-run-sync-get-dataset-items-get)", - version: "0.0.4", key: "apify-run-task", name: "Run Task", description: "Run a specific task and optionally return its dataset items. [See the documentation](https://docs.apify.com/api/v2/actor-task-run-sync-get-dataset-items-get)", @@ -86,55 +84,199 @@ export default { ], }, }, + async run({ $ }) { - const { - status, - id, - actId, - startedAt, - finishedAt, - options: { build }, - buildId, - defaultKeyValueStoreId, - defaultDatasetId, - defaultRequestQueueId, - consoleUrl, - } = await this.apify.runTaskSynchronously({ - taskId: this.taskId, - params: { - timeout: this.timeout, - memory: this.memory, - build: this.build, - }, - }); + const POLL_INTERVAL_MS = 30_000; // 30s + const POLL_WINDOW_MS = 24 * 60 * 60 * 1000; // 1 day + let input; + + if (this.overrideInput) { + try { + input = JSON.parse(this.overrideInput); + } catch (error) { + throw new Error(`Failed pro parse override Input JSON: ${error.message}`); + } + } + + // Helper: start task + const startTask = async () => { + return this.apify.runTask({ + taskId: this.taskId, + params: { + timeout: this.timeout, + memory: this.memory, + build: this.build, + }, + input, + }); + }; + + // Helper: fetch run + dataset items if succeeded + const fetchOutcome = async (runId) => { + const run = await this.apify.getRun({ + runId, + }); + const status = run.status; + + if (status !== ACTOR_JOB_STATUSES.SUCCEEDED) { + return { + status, + run, + items: [], + }; + } + + let items = []; + if (run.defaultDatasetId) { + const ds = await this.apify.listDatasetItems({ + datasetId: run.defaultDatasetId, + params: { + clean: this.clean, + fields: this.fields, + omit: this.omit, + flatten: this.flatten, + limit: this.limit, + }, + }); + items = ds.items || []; + } + return { + status, + run, + items, + }; + }; + + // Helper: schedule next poll (rerun) with 30s interval and 1-day cap + const schedulePoll = (runId) => { + const startEpoch = + ($.context.run?.context && $.context.run.context.pollStartMs) || + $.context.pollStartMs || + Date.now(); + + // Persist the poll start time across reruns + $.flow.rerun(POLL_INTERVAL_MS, { + apifyRunId: runId, + pollStartMs: startEpoch, + }); + }; + + // 1) ONLY START (no waiting) + if (!this.waitForFinish) { + const started = await startTask(); + $.export( + "$summary", + `Started task ${this.taskId}. Not waiting for completion.`, + ); + return { + runId: started.id, + status: ACTOR_JOB_STATUSES.RUNNING, + }; + } - if (status !== ACTOR_JOB_STATUSES.SUCCEEDED) { - throw new Error(`Run has finished with status: ${status}. Inspect it here: ${consoleUrl}`); + // RERUN CONTEXT (if we scheduled $.flow.rerun previously) + const runCtx = $.context.run || {}; + const rerunContext = runCtx.context || {}; + const isRerun = typeof runCtx.runs === "number" && runCtx.runs > 1; + + // RESUME DATA (if the webhook called the resume_url) + const resumeBody = $.$resume_data && $.$resume_data.body; + + // 3) RERUN/RESUME BEHAVIOR + if (resumeBody || isRerun) { + const runId = + resumeBody?.runId || + rerunContext.apifyRunId || + $.context.apifyRunId; + + if (!runId) { + throw new Error("Missing runId on rerun/resume."); + } + + // Enforce a 1-day cap for polling + const pollStartMs = rerunContext.pollStartMs || $.context.pollStartMs || Date.now(); + const elapsed = Date.now() - pollStartMs; + if (elapsed > POLL_WINDOW_MS) { + throw new Error( + `Polling window exceeded (>${POLL_WINDOW_MS} ms). Task did not finish in time.`, + ); + } + + // Try to fetch an outcome + const { + status, run, items, + } = await fetchOutcome(runId); + + // If finished + if (ACTOR_JOB_TERMINAL_STATUSES.includes(status)) { + // If finished successfully + if (status === ACTOR_JOB_STATUSES.SUCCEEDED) { + $.export( + "$summary", + `Task ${this.taskId} succeeded with ${items.length} items`, + ); + + return { + runId: run.id, + status, + defaultDatasetId: run.defaultDatasetId, + items, + }; + } + + // If finished with an error status + throw new Error( + `Apify run ${runId} finished with status ${status}. See console: ${run?.consoleUrl}`, + ); + } + + // Still running: schedule another poll + schedulePoll(runId); + return; // execution pauses until next rerun } - const { items } = await this.apify.listDatasetItems({ - datasetId: defaultDatasetId, - params: { - clean: this.clean, - fields: this.fields, - omit: this.omit, - flatten: this.flatten, - limit: this.limit, + // 2) START AND INSTALL WEBHOOK (initial execution) + const started = await startTask(); + + $.context.apifyRunId = started.id; + $.context.startTime = Date.now(); + $.context.pollStartMs = Date.now(); // track the start of a polling window + + // Create a resume link and suspend + const { resume_url } = $.flow.suspend(POLL_WINDOW_MS); // 1-day timeout for task run to finish + + // Create a webhook pointing to resume_url + const webhook = await this.apify.createWebhook({ + requestUrl: resume_url, + eventTypes: [ + WEBHOOK_EVENT_TYPES.ACTOR_RUN_SUCCEEDED, + WEBHOOK_EVENT_TYPES.ACTOR_RUN_FAILED, + WEBHOOK_EVENT_TYPES.ACTOR_RUN_ABORTED, + WEBHOOK_EVENT_TYPES.ACTOR_RUN_TIMED_OUT, + ], + condition: { + actorRunId: started.id, }, + payloadTemplate: JSON.stringify({ + runId: "{{resource.id}}", + status: "{{resource.status}}", + defaultDatasetId: "{{resource.defaultDatasetId}}", + startedAt: "{{resource.startedAt}}", + finishedAt: "{{resource.finishedAt}}", + eventType: "{{eventType}}", + }), + headersTemplate: JSON.stringify({ + "Content-Type": "application/json", + }), + shouldInterpolateStrings: true, + description: `Pipedream auto-resume for task ${this.taskId} run ${started.id}`, }); - $.export("$summary", `Run with task id ${this.taskId} finished successfully.`); - return { - runId: id, - actId, - startedAt, - finishedAt, - build, - buildId, - defaultKeyValueStoreId, - defaultDatasetId, - defaultRequestQueueId, - items, - }; + $.context.webhookId = webhook.id; + + // Fallback polling via rerun: every 30s, within a 1-day window + schedulePoll(started.id); + + // Execution suspends at $.flow.suspend; webhook or rerun will resume. }, }; diff --git a/components/apify/apify.app.mjs b/components/apify/apify.app.mjs index 3dd1dea9c761f..249806a1cedc2 100644 --- a/components/apify/apify.app.mjs +++ b/components/apify/apify.app.mjs @@ -163,7 +163,12 @@ export default { return this._client().actor(actorId) .call(input, options); }, - getActorRun({ runId }) { + createWebhook(opts = {}) { + return this._client().webhooks() + .create(opts); + }, + + getRun({ runId }) { return this._client().run(runId) .get(); }, @@ -174,10 +179,10 @@ export default { .start(data, params); }, runTask({ - taskId, params, + taskId, params, input, }) { return this._client().task(taskId) - .start(params); + .start(input, params); }, getActor({ actorId }) { return this._client().actor(actorId) @@ -244,10 +249,10 @@ export default { .listItems(params); }, runTaskSynchronously({ - taskId, params, + taskId, params, input, }) { return this._client().task(taskId) - .call({}, params); + .call(input, params); }, setKeyValueStoreRecord({ storeId, key, value, contentType, From d2beae0f95011b104369309f92a5dda9fdcda672 Mon Sep 17 00:00:00 2001 From: oleksandravalko Date: Thu, 9 Oct 2025 11:41:31 +0200 Subject: [PATCH 4/7] fix(apify-run-task) 11: remove duplicate method --- components/apify/actions/run-task/run-task.mjs | 2 +- components/apify/apify.app.mjs | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/components/apify/actions/run-task/run-task.mjs b/components/apify/actions/run-task/run-task.mjs index 46ea6a8c9fc22..b24a3db6cbd2f 100644 --- a/components/apify/actions/run-task/run-task.mjs +++ b/components/apify/actions/run-task/run-task.mjs @@ -246,7 +246,7 @@ export default { const { resume_url } = $.flow.suspend(POLL_WINDOW_MS); // 1-day timeout for task run to finish // Create a webhook pointing to resume_url - const webhook = await this.apify.createWebhook({ + const webhook = await this.apify.createHook({ requestUrl: resume_url, eventTypes: [ WEBHOOK_EVENT_TYPES.ACTOR_RUN_SUCCEEDED, diff --git a/components/apify/apify.app.mjs b/components/apify/apify.app.mjs index 249806a1cedc2..1715c7af089b2 100644 --- a/components/apify/apify.app.mjs +++ b/components/apify/apify.app.mjs @@ -163,11 +163,6 @@ export default { return this._client().actor(actorId) .call(input, options); }, - createWebhook(opts = {}) { - return this._client().webhooks() - .create(opts); - }, - getRun({ runId }) { return this._client().run(runId) .get(); From fcc66ad928429638f014984a66d20307d8a052d0 Mon Sep 17 00:00:00 2001 From: oleksandravalko Date: Thu, 9 Oct 2025 13:22:28 +0200 Subject: [PATCH 5/7] fix(apify-run-task) 11: delete webhook --- .../apify/actions/run-task/run-task.mjs | 113 +++++------------- 1 file changed, 32 insertions(+), 81 deletions(-) diff --git a/components/apify/actions/run-task/run-task.mjs b/components/apify/actions/run-task/run-task.mjs index b24a3db6cbd2f..aa67ee3b418ff 100644 --- a/components/apify/actions/run-task/run-task.mjs +++ b/components/apify/actions/run-task/run-task.mjs @@ -18,7 +18,6 @@ export default { ], description: "The ID of the task to run", }, - waitForFinish: { type: "boolean", label: "Wait for finish", @@ -26,8 +25,6 @@ export default { "If false, returns immediately after starting the task. If true, waits for task completion (via webhook or polling) and returns dataset items.", default: true, }, - - // Apify run params overrideInput: { type: "string", label: "Override Input", @@ -52,37 +49,6 @@ export default { description: "Specifies the Actor build to run. It can be either a build tag or build number. By default, the run uses the build specified in the task settings (typically latest).", optional: true, }, - // Retrieve dataset output option - clean: { - propDefinition: [ - apify, - "clean", - ], - }, - fields: { - propDefinition: [ - apify, - "fields", - ], - }, - omit: { - propDefinition: [ - apify, - "omit", - ], - }, - flatten: { - propDefinition: [ - apify, - "flatten", - ], - }, - limit: { - propDefinition: [ - apify, - "limit", - ], - }, }, async run({ $ }) { @@ -94,7 +60,7 @@ export default { try { input = JSON.parse(this.overrideInput); } catch (error) { - throw new Error(`Failed pro parse override Input JSON: ${error.message}`); + throw new Error(`Failed to parse override Input JSON: ${error.message}`); } } @@ -111,53 +77,29 @@ export default { }); }; - // Helper: fetch run + dataset items if succeeded - const fetchOutcome = async (runId) => { - const run = await this.apify.getRun({ - runId, - }); - const status = run.status; - - if (status !== ACTOR_JOB_STATUSES.SUCCEEDED) { - return { - status, - run, - items: [], - }; - } + // Helper: delete webhook + const deleteWebhook = async (webhookId) => { + if (!webhookId) return; - let items = []; - if (run.defaultDatasetId) { - const ds = await this.apify.listDatasetItems({ - datasetId: run.defaultDatasetId, - params: { - clean: this.clean, - fields: this.fields, - omit: this.omit, - flatten: this.flatten, - limit: this.limit, - }, - }); - items = ds.items || []; + try { + await this.apify.deleteHook(webhookId); + } catch (webhookError) { + console.warn("Failed to delete webhook (non-critical):", webhookError.message); } - return { - status, - run, - items, - }; }; // Helper: schedule next poll (rerun) with 30s interval and 1-day cap - const schedulePoll = (runId) => { + const schedulePoll = (runId, webhookId) => { const startEpoch = ($.context.run?.context && $.context.run.context.pollStartMs) || $.context.pollStartMs || Date.now(); - // Persist the poll start time across reruns + // Persist the poll start time and webhook ID across reruns $.flow.rerun(POLL_INTERVAL_MS, { apifyRunId: runId, pollStartMs: startEpoch, + webhookId, }); }; @@ -189,6 +131,10 @@ export default { rerunContext.apifyRunId || $.context.apifyRunId; + const webhookId = + rerunContext.webhookId || + $.context.webhookId; + if (!runId) { throw new Error("Missing runId on rerun/resume."); } @@ -197,31 +143,32 @@ export default { const pollStartMs = rerunContext.pollStartMs || $.context.pollStartMs || Date.now(); const elapsed = Date.now() - pollStartMs; if (elapsed > POLL_WINDOW_MS) { + // Clean up webhook before timing out + await deleteWebhook(webhookId); throw new Error( `Polling window exceeded (>${POLL_WINDOW_MS} ms). Task did not finish in time.`, ); } // Try to fetch an outcome - const { - status, run, items, - } = await fetchOutcome(runId); + const run = await this.apify.getRun({ + runId, + }); + const { status } = run; // If finished if (ACTOR_JOB_TERMINAL_STATUSES.includes(status)) { + // Clean up webhook + await deleteWebhook(webhookId); + // If finished successfully if (status === ACTOR_JOB_STATUSES.SUCCEEDED) { $.export( "$summary", - `Task ${this.taskId} succeeded with ${items.length} items`, + `Task ${this.taskId} succeeded.`, ); - return { - runId: run.id, - status, - defaultDatasetId: run.defaultDatasetId, - items, - }; + return run; } // If finished with an error status @@ -231,7 +178,7 @@ export default { } // Still running: schedule another poll - schedulePoll(runId); + schedulePoll(runId, webhookId); return; // execution pauses until next rerun } @@ -272,10 +219,14 @@ export default { description: `Pipedream auto-resume for task ${this.taskId} run ${started.id}`, }); + if (!webhook?.id) { + throw new Error("Failed to create webhook - no ID returned"); + } + $.context.webhookId = webhook.id; // Fallback polling via rerun: every 30s, within a 1-day window - schedulePoll(started.id); + schedulePoll(started.id, webhook.id); // Execution suspends at $.flow.suspend; webhook or rerun will resume. }, From 28fbf094fa1b40822637427dd037c5395d26952b Mon Sep 17 00:00:00 2001 From: oleksandravalko Date: Tue, 14 Oct 2025 13:34:41 +0200 Subject: [PATCH 6/7] fix(apify-run-task) 11: import to the apify_oauth --- components/apify_oauth/actions/run-task/run-task.mjs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/components/apify_oauth/actions/run-task/run-task.mjs b/components/apify_oauth/actions/run-task/run-task.mjs index 6be9e7d5d5519..00169faa8e7cd 100644 --- a/components/apify_oauth/actions/run-task/run-task.mjs +++ b/components/apify_oauth/actions/run-task/run-task.mjs @@ -1,5 +1,5 @@ import app from "../../apify_oauth.app.mjs"; -import common from "@pipedream/apify/actions/run-task-synchronously/run-task-synchronously.mjs"; +import common from "@pipedream/apify/actions/run-task/run-task.mjs"; import { adjustPropDefinitions } from "../../common/utils.mjs"; From 9ba98baea6c779020b6582133b449119d4de9332 Mon Sep 17 00:00:00 2001 From: oleksandravalko Date: Tue, 14 Oct 2025 13:34:56 +0200 Subject: [PATCH 7/7] fix(apify-run-task) 11: version, description --- components/apify/actions/run-task/run-task.mjs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/components/apify/actions/run-task/run-task.mjs b/components/apify/actions/run-task/run-task.mjs index aa67ee3b418ff..46077a4798646 100644 --- a/components/apify/actions/run-task/run-task.mjs +++ b/components/apify/actions/run-task/run-task.mjs @@ -6,8 +6,8 @@ import { export default { key: "apify-run-task", name: "Run Task", - description: "Run a specific task and optionally return its dataset items. [See the documentation](https://docs.apify.com/api/v2/actor-task-run-sync-get-dataset-items-get)", - version: "0.0.5", + description: "Run a specific task and optionally wait for it's termination.", + version: "0.0.1", type: "action", props: { apify,