Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

observer summary #1532

Merged
merged 4 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""observer summary and output

Revision ID: 6a947c379c02
Revises: d5640aa644b9
Create Date: 2025-01-10 22:46:41.757862+00:00

"""

from typing import Sequence, Union

import sqlalchemy as sa

from alembic import op

# revision identifiers, used by Alembic.
revision: str = "6a947c379c02"
down_revision: Union[str, None] = "d5640aa644b9"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column("observer_cruises", sa.Column("summary", sa.String(), nullable=True))
op.add_column("observer_cruises", sa.Column("output", sa.JSON(), nullable=True))
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("observer_cruises", "output")
op.drop_column("observer_cruises", "summary")
# ### end Alembic commands ###
24 changes: 24 additions & 0 deletions skyvern/forge/prompts/skyvern/observer_summary.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
The AI assistant has helped the user achieve the user goal in the web.
Given the user goal, the latest screenshot of the page and the mini tasks that have been completed by the user along the way, summarize what has been achieved and output structured data related to the user goal in json format.
You want to present the response in a clear way so that user can clearly understand what has been achieved.

Reply in JSON format with the following keys:
{
"description": str, // Summarize what has been achieved and describe the information extracted related to the user goal if any. Be precise and concise.
"output": json, // Structured data related to the user goal if any.
}

User goal:
```
{{ user_goal }}
```

Task history (the earliest task is the first in the list and the latest is the last in the list):
```
{{ task_history }}
```

Current datetime, ISO format:
```
{{ local_datetime }}
```
6 changes: 6 additions & 0 deletions skyvern/forge/sdk/db/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2093,6 +2093,8 @@ async def update_observer_cruise(
workflow_permanent_id: str | None = None,
url: str | None = None,
prompt: str | None = None,
summary: str | None = None,
output: dict[str, Any] | None = None,
organization_id: str | None = None,
) -> ObserverCruise:
async with self.Session() as session:
Expand All @@ -2116,6 +2118,10 @@ async def update_observer_cruise(
observer_cruise.url = url
if prompt:
observer_cruise.prompt = prompt
if summary:
observer_cruise.summary = summary
if output:
observer_cruise.output = output
await session.commit()
await session.refresh(observer_cruise)
return ObserverCruise.model_validate(observer_cruise)
Expand Down
2 changes: 2 additions & 0 deletions skyvern/forge/sdk/db/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,8 @@ class ObserverCruiseModel(Base):
workflow_permanent_id = Column(String, nullable=True)
prompt = Column(UnicodeText, nullable=True)
url = Column(String, nullable=True)
summary = Column(String, nullable=True)
output = Column(JSON, nullable=True)

created_at = Column(DateTime, default=datetime.datetime.utcnow, nullable=False)
modified_at = Column(DateTime, default=datetime.datetime.utcnow, onupdate=datetime.datetime.utcnow, nullable=False)
Expand Down
3 changes: 3 additions & 0 deletions skyvern/forge/sdk/schemas/observers.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ class ObserverCruise(BaseModel):
workflow_permanent_id: str | None = None
prompt: str | None = None
url: HttpUrl | None = None
summary: str | None = None
output: dict[str, Any] | list | str | None = None

created_at: datetime
modified_at: datetime
Expand All @@ -46,6 +48,7 @@ class ObserverThoughtType(StrEnum):
class ObserverThoughtScenario(StrEnum):
generate_plan = "generate_plan"
user_goal_check = "user_goal_check"
summarization = "summarization"
generate_metadata = "generate_metadata"
extract_loop_values = "extract_loop_values"
generate_task_in_loop = "generate_task_in_loop"
Expand Down
86 changes: 62 additions & 24 deletions skyvern/forge/sdk/services/observer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
ObserverCruise,
ObserverCruiseStatus,
ObserverMetadata,
ObserverThought,
ObserverThoughtScenario,
ObserverThoughtType,
)
Expand Down Expand Up @@ -395,7 +394,12 @@ async def run_observer_cruise_helper(
iteration=i,
workflow_run_id=workflow_run_id,
)
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id=workflow_run_id)
await _summarize_observer_cruise(
observer_cruise=observer_cruise,
task_history=task_history,
context=context,
screenshots=scraped_page.screenshots,
)
break

# parse observer repsonse and run the next task
Expand Down Expand Up @@ -573,10 +577,11 @@ async def run_observer_cruise_helper(
workflow_run_id=workflow_run_id,
completion_resp=completion_resp,
)
await mark_observer_cruise_as_completed(
observer_cruise_id=observer_cruise_id,
workflow_run_id=workflow_run_id,
organization_id=organization_id,
await _summarize_observer_cruise(
observer_cruise=observer_cruise,
task_history=task_history,
context=context,
screenshots=completion_screenshots,
)
break
else:
Expand Down Expand Up @@ -1039,24 +1044,6 @@ async def get_observer_thought_timelines(
]


async def _record_thought_screenshot(observer_thought: ObserverThought, workflow_run_id: str) -> None:
# get the browser state for the workflow run
browser_state = app.BROWSER_MANAGER.get_for_workflow_run(workflow_run_id=workflow_run_id)
if not browser_state:
LOG.warning("No browser state found for the workflow run", workflow_run_id=workflow_run_id)
return
# get the screenshot for the workflow run
try:
screenshot = await browser_state.take_screenshot(full_page=True)
await app.ARTIFACT_MANAGER.create_observer_thought_artifact(
observer_thought=observer_thought,
artifact_type=ArtifactType.SCREENSHOT_LLM,
data=screenshot,
)
except Exception:
LOG.warning("Failed to take screenshot for the observer thought", observer_thought=observer_thought)


async def get_observer_cruise(observer_cruise_id: str, organization_id: str | None = None) -> ObserverCruise | None:
return await app.DATABASE.get_observer_cruise(observer_cruise_id, organization_id=organization_id)

Expand All @@ -1080,11 +1067,15 @@ async def mark_observer_cruise_as_completed(
observer_cruise_id: str,
workflow_run_id: str | None = None,
organization_id: str | None = None,
summary: str | None = None,
output: dict[str, Any] | None = None,
) -> None:
await app.DATABASE.update_observer_cruise(
observer_cruise_id,
organization_id=organization_id,
status=ObserverCruiseStatus.completed,
summary=summary,
output=output,
)
if workflow_run_id:
await app.WORKFLOW_SERVICE.mark_workflow_run_as_completed(workflow_run_id)
Expand Down Expand Up @@ -1157,3 +1148,50 @@ def _get_extracted_data_from_block_result(
loop_output_overall.append(inner_loop_output_overall)
return loop_output_overall if loop_output_overall else None
return None


async def _summarize_observer_cruise(
observer_cruise: ObserverCruise,
task_history: list[dict],
context: SkyvernContext,
screenshots: list[bytes] | None = None,
) -> None:
observer_thought = await app.DATABASE.create_observer_thought(
observer_cruise_id=observer_cruise.observer_cruise_id,
organization_id=observer_cruise.organization_id,
workflow_run_id=observer_cruise.workflow_run_id,
workflow_id=observer_cruise.workflow_id,
workflow_permanent_id=observer_cruise.workflow_permanent_id,
observer_thought_type=ObserverThoughtType.user_goal_check,
observer_thought_scenario=ObserverThoughtScenario.summarization,
)
# summarize the observer cruise and format the output
observer_summary_prompt = prompt_engine.load_prompt(
"observer_summary",
user_goal=observer_cruise.user_prompt,
task_history=task_history,
local_datetime=datetime.now(context.tz_info).isoformat(),
)
observer_summary_resp = await app.LLM_API_HANDLER(
prompt=observer_summary_prompt,
screenshots=screenshots,
observer_thought=observer_thought,
)
LOG.info("Observer summary response", observer_summary_resp=observer_summary_resp)

thought = observer_summary_resp.get("description")
summarized_output = observer_summary_resp.get("output")
await app.DATABASE.update_observer_thought(
observer_thought_id=observer_thought.observer_thought_id,
organization_id=observer_cruise.organization_id,
thought=thought,
output=observer_summary_resp,
)

await mark_observer_cruise_as_completed(
observer_cruise_id=observer_cruise.observer_cruise_id,
workflow_run_id=observer_cruise.workflow_run_id,
organization_id=observer_cruise.organization_id,
summary=thought,
output=summarized_output,
)
Loading