diff --git a/python/openai/sample-agent/agent.py b/python/openai/sample-agent/agent.py index 6429020a..2e6e6369 100644 --- a/python/openai/sample-agent/agent.py +++ b/python/openai/sample-agent/agent.py @@ -43,6 +43,12 @@ from local_authentication_options import LocalAuthenticationOptions from microsoft_agents.hosting.core import Authorization, TurnContext +# Notifications +from microsoft_agents_a365.notifications.agent_notification import ( + AgentNotificationActivity, + NotificationTypes, +) + # Observability Components from microsoft_agents_a365.observability.core.config import configure from microsoft_agents_a365.observability.extensions.openai import OpenAIAgentsTraceInstrumentor @@ -342,6 +348,82 @@ async def process_user_message( # + # ========================================================================= + # NOTIFICATION HANDLING + # ========================================================================= + # + + async def handle_agent_notification_activity( + self, notification_activity: "AgentNotificationActivity", auth: Authorization, auth_handler_name: str, context: TurnContext + ) -> str: + """Handle agent notification activities (email, Word mentions, etc.)""" + try: + notification_type = notification_activity.notification_type + logger.info(f"📬 Processing notification: {notification_type}") + + # Setup MCP servers on first call + await self.setup_mcp_servers(auth, auth_handler_name, context) + + # Handle Email Notifications + if notification_type == NotificationTypes.EMAIL_NOTIFICATION: + if not hasattr(notification_activity, "email") or not notification_activity.email: + return "I could not find the email notification details." + + email = notification_activity.email + email_body = getattr(email, "html_body", "") or getattr(email, "body", "") + message = f"You have received the following email. Please follow any instructions in it. {email_body}" + + result = await Runner.run(starting_agent=self.agent, input=message, context=context) + return self._extract_result(result) or "Email notification processed." + + # Handle Word Comment Notifications + elif notification_type == NotificationTypes.WPX_COMMENT: + if not hasattr(notification_activity, "wpx_comment") or not notification_activity.wpx_comment: + return "I could not find the Word notification details." + + wpx = notification_activity.wpx_comment + doc_id = getattr(wpx, "document_id", "") + comment_id = getattr(wpx, "initiating_comment_id", "") + drive_id = "default" + + # Get Word document content + doc_message = f"You have a new comment on the Word document with id '{doc_id}', comment id '{comment_id}', drive id '{drive_id}'. Please retrieve the Word document as well as the comments and return it in text format." + doc_result = await Runner.run(starting_agent=self.agent, input=doc_message, context=context) + word_content = self._extract_result(doc_result) + + # Process the comment with document context + comment_text = notification_activity.text or "" + response_message = f"You have received the following Word document content and comments. Please refer to these when responding to comment '{comment_text}'. {word_content}" + result = await Runner.run(starting_agent=self.agent, input=response_message, context=context) + return self._extract_result(result) or "Word notification processed." + + # Generic notification handling + else: + notification_message = notification_activity.text or f"Notification received: {notification_type}" + result = await Runner.run(starting_agent=self.agent, input=notification_message, context=context) + return self._extract_result(result) or "Notification processed successfully." + + except Exception as e: + logger.error(f"Error processing notification: {e}") + return f"Sorry, I encountered an error processing the notification: {str(e)}" + + def _extract_result(self, result) -> str: + """Extract text content from agent result""" + if not result: + return "" + if hasattr(result, "final_output") and result.final_output: + return str(result.final_output) + elif hasattr(result, "contents"): + return str(result.contents) + elif hasattr(result, "text"): + return str(result.text) + elif hasattr(result, "content"): + return str(result.content) + else: + return str(result) + + # + # ========================================================================= # CLEANUP # ========================================================================= diff --git a/python/openai/sample-agent/host_agent_server.py b/python/openai/sample-agent/host_agent_server.py index d5125822..f10d9469 100644 --- a/python/openai/sample-agent/host_agent_server.py +++ b/python/openai/sample-agent/host_agent_server.py @@ -12,6 +12,7 @@ # Import our agent base class from agent_interface import AgentInterface, check_agent_inheritance +from aiohttp.client_exceptions import ClientConnectorError, ClientResponseError from aiohttp.web import Application, Request, Response, json_response, run_app from aiohttp.web_middlewares import middleware as web_middleware from dotenv import load_dotenv @@ -34,6 +35,14 @@ TurnContext, TurnState, ) +from microsoft_agents.activity import InvokeResponse +from microsoft_agents_a365.notifications.agent_notification import ( + AgentNotification, + NotificationTypes, + AgentNotificationActivity, + ChannelId, +) +from microsoft_agents_a365.notifications import EmailResponse from microsoft_agents_a365.observability.core.middleware.baggage_builder import BaggageBuilder from microsoft_agents_a365.runtime.environment_utils import ( get_observability_authentication_scope, @@ -52,6 +61,81 @@ agents_sdk_config = load_configuration_from_env(environ) +class SafeAgentNotification(AgentNotification): + """ + Extended AgentNotification that filters out invalid invoke activities. + + The SDK's AgentNotification will throw a ValueError if an invoke activity + is received without a valid 'name' field. This wrapper adds a pre-check + to prevent the error when Playground sends activities with name=None. + """ + + def on_agent_notification(self, channel_id: ChannelId, **kwargs): + """ + Override to add name validation before creating AgentNotificationActivity. + + We completely bypass the parent's decorator to avoid the SDK creating + AgentNotificationActivity with an invalid name, which throws ValueError. + """ + registered_channel = channel_id.channel.lower() + registered_subchannel = (channel_id.sub_channel or "*").lower() + + def route_selector(context: TurnContext) -> bool: + """Check if this activity should be handled by this notification handler""" + # First check: activity must have a valid name for notifications + activity_name = context.activity.name + if not activity_name: + logger.debug("⏭️ Skipping invoke activity with no name") + return False + + # Only handle agent/notification invoke activities + if activity_name != "agent/notification": + logger.debug(f"⏭️ Skipping invoke with non-notification name: {activity_name}") + return False + + # Check channel matching (from parent implementation) + ch = context.activity.channel_id + received_channel = (ch.channel if ch else "").lower() + received_subchannel = (ch.sub_channel if ch and ch.sub_channel else "").lower() + + if received_channel != registered_channel: + return False + if registered_subchannel == "*": + return True + if registered_subchannel not in self._known_subchannels: + return False + return received_subchannel == registered_subchannel + + def decorator(handler): + async def safe_route_handler(context: TurnContext, state: TurnState): + """Safely create AgentNotificationActivity and call handler""" + try: + ana = AgentNotificationActivity(context.activity) + await handler(context, state, ana) + # Set invoke response to 200 to prevent 501 Not Implemented + context.turn_state[TurnContext._INVOKE_RESPONSE_KEY] = InvokeResponse( + status=200, body={"status": "ok"} + ) + except ValueError as e: + # Log but don't crash on invalid notification types + logger.warning(f"⚠️ Invalid notification activity: {e}") + context.turn_state[TurnContext._INVOKE_RESPONSE_KEY] = InvokeResponse( + status=200, body={"status": "skipped", "reason": str(e)} + ) + except Exception as e: + logger.error(f"❌ Error in notification handler: {e}") + context.turn_state[TurnContext._INVOKE_RESPONSE_KEY] = InvokeResponse( + status=500, body={"status": "error", "message": str(e)} + ) + raise + + # Register this route with the app + self._app.add_route(route_selector, safe_route_handler, **kwargs) + return safe_route_handler + + return decorator + + class GenericAgentHost: """Generic host that can host any agent implementing the AgentInterface""" @@ -94,6 +178,9 @@ def __init__(self, agent_class: type[AgentInterface], *agent_args, **agent_kwarg authorization=self.authorization, **agents_sdk_config, ) + # Use SafeAgentNotification to filter out invalid invoke activities + # that would cause SDK ValueError when name is None + self.agent_notification = SafeAgentNotification(self.agent_app) # Setup message handlers self._setup_handlers() @@ -177,6 +264,107 @@ async def on_message(context: TurnContext, _: TurnState): logger.error(f"❌ Error processing message: {e}") await context.send_activity(error_msg) + # Handle invoke activities (notifications) with proper InvokeResponse + @self.agent_app.activity("invoke", **handler_config) + async def on_invoke(context: TurnContext, state: TurnState): + """Handle invoke activities including agent/notification""" + activity_name = context.activity.name + + # Skip invoke activities without a name + if not activity_name: + logger.debug("⏭️ Skipping invoke activity with no name") + return InvokeResponse(status=200, body={"status": "skipped", "reason": "no name"}) + + # Handle agent/notification invoke activities + if activity_name == "agent/notification": + try: + ana = AgentNotificationActivity(context.activity) + await handle_notification_internal(context, state, ana) + return InvokeResponse(status=200, body={"status": "ok"}) + except ValueError as e: + logger.warning(f"⚠️ Invalid notification: {e}") + return InvokeResponse(status=200, body={"status": "skipped", "reason": str(e)}) + except Exception as e: + logger.error(f"❌ Notification error: {e}") + return InvokeResponse(status=500, body={"status": "error", "message": str(e)}) + + # Unknown invoke type - return 501 Not Implemented + logger.debug(f"⏭️ Unknown invoke name: {activity_name}") + return InvokeResponse(status=501, body={"status": "not implemented", "name": activity_name}) + + # Shared notification handler logic + async def handle_notification_internal( + context: TurnContext, + state: TurnState, + notification_activity: AgentNotificationActivity, + ): + try: + tenant_id = context.activity.recipient.tenant_id if context.activity.recipient else None + agent_id = context.activity.recipient.agentic_app_id if context.activity.recipient else None + + with BaggageBuilder().tenant_id(tenant_id).agent_id(agent_id).build(): + # Ensure the agent is available + if not self.agent_instance: + logger.error("Agent not available") + await context.send_activity("❌ Sorry, the agent is not available.") + return + + # Exchange token for observability if auth handler is configured + if self.auth_handler_name and tenant_id and agent_id: + exaau_token = await self.agent_app.auth.exchange_token( + context, + scopes=get_observability_authentication_scope(), + auth_handler_id=self.auth_handler_name, + ) + cache_agentic_token(tenant_id, agent_id, exaau_token.token) + + logger.info(f"📬 Processing notification: {notification_activity.notification_type}") + + if not hasattr(self.agent_instance, "handle_agent_notification_activity"): + logger.warning("⚠️ Agent doesn't support notifications") + await context.send_activity( + "This agent doesn't support notification handling yet." + ) + return + + response = await self.agent_instance.handle_agent_notification_activity( + notification_activity, self.agent_app.auth, self.auth_handler_name, context + ) + + if notification_activity.notification_type == NotificationTypes.EMAIL_NOTIFICATION: + response_activity = EmailResponse.create_email_response_activity(response) + # Set text field for channels that require it (like Playground mock connector) + response_activity.text = response + try: + await context.send_activity(response_activity) + except (ClientConnectorError, ClientResponseError) as conn_err: + # Playground may close connection before we can reply - log and continue + logger.debug(f"⚠️ Could not send response (Playground limitation): {conn_err}") + return + + try: + await context.send_activity(response) + except (ClientConnectorError, ClientResponseError) as conn_err: + # Playground may close connection before we can reply - log and continue + logger.debug(f"⚠️ Could not send response (Playground limitation): {conn_err}") + + except (ClientConnectorError, ClientResponseError) as conn_err: + # Connection errors are expected with Playground - just log them + logger.debug(f"⚠️ Connection error (Playground limitation): {conn_err}") + except Exception as e: + logger.error(f"❌ Notification error: {e}") + try: + await context.send_activity( + f"Sorry, I encountered an error processing the notification: {str(e)}" + ) + except (ClientConnectorError, ClientResponseError): + # Can't send error message either - just log + pass + + # Note: Notification handling is done via the on_invoke handler above + # The SafeAgentNotification handlers below are kept for production 'agents' channel + # which may use a different routing mechanism than Playground's 'msteams' channel + async def initialize_agent(self): """Initialize the hosted agent instance""" if self.agent_instance is None: