Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

send webhook when task or workflow run is canceled #1374

Merged
merged 1 commit into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions skyvern/forge/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,7 @@ async def execute_step(
task=task,
last_step=step,
api_key=api_key,
need_call_webhook=False,
need_call_webhook=True,
)
return step, None, None

Expand Down Expand Up @@ -1544,7 +1544,7 @@ async def clean_up_task(
async def execute_task_webhook(
self,
task: Task,
last_step: Step,
last_step: Step | None,
api_key: str | None,
) -> None:
if not api_key:
Expand Down
15 changes: 14 additions & 1 deletion skyvern/forge/sdk/routes/agent_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ async def get_task(
async def cancel_task(
task_id: str,
current_org: Organization = Depends(org_auth_service.get_current_org),
x_api_key: Annotated[str | None, Header()] = None,
) -> None:
analytics.capture("skyvern-oss-agent-task-get")
task_obj = await app.DATABASE.get_task(task_id, organization_id=current_org.organization_id)
Expand All @@ -292,16 +293,28 @@ async def cancel_task(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Task not found {task_id}",
)
await app.agent.update_task(task_obj, status=TaskStatus.canceled)
task = await app.agent.update_task(task_obj, status=TaskStatus.canceled)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just only send the webhook when task/workflow cancellation is finished? any specific reason why we must send the webhook in the API call?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if user call multiple times to the cancle endpoint. he will receive multiple times the webhook without any task/workflow being cancelled 🤔

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

users can also cancel tasks that are stuck. our worker won't really send a webhook if we don't send a webhook to them in the API endpoint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's okay that we send two cancel task webhook.

# get latest step
latest_step = await app.DATABASE.get_latest_step(task_id, organization_id=current_org.organization_id)
# retry the webhook
await app.agent.execute_task_webhook(task=task, last_step=latest_step, api_key=x_api_key)


@base_router.post("/workflows/runs/{workflow_run_id}/cancel")
@base_router.post("/workflows/runs/{workflow_run_id}/cancel/", include_in_schema=False)
async def cancel_workflow_run(
workflow_run_id: str,
current_org: Organization = Depends(org_auth_service.get_current_org),
x_api_key: Annotated[str | None, Header()] = None,
) -> None:
workflow_run = await app.DATABASE.get_workflow_run(workflow_run_id=workflow_run_id)
if not workflow_run:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow run not found {workflow_run_id}",
)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id)
await app.WORKFLOW_SERVICE.execute_workflow_webhook(workflow_run, api_key=x_api_key)
Comment on lines +310 to +317
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question as task



@base_router.post(
Expand Down
26 changes: 17 additions & 9 deletions skyvern/forge/sdk/workflow/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ async def execute_workflow(
workflow=workflow,
workflow_run=workflow_run,
api_key=api_key,
need_call_webhook=False,
need_call_webhook=True,
)
return workflow_run
parameters = block.get_all_parameters(workflow_run_id)
Expand Down Expand Up @@ -881,10 +881,18 @@ async def clean_up_workflow(
if not need_call_webhook:
return

await self.execute_workflow_webhook(workflow_run, api_key)

async def execute_workflow_webhook(
self,
workflow_run: WorkflowRun,
api_key: str | None = None,
) -> None:
workflow_id = workflow_run.workflow_id
workflow_run_status_response = await self.build_workflow_run_status_response(
workflow_permanent_id=workflow.workflow_permanent_id,
workflow_permanent_id=workflow_run.workflow_permanent_id,
workflow_run_id=workflow_run.workflow_run_id,
organization_id=workflow.organization_id,
organization_id=workflow_run.organization_id,
)
LOG.info(
"Built workflow run status response",
Expand All @@ -894,15 +902,15 @@ async def clean_up_workflow(
if not workflow_run.webhook_callback_url:
LOG.warning(
"Workflow has no webhook callback url. Not sending workflow response",
workflow_id=workflow.workflow_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
)
return

if not api_key:
LOG.warning(
"Request has no api key. Not sending workflow response",
workflow_id=workflow.workflow_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
)
return
Expand All @@ -921,7 +929,7 @@ async def clean_up_workflow(
}
LOG.info(
"Sending webhook run status to webhook callback url",
workflow_id=workflow.workflow_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
webhook_callback_url=workflow_run.webhook_callback_url,
payload=payload,
Expand All @@ -934,15 +942,15 @@ async def clean_up_workflow(
if resp.status_code == 200:
LOG.info(
"Webhook sent successfully",
workflow_id=workflow.workflow_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
resp_code=resp.status_code,
resp_text=resp.text,
)
else:
LOG.info(
"Webhook failed",
workflow_id=workflow.workflow_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
webhook_data=payload,
resp=resp,
Expand All @@ -951,7 +959,7 @@ async def clean_up_workflow(
)
except Exception as e:
raise FailedToSendWebhook(
workflow_id=workflow.workflow_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run.workflow_run_id,
) from e

Expand Down
Loading