diff --git a/python/samples/demos/copilot_studio_agent/.env.sample b/python/samples/demos/copilot_studio_agent/.env.sample index 652168b135e7..4d8821341fc2 100644 --- a/python/samples/demos/copilot_studio_agent/.env.sample +++ b/python/samples/demos/copilot_studio_agent/.env.sample @@ -1,2 +1,3 @@ -BOT_SECRET="copy from Copilot Studio Agent, under Settings > Security > Web Channel" -BOT_ENDPOINT="https://europe.directline.botframework.com/v3/directline" \ No newline at end of file +DIRECTLINE_ENDPOINT="https://europe.directline.botframework.com/v3/directline" +AUDITOR_AGENT_SECRET="Copy from Copilot Studio Agent > Settings > Security > Web channel security > Secrets and tokens" +TAGLINE_AGENT_SECRET="Copy from Copilot Studio Agent > Settings > Security > Web channel security > Secrets and tokens" \ No newline at end of file diff --git a/python/samples/demos/copilot_studio_agent/.gitignore b/python/samples/demos/copilot_studio_agent/.gitignore new file mode 100644 index 000000000000..84224b5312fc --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/.gitignore @@ -0,0 +1 @@ +.chainlit/ \ No newline at end of file diff --git a/python/samples/demos/copilot_studio_agent/README.md b/python/samples/demos/copilot_studio_agent/README.md index 187396158e5b..131fa7c65150 100644 --- a/python/samples/demos/copilot_studio_agent/README.md +++ b/python/samples/demos/copilot_studio_agent/README.md @@ -14,7 +14,17 @@ This way, you can create any amount of agents in Copilot Studio and interact wit ## Implementation -The implementation is quite simple, since Copilot Studio can publish agents over DirectLine API, which we can use in Semantic Kernel to define a new subclass of `Agent` named [`DirectLineAgent`](src/direct_line_agent.py). +The implementation enables seamless integration with Copilot Studio agents via the DirectLine API. Several key components work together to provide this functionality: + +- [`DirectLineClient`](src/agents/copilot_studio/directline_client.py): A utility module that handles all Direct Line API operations including authentication, conversation management, posting user activities, and retrieving bot responses using watermark-based polling. + +- [`CopilotAgent`](src/agents/copilot_studio/copilot_agent.py): Implements `CopilotAgent`, which orchestrates interactions with a Copilot Studio bot. It serializes user messages, handles asynchronous polling for responses, and converts bot activities into structured message content. + +- [`CopilotAgentThread`](src/agents/copilot_studio/copilot_agent_thread.py): Provides a specialized thread implementation for Copilot Studio conversations, managing Direct Line-specific context such as conversation ID and watermark. + +- [`CopilotAgentChannel`](src/agents/copilot_studio/copilot_agent_channel.py): Adds `CopilotStudioAgentChannel`, allowing Copilot Studio agents to participate in multi-agent group chats via the channel-based invocation system. + +- [`CopilotMessageContent`](src/agents/copilot_studio/copilot_message_content.py): Introduces `CopilotMessageContent`, an extension of `ChatMessageContent` that can represent rich message types from Copilot Studio—including plain text, adaptive cards, and suggested actions. Additionally, we do enforce [authentication to the DirectLine API](https://learn.microsoft.com/en-us/microsoft-copilot-studio/configure-web-security). @@ -23,18 +33,28 @@ Additionally, we do enforce [authentication to the DirectLine API](https://learn > [!NOTE] > Working with Copilot Studio Agents requires a [subscription](https://learn.microsoft.com/en-us/microsoft-copilot-studio/requirements-licensing-subscriptions) to Microsoft Copilot Studio. -> [!TIP] -> In this case, we suggest to start with a simple Q&A Agent and supply a PDF to answer some questions. You can find a free sample like [Microsoft Surface Pro 4 User Guide](https://download.microsoft.com/download/2/9/B/29B20383-302C-4517-A006-B0186F04BE28/surface-pro-4-user-guide-EN.pdf) +For this sample, we have created two agents in Copilot Studio: +- The **TaglineGenerator agent** creates taglines for products based on descriptions +- The **BrandAuditor agent** evaluates and approves/rejects taglines based on brand guidelines + +The TaglineGenerator is used in the single agent chat example, allowing you to interact with it directly. In the group chat example, both the TaglineGenerator and the BrandAuditor agents collaborate to create and refine taglines that meet brand requirements. + +### Setting Up Copilot Studio Agents +Follow these steps to set up your Copilot Studio agents: 1. [Create a new agent](https://learn.microsoft.com/en-us/microsoft-copilot-studio/fundamentals-get-started?tabs=web) in Copilot Studio 2. [Publish the agent](https://learn.microsoft.com/en-us/microsoft-copilot-studio/publication-fundamentals-publish-channels?tabs=web) 3. Turn off default authentication under the agent Settings > Security 4. [Setup web channel security](https://learn.microsoft.com/en-us/microsoft-copilot-studio/configure-web-security) and copy the secret value -Once you're done with the above steps, you can use the following code to interact with the Copilot Studio Agent: +### Setting Up Environment -1. Copy the `.env.sample` file to `.env` and set the `BOT_SECRET` environment variable to the secret value -2. Run the following code: +1. Copy the `.env.sample` file to `.env` and add the agent secrets to your `.env` file: +``` +AUDITOR_AGENT_SECRET=your_tagline_agent_secret +BRAND_AGENT_SECRET=your_brand_auditor_agent_secret +``` +2. Set up your environment: ```bash python -m venv .venv @@ -45,6 +65,20 @@ source .venv/bin/activate .venv\Scripts\Activate.ps1 pip install -r requirements.txt +``` + +### Running the Single Agent Chat +```bash chainlit run --port 8081 .\chat.py ``` + +The chat.py file demonstrates a web-based chat interface that allows for multi-turn conversations with a single agent. + +### Running the Agent Group Chat + +```bash +python group_chat.py +``` + +The agents will collaborate automatically, with the TaglineGenerator creating taglines and the BrandAuditor providing feedback until a satisfactory tagline is approved. diff --git a/python/samples/demos/copilot_studio_agent/src/agents/auditor_agent.py b/python/samples/demos/copilot_studio_agent/src/agents/auditor_agent.py new file mode 100644 index 000000000000..8ab3524cdd6b --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/agents/auditor_agent.py @@ -0,0 +1,39 @@ +# Copyright (c) Microsoft. All rights reserved. + +import os + +from agents.copilot_studio.copilot_agent import CopilotAgent +from agents.copilot_studio.directline_client import DirectLineClient +from dotenv import load_dotenv + +load_dotenv(override=True) + + +class BrandAuditor(CopilotAgent): + """ + Brand auditor agent that ensures all messaging aligns with the brand's identity. + Evaluates taglines for alignment with brand voice, values and target audience. + Initializes a DirectLine client configured for the agent instance. + """ + + def __init__(self): + directline_endpoint = os.getenv("DIRECTLINE_ENDPOINT") + copilot_agent_secret = os.getenv("AUDITOR_AGENT_SECRET") + + if not directline_endpoint or not copilot_agent_secret: + raise ValueError("DIRECTLINE_ENDPOINT and AUDITOR_AGENT_SECRET must be set in environment variables.") + + directline_client = DirectLineClient( + directline_endpoint=directline_endpoint, + copilot_agent_secret=copilot_agent_secret, + ) + + super().__init__( + id="brand_auditor", + name="brand_auditor", + description=( + "Brand compliance specialist ensuring messaging aligns with a modern wellness " + "company's identity, values, and audience." + ), + directline_client=directline_client, + ) diff --git a/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent.py b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent.py new file mode 100644 index 000000000000..c857ac8e1263 --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent.py @@ -0,0 +1,268 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +import sys +from collections.abc import AsyncIterable +from typing import Any, ClassVar + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +from agents.copilot_studio.copilot_agent_channel import CopilotStudioAgentChannel +from agents.copilot_studio.copilot_agent_thread import CopilotAgentThread +from agents.copilot_studio.copilot_message_content import CopilotMessageContent +from agents.copilot_studio.directline_client import DirectLineClient + +from semantic_kernel.agents import Agent +from semantic_kernel.agents.agent import AgentResponseItem, AgentThread +from semantic_kernel.agents.channels.agent_channel import AgentChannel +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.utils.author_role import AuthorRole +from semantic_kernel.exceptions.agent_exceptions import AgentInvokeException +from semantic_kernel.utils.telemetry.agent_diagnostics.decorators import ( + trace_agent_get_response, + trace_agent_invocation, +) + +logger: logging.Logger = logging.getLogger(__name__) + + +class CopilotAgent(Agent): + """ + An agent that facilitates communication with a Microsoft Copilot Studio bot via the Direct Line API. + It serializes user inputs into Direct Line payloads, handles asynchronous response polling, and + transforms bot activities into structured message content. + Conversation state such as conversation ID and watermark is externally managed by CopilotAgentThread. + """ + + directline_client: DirectLineClient | None = None + + channel_type: ClassVar[type[AgentChannel]] = CopilotStudioAgentChannel + + def __init__( + self, + id: str, + name: str, + description: str, + directline_client: DirectLineClient, + ) -> None: + """ + Initialize the CopilotAgent. + """ + super().__init__(id=id, name=name, description=description) + self.directline_client = directline_client + + @override + def get_channel_keys(self) -> list[str]: + """ + Override to return agent ID instead of channel_type for Copilot agents. + + This is particularly important for CopilotAgent because each agent instance + maintains its own conversation with a unique thread ID in the DirectLine API. + Without this override, multiple CopilotAgent instances in a group chat would + share the same channel, causing thread ID conflicts and message routing issues. + + Returns: + A list containing the agent ID as the unique channel key, ensuring each + CopilotAgent gets its own dedicated channel and thread. + """ + return [self.id] + + @trace_agent_get_response + @override + async def get_response( + self, + *, + messages: str | ChatMessageContent | list[str | ChatMessageContent], + thread: AgentThread | None = None, + **kwargs, + ) -> AgentResponseItem[CopilotMessageContent]: + """ + Get a response from the agent on a thread. + + Args: + messages: The input chat message content either as a string, ChatMessageContent or + a list of strings or ChatMessageContent. + thread: The thread to use for the agent. + kwargs: Additional keyword arguments. + + Returns: + AgentResponseItem[ChatMessageContent]: The response from the agent. + """ + thread = await self._ensure_thread_exists_with_messages( + messages=messages, + thread=thread, + construct_thread=lambda: CopilotAgentThread(directline_client=self.directline_client), + expected_type=CopilotAgentThread, + ) + assert thread.id is not None # nosec + + response_items = [] + async for response_item in self.invoke( + messages=messages, + thread=thread, + **kwargs, + ): + response_items.append(response_item) + + if not response_items: + raise AgentInvokeException("No response messages were returned from the agent.") + + return response_items[-1] + + @trace_agent_invocation + @override + async def invoke( + self, + *, + messages: str | ChatMessageContent | list[str | ChatMessageContent], + thread: AgentThread | None = None, + message_data: dict[str, Any] | None = None, + **kwargs, + ) -> AsyncIterable[AgentResponseItem[CopilotMessageContent]]: + """Invoke the agent on the specified thread. + + Args: + messages: The input chat message content either as a string, ChatMessageContent or + a list of strings or ChatMessageContent. + thread: The thread to use for the agent. + message_data: Optional dict that will be sent as the "value" field in the payload + for adaptive card responses. + kwargs: Additional keyword arguments. + + Yields: + AgentResponseItem[ChatMessageContent]: The response from the agent. + """ + logger.debug("Received messages: %s", messages) + if not isinstance(messages, str) and not isinstance(messages, ChatMessageContent): + raise AgentInvokeException("Messages must be a string or a ChatMessageContent for Copilot Agent.") + + # Ensure DirectLine client is initialized + if self.directline_client is None: + raise AgentInvokeException("DirectLine client is not initialized.") + + thread = await self._ensure_thread_exists_with_messages( + messages=messages, + thread=thread, + construct_thread=lambda: CopilotAgentThread(directline_client=self.directline_client), + expected_type=CopilotAgentThread, + ) + assert thread.id is not None # nosec + + normalized_message = ( + ChatMessageContent(role=AuthorRole.USER, content=messages) if isinstance(messages, str) else messages + ) + + payload = self._build_payload(normalized_message, message_data, thread.id) + response_data = await self._send_message(payload, thread) + if response_data is None or "activities" not in response_data: + raise AgentInvokeException(f"Invalid response from DirectLine Bot.\n{response_data}") + + # Process DirectLine activities and convert them to appropriate message content + for activity in response_data["activities"]: + if activity.get("type") != "message" or activity.get("from", {}).get("id") == "user": + continue + + # Create a CopilotMessageContent instance from the activity + message = CopilotMessageContent.from_bot_activity(activity, name=self.name) + + logger.debug("Response message: %s", message.content) + + yield AgentResponseItem(message=message, thread=thread) + + def _build_payload( + self, + message: ChatMessageContent, + message_data: dict[str, Any] | None = None, + thread_id: str | None = None, + ) -> dict[str, Any]: + """Build the message payload for the DirectLine Bot. + + Args: + message: The message content to send. + message_data: Optional dict that will be sent as the "value" field in the payload + for adaptive card responses. + thread_id: The thread ID (conversation ID). + + Returns: + A dictionary representing the payload to be sent to the DirectLine Bot. + """ + payload = { + "type": "message", + "from": {"id": "user"}, + } + + if message_data and "adaptive_card_response" in message_data: + payload["value"] = message_data["adaptive_card_response"] + else: + payload["text"] = message.content + + payload["conversationId"] = thread_id + return payload + + async def _send_message(self, payload: dict[str, Any], thread: CopilotAgentThread) -> dict[str, Any] | None: + """ + Post the payload to the conversation and poll for responses. + """ + if self.directline_client is None: + raise AgentInvokeException("DirectLine client is not initialized.") + + # Post the message payload + await thread.post_message(payload) + + # Poll for new activities using watermark until DynamicPlanFinished event is found + finished = False + collected_data = None + while not finished: + data = await thread.get_messages() + activities = data.get("activities", []) + + # Check for either DynamicPlanFinished event or message from bot + if any( + (activity.get("type") == "event" and activity.get("name") == "DynamicPlanFinished") + or (activity.get("type") == "message" and activity.get("from", {}).get("role") == "bot") + for activity in activities + ): + collected_data = data + finished = True + break + + await asyncio.sleep(1) + + return collected_data + + async def close(self) -> None: + """ + Clean up resources. + """ + if self.directline_client: + await self.directline_client.close() + + @trace_agent_invocation + @override + async def invoke_stream(self, *args, **kwargs): + return super().invoke_stream(*args, **kwargs) + + async def create_channel(self, thread_id: str | None = None) -> AgentChannel: + """Create a Copilot Agent channel. + + Args: + thread_id: The ID of the thread. If None, a new thread will be created. + + Returns: + An instance of AgentChannel. + """ + from agents.copilot_studio.copilot_agent_channel import CopilotStudioAgentChannel + + if self.directline_client is None: + raise AgentInvokeException("DirectLine client is not initialized.") + + thread = CopilotAgentThread(directline_client=self.directline_client, conversation_id=thread_id) + + if thread.id is None: + await thread.create() + + return CopilotStudioAgentChannel(thread=thread) diff --git a/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent_channel.py b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent_channel.py new file mode 100644 index 000000000000..aee9c9518cb6 --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent_channel.py @@ -0,0 +1,114 @@ +# Copyright (c) Microsoft. All rights reserved. + +import logging +import sys +from collections.abc import AsyncIterable +from typing import TYPE_CHECKING, Any + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +from semantic_kernel.agents.channels.agent_channel import AgentChannel +from semantic_kernel.contents.chat_history import ChatHistory +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.streaming_chat_message_content import StreamingChatMessageContent +from semantic_kernel.exceptions.agent_exceptions import AgentChatException, AgentInvokeException + +if TYPE_CHECKING: + from semantic_kernel.agents.agent import Agent + +from agents.copilot_studio.copilot_agent_thread import CopilotAgentThread + +logger = logging.getLogger(__name__) + + +class CopilotStudioAgentChannel(AgentChannel, ChatHistory): + """A channel for interacting with Copilot Studio Agent.""" + + thread: "CopilotAgentThread" + + @override + async def receive(self, history: list[ChatMessageContent]) -> None: + """Receive the conversation messages. + + Args: + history: The history of messages in the conversation. + """ + for incoming_message in history: + self.messages.append(incoming_message) + + @override + async def invoke( + self, + agent: "Agent", + **kwargs: Any, + ) -> AsyncIterable[tuple[bool, ChatMessageContent]]: + """Perform a discrete incremental interaction between a single Agent and AgentChat. + + Args: + agent: The agent to interact with. + kwargs: Additional keyword arguments. + + Returns: + An async iterable of ChatMessageContent with a boolean indicating if the + message should be visible external to the agent. + """ + from agents.copilot_studio.copilot_agent import CopilotAgent + + if not isinstance(agent, CopilotAgent): + raise ValueError("Agent must be an instance of CopilotAgent.") + if not self.messages: + # This is not supposed to happen, as the channel won't get invoked + # before it has received messages. This is just extra safety. + raise AgentChatException("No chat history available.") + + try: + # Pass thread object instead of just the ID + logger.debug(f"Invoking Copilot Studio agent: {agent.name} with thread ID: {self.thread.id}") + async for response in agent.invoke( + messages=self.messages[-1], + thread=self.thread, + **kwargs, + ): + # Append the response to the chat history + self.messages.append(response.message) + yield True, response.message + except Exception as e: + logger.error(f"Error invoking Copilot Studio agent: {e}") + raise AgentInvokeException(f"Error invoking Copilot Studio agent: {e}") + + @override + async def invoke_stream( + self, + agent: "Agent", + messages: list[ChatMessageContent], + **kwargs: Any, + ) -> AsyncIterable[StreamingChatMessageContent]: + """Perform a streaming interaction between a single Agent and AgentChat. + + Args: + agent: The agent to interact with. + messages: The history of messages in the conversation. + kwargs: Additional keyword arguments. + + Returns: + An async iterable of StreamingChatMessageContent. + """ + # For now, just implement a placeholder that raises NotImplementedError + raise NotImplementedError("Streaming is not supported by CopilotStudioAgentChannel yet") + + @override + async def get_history(self) -> AsyncIterable[ChatMessageContent]: + """Retrieve the message history specific to this channel. + + Returns: + An async iterable of ChatMessageContent. + """ + for message in reversed(self.messages): + yield message + + async def reset(self) -> None: + """Reset the channel state.""" + self.messages.clear() diff --git a/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent_thread.py b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent_thread.py new file mode 100644 index 000000000000..c5b651571d64 --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_agent_thread.py @@ -0,0 +1,120 @@ +# Copyright (c) Microsoft. All rights reserved. + +import logging +import sys +from typing import Any + +if sys.version_info >= (3, 12): + from typing import override # pragma: no cover +else: + from typing_extensions import override # pragma: no cover + +from agents.copilot_studio.directline_client import DirectLineClient + +from semantic_kernel.agents.agent import AgentThread +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.exceptions.agent_exceptions import AgentInvokeException + +logger = logging.getLogger(__name__) + + +class CopilotAgentThread(AgentThread): + """ + Thread implementation for Copilot Studio conversations via DirectLine API. + Manages conversation IDs and watermarks for tracking conversation state. + """ + + def __init__( + self, + directline_client: DirectLineClient, + conversation_id: str | None = None, + watermark: str | None = None, + ) -> None: + """Initialize the Copilot Agent Thread. + + Args: + directline_client: The DirectLine client for API communication. + conversation_id: The conversation ID (optional). + watermark: The watermark for tracking conversation state (optional). + """ + super().__init__() + self._directline_client = directline_client + self._id = conversation_id + self.watermark = watermark + + @override + async def _create(self) -> str: + """Starts the thread and returns the underlying Copilot Studio Agent conversation ID.""" + self._id = await self._directline_client.start_conversation() + return self._id + + @override + async def _delete(self) -> None: + """Ends the current thread. + + This will end the underlying DirectLine conversation but not delete it permanently + from the service, as DirectLine API doesn't provide a specific endpoint to delete conversations. + """ + if self._id: + try: + await self._directline_client.end_conversation(self._id) + logger.debug(f"Conversation {self._id} has been ended") + except Exception as e: + logger.error(f"Failed to end conversation {self._id}: {str(e)}") + + @override + async def _on_new_message(self, new_message: str | ChatMessageContent) -> None: + """Called when a new message has been contributed to the chat.""" + # Not implemented for DirectLine + pass + + async def update_watermark(self, watermark: str) -> None: + """Update the watermark for the conversation. + + Args: + watermark: The new watermark. + """ + self.watermark = watermark + + async def post_message(self, payload: dict[str, Any]) -> dict[str, Any]: + """Post a message to the DirectLine conversation. + + Args: + payload: The message payload to post. + + Returns: + The response from the DirectLine API. + + Raises: + AgentInvokeException: If posting the message fails or the thread ID is not set. + """ + if not self._id: + raise AgentInvokeException("Thread ID (conversation ID) is not set. Create the thread first.") + + try: + return await self._directline_client.post_activity(self._id, payload) + except Exception as e: + logger.error(f"Failed to post message to thread {self._id}: {str(e)}") + raise AgentInvokeException(f"Failed to post message: {str(e)}") + + async def get_messages(self) -> dict[str, Any]: + """Get messages from the DirectLine conversation using the current watermark. + + Returns: + The activities data from the DirectLine API. + + Raises: + AgentInvokeException: If getting messages fails or the thread ID is not set. + """ + if not self._id: + raise AgentInvokeException("Thread ID (conversation ID) is not set. Create the thread first.") + + try: + data = await self._directline_client.get_activities(self._id, self.watermark) + watermark = data.get("watermark") + if watermark: + await self.update_watermark(watermark) + return data + except Exception as e: + logger.error(f"Failed to get messages from thread {self._id}: {str(e)}") + raise AgentInvokeException(f"Failed to get messages: {str(e)}") diff --git a/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_message_content.py b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_message_content.py new file mode 100644 index 000000000000..e27d2fb054ed --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/copilot_message_content.py @@ -0,0 +1,100 @@ +# Copyright (c) Microsoft. All rights reserved. + +from enum import Enum +from typing import Any + +from pydantic import Field + +from semantic_kernel.contents.chat_message_content import ChatMessageContent +from semantic_kernel.contents.utils.author_role import AuthorRole + + +class CopilotContentType(str, Enum): + TEXT = "text" + ADAPTIVE_CARD = "adaptiveCard" + SUGGESTED_ACTIONS = "suggestedActions" + + +class CopilotMessageContent(ChatMessageContent): + """ + Extended ChatMessageContent that supports various content types from Copilot Studio + including text, adaptive cards, and suggested actions. + """ + + copilot_content_type: CopilotContentType = Field(default=CopilotContentType.TEXT) + adaptive_card: dict[str, Any] | None = Field(default=None) + suggested_actions: list[dict[str, Any]] | None = Field(default=None) + + def __init__( + self, + role: AuthorRole, + content: str = "", + name: str | None = None, + copilot_content_type: CopilotContentType = CopilotContentType.TEXT, + adaptive_card: dict[str, Any] | None = None, + suggested_actions: list[dict[str, Any]] | None = None, + **kwargs, + ): + super().__init__(role=role, content=content, name=name, **kwargs) + + self.copilot_content_type = copilot_content_type + self.adaptive_card = adaptive_card + self.suggested_actions = suggested_actions + + # Store rich content in metadata for preservation + if adaptive_card: + self.metadata["adaptive_card"] = adaptive_card + if suggested_actions: + self.metadata["suggested_actions"] = suggested_actions + + @classmethod + def from_bot_activity(cls, activity: dict[str, Any], name: str = None) -> "CopilotMessageContent": + """ + Create a CopilotMessageContent instance from a DirectLine activity. + + Args: + activity: The DirectLine activity object + name: Optional name for the copilot agent sending the message + + Returns: + A CopilotMessageContent instance with the appropriate content type + """ + role = activity.get("from", {}).get("role", "assistant") + if role == "bot": + role = "assistant" + + # Get the base text content + content = activity.get("text", "") + name = name or activity.get("from", {}).get("name") + + # Check for suggested actions + suggested_actions = activity.get("suggestedActions", {}).get("actions", []) + + # Check for adaptive card attachments + attachments = activity.get("attachments", []) + adaptive_card = None + + if attachments and attachments[0].get("contentType") == "application/vnd.microsoft.card.adaptive": + adaptive_card = attachments[0].get("content", {}) + return cls( + role=role, + content=content, + name=name, + copilot_content_type=CopilotContentType.ADAPTIVE_CARD, + adaptive_card=adaptive_card, + suggested_actions=suggested_actions if suggested_actions else None, + ) + if suggested_actions: + return cls( + role=role, + content=content, + name=name, + copilot_content_type=CopilotContentType.SUGGESTED_ACTIONS, + suggested_actions=suggested_actions, + ) + return cls( + role=role, + content=content, + name=name, + copilot_content_type=CopilotContentType.TEXT, + ) diff --git a/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/directline_client.py b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/directline_client.py new file mode 100644 index 000000000000..53be43b67b92 --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/agents/copilot_studio/directline_client.py @@ -0,0 +1,167 @@ +# Copyright (c) Microsoft. All rights reserved. + +import logging +from typing import Any + +import aiohttp + +logger = logging.getLogger(__name__) + + +class DirectLineClient: + """ + Handles Direct Line API interactions for Copilot Studio agents. + Provides methods for authentication, starting conversations, posting activities, + and polling responses, including support for watermark-based activity retrieval. + """ + + def __init__( + self, + directline_endpoint: str, + copilot_agent_secret: str, + ) -> None: + """ + Initialize the DirectLine Client. + + Args: + directline_endpoint: The endpoint for the DirectLine API. + copilot_agent_secret: The secret used to authenticate with DirectLine API. + + """ + self.directline_endpoint = directline_endpoint + self.copilot_agent_secret = copilot_agent_secret + self._session: aiohttp.ClientSession | None = None + + async def get_session(self) -> aiohttp.ClientSession: + """ + Get an authenticated aiohttp ClientSession using the bot secret. + Creates a new session if one doesn't exist already. + + Returns: + An authenticated aiohttp ClientSession. + """ + # Create a session with the bot secret for authorization + if self._session is None or self._session.closed: + headers = { + "Authorization": f"Bearer {self.copilot_agent_secret}", + "Content-Type": "application/json", + } + self._session = aiohttp.ClientSession(headers=headers) + + return self._session + + async def start_conversation(self) -> str: + """ + Start a new DirectLine conversation. + + Returns: + The conversation ID. + + Raises: + Exception: If starting the conversation fails. + """ + # Use the session with the bot secret for authorization + session = await self.get_session() + + async with session.post(f"{self.directline_endpoint}/conversations") as resp: + if resp.status not in (200, 201): + raise Exception(f"Failed to create DirectLine conversation. Status: {resp.status}") + + data = await resp.json() + conversation_id = data.get("conversationId") + + if not conversation_id: + logger.error("Conversation creation response missing conversationId: %s", data) + raise Exception("No conversation ID received from conversation creation.") + + logger.debug(f"Created conversation {conversation_id}") + + return conversation_id + + async def post_activity(self, conversation_id: str, payload: dict[str, Any]) -> dict[str, Any]: + """ + Post an activity to a DirectLine conversation. + + Args: + conversation_id: The conversation ID. + payload: The activity payload to post. + + Returns: + The response from the API. + + Raises: + Exception: If posting the activity fails. + """ + session = await self.get_session() + activities_url = f"{self.directline_endpoint}/conversations/{conversation_id}/activities" + + logger.debug(f"Posting activity to {activities_url}") + async with session.post(activities_url, json=payload) as resp: + if resp.status != 200: + logger.error("Failed to post activity. Status: %s", resp.status) + raise Exception(f"Failed to post activity. Status: {resp.status}") + + return await resp.json() + + async def get_activities(self, conversation_id: str, watermark: str | None = None) -> dict[str, Any]: + """ + Get activities from a DirectLine conversation. + Use watermark to retrieve new activities since the last retrieved activity. + + Args: + conversation_id: The conversation ID. + watermark: The watermark for retrieving new activities. + + Returns: + The activities data. + + Raises: + Exception: If retrieving activities fails. + """ + session = await self.get_session() + activities_url = f"{self.directline_endpoint}/conversations/{conversation_id}/activities" + + if watermark: + activities_url = f"{activities_url}?watermark={watermark}" + + async with session.get(activities_url) as resp: + if resp.status != 200: + logger.error("Error polling activities. Status: %s", resp.status) + raise Exception(f"Error polling activities. Status: {resp.status}") + + return await resp.json() + + async def end_conversation(self, conversation_id: str, user_id: str = "user1") -> dict[str, Any]: + """ + End a DirectLine conversation by sending an endOfConversation activity. + + Args: + conversation_id: The conversation ID to end. + user_id: The user ID to use in the 'from' field (defaults to "user1"). + + Returns: + The response from the API. + + Raises: + Exception: If ending the conversation fails. + """ + payload = {"type": "endOfConversation", "from": {"id": user_id}} + + session = await self.get_session() + activities_url = f"{self.directline_endpoint}/conversations/{conversation_id}/activities" + + async with session.post(activities_url, json=payload) as resp: + if resp.status != 200: + logger.error("Failed to end conversation. Status: %s", resp.status) + raise Exception(f"Failed to end conversation. Status: {resp.status}") + + logger.debug(f"Successfully ended conversation {conversation_id}") + return await resp.json() + + async def close(self) -> None: + """ + Close the aiohttp session. + """ + if self._session and not self._session.closed: + await self._session.close() + logger.debug("DirectLine session closed") diff --git a/python/samples/demos/copilot_studio_agent/src/agents/tagline_agent.py b/python/samples/demos/copilot_studio_agent/src/agents/tagline_agent.py new file mode 100644 index 000000000000..aed7428b1c0f --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/agents/tagline_agent.py @@ -0,0 +1,35 @@ +# Copyright (c) Microsoft. All rights reserved. + +import os + +from agents.copilot_studio.copilot_agent import CopilotAgent +from agents.copilot_studio.directline_client import DirectLineClient +from dotenv import load_dotenv + +load_dotenv(override=True) + + +class TaglineGenerator(CopilotAgent): + """ + Provides a single tagline at a time based on product descriptions or feedback. + Initializes a DirectLine client configured for the agent instance. + """ + + def __init__(self): + directline_endpoint = os.getenv("DIRECTLINE_ENDPOINT") + copilot_agent_secret = os.getenv("TAGLINE_AGENT_SECRET") + + if not directline_endpoint or not copilot_agent_secret: + raise ValueError("DIRECTLINE_ENDPOINT and TAGLINE_AGENT_SECRET must be set in environment variables.") + + directline_client = DirectLineClient( + directline_endpoint=directline_endpoint, + copilot_agent_secret=copilot_agent_secret, + ) + + super().__init__( + id="tagline_generator", + name="tagline_generator", + description="Creative copywriter that generates witty, impactful taglines.", + directline_client=directline_client, + ) diff --git a/python/samples/demos/copilot_studio_agent/src/chat.py b/python/samples/demos/copilot_studio_agent/src/chat.py index 39c0cabd0739..1702316663fe 100644 --- a/python/samples/demos/copilot_studio_agent/src/chat.py +++ b/python/samples/demos/copilot_studio_agent/src/chat.py @@ -1,44 +1,46 @@ # Copyright (c) Microsoft. All rights reserved. import logging -import os +import sys import chainlit as cl -from direct_line_agent import DirectLineAgent +from agents.tagline_agent import TaglineGenerator from dotenv import load_dotenv -from semantic_kernel.contents.chat_history import ChatHistory - load_dotenv(override=True) -logging.basicConfig(level=logging.INFO) -logging.getLogger("direct_line_agent").setLevel(logging.DEBUG) +logging.basicConfig( + level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", stream=sys.stdout, force=True +) + +# Set log levels for specific libraries that might be too verbose +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("aiohttp").setLevel(logging.WARNING) + logger = logging.getLogger(__name__) -agent = DirectLineAgent( - id="copilot_studio", - name="copilot_studio", - description="copilot_studio", - bot_secret=os.getenv("BOT_SECRET"), - bot_endpoint=os.getenv("BOT_ENDPOINT"), -) +agent = TaglineGenerator() @cl.on_chat_start async def on_chat_start(): - cl.user_session.set("chat_history", ChatHistory()) + cl.user_session.set("agent_threads", {}) @cl.on_message async def on_message(message: cl.Message): - chat_history: ChatHistory = cl.user_session.get("chat_history") - - chat_history.add_user_message(message.content) - - response = await agent.get_response(history=chat_history) - - cl.user_session.set("chat_history", chat_history) - - logger.info(f"Response: {response}") - - await cl.Message(content=response.content, author=agent.name).send() + # Get threads from session + agent_threads = cl.user_session.get("agent_threads", {}) + thread = agent_threads.get(agent.id) + + final_response = None + async for response in agent.invoke(messages=message.content, thread=thread): + if response: + # Send each message as it comes in + await cl.Message(content=response.message.content, author=agent.name).send() + final_response = response + + # Update thread in session + if final_response is not None: + agent_threads[agent.id] = final_response.thread + cl.user_session.set("agent_threads", agent_threads) diff --git a/python/samples/demos/copilot_studio_agent/src/direct_line_agent.py b/python/samples/demos/copilot_studio_agent/src/direct_line_agent.py deleted file mode 100644 index 718610492cea..000000000000 --- a/python/samples/demos/copilot_studio_agent/src/direct_line_agent.py +++ /dev/null @@ -1,236 +0,0 @@ -# Copyright (c) Microsoft. All rights reserved. - -import asyncio -import logging -import sys -from collections.abc import AsyncIterable -from typing import Any - -if sys.version_info >= (3, 12): - from typing import override # pragma: no cover -else: - from typing_extensions import override # pragma: no cover -import aiohttp - -from semantic_kernel.agents import Agent -from semantic_kernel.contents.chat_history import ChatHistory -from semantic_kernel.contents.chat_message_content import ChatMessageContent -from semantic_kernel.exceptions.agent_exceptions import AgentInvokeException -from semantic_kernel.utils.telemetry.agent_diagnostics.decorators import ( - trace_agent_get_response, - trace_agent_invocation, -) - -logger = logging.getLogger(__name__) - - -class DirectLineAgent(Agent): - """ - An Agent subclass that connects to a DirectLine Bot from Microsoft Bot Framework. - Instead of directly supplying a secret and conversation ID, the agent queries a token_endpoint - to retrieve the token and then starts a conversation. - """ - - token_endpoint: str | None = None - bot_secret: str | None = None - bot_endpoint: str - conversation_id: str | None = None - directline_token: str | None = None - session: aiohttp.ClientSession = None - - async def _ensure_session(self) -> None: - """ - Lazily initialize the aiohttp ClientSession. - """ - if self.session is None: - self.session = aiohttp.ClientSession() - - async def _fetch_token_and_conversation(self) -> None: - """ - Retrieve the DirectLine token either by using the bot_secret or by querying the token_endpoint. - If bot_secret is provided, it posts to "https://directline.botframework.com/v3/directline/tokens/generate". - """ - await self._ensure_session() - try: - if self.bot_secret: - url = f"{self.bot_endpoint}/tokens/generate" - headers = {"Authorization": f"Bearer {self.bot_secret}"} - async with self.session.post(url, headers=headers) as resp: - if resp.status == 200: - data = await resp.json() - self.directline_token = data.get("token") - if not self.directline_token: - logger.error("Token generation response missing token: %s", data) - raise AgentInvokeException("No token received from token generation.") - else: - logger.error("Token generation endpoint error status: %s", resp.status) - raise AgentInvokeException("Failed to generate token using bot_secret.") - else: - async with self.session.get(self.token_endpoint) as resp: - if resp.status == 200: - data = await resp.json() - self.directline_token = data.get("token") - if not self.directline_token: - logger.error("Token endpoint returned no token: %s", data) - raise AgentInvokeException("No token received.") - else: - logger.error("Token endpoint error status: %s", resp.status) - raise AgentInvokeException("Failed to fetch token from token endpoint.") - except Exception as ex: - logger.exception("Exception fetching token: %s", ex) - raise AgentInvokeException("Exception occurred while fetching token.") from ex - - @trace_agent_get_response - @override - async def get_response( - self, - history: ChatHistory, - arguments: dict[str, Any] | None = None, - **kwargs: Any, - ) -> ChatMessageContent: - """ - Get a response from the DirectLine Bot. - """ - responses = [] - async for response in self.invoke(history, arguments, **kwargs): - responses.append(response) - - if not responses: - raise AgentInvokeException("No response from DirectLine Bot.") - - return responses[0] - - @trace_agent_invocation - @override - async def invoke( - self, - history: ChatHistory, - arguments: dict[str, Any] | None = None, - **kwargs: Any, - ) -> AsyncIterable[ChatMessageContent]: - """ - Send the latest message from the chat history to the DirectLine Bot - and yield responses. This sends the payload after ensuring that: - 1. The token is fetched. - 2. A conversation is started. - 3. The activity payload is posted. - 4. Activities are polled until an event "DynamicPlanFinished" is received. - """ - payload = self._build_payload(history, arguments, **kwargs) - response_data = await self._send_message(payload) - if response_data is None or "activities" not in response_data: - raise AgentInvokeException(f"Invalid response from DirectLine Bot.\n{response_data}") - - logger.debug("DirectLine Bot response: %s", response_data) - - # NOTE DirectLine Activities have different formats - # than ChatMessageContent. We need to convert them and - # remove unsupported activities. - for activity in response_data["activities"]: - if activity.get("type") != "message" or activity.get("from", {}).get("role") == "user": - continue - role = activity.get("from", {}).get("role", "assistant") - if role == "bot": - role = "assistant" - message = ChatMessageContent( - role=role, - content=activity.get("text", ""), - name=activity.get("from", {}).get("name", self.name), - ) - yield message - - def _build_payload( - self, - history: ChatHistory, - arguments: dict[str, Any] | None = None, - **kwargs: Any, - ) -> dict[str, Any]: - """ - Build the message payload for the DirectLine Bot. - Uses the latest message from the chat history. - """ - latest_message = history.messages[-1] if history.messages else None - text = latest_message.content if latest_message else "Hello" - payload = { - "type": "message", - "from": {"id": "user"}, - "text": text, - } - # Optionally include conversationId if available. - if self.conversation_id: - payload["conversationId"] = self.conversation_id - return payload - - async def _send_message(self, payload: dict[str, Any]) -> dict[str, Any] | None: - """ - 1. Ensure the token is fetched. - 2. Start a conversation by posting to the bot_endpoint /conversations endpoint (without a payload) - 3. Post the payload to /conversations/{conversationId}/activities - 4. Poll GET /conversations/{conversationId}/activities every 1s using a watermark - to fetch only the latest messages until an activity with type="event" - and name="DynamicPlanFinished" is found. - """ - await self._ensure_session() - if not self.directline_token: - await self._fetch_token_and_conversation() - - headers = { - "Authorization": f"Bearer {self.directline_token}", - "Content-Type": "application/json", - } - - # Step 2: Start a conversation if one hasn't already been started. - if not self.conversation_id: - start_conv_url = f"{self.bot_endpoint}/conversations" - async with self.session.post(start_conv_url, headers=headers) as resp: - if resp.status not in (200, 201): - logger.error("Failed to start conversation. Status: %s", resp.status) - raise AgentInvokeException("Failed to start conversation.") - conv_data = await resp.json() - self.conversation_id = conv_data.get("conversationId") - if not self.conversation_id: - raise AgentInvokeException("Conversation ID not found in start response.") - - # Step 3: Post the message payload. - activities_url = f"{self.bot_endpoint}/conversations/{self.conversation_id}/activities" - async with self.session.post(activities_url, json=payload, headers=headers) as resp: - if resp.status != 200: - logger.error("Failed to post activity. Status: %s", resp.status) - raise AgentInvokeException("Failed to post activity.") - _ = await resp.json() # Response from posting activity is ignored. - - # Step 4: Poll for new activities using watermark until DynamicPlanFinished event is found. - finished = False - collected_data = None - watermark = None - while not finished: - url = activities_url if watermark is None else f"{activities_url}?watermark={watermark}" - async with self.session.get(url, headers=headers) as resp: - if resp.status == 200: - data = await resp.json() - watermark = data.get("watermark", watermark) - activities = data.get("activities", []) - if any( - activity.get("type") == "event" and activity.get("name") == "DynamicPlanFinished" - for activity in activities - ): - collected_data = data - finished = True - break - else: - logger.error("Error polling activities. Status: %s", resp.status) - await asyncio.sleep(0.3) - - return collected_data - - async def close(self) -> None: - """ - Clean up the aiohttp session. - """ - await self.session.close() - - # NOTE not implemented yet, possibly use websockets - @trace_agent_invocation - @override - async def invoke_stream(self, *args, **kwargs): - return super().invoke_stream(*args, **kwargs) diff --git a/python/samples/demos/copilot_studio_agent/src/group_chat.py b/python/samples/demos/copilot_studio_agent/src/group_chat.py new file mode 100644 index 000000000000..f5900f81053c --- /dev/null +++ b/python/samples/demos/copilot_studio_agent/src/group_chat.py @@ -0,0 +1,92 @@ +# Copyright (c) Microsoft. All rights reserved. + +import asyncio +import logging +import sys + +from agents.auditor_agent import BrandAuditor +from agents.tagline_agent import TaglineGenerator +from dotenv import load_dotenv + +from semantic_kernel.agents import AgentGroupChat +from semantic_kernel.agents.strategies import TerminationStrategy +from semantic_kernel.contents import AuthorRole + +""" +The following sample demonstrates how to create a group chat with Copilot Studio agents +to generate and evaluate taglines according to brand guidelines. +""" + +# Load environment variables from .env file +load_dotenv() + +# Configure the root logger to capture logs from all modules +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + stream=sys.stdout, # Explicitly set output to stdout + force=True, # Force reconfiguration of the root logger +) + +# Set log levels for specific libraries that might be too verbose +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("aiohttp").setLevel(logging.WARNING) + +logger = logging.getLogger(__name__) + + +class ApprovalTerminationStrategy(TerminationStrategy): + """A strategy for determining when an agent should terminate based on brand approval.""" + + async def should_agent_terminate(self, agent, history): + """Check if the agent should terminate.""" + # Terminate if the brand auditor approves the tagline + content = history[-1].content.lower() + return "approved" in content and "not approved" not in content and "rejected" not in content + + +USER_INPUT = "Suggest a thrilling tagline for our energy drink that helps users crush the day." + + +async def main(): + # 1. Create the tagline generator agent + tagline_generator = TaglineGenerator() + + # 2. Create the brand auditor agent + brand_auditor = BrandAuditor() + + # 3. Place the agents in a group chat with a custom termination strategy + chat = AgentGroupChat( + agents=[tagline_generator, brand_auditor], + termination_strategy=ApprovalTerminationStrategy( + agents=[brand_auditor], + maximum_iterations=10, + ), + ) + + try: + # 4. Add the user input to the chat + await chat.add_chat_message(USER_INPUT) + print(f"# {AuthorRole.USER}: '{USER_INPUT}'") + + # 5. Invoke the chat and print responses + async for content in chat.invoke(): + print(f"# {content.role} - {content.name or '*'}: '{content.content}'") + finally: + # 6. Reset the chat (cleanup) + await chat.reset() + + """ + Sample Output: + # AuthorRole.USER: Suggest a thrilling tagline for our energy drink that helps users crush the day. + # AuthorRole.ASSISTANT - tagline_generator: "Fuel Your Fire, Crush the Day!" + # AuthorRole.ASSISTANT - brand_auditor: "The tagline does not align with the brand's calm, confident, + # and sincere voice..." + # AuthorRole.ASSISTANT - tagline_generator: "Empower Your Day with Natural Energy." + # AuthorRole.ASSISTANT - brand_auditor: "The tagline aligns well with the brand's calm, confident, + # and sincere voice..." + """ + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/python/samples/demos/copilot_studio_agent/src/requirements.txt b/python/samples/demos/copilot_studio_agent/src/requirements.txt index 5b17fe631d9d..d9f2e3186bc5 100644 --- a/python/samples/demos/copilot_studio_agent/src/requirements.txt +++ b/python/samples/demos/copilot_studio_agent/src/requirements.txt @@ -1,4 +1,4 @@ chainlit>=2.0.1 python-dotenv>=1.0.1 aiohttp>=3.10.5 -semantic-kernel>=1.22.0 \ No newline at end of file +semantic-kernel>=1.26.1 \ No newline at end of file