diff --git a/skyvern/exceptions.py b/skyvern/exceptions.py index 551bc4cc1..da7b1eaed 100644 --- a/skyvern/exceptions.py +++ b/skyvern/exceptions.py @@ -552,3 +552,8 @@ def __init__(self, element_id: str): super().__init__( f"Select on the dropdown container instead of the option, try again with another element. element_id={element_id}" ) + + +class UrlGenerationFailure(SkyvernHTTPException): + def __init__(self) -> None: + super().__init__("Failed to generate the url for the prompt") diff --git a/skyvern/forge/prompts/skyvern/observer.j2 b/skyvern/forge/prompts/skyvern/observer.j2 new file mode 100644 index 000000000..4a44bba40 --- /dev/null +++ b/skyvern/forge/prompts/skyvern/observer.j2 @@ -0,0 +1,43 @@ +You're to assist the user to achieve the user goal in the web, given the DOM elements in the list, the screenshots of the website and the task history list. Plan the next task the use needs to do towards the goal. + +You have access to the following task types to take actions: +- navigate: this task can be used to set up a mini goal to achieve in the web which most likely results in navigating the web, like filling a form in the page, clicking a buton in the page to open or navigate to another page, interacting with some elements in the page like clicking, typing and selecting options, and so on. +- extract: extract information users would like to output from the page. When planning such a task, you should not expect anything related to navigation mentioned in the "navigate" task. The website will remain still and the only action here is to extract information from it. +- loop: this task can be used to generate a list of planning sessions like this. When to use a loop task? Use loop when there are multiple parallel tasks you can do with the same goal. Each task in the loop has the same goal but with different objects/values/targets/variables. Use loop task when it's in a "breadth first search" situation where you can go through a list of values and execute the same task for each value. Examples: + - When the goal is "Open up to 10 links from an ecomm search result page, and extract information like the price of each product.", loop task should be used to iterate through a list of links or URLs. In each iteration of the loop, the task will go to the linked page and trigger another planning session with the goal of extrating price information of the product + - When the goal is "download 5 documents found on a page", loop task should be used to iterate through a list of document names. Each document will trigger another planning session to download the relative document + +MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. + +Reply in JSON format with the following keys: +{ + "page_info": str, // Think step by step. Describe all the useful information in the page related to the user goal. + "thoughts": str, // Think step by step. What has been done so far and what is the next reasonable mini goal a human can do foreseeably move towards the overall goal. + "user_goal_achieved": bool, // True if the user goal has been completed, false otherwise. If the user is searching for something, looking for information or specifically trying to extract information along side the goal, make sure at least an extract task has happened in the task history before claiming that the goal is achieved. + "plan": str, // The mini goal to achieve to move towards the goal. Include all the necessary and required user information provided by the user goal to achieve the plan. DO NOT come up or hallucinate any data that's not provided in the user goal. Be accurate and precise. Return null if the user goal has been achieved. + "task_type": str, // One of the available task types: navigate, extract, loop + "loop_values": list[str], // a list of string values to iterate through for loop task. null if it's not a loop task + "is_loop_value_link": bool, // true if the loop_values is a list of urls to go to before for each planning session inside the loop +} + +The URL of the page you're on right now is `{{ current_url }}`. + +Clickable elements from the page: +``` +{{ elements }} +``` + +User goal: +``` +{{ user_goal }} +``` + +Task history (the earliest task is the first in the list and the latest is the last in the list): +``` +{{ task_history }} +``` + +Current datetime, ISO format: +``` +{{ local_datetime }} +``` diff --git a/skyvern/forge/prompts/skyvern/observer_check_completion.j2 b/skyvern/forge/prompts/skyvern/observer_check_completion.j2 new file mode 100644 index 000000000..aa2df165c --- /dev/null +++ b/skyvern/forge/prompts/skyvern/observer_check_completion.j2 @@ -0,0 +1,22 @@ +You're to assist the user to achieve the user goal in the web, given the user goal and the mini tasks that have been completed by the user along the way. + +Reply in JSON format with the following keys: +{ + "thoughts": str, // Think step by step. Would completing the tasks in the task history be good enough to achieve the user goal? If more tasks need to be completed to achieve the goal, what would be the next task? + "user_goal_achieved": bool, // True if the user goal has been completed, false otherwise. If the user wants to extract information along side the goal, make sure the extract task has happened before claiming that the goal is achieved. +} + +User goal: +``` +{{ user_goal }} +``` + +Task history (the earliest task is the first in the list and the latest is the last in the list): +``` +{{ task_history }} +``` + +Current datetime, ISO format: +``` +{{ local_datetime }} +``` diff --git a/skyvern/forge/prompts/skyvern/observer_generate_extraction_task.j2 b/skyvern/forge/prompts/skyvern/observer_generate_extraction_task.j2 new file mode 100644 index 000000000..1f8b40a3e --- /dev/null +++ b/skyvern/forge/prompts/skyvern/observer_generate_extraction_task.j2 @@ -0,0 +1,25 @@ +You're to assist the user to extract data from the web. Given the data extraction goal, DOM elements in the list and the screenshots of the website, design the schema for the data output. + +MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. + +Reply in JSON format with the following keys: +{ + "schema": JSON, // the schema of the output data to extract. Use JSON Schema specification style. +} + +The URL of the page you're on right now is `{{ current_url }}`. + +Clickable elements from the page: +``` +{{ elements }} +``` + +Data extraction goal: +``` +{{ data_extraction_goal }} +``` + +Current datetime, ISO format: +``` +{{ local_datetime }} +``` diff --git a/skyvern/forge/prompts/skyvern/observer_generate_loop_task.j2 b/skyvern/forge/prompts/skyvern/observer_generate_loop_task.j2 new file mode 100644 index 000000000..3b0ebee22 --- /dev/null +++ b/skyvern/forge/prompts/skyvern/observer_generate_loop_task.j2 @@ -0,0 +1,26 @@ +You're to assist the user to achieve goals on the web, given the DOM elements in the list, the screenshots of the website. Now the user needs to iterate throught a list of values. For each value, the user has the same goal and plan to complete a task. + +MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. + +Reply in JSON format with the following keys: +{ + "thoughts": str, // Think step by step. What needs to been done for each value in the list. + "task": str, // Describe the task complete for each value in one sentence. Be concise about what you need to do and include all the user data required to complete the task. +} + +The URL of the page you're on right now is `{{ current_url }}`. + +Clickable elements from the page: +``` +{{ elements }} +``` + +User's overall plan right now: +``` +{{ plan }} +``` + +Current datetime, ISO format: +``` +{{ local_datetime }} +``` diff --git a/skyvern/forge/prompts/skyvern/observer_generate_metadata.j2 b/skyvern/forge/prompts/skyvern/observer_generate_metadata.j2 new file mode 100644 index 000000000..b23f58fbf --- /dev/null +++ b/skyvern/forge/prompts/skyvern/observer_generate_metadata.j2 @@ -0,0 +1,20 @@ +You're to assist the user to achieve the user goal in the browser. Given the user input, what is the url to type into the browser? Also come up with a proper title for this goal to achieve. + +MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. + +Reply in JSON format with the following keys: +{ + "thoughts": str, // Think step by step. If the user specifically mentioned which website it is to use, let's use that url. If the user doesn't specifify a specific website, to achieve what the user wants to do, what is most likely website url for achieving the goal? + "url": str, // The url to type into the browser. DO NOT change the url if user specified one. + "title": str, // A descriptive and informative title for the goal. Use no more than 5 words +} + +User goal: +``` +{{ user_goal }} +```{% if user_url %} + +Starting url provided by user: +``` +{{ user_url }} +```{% endif %} diff --git a/skyvern/forge/prompts/skyvern/observer_generate_task_block.j2 b/skyvern/forge/prompts/skyvern/observer_generate_task_block.j2 new file mode 100644 index 000000000..d72133803 --- /dev/null +++ b/skyvern/forge/prompts/skyvern/observer_generate_task_block.j2 @@ -0,0 +1,26 @@ +You're to assist the user to achieve the user goal in the web, given the DOM elements in the list, the screenshots of the website and the user plan. Help user generate the navigation goal and data extraction goal if any. + +User is planning to {% if is_link %}go to a list of links{% else %}go through a list of values{% endif %} and and take the same task with each {% if is_link %}link{% else %}value{% endif %} in the list. Here's the specific plan: +``` +{{ plan }} +``` + +{% if is_link %}Links{% else %}Values{% endif %} the user will go through: +``` +{{ loop_values }} +``` + +MAKE SURE YOU OUTPUT VALID JSON. No text before or after JSON, no trailing commas, no comments (//), no unnecessary quotes, etc. + +Reply in JSON format with the following keys: +{ + "thoughts": str, // Think step by step. What would the use do to achieve the goal. + "navigation_goal": str, // What kind things the user needs to do in the web achieve the plan and finally get the data to extract. Include all the data needed to complete the goal here.{% if is_link %} The user already has the link to go to the target page first, in order to start executing the plan. So state the navigation goal from the perspective of already being on the target page. What does the user need to do from there?{% endif %} + "data_extraction_goal": str, // If the user needs to extract/retrieve data from the site after navigation goal is achieved, define that extraction goal here. null if no data needs to be extracted. + "data_schema": json, // the schema of the output data. use JSON schema specification style. All fields should be optional as we optimize to extract as much data as possible +} + +Current datetime, ISO format: +``` +{{ local_datetime }} +``` diff --git a/skyvern/forge/prompts/skyvern/observer_loop_task_extraction_goal.j2 b/skyvern/forge/prompts/skyvern/observer_loop_task_extraction_goal.j2 new file mode 100644 index 000000000..a537eb7f6 --- /dev/null +++ b/skyvern/forge/prompts/skyvern/observer_loop_task_extraction_goal.j2 @@ -0,0 +1,6 @@ +The user is trying to achieve a goal the web. Now they've decided to go through a list of values and take the same tasks with each variant in the list. + +Help to user extract this list of values based on what they want to achieve: +``` +{{ plan }} +``` diff --git a/skyvern/forge/sdk/executor/async_executor.py b/skyvern/forge/sdk/executor/async_executor.py index 84b51af8d..6f37282bb 100644 --- a/skyvern/forge/sdk/executor/async_executor.py +++ b/skyvern/forge/sdk/executor/async_executor.py @@ -7,7 +7,10 @@ from skyvern.forge import app from skyvern.forge.sdk.core import skyvern_context from skyvern.forge.sdk.core.skyvern_context import SkyvernContext +from skyvern.forge.sdk.schemas.observers import ObserverCruiseStatus from skyvern.forge.sdk.schemas.tasks import TaskStatus +from skyvern.forge.sdk.services import observer_service +from skyvern.forge.sdk.workflow.models.workflow import WorkflowRunStatus LOG = structlog.get_logger() @@ -40,12 +43,24 @@ async def execute_workflow( ) -> None: pass + @abc.abstractmethod + async def execute_cruise( + self, + request: Request | None, + background_tasks: BackgroundTasks | None, + organization_id: str, + observer_cruise_id: str, + max_iterations_override: int | None, + **kwargs: dict, + ) -> None: + pass + class BackgroundTaskExecutor(AsyncExecutor): async def execute_task( self, request: Request | None, - background_tasks: BackgroundTasks, + background_tasks: BackgroundTasks | None, task_id: str, organization_id: str, max_steps_override: int | None, @@ -76,18 +91,19 @@ async def execute_task( context.organization_id = organization_id context.max_steps_override = max_steps_override - background_tasks.add_task( - app.agent.execute_step, - organization, - task, - step, - api_key, - ) + if background_tasks: + background_tasks.add_task( + app.agent.execute_step, + organization, + task, + step, + api_key, + ) async def execute_workflow( self, request: Request | None, - background_tasks: BackgroundTasks, + background_tasks: BackgroundTasks | None, organization_id: str, workflow_id: str, workflow_run_id: str, @@ -104,9 +120,53 @@ async def execute_workflow( if organization is None: raise OrganizationNotFound(organization_id) - background_tasks.add_task( - app.WORKFLOW_SERVICE.execute_workflow, - workflow_run_id=workflow_run_id, - api_key=api_key, - organization=organization, + if background_tasks: + background_tasks.add_task( + app.WORKFLOW_SERVICE.execute_workflow, + workflow_run_id=workflow_run_id, + api_key=api_key, + organization=organization, + ) + + async def execute_cruise( + self, + request: Request | None, + background_tasks: BackgroundTasks | None, + organization_id: str, + observer_cruise_id: str, + max_iterations_override: int | None, + **kwargs: dict, + ) -> None: + LOG.info( + "Executing cruise using background task executor", + observer_cruise_id=observer_cruise_id, ) + + organization = await app.DATABASE.get_organization(organization_id) + if organization is None: + raise OrganizationNotFound(organization_id) + + observer_cruise = await app.DATABASE.get_observer_cruise( + observer_cruise_id=observer_cruise_id, organization_id=organization_id + ) + if not observer_cruise or not observer_cruise.workflow_run_id: + raise ValueError("No observer cruise or no workflow run associated with observer cruise") + + # mark observer cruise as queued + await app.DATABASE.update_observer_cruise( + observer_cruise_id, + status=ObserverCruiseStatus.queued, + organization_id=organization_id, + ) + await app.DATABASE.update_workflow_run( + workflow_run_id=observer_cruise.workflow_run_id, + status=WorkflowRunStatus.queued, + ) + + if background_tasks: + background_tasks.add_task( + observer_service.run_observer_cruise, + organization=organization, + observer_cruise_id=observer_cruise_id, + max_iterations_override=max_iterations_override, + ) diff --git a/skyvern/forge/sdk/routes/agent_protocol.py b/skyvern/forge/sdk/routes/agent_protocol.py index 3f58cc995..0d42b16d5 100644 --- a/skyvern/forge/sdk/routes/agent_protocol.py +++ b/skyvern/forge/sdk/routes/agent_protocol.py @@ -36,6 +36,7 @@ from skyvern.forge.sdk.db.enums import OrganizationAuthTokenType, TaskType from skyvern.forge.sdk.executor.factory import AsyncExecutorFactory from skyvern.forge.sdk.models import Step +from skyvern.forge.sdk.schemas.observers import CruiseRequest, ObserverCruise from skyvern.forge.sdk.schemas.organizations import ( GetOrganizationAPIKeysResponse, GetOrganizationsResponse, @@ -53,7 +54,7 @@ TaskStatus, ) from skyvern.forge.sdk.schemas.workflow_runs import WorkflowRunBlock, WorkflowRunEvent, WorkflowRunEventType -from skyvern.forge.sdk.services import org_auth_service +from skyvern.forge.sdk.services import observer_service, org_auth_service from skyvern.forge.sdk.workflow.exceptions import ( FailedToCreateWorkflow, FailedToUpdateWorkflow, @@ -1117,3 +1118,31 @@ async def upload_file( status_code=200, media_type="application/json", ) + + +@base_router.post("/cruise") +@base_router.post("/cruise/", include_in_schema=False) +async def observer_cruise( + request: Request, + background_tasks: BackgroundTasks, + data: CruiseRequest, + organization: Organization = Depends(org_auth_service.get_current_org), + x_max_iterations_override: Annotated[int | None, Header()] = None, +) -> ObserverCruise: + if x_max_iterations_override: + LOG.info("Overriding max iterations for observer", max_iterations_override=x_max_iterations_override) + + observer_cruise = await observer_service.initialize_observer_cruise( + organization=organization, + user_prompt=data.user_prompt, + user_url=str(data.url) if data.url else None, + ) + analytics.capture("skyvern-oss-agent-observer-cruise", data={"url": observer_cruise.url}) + await AsyncExecutorFactory.get_executor().execute_cruise( + request=request, + background_tasks=background_tasks, + organization_id=organization.organization_id, + observer_cruise_id=observer_cruise.observer_cruise_id, + max_iterations_override=x_max_iterations_override, + ) + return observer_cruise diff --git a/skyvern/forge/sdk/schemas/observers.py b/skyvern/forge/sdk/schemas/observers.py index 165952e8a..8fc2bf7f6 100644 --- a/skyvern/forge/sdk/schemas/observers.py +++ b/skyvern/forge/sdk/schemas/observers.py @@ -1,7 +1,11 @@ from datetime import datetime from enum import StrEnum -from pydantic import BaseModel, ConfigDict, HttpUrl +from pydantic import BaseModel, ConfigDict, HttpUrl, field_validator + +from skyvern.forge.sdk.core.validators import validate_url + +DEFAULT_WORKFLOW_TITLE = "New Workflow" class ObserverCruiseStatus(StrEnum): @@ -48,3 +52,20 @@ class ObserverThought(BaseModel): created_at: datetime modified_at: datetime + + +class ObserverMetadata(BaseModel): + url: str + workflow_title: str = DEFAULT_WORKFLOW_TITLE + + @field_validator("url") + @classmethod + def validate_urls(cls, v: str | None) -> str | None: + if v is None: + return None + return validate_url(v) + + +class CruiseRequest(BaseModel): + user_prompt: str + url: HttpUrl | None = None diff --git a/skyvern/forge/sdk/services/observer_service.py b/skyvern/forge/sdk/services/observer_service.py new file mode 100644 index 000000000..fcfffb11a --- /dev/null +++ b/skyvern/forge/sdk/services/observer_service.py @@ -0,0 +1,772 @@ +import os +import random +import string +from datetime import datetime +from typing import Any + +import structlog +from pydantic import BaseModel + +from skyvern.exceptions import UrlGenerationFailure +from skyvern.forge import app +from skyvern.forge.prompts import prompt_engine +from skyvern.forge.sdk.core import skyvern_context +from skyvern.forge.sdk.core.skyvern_context import SkyvernContext +from skyvern.forge.sdk.schemas.observers import ObserverCruise, ObserverCruiseStatus, ObserverMetadata +from skyvern.forge.sdk.schemas.organizations import Organization +from skyvern.forge.sdk.schemas.tasks import ProxyLocation +from skyvern.forge.sdk.workflow.models.block import ( + BlockResult, + BlockStatus, + BlockTypeVar, + ExtractionBlock, + ForLoopBlock, + NavigationBlock, + TaskBlock, +) +from skyvern.forge.sdk.workflow.models.parameter import PARAMETER_TYPE, ContextParameter +from skyvern.forge.sdk.workflow.models.workflow import Workflow, WorkflowRequestBody, WorkflowRun, WorkflowRunStatus +from skyvern.forge.sdk.workflow.models.yaml import ( + BLOCK_YAML_TYPES, + PARAMETER_YAML_TYPES, + ContextParameterYAML, + ExtractionBlockYAML, + ForLoopBlockYAML, + NavigationBlockYAML, + TaskBlockYAML, + WorkflowCreateYAMLRequest, + WorkflowDefinitionYAML, +) +from skyvern.webeye.browser_factory import BrowserState +from skyvern.webeye.scraper.scraper import ElementTreeFormat, ScrapedPage, scrape_website +from skyvern.webeye.utils.page import SkyvernFrame + +LOG = structlog.get_logger() +DEFAULT_WORKFLOW_TITLE = "New Workflow" +RANDOM_STRING_POOL = string.ascii_letters + string.digits +DEFAULT_MAX_ITERATIONS = 10 + +DATA_EXTRACTION_SCHEMA_FOR_LOOP = { + "type": "object", + "properties": { + "loop_values": { + "type": "array", + "description": 'User will later iterate through this array of values to achieve their "big goal" in the web. In each iteration, the user will try to take the same actions in the web but with a different value of its own. If the value is a url link, make sure it is a full url with http/https protocol, domain and path if any, based on the current url. For examples: \n1. When the goal is "Open up to 10 links from an ecomm search result page, and extract information like the price of each product.", user will iterate through an array of product links or URLs. In each iteration, the user will go to the linked page and extrat price information of the product. As a result, the array consists of 10 product urls scraped from the search result page.\n2. When the goal is "download 10 documents found on a page", user will iterate through an array of document names. In each iteration, the user will use a different value variant to start from the same page (the existing page) and take actions based on the variant. As a result, the array consists of up to 10 document names scraped from the page that the user wants to download.', + "items": {"type": "string", "description": "The relevant value"}, + }, + "is_loop_value_link": { + "type": "boolean", + "description": "true if the loop_values is an array of urls to be visited for each task. false if the loop_values is an array of non-link values to be used in each task (for each task they start from the same page / link).", + }, + }, +} + + +class LoopExtractionOutput(BaseModel): + loop_values: list[str] + is_loop_value_link: bool + + +async def initialize_observer_cruise( + organization: Organization, user_prompt: str, user_url: str | None = None +) -> ObserverCruise: + observer_cruise = await app.DATABASE.create_observer_cruise( + prompt=user_prompt, + organization_id=organization.organization_id, + ) + + metadata_prompt = prompt_engine.load_prompt("observer_generate_metadata", user_goal=user_prompt, user_url=user_url) + metadata_response = await app.SECONDARY_LLM_API_HANDLER(prompt=metadata_prompt, observer_cruise=observer_cruise) + # validate + LOG.info(f"Initialized observer initial response: {metadata_response}") + url: str = metadata_response.get("url", "") + if not url: + raise UrlGenerationFailure() + title: str = metadata_response.get("title", DEFAULT_WORKFLOW_TITLE) + metadata = ObserverMetadata( + url=url, + workflow_title=title, + ) + url = metadata.url + if not url: + raise UrlGenerationFailure() + + # create workflow and workflow run + max_steps_override = 10 + new_workflow = await app.WORKFLOW_SERVICE.create_empty_workflow(organization, metadata.workflow_title) + workflow_run = await app.WORKFLOW_SERVICE.setup_workflow_run( + request_id=None, + workflow_request=WorkflowRequestBody(), + workflow_permanent_id=new_workflow.workflow_permanent_id, + organization_id=organization.organization_id, + version=None, + max_steps_override=max_steps_override, + ) + # update oserver cruise + observer_cruise = await app.DATABASE.update_observer_cruise( + observer_cruise_id=observer_cruise.observer_cruise_id, + workflow_run_id=workflow_run.workflow_run_id, + workflow_id=new_workflow.workflow_id, + workflow_permanent_id=new_workflow.workflow_permanent_id, + url=url, + organization_id=organization.organization_id, + ) + return observer_cruise + + +async def run_observer_cruise( + organization: Organization, + observer_cruise_id: str, + request_id: str | None = None, + max_iterations_override: str | int | None = None, +) -> None: + organization_id = organization.organization_id + observer_cruise = await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id) + if not observer_cruise: + LOG.error("Observer cruise not found", observer_cruise_id=observer_cruise_id, organization_id=organization_id) + return None + if observer_cruise.status != ObserverCruiseStatus.queued: + LOG.error( + "Observer cruise is not queued. Duplicate observer cruise", + observer_cruise_id=observer_cruise_id, + status=observer_cruise.status, + organization_id=organization_id, + ) + return None + if not observer_cruise.url or not observer_cruise.prompt: + LOG.error( + "Observer cruise url or prompt not found", + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + ) + return None + if not observer_cruise.workflow_run_id: + LOG.error( + "Workflow run id not found in observer cruise", + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + ) + return None + + int_max_iterations_override = None + if max_iterations_override: + try: + int_max_iterations_override = int(max_iterations_override) + LOG.info("max_iterationss_override is set", max_iterations_override=int_max_iterations_override) + except ValueError: + LOG.info( + "max_iterations_override isn't an integer, won't override", + max_iterations_override=max_iterations_override, + ) + + workflow_run_id = observer_cruise.workflow_run_id + + workflow_run = await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id) + if not workflow_run: + LOG.error("Workflow run not found", workflow_run_id=workflow_run_id) + return None + else: + LOG.info("Workflow run found", workflow_run_id=workflow_run_id) + + if workflow_run.status != WorkflowRunStatus.queued: + LOG.warning("Duplicate workflow run execution", workflow_run_id=workflow_run_id, status=workflow_run.status) + return None + + workflow_id = workflow_run.workflow_id + workflow = await app.WORKFLOW_SERVICE.get_workflow(workflow_id, organization_id=organization_id) + if not workflow: + LOG.error("Workflow not found", workflow_id=workflow_id) + return None + else: + LOG.info("Workflow found", workflow_id=workflow_id) + + ###################### run observer ###################### + + skyvern_context.set( + SkyvernContext( + organization_id=organization_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + request_id=request_id, + ) + ) + + await app.DATABASE.update_observer_cruise( + observer_cruise_id=observer_cruise_id, organization_id=organization_id, status=ObserverCruiseStatus.running + ) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_running(workflow_run_id=workflow_run.workflow_run_id) + await _set_up_workflow_context(workflow_id, workflow_run_id) + + url = str(observer_cruise.url) + user_prompt = observer_cruise.prompt + task_history: list[dict] = [] + yaml_blocks: list[BLOCK_YAML_TYPES] = [] + yaml_parameters: list[PARAMETER_YAML_TYPES] = [] + + for i in range(int_max_iterations_override or DEFAULT_MAX_ITERATIONS): + LOG.info(f"Observer iteration i={i}", workflow_run_id=workflow_run_id, url=url) + browser_state = await app.BROWSER_MANAGER.get_or_create_for_workflow_run( + workflow_run=workflow_run, + url=url, + ) + scraped_page = await scrape_website( + browser_state, + url, + app.AGENT_FUNCTION.cleanup_element_tree_factory(), + scrape_exclude=app.scrape_exclude, + ) + element_tree_in_prompt: str = scraped_page.build_element_tree(ElementTreeFormat.HTML) + page = await browser_state.get_working_page() + current_url = str( + await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else url + ) + + context = skyvern_context.ensure_context() + observer_prompt = prompt_engine.load_prompt( + "observer", + current_url=current_url, + elements=element_tree_in_prompt, + user_goal=user_prompt, + task_history=task_history, + local_datetime=datetime.now(context.tz_info).isoformat(), + ) + observer_response = await app.LLM_API_HANDLER( + prompt=observer_prompt, screenshots=scraped_page.screenshots, observer_cruise=observer_cruise + ) + LOG.info( + "Observer response", + observer_response=observer_response, + iteration=i, + current_url=current_url, + workflow_run_id=workflow_run_id, + ) + # see if the user goal has achieved or not + user_goal_achieved = observer_response.get("user_goal_achieved", False) + observation = observer_response.get("page_info", "") + thoughts: str = observer_response.get("thoughts", "") + plan: str = observer_response.get("plan", "") + # Create and save observer thought + await app.DATABASE.create_observer_thought( + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + workflow_run_id=workflow_run.workflow_run_id, + workflow_id=workflow.workflow_id, + workflow_permanent_id=workflow.workflow_permanent_id, + thought=thoughts, + observation=observation, + answer=plan, + ) + + if user_goal_achieved is True: + LOG.info( + "User goal achieved. Workflow run will complete. Observer is stopping", + iteration=i, + workflow_run_id=workflow_run_id, + ) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id) + break + + # parse observer repsonse and run the next task + task_type = observer_response.get("task_type") + if not task_type: + LOG.error("No task type found in observer response", observer_response=observer_response) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Skyvern failed to generate a task. Please try again later.", + ) + break + + block: BlockTypeVar | None = None + if task_type == "extract": + block, block_yaml_list, parameter_yaml_list = await _generate_extraction_task( + observer_cruise=observer_cruise, + workflow_id=workflow_id, + current_url=current_url, + element_tree_in_prompt=element_tree_in_prompt, + data_extraction_goal=plan, + task_history=task_history, + ) + task_history.append({"type": task_type, "task": plan}) + elif task_type == "navigate": + original_url = url if i == 0 else None + block, block_yaml_list, parameter_yaml_list = await _generate_navigation_task( + workflow_id=workflow_id, + original_url=original_url, + navigation_goal=plan, + ) + task_history.append({"type": task_type, "task": plan}) + elif task_type == "loop": + try: + block, block_yaml_list, parameter_yaml_list, extraction_obj, inner_task = await _generate_loop_task( + observer_cruise=observer_cruise, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + plan=plan, + browser_state=browser_state, + original_url=url, + scraped_page=scraped_page, + ) + task_history.append( + { + "type": task_type, + "task": plan, + "loop_over_values": extraction_obj.loop_values, + "task_inside_the_loop": inner_task, + } + ) + except Exception: + LOG.exception("Failed to generate loop task") + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Failed to generate loop task.", + ) + break + else: + LOG.info("Unsupported task type", task_type=task_type) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, failure_reason=f"Unsupported task type gets generated: {task_type}" + ) + break + + # generate the extraction task + block_result = await block.execute_safe(workflow_run_id=workflow_run_id) + + # refresh workflow + yaml_blocks.extend(block_yaml_list) + yaml_parameters.extend(parameter_yaml_list) + + # Update workflow definition + workflow_definition_yaml = WorkflowDefinitionYAML( + parameters=yaml_parameters, + blocks=yaml_blocks, + ) + workflow_create_request = WorkflowCreateYAMLRequest( + title=workflow.title, + description=workflow.description, + proxy_location=ProxyLocation.RESIDENTIAL, + workflow_definition=workflow_definition_yaml, + ) + LOG.info("Creating workflow from request", workflow_create_request=workflow_create_request) + workflow = await app.WORKFLOW_SERVICE.create_workflow_from_request( + organization=organization, + request=workflow_create_request, + workflow_permanent_id=workflow.workflow_permanent_id, + ) + LOG.info("Workflow created", workflow_id=workflow.workflow_id) + + # execute the extraction task + workflow_run = await handle_block_result(block, block_result, workflow, workflow_run) + if workflow_run.status != WorkflowRunStatus.running: + LOG.info( + "Workflow run is not running anymore, stopping the observer", + workflow_run_id=workflow_run_id, + status=workflow_run.status, + ) + break + if block_result.success is True: + # validate completion + observer_completion_prompt = prompt_engine.load_prompt( + "observer_check_completion", + user_goal=user_prompt, + task_history=task_history, + local_datetime=datetime.now(context.tz_info).isoformat(), + ) + completion_resp = await app.LLM_API_HANDLER( + prompt=observer_completion_prompt, observer_cruise=observer_cruise + ) + LOG.info( + "Observer completion check response", + completion_resp=completion_resp, + iteration=i, + workflow_run_id=workflow_run_id, + task_history=task_history, + ) + if completion_resp.get("user_goal_achieved", False): + LOG.info( + "User goal achieved according to the observer completion check", + iteration=i, + workflow_run_id=workflow_run_id, + completion_resp=completion_resp, + ) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id) + break + + await app.DATABASE.update_observer_cruise( + observer_cruise_id=observer_cruise_id, + organization_id=organization_id, + status=ObserverCruiseStatus.completed, + ) + await app.WORKFLOW_SERVICE.clean_up_workflow(workflow=workflow, workflow_run=workflow_run) + + +async def handle_block_result( + block: BlockTypeVar, + block_result: BlockResult, + workflow: Workflow, + workflow_run: WorkflowRun, + is_last_block: bool = True, +) -> WorkflowRun: + workflow_run_id = workflow_run.workflow_run_id + if block_result.status == BlockStatus.canceled: + LOG.info( + "Block with type {block.block_type} was canceled for workflow run {workflow_run_id}, cancelling workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_canceled(workflow_run_id=workflow_run.workflow_run_id) + + # TODO: we can also support webhook by adding api_key to the function signature + await app.WORKFLOW_SERVICE.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + need_call_webhook=False, + ) + elif block_result.status == BlockStatus.failed: + LOG.error( + f"Block with type {block.block_type} failed for workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + if block.continue_on_failure and not is_last_block: + LOG.warning( + f"Block with type {block.block_type} failed but will continue executing the workflow run {workflow_run_id}", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + else: + failure_reason = f"Block with type {block.block_type} failed. failure reason: {block_result.failure_reason}" + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) + + # TODO: add api_key + await app.WORKFLOW_SERVICE.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + ) + elif block_result.status == BlockStatus.terminated: + LOG.info( + f"Block with type {block.block_type} was terminated for workflow run {workflow_run_id}, marking workflow run as terminated", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_result=block_result, + block_type_var=block.block_type, + block_label=block.label, + ) + if block.continue_on_failure and not is_last_block: + LOG.warning( + f"Block with type {block.block_type} was terminated for workflow run {workflow_run_id}, but will continue executing the workflow run", + block_type=block.block_type, + workflow_run_id=workflow_run.workflow_run_id, + block_result=block_result, + continue_on_failure=block.continue_on_failure, + block_type_var=block.block_type, + block_label=block.label, + ) + else: + failure_reason = f"Block with type {block.block_type} terminated. Reason: {block_result.failure_reason}" + await app.WORKFLOW_SERVICE.mark_workflow_run_as_terminated( + workflow_run_id=workflow_run.workflow_run_id, failure_reason=failure_reason + ) + await app.WORKFLOW_SERVICE.clean_up_workflow( + workflow=workflow, + workflow_run=workflow_run, + ) + # refresh workflow run model + return await app.WORKFLOW_SERVICE.get_workflow_run(workflow_run_id=workflow_run_id) + + +async def _set_up_workflow_context(workflow_id: str, workflow_run_id: str) -> None: + """ + TODO: see if we could remove this function as we can just set an empty workflow context + """ + # Get all tuples + wp_wps_tuples = await app.WORKFLOW_SERVICE.get_workflow_run_parameter_tuples(workflow_run_id=workflow_run_id) + workflow_output_parameters = await app.WORKFLOW_SERVICE.get_workflow_output_parameters(workflow_id=workflow_id) + app.WORKFLOW_CONTEXT_MANAGER.initialize_workflow_run_context( + workflow_run_id, + wp_wps_tuples, + workflow_output_parameters, + [], + ) + + +async def _generate_loop_task( + observer_cruise: ObserverCruise, + workflow_id: str, + workflow_run_id: str, + plan: str, + browser_state: BrowserState, + original_url: str, + scraped_page: ScrapedPage, +) -> tuple[ForLoopBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES], LoopExtractionOutput, dict[str, Any]]: + for_loop_parameter_yaml_list: list[PARAMETER_YAML_TYPES] = [] + loop_value_extraction_goal = prompt_engine.load_prompt( + "observer_loop_task_extraction_goal", + plan=plan, + ) + + label = f"extraction_task_for_loop_{_generate_random_string()}" + extraction_block_yaml = ExtractionBlockYAML( + label=label, + data_extraction_goal=loop_value_extraction_goal, + data_schema=DATA_EXTRACTION_SCHEMA_FOR_LOOP, + ) + loop_value_extraction_output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( + workflow_id=workflow_id, + block_yaml=extraction_block_yaml, + ) + extraction_block_for_loop = ExtractionBlock( + label=label, + data_extraction_goal=loop_value_extraction_goal, + data_schema=DATA_EXTRACTION_SCHEMA_FOR_LOOP, + output_parameter=loop_value_extraction_output_parameter, + ) + + # execute the extraction block + extraction_block_result = await extraction_block_for_loop.execute_safe(workflow_run_id=workflow_run_id) + LOG.info("Extraction block result", extraction_block_result=extraction_block_result) + if extraction_block_result.success is False: + LOG.error( + "Failed to execute the extraction block for the loop task", + extraction_block_result=extraction_block_result, + ) + # TODO: fail the workflow run + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Failed to extract loop values for the loop task. Please try again later.", + ) + raise Exception("extraction_block failed") + # validate output parameter + try: + output_value_obj = LoopExtractionOutput.model_validate( + extraction_block_result.output_parameter_value.get("extracted_information") # type: ignore + ) + except Exception: + LOG.error( + "Failed to validate the output parameter of the extraction block for the loop task", + extraction_block_result=extraction_block_result, + ) + await app.WORKFLOW_SERVICE.mark_workflow_run_as_failed( + workflow_run_id=workflow_run_id, + failure_reason="Invalid output parameter of the extraction block for the loop task. Please try again later.", + ) + raise + + # create ContextParameter for the loop over pointer that ForLoopBlock needs. + loop_for_context_parameter = ContextParameter( + key="loop_values", + source=loop_value_extraction_output_parameter, + ) + for_loop_parameter_yaml_list.append( + ContextParameterYAML( + key=loop_for_context_parameter.key, + description=loop_for_context_parameter.description, + source_parameter_key=loop_value_extraction_output_parameter.key, + ) + ) + app.WORKFLOW_CONTEXT_MANAGER.add_context_parameter(workflow_run_id, loop_for_context_parameter) + await app.WORKFLOW_CONTEXT_MANAGER.set_parameter_values_for_output_parameter_dependent_blocks( + workflow_run_id=workflow_run_id, + output_parameter=loop_value_extraction_output_parameter, + value=extraction_block_result.output_parameter_value, + ) + task_parameters: list[PARAMETER_TYPE] = [] + if output_value_obj.is_loop_value_link: + LOG.info("Loop values are links", loop_values=output_value_obj.loop_values) + # create ContextParameter for the value + url_value_context_parameter = ContextParameter( + key="task_in_loop_url", + source=loop_for_context_parameter, + ) + task_parameters.append(url_value_context_parameter) + for_loop_parameter_yaml_list.append( + ContextParameterYAML( + key=url_value_context_parameter.key, + description=url_value_context_parameter.description, + source_parameter_key=loop_for_context_parameter.key, + ) + ) + app.WORKFLOW_CONTEXT_MANAGER.add_context_parameter(workflow_run_id, url_value_context_parameter) + url = "task_in_loop_url" + else: + LOG.info("Loop values are not links", loop_values=output_value_obj.loop_values) + page = await browser_state.get_working_page() + url = str( + await SkyvernFrame.evaluate(frame=page, expression="() => document.location.href") if page else original_url + ) + task_in_loop_label = f"task_in_loop_{_generate_random_string()}" + context = skyvern_context.ensure_context() + task_in_loop_metadata_prompt = prompt_engine.load_prompt( + "observer_generate_task_block", + plan=plan, + local_datetime=datetime.now(context.tz_info).isoformat(), + is_link=output_value_obj.is_loop_value_link, + loop_values=output_value_obj.loop_values, + ) + task_in_loop_metadata_response = await app.LLM_API_HANDLER( + task_in_loop_metadata_prompt, + screenshots=scraped_page.screenshots, + observer_cruise=observer_cruise, + ) + LOG.info("Task in loop metadata response", task_in_loop_metadata_response=task_in_loop_metadata_response) + navigation_goal = task_in_loop_metadata_response.get("navigation_goal") + data_extraction_goal = task_in_loop_metadata_response.get("data_extraction_goal") + data_extraction_schema = task_in_loop_metadata_response.get("data_schema") + if data_extraction_goal and navigation_goal: + navigation_goal = ( + navigation_goal + + " Optimize for extracting as much data as possible. Complete when most data is seen even if some data is partially missing." + ) + block_yaml = TaskBlockYAML( + label=task_in_loop_label, + url=url, + title=task_in_loop_label, + navigation_goal=navigation_goal, + data_extraction_goal=data_extraction_goal, + data_schema=data_extraction_schema, + parameter_keys=[param.key for param in task_parameters], + continue_on_failure=True, + ) + block_yaml_output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( + workflow_id=workflow_id, + block_yaml=block_yaml, + ) + task_in_loop_block = TaskBlock( + label=task_in_loop_label, + url=url, + title=task_in_loop_label, + navigation_goal=navigation_goal, + data_extraction_goal=data_extraction_goal, + data_schema=data_extraction_schema, + output_parameter=block_yaml_output_parameter, + parameters=task_parameters, + continue_on_failure=True, + ) + + # use the output parameter of the extraction block to create the for loop block + for_loop_yaml = ForLoopBlockYAML( + label=f"loop_{_generate_random_string()}", + loop_over_parameter_key=loop_for_context_parameter.key, + loop_blocks=[block_yaml], + ) + output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( + workflow_id=workflow_id, + block_yaml=for_loop_yaml, + ) + return ( + ForLoopBlock( + label=for_loop_yaml.label, + # TODO: this loop over parameter needs to be a context parameter + loop_over=loop_for_context_parameter, + loop_blocks=[task_in_loop_block], + output_parameter=output_parameter, + ), + [extraction_block_yaml, for_loop_yaml], + for_loop_parameter_yaml_list, + output_value_obj, + { + "inner_task_label": task_in_loop_block.label, + "inner_task_navigation_goal": navigation_goal, + "inner_task_data_extraction_goal": data_extraction_goal, + }, + ) + + +async def _generate_extraction_task( + observer_cruise: ObserverCruise, + workflow_id: str, + current_url: str, + element_tree_in_prompt: str, + data_extraction_goal: str, + task_history: list[dict] | None = None, +) -> tuple[ExtractionBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]: + LOG.info("Generating extraction task", data_extraction_goal=data_extraction_goal, current_url=current_url) + # extract the data + context = skyvern_context.ensure_context() + generate_extraction_task_prompt = prompt_engine.load_prompt( + "observer_generate_extraction_task", + current_url=current_url, + elements=element_tree_in_prompt, + data_extraction_goal=data_extraction_goal, + local_datetime=datetime.now(context.tz_info).isoformat(), + ) + generate_extraction_task_response = await app.LLM_API_HANDLER( + generate_extraction_task_prompt, + observer_cruise=observer_cruise, + ) + LOG.info("Data extraction response", data_extraction_response=generate_extraction_task_response) + + # create OutputParameter for the data_extraction block + data_schema: dict[str, Any] | list | None = generate_extraction_task_response.get("schema") + label = f"data_extraction_{_generate_random_string()}" + url: str | None = None + if not task_history: + # data extraction is the very first block + url = current_url + extraction_block_yaml = ExtractionBlockYAML( + label=label, + data_extraction_goal=data_extraction_goal, + data_schema=data_schema, + url=url, + ) + output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( + workflow_id=workflow_id, + block_yaml=extraction_block_yaml, + ) + # create ExtractionBlock + return ( + ExtractionBlock( + label=label, + url=url, + data_extraction_goal=data_extraction_goal, + data_schema=data_schema, + output_parameter=output_parameter, + ), + [extraction_block_yaml], + [], + ) + + +async def _generate_navigation_task( + workflow_id: str, + navigation_goal: str, + original_url: str | None = None, +) -> tuple[NavigationBlock, list[BLOCK_YAML_TYPES], list[PARAMETER_YAML_TYPES]]: + LOG.info("Generating navigation task", navigation_goal=navigation_goal, original_url=original_url) + label = f"navigation_{_generate_random_string()}" + navigation_block_yaml = NavigationBlockYAML( + label=label, + url=original_url, + navigation_goal=navigation_goal, + ) + output_parameter = await app.WORKFLOW_SERVICE.create_output_parameter_for_block( + workflow_id=workflow_id, + block_yaml=navigation_block_yaml, + ) + return ( + NavigationBlock( + label=label, + url=original_url, + navigation_goal=navigation_goal, + output_parameter=output_parameter, + ), + [navigation_block_yaml], + [], + ) + + +def _generate_random_string(length: int = 5) -> str: + # Use the current timestamp as the seed + random.seed(os.urandom(16)) + return "".join(random.choices(RANDOM_STRING_POOL, k=length))