Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/actions/spelling/allow.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ JPY
JSONRPCt
jwk
jwks
JWS
jws
JWS
kid
kwarg
langgraph
Expand Down Expand Up @@ -83,6 +83,8 @@ RUF
SLF
socio
sse
sut
SUT
tagwords
taskupdate
testuuid
Expand Down
106 changes: 106 additions & 0 deletions .github/workflows/run-tck.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
name: Run TCK

on:
push:
branches: [ "main" ]
pull_request:
branches: [ "main" ]
paths-ignore:
- '**.md'
- 'LICENSE'
- '.github/CODEOWNERS'

permissions:
contents: read

env:
TCK_VERSION: 0.3.0.beta3
SUT_BASE_URL: http://localhost:41241
SUT_JSONRPC_URL: http://localhost:41241/a2a/jsonrpc
UV_SYSTEM_PYTHON: 1
TCK_STREAMING_TIMEOUT: 5.0

concurrency:
group: '${{ github.workflow }} @ ${{ github.head_ref || github.ref }}'
cancel-in-progress: true

jobs:
tck-test:
name: Run TCK with Python ${{ matrix.python-version }}
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ['3.10', '3.13']
steps:
- name: Checkout a2a-python
uses: actions/checkout@v6

- name: Install uv
uses: astral-sh/setup-uv@v7
with:
enable-cache: true
cache-dependency-glob: "uv.lock"

- name: Set up Python ${{ matrix.python-version }}
run: uv python install ${{ matrix.python-version }}

- name: Install Dependencies
run: uv sync --locked --all-extras

- name: Checkout a2a-tck
uses: actions/checkout@v6
with:
repository: a2aproject/a2a-tck
path: tck/a2a-tck
ref: ${{ env.TCK_VERSION }}

- name: Start SUT
run: |
uv run tck/sut_agent.py &

- name: Wait for SUT to start
run: |
URL="${{ env.SUT_BASE_URL }}/.well-known/agent-card.json"
EXPECTED_STATUS=200
TIMEOUT=120
RETRY_INTERVAL=2
START_TIME=$(date +%s)

while true; do
CURRENT_TIME=$(date +%s)
ELAPSED_TIME=$((CURRENT_TIME - START_TIME))

if [ "$ELAPSED_TIME" -ge "$TIMEOUT" ]; then
echo "❌ Timeout: Server did not respond with status $EXPECTED_STATUS within $TIMEOUT seconds."
exit 1
fi

HTTP_STATUS=$(curl --output /dev/null --silent --write-out "%{http_code}" "$URL") || true
echo "STATUS: ${HTTP_STATUS}"

if [ "$HTTP_STATUS" -eq "$EXPECTED_STATUS" ]; then
echo "✅ Server is up! Received status $HTTP_STATUS after $ELAPSED_TIME seconds."
break;
fi

echo "⏳ Server not ready (status: $HTTP_STATUS). Retrying in $RETRY_INTERVAL seconds..."
sleep "$RETRY_INTERVAL"
done

- name: Run TCK (mandatory)
id: run-tck-mandatory
run: |
uv run run_tck.py --sut-url ${{ env.SUT_JSONRPC_URL }} --category mandatory --transports jsonrpc
working-directory: tck/a2a-tck

- name: Run TCK (capabilities)
id: run-tck-capabilities
run: |
uv run run_tck.py --sut-url ${{ env.SUT_JSONRPC_URL }} --category capabilities --transports jsonrpc
working-directory: tck/a2a-tck

- name: Stop SUT
if: always()
run: |
pkill -f sut_agent.py || true
sleep 2
Empty file added tck/__init__.py
Empty file.
186 changes: 186 additions & 0 deletions tck/sut_agent.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
import asyncio
import logging
import os
import uuid

from datetime import datetime, timezone

import uvicorn

from a2a.server.agent_execution.agent_executor import AgentExecutor
from a2a.server.agent_execution.context import RequestContext
from a2a.server.apps import A2AStarletteApplication
from a2a.server.events.event_queue import EventQueue
from a2a.server.request_handlers.default_request_handler import (
DefaultRequestHandler,
)
from a2a.server.tasks.inmemory_task_store import InMemoryTaskStore
from a2a.types import (
AgentCapabilities,
AgentCard,
AgentProvider,
Message,
TaskState,
TaskStatus,
TaskStatusUpdateEvent,
TextPart,
)


