Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 82 additions & 0 deletions python/openai/sample-agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@
from local_authentication_options import LocalAuthenticationOptions
from microsoft_agents.hosting.core import Authorization, TurnContext

# Notifications
from microsoft_agents_a365.notifications.agent_notification import (
AgentNotificationActivity,
NotificationTypes,
)

# Observability Components
from microsoft_agents_a365.observability.core.config import configure
from microsoft_agents_a365.observability.extensions.openai import OpenAIAgentsTraceInstrumentor
Expand Down Expand Up @@ -342,6 +348,82 @@ async def process_user_message(

# </MessageProcessing>

# =========================================================================
# NOTIFICATION HANDLING
# =========================================================================
# <NotificationHandling>

async def handle_agent_notification_activity(
self, notification_activity: "AgentNotificationActivity", auth: Authorization, auth_handler_name: str, context: TurnContext
) -> str:
"""Handle agent notification activities (email, Word mentions, etc.)"""
try:
notification_type = notification_activity.notification_type
logger.info(f"📬 Processing notification: {notification_type}")

# Setup MCP servers on first call
await self.setup_mcp_servers(auth, auth_handler_name, context)

# Handle Email Notifications
if notification_type == NotificationTypes.EMAIL_NOTIFICATION:
if not hasattr(notification_activity, "email") or not notification_activity.email:
return "I could not find the email notification details."

email = notification_activity.email
email_body = getattr(email, "html_body", "") or getattr(email, "body", "")
message = f"You have received the following email. Please follow any instructions in it. {email_body}"

result = await Runner.run(starting_agent=self.agent, input=message, context=context)
return self._extract_result(result) or "Email notification processed."

# Handle Word Comment Notifications
elif notification_type == NotificationTypes.WPX_COMMENT:
if not hasattr(notification_activity, "wpx_comment") or not notification_activity.wpx_comment:
return "I could not find the Word notification details."

wpx = notification_activity.wpx_comment
doc_id = getattr(wpx, "document_id", "")
comment_id = getattr(wpx, "initiating_comment_id", "")
drive_id = "default"

# Get Word document content
doc_message = f"You have a new comment on the Word document with id '{doc_id}', comment id '{comment_id}', drive id '{drive_id}'. Please retrieve the Word document as well as the comments and return it in text format."
doc_result = await Runner.run(starting_agent=self.agent, input=doc_message, context=context)
word_content = self._extract_result(doc_result)

# Process the comment with document context
comment_text = notification_activity.text or ""
response_message = f"You have received the following Word document content and comments. Please refer to these when responding to comment '{comment_text}'. {word_content}"
result = await Runner.run(starting_agent=self.agent, input=response_message, context=context)
return self._extract_result(result) or "Word notification processed."

# Generic notification handling
else:
notification_message = notification_activity.text or f"Notification received: {notification_type}"
result = await Runner.run(starting_agent=self.agent, input=notification_message, context=context)
return self._extract_result(result) or "Notification processed successfully."

except Exception as e:
logger.error(f"Error processing notification: {e}")
return f"Sorry, I encountered an error processing the notification: {str(e)}"

def _extract_result(self, result) -> str:
"""Extract text content from agent result"""
if not result:
return ""
if hasattr(result, "final_output") and result.final_output:
return str(result.final_output)
elif hasattr(result, "contents"):
return str(result.contents)
elif hasattr(result, "text"):
return str(result.text)
elif hasattr(result, "content"):
return str(result.content)
else:
return str(result)

# </NotificationHandling>

# =========================================================================
# CLEANUP
# =========================================================================
Expand Down
188 changes: 188 additions & 0 deletions python/openai/sample-agent/host_agent_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

# Import our agent base class
from agent_interface import AgentInterface, check_agent_inheritance
from aiohttp.client_exceptions import ClientConnectorError, ClientResponseError
from aiohttp.web import Application, Request, Response, json_response, run_app
from aiohttp.web_middlewares import middleware as web_middleware
from dotenv import load_dotenv
Expand All @@ -34,6 +35,14 @@
TurnContext,
TurnState,
)
from microsoft_agents.activity import InvokeResponse
from microsoft_agents_a365.notifications.agent_notification import (
AgentNotification,
NotificationTypes,
AgentNotificationActivity,
ChannelId,
)
from microsoft_agents_a365.notifications import EmailResponse
from microsoft_agents_a365.observability.core.middleware.baggage_builder import BaggageBuilder
from microsoft_agents_a365.runtime.environment_utils import (
get_observability_authentication_scope,
Expand All @@ -52,6 +61,81 @@
agents_sdk_config = load_configuration_from_env(environ)


class SafeAgentNotification(AgentNotification):
"""
Extended AgentNotification that filters out invalid invoke activities.
The SDK's AgentNotification will throw a ValueError if an invoke activity
is received without a valid 'name' field. This wrapper adds a pre-check
to prevent the error when Playground sends activities with name=None.
"""

def on_agent_notification(self, channel_id: ChannelId, **kwargs):
"""
Override to add name validation before creating AgentNotificationActivity.
We completely bypass the parent's decorator to avoid the SDK creating
AgentNotificationActivity with an invalid name, which throws ValueError.
"""
registered_channel = channel_id.channel.lower()
registered_subchannel = (channel_id.sub_channel or "*").lower()

def route_selector(context: TurnContext) -> bool:
"""Check if this activity should be handled by this notification handler"""
# First check: activity must have a valid name for notifications
activity_name = context.activity.name
if not activity_name:
logger.debug("⏭️ Skipping invoke activity with no name")
return False

# Only handle agent/notification invoke activities
if activity_name != "agent/notification":
logger.debug(f"⏭️ Skipping invoke with non-notification name: {activity_name}")
return False

# Check channel matching (from parent implementation)
ch = context.activity.channel_id
received_channel = (ch.channel if ch else "").lower()
received_subchannel = (ch.sub_channel if ch and ch.sub_channel else "").lower()

if received_channel != registered_channel:
return False
if registered_subchannel == "*":
return True
if registered_subchannel not in self._known_subchannels:
return False
return received_subchannel == registered_subchannel

def decorator(handler):
async def safe_route_handler(context: TurnContext, state: TurnState):
"""Safely create AgentNotificationActivity and call handler"""
try:
ana = AgentNotificationActivity(context.activity)
await handler(context, state, ana)
# Set invoke response to 200 to prevent 501 Not Implemented
context.turn_state[TurnContext._INVOKE_RESPONSE_KEY] = InvokeResponse(
status=200, body={"status": "ok"}
)
except ValueError as e:
# Log but don't crash on invalid notification types
logger.warning(f"⚠️ Invalid notification activity: {e}")
context.turn_state[TurnContext._INVOKE_RESPONSE_KEY] = InvokeResponse(
status=200, body={"status": "skipped", "reason": str(e)}
)
except Exception as e:
logger.error(f"❌ Error in notification handler: {e}")
context.turn_state[TurnContext._INVOKE_RESPONSE_KEY] = InvokeResponse(
status=500, body={"status": "error", "message": str(e)}
)
raise

# Register this route with the app
self._app.add_route(route_selector, safe_route_handler, **kwargs)
return safe_route_handler

return decorator


class GenericAgentHost:
"""Generic host that can host any agent implementing the AgentInterface"""

Expand Down Expand Up @@ -94,6 +178,9 @@ def __init__(self, agent_class: type[AgentInterface], *agent_args, **agent_kwarg
authorization=self.authorization,
**agents_sdk_config,
)
# Use SafeAgentNotification to filter out invalid invoke activities
# that would cause SDK ValueError when name is None
self.agent_notification = SafeAgentNotification(self.agent_app)

# Setup message handlers
self._setup_handlers()
Expand Down Expand Up @@ -177,6 +264,107 @@ async def on_message(context: TurnContext, _: TurnState):
logger.error(f"❌ Error processing message: {e}")
await context.send_activity(error_msg)

# Handle invoke activities (notifications) with proper InvokeResponse
@self.agent_app.activity("invoke", **handler_config)
async def on_invoke(context: TurnContext, state: TurnState):
"""Handle invoke activities including agent/notification"""
activity_name = context.activity.name

# Skip invoke activities without a name
if not activity_name:
logger.debug("⏭️ Skipping invoke activity with no name")
return InvokeResponse(status=200, body={"status": "skipped", "reason": "no name"})

# Handle agent/notification invoke activities
if activity_name == "agent/notification":
try:
ana = AgentNotificationActivity(context.activity)
await handle_notification_internal(context, state, ana)
return InvokeResponse(status=200, body={"status": "ok"})
except ValueError as e:
logger.warning(f"⚠️ Invalid notification: {e}")
return InvokeResponse(status=200, body={"status": "skipped", "reason": str(e)})
except Exception as e:
logger.error(f"❌ Notification error: {e}")
return InvokeResponse(status=500, body={"status": "error", "message": str(e)})

# Unknown invoke type - return 501 Not Implemented
logger.debug(f"⏭️ Unknown invoke name: {activity_name}")
return InvokeResponse(status=501, body={"status": "not implemented", "name": activity_name})

# Shared notification handler logic
async def handle_notification_internal(
context: TurnContext,
state: TurnState,
notification_activity: AgentNotificationActivity,
):
try:
tenant_id = context.activity.recipient.tenant_id if context.activity.recipient else None
agent_id = context.activity.recipient.agentic_app_id if context.activity.recipient else None

with BaggageBuilder().tenant_id(tenant_id).agent_id(agent_id).build():
# Ensure the agent is available
if not self.agent_instance:
logger.error("Agent not available")
await context.send_activity("❌ Sorry, the agent is not available.")
return

# Exchange token for observability if auth handler is configured
if self.auth_handler_name and tenant_id and agent_id:
exaau_token = await self.agent_app.auth.exchange_token(
context,
scopes=get_observability_authentication_scope(),
auth_handler_id=self.auth_handler_name,
)
cache_agentic_token(tenant_id, agent_id, exaau_token.token)

logger.info(f"📬 Processing notification: {notification_activity.notification_type}")

if not hasattr(self.agent_instance, "handle_agent_notification_activity"):
logger.warning("⚠️ Agent doesn't support notifications")
await context.send_activity(
"This agent doesn't support notification handling yet."
)
return

response = await self.agent_instance.handle_agent_notification_activity(
notification_activity, self.agent_app.auth, self.auth_handler_name, context
)

if notification_activity.notification_type == NotificationTypes.EMAIL_NOTIFICATION:
response_activity = EmailResponse.create_email_response_activity(response)
# Set text field for channels that require it (like Playground mock connector)
response_activity.text = response
try:
await context.send_activity(response_activity)
except (ClientConnectorError, ClientResponseError) as conn_err:
# Playground may close connection before we can reply - log and continue
logger.debug(f"⚠️ Could not send response (Playground limitation): {conn_err}")
return

try:
await context.send_activity(response)
except (ClientConnectorError, ClientResponseError) as conn_err:
# Playground may close connection before we can reply - log and continue
logger.debug(f"⚠️ Could not send response (Playground limitation): {conn_err}")

except (ClientConnectorError, ClientResponseError) as conn_err:
# Connection errors are expected with Playground - just log them
logger.debug(f"⚠️ Connection error (Playground limitation): {conn_err}")
except Exception as e:
logger.error(f"❌ Notification error: {e}")
try:
await context.send_activity(
f"Sorry, I encountered an error processing the notification: {str(e)}"
)
except (ClientConnectorError, ClientResponseError):
# Can't send error message either - just log
pass

# Note: Notification handling is done via the on_invoke handler above
# The SafeAgentNotification handlers below are kept for production 'agents' channel
# which may use a different routing mechanism than Playground's 'msteams' channel

async def initialize_agent(self):
"""Initialize the hosted agent instance"""
if self.agent_instance is None:
Expand Down
Loading