diff --git a/components/apify/actions/run-task-synchronously/run-task-synchronously.mjs b/components/apify/actions/run-task-synchronously/run-task-synchronously.mjs deleted file mode 100644 index 776620f885c1b..0000000000000 --- a/components/apify/actions/run-task-synchronously/run-task-synchronously.mjs +++ /dev/null @@ -1,120 +0,0 @@ -import apify from "../../apify.app.mjs"; -import { ACTOR_JOB_STATUSES } from "@apify/consts"; - -export default { - key: "apify-run-task-synchronously", - name: "Run Task Synchronously", - description: "Run a specific task and return its dataset items. [See the documentation](https://docs.apify.com/api/v2/actor-task-run-sync-get-dataset-items-get)", - version: "0.0.4", - type: "action", - props: { - apify, - taskId: { - propDefinition: [ - apify, - "taskId", - ], - description: "The ID of the task to run", - }, - timeout: { - type: "integer", - label: "Timeout", - description: "Optional timeout for the run, in seconds. By default, the run uses a timeout specified in the task settings.", - optional: true, - }, - memory: { - type: "integer", - label: "Memory", - description: "Memory limit for the run, in megabytes. The amount of memory can be set to a power of 2 with a minimum of 128. By default, the run uses a memory limit specified in the task settings.", - optional: true, - }, - build: { - type: "string", - label: "Build", - description: "Specifies the Actor build to run. It can be either a build tag or build number. By default, the run uses the build specified in the task settings (typically latest).", - optional: true, - }, - // Retrieve dataset output option - clean: { - propDefinition: [ - apify, - "clean", - ], - }, - fields: { - propDefinition: [ - apify, - "fields", - ], - }, - omit: { - propDefinition: [ - apify, - "omit", - ], - }, - flatten: { - propDefinition: [ - apify, - "flatten", - ], - }, - limit: { - propDefinition: [ - apify, - "limit", - ], - }, - }, - async run({ $ }) { - const { - status, - id, - actId, - startedAt, - finishedAt, - options: { build }, - buildId, - defaultKeyValueStoreId, - defaultDatasetId, - defaultRequestQueueId, - consoleUrl, - } = await this.apify.runTaskSynchronously({ - taskId: this.taskId, - params: { - timeout: this.timeout, - memory: this.memory, - build: this.build, - }, - }); - - if (status !== ACTOR_JOB_STATUSES.SUCCEEDED) { - throw new Error(`Run has finished with status: ${status}. Inspect it here: ${consoleUrl}`); - } - - const { items } = await this.apify.listDatasetItems({ - datasetId: defaultDatasetId, - params: { - clean: this.clean, - fields: this.fields, - omit: this.omit, - flatten: this.flatten, - limit: this.limit, - }, - }); - - $.export("$summary", `Run with task id ${this.taskId} finished successfully.`); - return { - runId: id, - actId, - startedAt, - finishedAt, - build, - buildId, - defaultKeyValueStoreId, - defaultDatasetId, - defaultRequestQueueId, - items, - }; - }, -}; diff --git a/components/apify/actions/run-task/run-task.mjs b/components/apify/actions/run-task/run-task.mjs new file mode 100644 index 0000000000000..46077a4798646 --- /dev/null +++ b/components/apify/actions/run-task/run-task.mjs @@ -0,0 +1,233 @@ +import apify from "../../apify.app.mjs"; +import { + ACTOR_JOB_STATUSES, ACTOR_JOB_TERMINAL_STATUSES, WEBHOOK_EVENT_TYPES, +} from "@apify/consts"; + +export default { + key: "apify-run-task", + name: "Run Task", + description: "Run a specific task and optionally wait for it's termination.", + version: "0.0.1", + type: "action", + props: { + apify, + taskId: { + propDefinition: [ + apify, + "taskId", + ], + description: "The ID of the task to run", + }, + waitForFinish: { + type: "boolean", + label: "Wait for finish", + description: + "If false, returns immediately after starting the task. If true, waits for task completion (via webhook or polling) and returns dataset items.", + default: true, + }, + overrideInput: { + type: "string", + label: "Override Input", + description: "Optional JSON string to override the default input for the task run. Must be valid JSON.", + optional: true, + }, + timeout: { + type: "integer", + label: "Timeout", + description: "Optional timeout for the run, in seconds. By default, the run uses a timeout specified in the task settings.", + optional: true, + }, + memory: { + type: "integer", + label: "Memory", + description: "Memory limit for the run, in megabytes. The amount of memory can be set to a power of 2 with a minimum of 128. By default, the run uses a memory limit specified in the task settings.", + optional: true, + }, + build: { + type: "string", + label: "Build", + description: "Specifies the Actor build to run. It can be either a build tag or build number. By default, the run uses the build specified in the task settings (typically latest).", + optional: true, + }, + }, + + async run({ $ }) { + const POLL_INTERVAL_MS = 30_000; // 30s + const POLL_WINDOW_MS = 24 * 60 * 60 * 1000; // 1 day + let input; + + if (this.overrideInput) { + try { + input = JSON.parse(this.overrideInput); + } catch (error) { + throw new Error(`Failed to parse override Input JSON: ${error.message}`); + } + } + + // Helper: start task + const startTask = async () => { + return this.apify.runTask({ + taskId: this.taskId, + params: { + timeout: this.timeout, + memory: this.memory, + build: this.build, + }, + input, + }); + }; + + // Helper: delete webhook + const deleteWebhook = async (webhookId) => { + if (!webhookId) return; + + try { + await this.apify.deleteHook(webhookId); + } catch (webhookError) { + console.warn("Failed to delete webhook (non-critical):", webhookError.message); + } + }; + + // Helper: schedule next poll (rerun) with 30s interval and 1-day cap + const schedulePoll = (runId, webhookId) => { + const startEpoch = + ($.context.run?.context && $.context.run.context.pollStartMs) || + $.context.pollStartMs || + Date.now(); + + // Persist the poll start time and webhook ID across reruns + $.flow.rerun(POLL_INTERVAL_MS, { + apifyRunId: runId, + pollStartMs: startEpoch, + webhookId, + }); + }; + + // 1) ONLY START (no waiting) + if (!this.waitForFinish) { + const started = await startTask(); + $.export( + "$summary", + `Started task ${this.taskId}. Not waiting for completion.`, + ); + return { + runId: started.id, + status: ACTOR_JOB_STATUSES.RUNNING, + }; + } + + // RERUN CONTEXT (if we scheduled $.flow.rerun previously) + const runCtx = $.context.run || {}; + const rerunContext = runCtx.context || {}; + const isRerun = typeof runCtx.runs === "number" && runCtx.runs > 1; + + // RESUME DATA (if the webhook called the resume_url) + const resumeBody = $.$resume_data && $.$resume_data.body; + + // 3) RERUN/RESUME BEHAVIOR + if (resumeBody || isRerun) { + const runId = + resumeBody?.runId || + rerunContext.apifyRunId || + $.context.apifyRunId; + + const webhookId = + rerunContext.webhookId || + $.context.webhookId; + + if (!runId) { + throw new Error("Missing runId on rerun/resume."); + } + + // Enforce a 1-day cap for polling + const pollStartMs = rerunContext.pollStartMs || $.context.pollStartMs || Date.now(); + const elapsed = Date.now() - pollStartMs; + if (elapsed > POLL_WINDOW_MS) { + // Clean up webhook before timing out + await deleteWebhook(webhookId); + throw new Error( + `Polling window exceeded (>${POLL_WINDOW_MS} ms). Task did not finish in time.`, + ); + } + + // Try to fetch an outcome + const run = await this.apify.getRun({ + runId, + }); + const { status } = run; + + // If finished + if (ACTOR_JOB_TERMINAL_STATUSES.includes(status)) { + // Clean up webhook + await deleteWebhook(webhookId); + + // If finished successfully + if (status === ACTOR_JOB_STATUSES.SUCCEEDED) { + $.export( + "$summary", + `Task ${this.taskId} succeeded.`, + ); + + return run; + } + + // If finished with an error status + throw new Error( + `Apify run ${runId} finished with status ${status}. See console: ${run?.consoleUrl}`, + ); + } + + // Still running: schedule another poll + schedulePoll(runId, webhookId); + return; // execution pauses until next rerun + } + + // 2) START AND INSTALL WEBHOOK (initial execution) + const started = await startTask(); + + $.context.apifyRunId = started.id; + $.context.startTime = Date.now(); + $.context.pollStartMs = Date.now(); // track the start of a polling window + + // Create a resume link and suspend + const { resume_url } = $.flow.suspend(POLL_WINDOW_MS); // 1-day timeout for task run to finish + + // Create a webhook pointing to resume_url + const webhook = await this.apify.createHook({ + requestUrl: resume_url, + eventTypes: [ + WEBHOOK_EVENT_TYPES.ACTOR_RUN_SUCCEEDED, + WEBHOOK_EVENT_TYPES.ACTOR_RUN_FAILED, + WEBHOOK_EVENT_TYPES.ACTOR_RUN_ABORTED, + WEBHOOK_EVENT_TYPES.ACTOR_RUN_TIMED_OUT, + ], + condition: { + actorRunId: started.id, + }, + payloadTemplate: JSON.stringify({ + runId: "{{resource.id}}", + status: "{{resource.status}}", + defaultDatasetId: "{{resource.defaultDatasetId}}", + startedAt: "{{resource.startedAt}}", + finishedAt: "{{resource.finishedAt}}", + eventType: "{{eventType}}", + }), + headersTemplate: JSON.stringify({ + "Content-Type": "application/json", + }), + shouldInterpolateStrings: true, + description: `Pipedream auto-resume for task ${this.taskId} run ${started.id}`, + }); + + if (!webhook?.id) { + throw new Error("Failed to create webhook - no ID returned"); + } + + $.context.webhookId = webhook.id; + + // Fallback polling via rerun: every 30s, within a 1-day window + schedulePoll(started.id, webhook.id); + + // Execution suspends at $.flow.suspend; webhook or rerun will resume. + }, +}; diff --git a/components/apify/apify.app.mjs b/components/apify/apify.app.mjs index 3dd1dea9c761f..1715c7af089b2 100644 --- a/components/apify/apify.app.mjs +++ b/components/apify/apify.app.mjs @@ -163,7 +163,7 @@ export default { return this._client().actor(actorId) .call(input, options); }, - getActorRun({ runId }) { + getRun({ runId }) { return this._client().run(runId) .get(); }, @@ -174,10 +174,10 @@ export default { .start(data, params); }, runTask({ - taskId, params, + taskId, params, input, }) { return this._client().task(taskId) - .start(params); + .start(input, params); }, getActor({ actorId }) { return this._client().actor(actorId) @@ -244,10 +244,10 @@ export default { .listItems(params); }, runTaskSynchronously({ - taskId, params, + taskId, params, input, }) { return this._client().task(taskId) - .call({}, params); + .call(input, params); }, setKeyValueStoreRecord({ storeId, key, value, contentType, diff --git a/components/apify_oauth/actions/run-task-synchronously/run-task-synchronously.mjs b/components/apify_oauth/actions/run-task/run-task.mjs similarity index 71% rename from components/apify_oauth/actions/run-task-synchronously/run-task-synchronously.mjs rename to components/apify_oauth/actions/run-task/run-task.mjs index bc1148e71b47f..00169faa8e7cd 100644 --- a/components/apify_oauth/actions/run-task-synchronously/run-task-synchronously.mjs +++ b/components/apify_oauth/actions/run-task/run-task.mjs @@ -1,5 +1,5 @@ import app from "../../apify_oauth.app.mjs"; -import common from "@pipedream/apify/actions/run-task-synchronously/run-task-synchronously.mjs"; +import common from "@pipedream/apify/actions/run-task/run-task.mjs"; import { adjustPropDefinitions } from "../../common/utils.mjs"; @@ -10,7 +10,7 @@ const props = adjustPropDefinitions(others.props, app); export default { ...others, - key: "apify_oauth-run-task-synchronously", + key: "apify_oauth-run-task", version: "0.0.1", name, description,