JSONRPC_URL = '/a2a/jsonrpc'

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger('SUTAgent')


class SUTAgentExecutor(AgentExecutor):
"""Execution logic for the SUT agent."""

def __init__(self) -> None:
"""Initializes the SUT agent executor."""
self.running_tasks = set()

async def cancel(
self, context: RequestContext, event_queue: EventQueue
) -> None:
"""Cancels a task."""
api_task_id = context.task_id
if api_task_id in self.running_tasks:
self.running_tasks.remove(api_task_id)

status_update = TaskStatusUpdateEvent(
task_id=api_task_id,
context_id=context.context_id or str(uuid.uuid4()),
status=TaskStatus(
state=TaskState.canceled,
timestamp=datetime.now(timezone.utc).isoformat(),
),
final=True,
)
await event_queue.enqueue_event(status_update)

async def execute(
self, context: RequestContext, event_queue: EventQueue
) -> None:
"""Executes a task."""
user_message = context.message
task_id = context.task_id
context_id = context.context_id

self.running_tasks.add(task_id)

logger.info(
'[SUTAgentExecutor] Processing message %s for task %s (context: %s)',
user_message.message_id,
task_id,
context_id,
)

working_status = TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(
state=TaskState.working,
message=Message(
role='agent',
message_id=str(uuid.uuid4()),
parts=[TextPart(text='Processing your question')],
task_id=task_id,
context_id=context_id,
),
timestamp=datetime.now(timezone.utc).isoformat(),
),
final=False,
)
await event_queue.enqueue_event(working_status)

agent_reply_text = 'Hello world!'
await asyncio.sleep(3) # Simulate processing delay

if task_id not in self.running_tasks:
logger.info('Task %s was cancelled.', task_id)
return

logger.info('[SUTAgentExecutor] Response: %s', agent_reply_text)

agent_message = Message(
role='agent',
message_id=str(uuid.uuid4()),
parts=[TextPart(text=agent_reply_text)],
task_id=task_id,
context_id=context_id,
)

final_update = TaskStatusUpdateEvent(
task_id=task_id,
context_id=context_id,
status=TaskStatus(
state=TaskState.input_required,
message=agent_message,
timestamp=datetime.now(timezone.utc).isoformat(),
),
final=True,
)
await event_queue.enqueue_event(final_update)


def main() -> None:
"""Main entrypoint."""
http_port = int(os.environ.get('HTTP_PORT', '41241'))

agent_card = AgentCard(
name='SUT Agent',
description='An agent to be used as SUT against TCK tests.',
url=f'http://localhost:{http_port}{JSONRPC_URL}',
provider=AgentProvider(
organization='A2A Samples',
url='https://example.com/a2a-samples',
),
version='1.0.0',
protocol_version='0.3.0',
capabilities=AgentCapabilities(
streaming=True,
push_notifications=False,
state_transition_history=True,
),
default_input_modes=['text'],
default_output_modes=['text', 'task-status'],
skills=[
{
'id': 'sut_agent',
'name': 'SUT Agent',
'description': 'Simulate the general flow of a streaming agent.',
'tags': ['sut'],
'examples': ['hi', 'hello world', 'how are you', 'goodbye'],
'input_modes': ['text'],
'output_modes': ['text', 'task-status'],
}
],
supports_authenticated_extended_card=False,
preferred_transport='JSONRPC',
additional_interfaces=[
{
'url': f'http://localhost:{http_port}{JSONRPC_URL}',
'transport': 'JSONRPC',
},
],
)

request_handler = DefaultRequestHandler(
agent_executor=SUTAgentExecutor(),
task_store=InMemoryTaskStore(),
)

server = A2AStarletteApplication(
agent_card=agent_card,
http_handler=request_handler,
)

app = server.build(rpc_url=JSONRPC_URL)

logger.info('Starting HTTP server on port %s...', http_port)
uvicorn.run(app, host='127.0.0.1', port=http_port, log_level='info')


if __name__ == '__main__':
main()
Loading