diff --git a/examples/email_client/README.md b/examples/email_client/README.md new file mode 100644 index 00000000..04f89f55 --- /dev/null +++ b/examples/email_client/README.md @@ -0,0 +1,142 @@ +# A Llama and Llama Stack Powered Email Agent + +This is a Llama Stack port of the [Llama Powered Email Agent](https://github.com/meta-llama/llama-recipes/tree/main/recipes/use_cases/email_agent) app that shows how to build an email agent app powered by Llama 3.1 8B and Llama Stack, using Llama Stack custom tool and agent APIs. + +Currently implemented features of the agent include: +* search for emails and attachments +* get email detail +* reply to a specific email +* forward an email +* get summary of a PDF attachment +* draft and send an email + +We'll mainly cover here how to port a Llama app using native custom tools supported in Llama 3.1 (and later) and an agent implementation from scratch to using Llama Stack APIs. See the link above for a comprehensive overview, definition, and resources of LLM agents, and a detailed list of TODOs for the email agent. + +# Setup and Installation + +See the link above for Enable Gmail API and Install Ollama with Llama 3.1 8B. + +## Install required packages +First, create a Conda or virtual env, then activate it and install the required Python libraries (slightly different from the original app because here we'll also install the `llama-stack-client` package): +``` +git clone https://github.com/meta-llama/llama-stack +cd llama-stack/docs/zero_to_hero_guide/email_agent +pip install -r requirements.txt +``` + +# Run Email Agent + +The steps are also the same as the [original app](https://github.com/meta-llama/llama-recipes/tree/main/recipes/use_cases/email_agent): + +``` +python main.py --gmail +``` + +# Implementation Notes +Notes here mainly cover how custom tools (functions) are defined and how the Llama Stack Agent class is used with the custom tools. + +## Available Custom Tool Definition +The `functions_prompt.py` defines the following six custom tools (functions), each as a subclass of Llama Stack's `CustomTool`, along with examples for each function call spec that Llama should return): + +* ListEmailsTool +* GetEmailDetailTool +* SendEmailTool +* GetPDFSummaryTool +* CreateDraftTool +* SendDraftTool + +Below is an example custom tool call spec in JSON format, for the user asks such as "do i have emails with attachments larger than 5mb", "any attachments larger than 5mb" or "let me know if i have large attachments over 5mb": +``` +{"name": "list_emails", "parameters": {"query": "has:attachment larger:5mb"}} +``` + +Porting the custom function definition in the original app to Llama Stack's CustomTool subclass is straightforward. Below is an example of the original custom function definition: +``` +list_emails_function = """ +{ + "type": "function", + "function": { + "name": "list_emails", + "description": "Return a list of emails matching an optionally specified query.", + "parameters": { + "type": "dic", + "properties": [ + { + "maxResults": { + "type": "integer", + "description": "The default maximum number of emails to return is 100; the maximum allowed value for this field is 500." + } + }, + { + "query": { + "type": "string", + "description": "One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not)." + } + } + ], + "required": [] + } + } +} +""" +``` + +And its Llama Stack CustomTool subclass implementation is: +``` +class ListEmailsTool(CustomTool): + """Custom tool for List Emails.""" + + def get_name(self) -> str: + return "list_emails" + + def get_description(self) -> str: + return "Return a list of emails matching an optionally specified query." + + def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]: + return { + "maxResults": ToolParamDefinitionParam( + param_type="int", + description="The default maximum number of emails to return is 100; the maximum allowed value for this field is 500.", + required=False + ), + "query": ToolParamDefinitionParam( + param_type="str", + description="One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not).", + required=False + ) + } + async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]: + assert len(messages) == 1, "Expected single message" + + message = messages[0] + + tool_call = message.tool_calls[0] + try: + response = await self.run_impl(**tool_call.arguments) + response_str = json.dumps(response, ensure_ascii=False) + except Exception as e: + response_str = f"Error when running tool: {e}" + + message = ToolResponseMessage( + call_id=tool_call.call_id, + tool_name=tool_call.tool_name, + content=response_str, + role="ipython", + ) + return [message] + + async def run_impl(self, query: str, maxResults: int = 100) -> Dict[str, Any]: + """Query to get a list of emails matching the query.""" + emails = list_emails(query) + return {"name": self.get_name(), "result": emails} +``` + +Each CustomTool subclass has a `run_impl` method that calls actual Gmail API-based tool call implementation (same as the original app), which, in the example above, is `list_emails`. + +## The Llama Stack Agent class + +The `create_email_agent` in main.py creates a Llama Stack Agent with 6 custom tools using a `LlamaStackClient` instance that connects to Together.ai's Llama Stack server. The agent then creates a session, uses the same session in a loop to create a turn for each user ask. Inside each turn, a tool call spec is generated based on the user ask and, if needed after processing of the tool call spec to match what the actual Gmail API expects (e.g. get_email_detail requires an email id but the tool call spec generated by Llama doesn't have the id), actual tool calling happens. After post-processing of the tool call result, a user-friendly message is generated to respond to the user's original ask. + +## Memory + +In `shared.py` we define a simple dictionary `memory`, used to hold short-term results such as a list of found emails based on the user ask, or the draft id of a created email draft. They're needed to answer follow up user asks such as "what attachments does the email with subject xxx have" or "send the draft". diff --git a/examples/email_client/email_agent.py b/examples/email_client/email_agent.py new file mode 100644 index 00000000..9e372fb2 --- /dev/null +++ b/examples/email_client/email_agent.py @@ -0,0 +1,513 @@ +from google.auth.transport.requests import Request +from google_auth_oauthlib.flow import InstalledAppFlow +from googleapiclient.discovery import build +from email.mime.text import MIMEText +from email.mime.multipart import MIMEMultipart +from email.mime.base import MIMEBase +from email import encoders + +from bs4 import BeautifulSoup +import os +import pytz +import base64 +import pickle +from datetime import datetime, timezone +import ollama +from pypdf import PdfReader +from pathlib import Path +from shared import memory + +SCOPES = ['https://www.googleapis.com/auth/gmail.readonly', 'https://www.googleapis.com/auth/gmail.compose'] +user_email = None +service = None +user_id = 'me' + +def authenticate_gmail(user_email): + creds = None + token_file = f'token_{user_email}.pickle' # Unique token file for each user + + # Load the user's token if it exists + if os.path.exists(token_file): + with open(token_file, 'rb') as token: + creds = pickle.load(token) + + # If no valid credentials, prompt the user to log in + if not creds or not creds.valid: + if creds and creds.expired and creds.refresh_token: + creds.refresh(Request()) + else: + flow = InstalledAppFlow.from_client_secrets_file('credentials.json', SCOPES) + creds = flow.run_console() + + # Save the new credentials to a user-specific token file + with open(token_file, 'wb') as token: + pickle.dump(creds, token) + + # Build the Gmail API service + service = build('gmail', 'v1', credentials=creds) + return service + +def num_of_emails(query=''): + response = service.users().messages().list( + userId='me', + q=query, + maxResults=1).execute() + return response.get('resultSizeEstimate', 0) + +def list_emails(query='', max_results=100): + emails = [] + next_page_token = None + + while True: + response = service.users().messages().list( + userId=user_id, + maxResults=max_results, + pageToken=next_page_token, + q=query + ).execute() + + if 'messages' in response: + for msg in response['messages']: + sender, subject, received_time = get_email_info(msg['id']) + emails.append( + { + "message_id": msg['id'], + "sender": sender, + "subject": subject, + "received_time": received_time + } + ) + + next_page_token = response.get('nextPageToken') + + if not next_page_token: + break + + return emails + +def get_email_detail(detail, which): + message_id = None + # pre-processing + if 'from ' in which: + sender = which.split('from ')[-1] + for email in memory['emails']: + if email['sender'].find(sender) != -1: + message_id = email['message_id'] + break + elif 'subject:' in which: + subject = which.split('subject:')[-1] + # exact match beats substring + for email in memory['emails']: + if email['subject'].upper() == subject.upper(): + message_id = email['message_id'] + break + elif email['subject'].upper().find(subject.upper()) != -1: + message_id = email['message_id'] + + elif 'id_' in which: + message_id = which.split('id_')[-1] + else: + message_id = memory['emails'][-1]['message_id'] + + if detail == 'body': + return get_email_body(message_id) + elif detail == 'attachment': + return get_email_attachments(message_id) + +def get_email_body(message_id): + try: + message = service.users().messages().get( + userId=user_id, + id=message_id, + format='full').execute() + + # Recursive function to extract the parts + def extract_parts(payload): + text_body = "" + if 'parts' in payload: + for part in payload['parts']: + return extract_parts(part) + else: + mime_type = payload.get('mimeType') + body = payload.get('body', {}).get('data') + if mime_type == 'text/html': + decoded_body = base64.urlsafe_b64decode(body).decode('utf-8') + soup = BeautifulSoup(decoded_body, 'html.parser') + text_body = soup.get_text().strip() + elif mime_type == 'text/plain': + decoded_body = base64.urlsafe_b64decode(body).decode('utf-8') + text_body = decoded_body + + return text_body + + return extract_parts(message['payload']) + + except Exception as e: + print(f"An error occurred: {e}") + return None + +def parse_message(message): + payload = message['payload'] + headers = payload.get("headers") + + subject = None + sender = None + for header in headers: + if header['name'] == 'Subject': + subject = header['value'] + elif header['name'] == 'From': + sender = header['value'] + + internal_date = message.get('internalDate') + utc_time = datetime.fromtimestamp(int(internal_date) / 1000, tz=timezone.utc) + + # Convert UTC to the specified timezone + local_timezone = pytz.timezone("America/Los_Angeles") + local_time = utc_time.astimezone(local_timezone) + + # Format the local time as a string + received_time = local_time.strftime('%Y-%m-%d %H:%M:%S %Z') + + # Check if the email is plain text or multipart + if 'parts' in payload: + # Multipart message - find the text/plain or text/html part + for part in payload['parts']: + if part['mimeType'] == 'text/plain' or part['mimeType'] == 'text/html': # You can also look for 'text/html' + data = part['body']['data'] + body = base64.urlsafe_b64decode(data).decode('utf-8') + return sender, subject, received_time, body + elif part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']: + return sender, subject, received_time, get_email_body(message.get('id')) + else: + # Single part message + data = payload['body']['data'] + body = base64.urlsafe_b64decode(data).decode('utf-8') + return sender, subject, received_time, body + +def get_email_info(msg_id): + message = service.users().messages().get( + userId=user_id, + id=msg_id, + format='full').execute() + + sender, subject, received_time, body = parse_message(message) + + return sender, subject, received_time + +def reply_email(message_id, reply_text): + # Fetch the original message + original_message = service.users().messages().get( + userId=user_id, + id=message_id, + format='full').execute() + + # Get headers + headers = original_message['payload']['headers'] + subject = None + to = None + for header in headers: + if header['name'] == 'Subject': + subject = header['value'] + if header['name'] == 'From': + to = header['value'] + + # Create the reply subject + if not subject.startswith("Re: "): + subject = "Re: " + subject + + # Compose the reply message + reply_message = MIMEText(reply_text) + reply_message['to'] = to + reply_message['from'] = user_id + reply_message['subject'] = subject + reply_message['In-Reply-To'] = message_id + + # Encode and send the message + raw_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode("utf-8") + body = {'raw': raw_message, + 'threadId': original_message['threadId']} + sent_message = service.users().messages().send( + userId=user_id, + body=body).execute() + print("Reply sent. Message ID:", sent_message['id']) + +def forward_email(message_id, forward_to, email_body=None): + """ + Forwards an email, preserving the original MIME type, including multipart/related. + """ + # Get the original message in 'full' format + original_message = service.users().messages().get( + userId=user_id, + id=message_id, + format='full').execute() + + # Extract the payload and headers + payload = original_message.get('payload', {}) + headers = payload.get('headers', []) + parts = payload.get('parts', []) + # Get the Subject + subject = next((header['value'] for header in headers if header['name'].lower() == 'subject'), 'No Subject') + + # Create a new MIME message for forwarding + mime_message = MIMEMultipart(payload.get('mimeType', 'mixed').split('/')[-1]) + mime_message['To'] = forward_to + mime_message['Subject'] = f"Fwd: {subject}" + + # Add the optional custom email body + if email_body: + mime_message.attach(MIMEText(email_body, 'plain')) + + # Function to fetch attachment data by attachmentId + def fetch_attachment_data(attachment_id, message_id): + attachment = service.users().messages().attachments().get( + userId=user_id, messageId=message_id, id=attachment_id + ).execute() + return base64.urlsafe_b64decode(attachment['data']) + + # Rebuild MIME structure + def rebuild_parts(parts): + """ + Recursively rebuild MIME parts. + """ + if not parts: + return None + + for part in parts: + part_mime_type = part.get('mimeType', 'text/plain') + part_body = part.get('body', {}) + part_data = part_body.get('data', '') + part_parts = part.get('parts', []) # Sub-parts for multipart types + filename = part.get('filename') + attachment_id = part_body.get('attachmentId') + + if part_mime_type.startswith('multipart/'): + # Rebuild nested multipart + sub_multipart = MIMEMultipart(part_mime_type.split('/')[-1]) + sub_parts = rebuild_parts(part_parts) + if sub_parts: + for sub_part in sub_parts: + sub_multipart.attach(sub_part) + yield sub_multipart + elif filename and attachment_id: + # Handle attachments + decoded_data = fetch_attachment_data(attachment_id, message_id) + attachment = MIMEBase(*part_mime_type.split('/')) + attachment.set_payload(decoded_data) + encoders.encode_base64(attachment) + attachment.add_header('Content-Disposition', f'attachment; filename="{filename}"') + yield attachment + else: + if part_data: + # Decode and attach non-multipart parts + decoded_data = base64.urlsafe_b64decode(part_data) + + if part_mime_type == 'text/plain': + yield MIMEText(decoded_data.decode('utf-8'), 'plain') + elif part_mime_type == 'text/html': + yield MIMEText(decoded_data.decode('utf-8'), 'html') + + # Rebuild the main MIME structure + rebuilt_parts = rebuild_parts(parts) + if rebuilt_parts: + for rebuilt_part in rebuilt_parts: + mime_message.attach(rebuilt_part) + + # Encode the MIME message to base64 + raw = base64.urlsafe_b64encode(mime_message.as_bytes()).decode('utf-8') + + # Send the email + forward_body = {'raw': raw} + sent_message = service.users().messages().send(userId=user_id, body=forward_body).execute() + + print(f"Message forwarded successfully! Message ID: {sent_message['id']}") + +def send_email(action, to, subject, body="", email_id=""): + if action == "compose": + message = MIMEText(body) + message['to'] = to + message['from'] = user_id + message['subject'] = subject + + # Encode and send the message + raw_message = base64.urlsafe_b64encode(message.as_bytes()).decode("utf-8") + body = {'raw': raw_message} + sent_message = service.users().messages().send( + userId=user_id, + body=body).execute() + return sent_message['id'] + elif action == "reply": # reply or forward; a message id is needed + reply_email(email_id, body) + elif action == "forward": + forward_email(email_id, to, body) + +def create_draft(action, to, subject, body="", email_id=""): + if action == "new": + message = MIMEText(body) + message['to'] = to + message['from'] = user_id + message['subject'] = subject + + encoded_message = base64.urlsafe_b64encode(message.as_bytes()).decode() + draft_body = {'message': {'raw': encoded_message}} + draft = service.users().drafts().create( + userId=user_id, + body=draft_body).execute() + print(f"Draft created with ID: {draft['id']}") + return draft['id'] + elif action == "reply": + return create_reply_draft(email_id, body) + elif action == "forward": + return create_forward_draft(email_id, to, body) + else: + return + +def create_reply_draft(message_id, reply_text): + # Fetch the original message + original_message = service.users().messages().get( + userId=user_id, + id=message_id, + format='full').execute() + + # Get headers + headers = original_message['payload']['headers'] + subject = None + to = None + for header in headers: + if header['name'] == 'Subject': + subject = header['value'] + if header['name'] == 'From': + to = header['value'] + + # Create the reply subject + if not subject.startswith("Re: "): + subject = "Re: " + subject + + # Compose the reply message + reply_message = MIMEText(reply_text) + reply_message['to'] = to + reply_message['from'] = user_id + reply_message['subject'] = subject + reply_message['In-Reply-To'] = message_id + + encoded_message = base64.urlsafe_b64encode(reply_message.as_bytes()).decode() + draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}} + draft = service.users().drafts().create(userId=user_id, body=draft_body).execute() + return draft['id'] + +def create_forward_draft(message_id, recipient_email, custom_message=None): + # Get the original message + original_message = service.users().messages().get( + userId=user_id, + id=message_id, + format='raw').execute() + + # Decode the raw message + raw_message = base64.urlsafe_b64decode(original_message['raw'].encode('utf-8')) + + # Prepare the forward header and optional custom message + forward_header = f"----- Forwarded message -----\nFrom: {recipient_email}\n\n" + if custom_message: + forward_header += f"{custom_message}\n\n" + + # Combine the forward header with the original message + new_message = forward_header + raw_message.decode('utf-8') + + # Encode the combined message into base64 format + encoded_message = base64.urlsafe_b64encode(new_message.encode('utf-8')).decode('utf-8') + + draft_body = {'message': {'raw': encoded_message, 'threadId': original_message['threadId']}} + draft = service.users().drafts().create(userId=user_id, body=draft_body).execute() + print(f"Forward draft created with ID: {draft['id']}") + return draft['id'] + +def send_draft(id): + sent_message = service.users().drafts().send( + userId=user_id, + body={'id': memory['draft_id']} + ).execute() + return f"Draft sent with email ID: {sent_message['id']}" + +def get_pdf_summary(file_name): + text = pdf2text(file_name) + print("Calling Llama to generate a summary...") + response = llama31(text, "Generate a summary of the input text in 5 sentences.") + return response + +def get_email_attachments(message_id, mime_type='application/pdf'): + attachments = [] + + # Helper function to process email parts + def process_parts(parts): + for part in parts: + if part['mimeType'] in ['multipart/related', 'multipart/mixed', 'multipart/alternative']: + # Recursively process nested parts + if 'parts' in part: + process_parts(part['parts']) + elif 'filename' in part and part['filename']: + if part['mimeType'] == mime_type: # Check for the desired MIME type + attachment_id = part['body'].get('attachmentId') + if attachment_id: + # Get the attachment data + attachment = service.users().messages().attachments().get( + userId=user_id, + messageId=message_id, + id=attachment_id + ).execute() + + # Decode the attachment content + file_data = base64.urlsafe_b64decode(attachment['data'].encode('UTF-8')) + + with open(part['filename'], "wb") as f: + f.write(file_data) + + # Save the attachment information + attachments.append( + {'filename': part['filename'], + 'data': file_data, + 'size': attachment.get('size', 0) + }) + + # Retrieve the email message + message = service.users().messages().get( + userId=user_id, + id=message_id, + format='full').execute() + payload = message['payload'] + + # Start processing the parts + if 'parts' in payload: + process_parts(payload['parts']) + + rslt = "" + for a in attachments: + rslt += f"{a['filename']} - {a['size']} bytes\n" + return rslt #attachments + +def pdf2text(file): + text = '' + try: + with Path(file).open("rb") as f: + reader = PdfReader(f) + text = "\n\n".join([page.extract_text() for page in reader.pages]) + except Exception as e: + raise f"Error reading the PDF file: {str(e)}" + + print(f"\nPDF text length: {len(text)}\n") + + return text + +def set_email_service(gmail): + global user_email + global service + + user_email = gmail + service = authenticate_gmail(user_email) + +def llama31(user_prompt: str, system_prompt = ""): + response = ollama.chat(model='llama3.1', + messages=[ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_prompt}, + ], + ) + return response['message']['content'] diff --git a/examples/email_client/functions_prompt.py b/examples/email_client/functions_prompt.py new file mode 100644 index 00000000..02f0997a --- /dev/null +++ b/examples/email_client/functions_prompt.py @@ -0,0 +1,359 @@ +from typing import List, Dict, Any +from llama_stack_client.types.tool_param_definition_param import ToolParamDefinitionParam +from llama_stack_client.types import CompletionMessage, ToolResponseMessage +from llama_stack_client.lib.agents.custom_tool import CustomTool +from email_agent import * +import json + +class ListEmailsTool(CustomTool): + """Custom tool for List Emails.""" + + def get_name(self) -> str: + return "list_emails" + + def get_description(self) -> str: + return "Return a list of emails matching an optionally specified query." + + def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]: + return { + "maxResults": ToolParamDefinitionParam( + param_type="int", + description="The default maximum number of emails to return is 100; the maximum allowed value for this field is 500.", + required=False + ), + "query": ToolParamDefinitionParam( + param_type="str", + description="One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not).", + required=False + ) + } + async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]: + assert len(messages) == 1, "Expected single message" + + message = messages[0] + + tool_call = message.tool_calls[0] + try: + response = await self.run_impl(**tool_call.arguments) + response_str = json.dumps(response, ensure_ascii=False) + except Exception as e: + response_str = f"Error when running tool: {e}" + + message = ToolResponseMessage( + call_id=tool_call.call_id, + tool_name=tool_call.tool_name, + content=response_str, + role="ipython", + ) + return [message] + + async def run_impl(self, query: str, maxResults: int = 100) -> Dict[str, Any]: + """Query to get a list of emails matching the query.""" + emails = list_emails(query) + return {"name": self.get_name(), "result": emails} + + +class GetEmailDetailTool(CustomTool): + """Custom tool for Get Email Detail.""" + + def get_name(self) -> str: + return "get_email_detail" + + def get_description(self) -> str: + return "Get detailed info about a specific email" + + def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]: + return { + "detail": ToolParamDefinitionParam( + param_type="str", + description="what detail the user wants to know about - two possible values: body or attachment", + required=True + ), + "query": ToolParamDefinitionParam( + param_type="str", + description="One or more keywords in the email subject and body, or one or more filters. There can be 6 types of filters: 1) Field-specific Filters: from, to, cc, bcc, subject; 2) Date Filters: before, after, older than, newer than); 3) Status Filters: read, unread, starred, importatant; 4) Attachment Filters: has, filename or type; 5) Size Filters: larger, smaller; 6) logical operators (or, and, not).", + required=False + ) + } + async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]: + assert len(messages) == 1, "Expected single message" + + message = messages[0] + + tool_call = message.tool_calls[0] + try: + response = await self.run_impl(**tool_call.arguments) + response_str = json.dumps(response, ensure_ascii=False) + except Exception as e: + response_str = f"Error when running tool: {e}" + + message = ToolResponseMessage( + call_id=tool_call.call_id, + tool_name=tool_call.tool_name, + content=response_str, + role="ipython", + ) + return [message] + + async def run_impl(self, detail: str, query: str) -> Dict[str, Any]: + """Query to get the detail of an email.""" + + detail = get_email_detail(detail, query) + return {"name": self.get_name(), "result": detail} + + +class SendEmailTool(CustomTool): + """Compose, reply, or forward email.""" + + def get_name(self) -> str: + return "send_email" + + def get_description(self) -> str: + return "Compose, reply, or forward email" + + def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]: + return { + "action": ToolParamDefinitionParam( + param_type="string", + description="Whether to compose, reply, or forward an email", + required=True + ), + "to": ToolParamDefinitionParam( + param_type="str", + description="The recipient of the email", + required=True + ), + "subject": ToolParamDefinitionParam( + param_type="str", + description="The subject of the email", + required=True + ), + "body": ToolParamDefinitionParam( + param_type="str", + description="The content of the email", + required=True + ), + "email_id": ToolParamDefinitionParam( + param_type="str", + description="The email id to reply or forward to", + required=False + ) + } + + async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]: + assert len(messages) == 1, "Expected single message" + + message = messages[0] + + tool_call = message.tool_calls[0] + try: + response = await self.run_impl(**tool_call.arguments) + response_str = json.dumps(response, ensure_ascii=False) + except Exception as e: + response_str = f"Error when running tool: {e}" + + message = ToolResponseMessage( + call_id=tool_call.call_id, + tool_name=tool_call.tool_name, + content=response_str, + role="ipython", + ) + return [message] + + async def run_impl(self, action, to, subject, body="", email_id="") -> Dict[str, Any]: + """Send an email.""" + + result = send_email(action, to, subject, body, email_id) + return {"name": self.get_name(), "result": result} + + +class GetPDFSummaryTool(CustomTool): + """Get a summary of a PDF attachment.""" + + def get_name(self) -> str: + return "get_pdf_summary" + + def get_description(self) -> str: + return "Get a summary of a PDF attachment" + + def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]: + return { + "file_name": ToolParamDefinitionParam( + param_type="string", + description="The name of the PDF file", + required=True + ) + } + + async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]: + assert len(messages) == 1, "Expected single message" + + message = messages[0] + + tool_call = message.tool_calls[0] + try: + response = await self.run_impl(**tool_call.arguments) + response_str = json.dumps(response, ensure_ascii=False) + except Exception as e: + response_str = f"Error when running tool: {e}" + + message = ToolResponseMessage( + call_id=tool_call.call_id, + tool_name=tool_call.tool_name, + content=response_str, + role="ipython", + ) + return [message] + + async def run_impl(self, file_name: str) -> Dict[str, Any]: + """Get the summary of a PDF file.""" + + summary = get_pdf_summary(file_name) + return {"name": self.get_name(), "result": summary} + + +class CreateDraftTool(CustomTool): + """Create a new, reply, or forward email draft.""" + + def get_name(self) -> str: + return "create_draft" + + def get_description(self) -> str: + return "Create a new, reply, or forward email draft" + + def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]: + return { + "action": ToolParamDefinitionParam( + param_type="string", + description="Whether to compose, reply, or forward an email", + required=True + ), + "to": ToolParamDefinitionParam( + param_type="str", + description="The recipient of the email", + required=True + ), + "subject": ToolParamDefinitionParam( + param_type="str", + description="The subject of the email", + required=True + ), + "body": ToolParamDefinitionParam( + param_type="str", + description="The content of the email", + required=True + ), + "email_id": ToolParamDefinitionParam( + param_type="str", + description="The email id to reply or forward to, or empty if draft a new email.", + required=True + ) + } + + async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]: + assert len(messages) == 1, "Expected single message" + + message = messages[0] + + tool_call = message.tool_calls[0] + try: + response = await self.run_impl(**tool_call.arguments) + response_str = json.dumps(response, ensure_ascii=False) + except Exception as e: + response_str = f"Error when running tool: {e}" + + message = ToolResponseMessage( + call_id=tool_call.call_id, + tool_name=tool_call.tool_name, + content=response_str, + role="ipython", + ) + return [message] + + async def run_impl(self, action, to, subject, body="", email_id="") -> Dict[str, Any]: + """Create an email draft.""" + + result = create_draft(action, to, subject, body, email_id) + return {"name": self.get_name(), "result": result} + + +class SendDraftTool(CustomTool): + """Send a draft email.""" + + def get_name(self) -> str: + return "send_draft" + + def get_description(self) -> str: + return "Send a draft email" + + def get_params_definition(self) -> Dict[str, ToolParamDefinitionParam]: + return { + "id": ToolParamDefinitionParam( + param_type="str", + description="The email draft id.", + required=True + ) + } + + async def run(self, messages: List[CompletionMessage]) -> List[ToolResponseMessage]: + assert len(messages) == 1, "Expected single message" + + message = messages[0] + + tool_call = message.tool_calls[0] + try: + response = await self.run_impl(**tool_call.arguments) + response_str = json.dumps(response, ensure_ascii=False) + except Exception as e: + response_str = f"Error when running tool: {e}" + + message = ToolResponseMessage( + call_id=tool_call.call_id, + tool_name=tool_call.tool_name, + content=response_str, + role="ipython", + ) + return [message] + + async def run_impl(self, id: str) -> Dict[str, Any]: + """Send the last draft email.""" + + result = send_draft(memory['draft_id']) + return {"name": self.get_name(), "result": result} + + +examples = """ +{"name": "list_emails", "parameters": {"query": "has:attachment larger:5mb"}} +{"name": "list_emails", "parameters": {"query": "has:attachment"}} +{"name": "list_emails", "parameters": {"query": "newer_than:1d"}} +{"name": "list_emails", "parameters": {"query": "older_than:1d"}} +{"name": "list_emails", "parameters": {"query": "is:unread"}} +{"name": "list_emails", "parameters": {"query": " is:unread"}} +{"name": "list_emails", "parameters": {"query": " is:read"}} +{"name": "get_email_detail", "parameters": {"detail": "body", "which": "first"}} +{"name": "get_email_detail", "parameters": {"detail": "body", "which": "last"}} +{"name": "get_email_detail", "parameters": {"detail": "body", "which": "second"}} +{"name": "get_email_detail", "parameters": {"detail": "body", "which": "subject "}} +{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": "from "}} +{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": "first"}} +{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": "last"}} +{"name": "get_email_detail", "parameters": {"detail": "attachment", "which": ""}} +{"name": "send_email", "parameters": {"action": "compose", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx"}} +{"name": "send_email", "parameters": {"action": "reply", "to": "", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}} +{"name": "send_email", "parameters": {"action": "forward", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}} +{"name": "create_draft", "parameters": {"action": "new", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx", "email_id": ""}} +{"name": "create_draft", "parameters": {"action": "reply", "to": "", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}} +{"name": "create_draft", "parameters": {"action": "forward", "to": "jeffxtang@meta.com", "subject": "xxxxx", "body": "xxxxx", "email_id": "xxxxx"}} +{"name": "send_draft", "parameters": {"id": "..."}} +{"name": "get_pdf_summary", "parameters": {"file_name": "..."}} +""" + +system_prompt = f""" +Your name is Email Agent, an assistant that can perform all email related tasks for your user. +Respond to the user's ask by making use of the following functions if needed. +If no available functions can be used, just say "I don't know" and don't make up facts. + +Example responses: +{examples} + +""" diff --git a/examples/email_client/main.py b/examples/email_client/main.py new file mode 100644 index 00000000..f4d37ed0 --- /dev/null +++ b/examples/email_client/main.py @@ -0,0 +1,138 @@ +import argparse +import email_agent +import asyncio +import json +from functions_prompt import * + +from llama_stack_client import LlamaStackClient +from llama_stack_client.lib.agents.agent import Agent +from llama_stack_client.lib.agents.event_logger import EventLogger +from llama_stack_client.types.agent_create_params import ( + AgentConfig, +) + +from shared import memory + +LLAMA_STACK_API_TOGETHER_URL="https://llama-stack.together.ai" +LLAMA31_8B_INSTRUCT = "Llama3.1-8B-Instruct" + +async def create_email_agent(client: LlamaStackClient) -> Agent: + """Create an agent with gmail tool capabilities.""" + + listEmailsTool = ListEmailsTool() + getEmailDetailTool = GetEmailDetailTool() + sendEmailTool = SendEmailTool() + getPDFSummaryTool = GetPDFSummaryTool() + createDraftTool = CreateDraftTool() + sendDraftTool = SendDraftTool() + + agent_config = AgentConfig( + model=LLAMA31_8B_INSTRUCT, + instructions=system_prompt, + sampling_params={ + "strategy": "greedy", + "temperature": 0.0, + "top_p": 0.9, + }, + tools = [ + listEmailsTool.get_tool_definition(), + getEmailDetailTool.get_tool_definition(), + sendEmailTool.get_tool_definition(), + getPDFSummaryTool.get_tool_definition(), + createDraftTool.get_tool_definition(), + sendDraftTool.get_tool_definition(), + + ], + tool_choice = "auto", + tool_prompt_format = "json", + input_shields = [], + output_shields = [], + enable_session_persistence = True + ) + + agent = Agent( + client = client, + agent_config = agent_config, + custom_tools = ( + listEmailsTool, + getEmailDetailTool, + sendEmailTool, + getPDFSummaryTool, + createDraftTool, + sendDraftTool + ) + ) + + return agent + +async def main(): + parser = argparse.ArgumentParser(description="Set email address") + parser.add_argument("--email", type=str, required=True, help="Your Gmail address") + args = parser.parse_args() + + email_agent.set_email_service(args.email) + + greeting = llama31("hello", "Your name is Email Agent, an assistant that can perform all email related tasks for your user.") + agent_response = f"{greeting}\n\nYour ask: " + + client = LlamaStackClient(base_url=LLAMA_STACK_API_TOGETHER_URL) + agent = await create_email_agent(client) + session_id = agent.create_session("email-session") + + while True: + ask = input(agent_response) + if ask == "bye": + print(llama31("bye")) + break + print("\n-------------------------\nCalling Llama...") + + response = agent.create_turn( + messages=[{"role": "user", "content": ask}], + session_id=session_id, + ) + + async for log in EventLogger().log(response): + if log.role == "CustomTool": + tool_name = json.loads(log.content)['name'] + result = json.loads(log.content)['result'] + + # post processing + if tool_name == 'list_emails': + memory['emails'] = result + num = len(result) + if num == 0: + output = "I couldn't find any such emails. What else would you like to do?" + elif num <= 5: + output = f"I found {num} email{'s' if num > 1 else ''} matching your query:\n" + for i, email in enumerate(result, start=1): + output += f"{i}. From: {email['sender']}, Subject: {email['subject']}, Received on: {email['received_time']}\n" + else: + output = f"I found {num} emails matching your query. Here are the first 5 emails:\n" + for i in range(1, 6): + output += f"{i}. From: {result[i - 1]['sender']}, Subject: {result[i - 1]['subject']}, Received on: {result[i - 1]['received_time']}\n" + + elif tool_name == "get_email_detail": + output = result + elif tool_name == "create_draft": + output = "Draft created." + memory['draft_id'] = result + elif tool_name == "send_draft": + output = result + elif tool_name == "send_email": + output = "Email sent." + elif tool_name == "get_pdf_summary": + output = result + + print(f"\n-------------------------\n\nAgent: {output}\n") + elif log.role == "inference": + print("Llama returned: ", end="") + else: + print(log, end="") + + agent_response = "Your ask: " + +if __name__ == "__main__": + asyncio.run(main()) + + + diff --git a/examples/email_client/requirements.txt b/examples/email_client/requirements.txt new file mode 100644 index 00000000..da96182d --- /dev/null +++ b/examples/email_client/requirements.txt @@ -0,0 +1,10 @@ +google-auth==2.27.0 +google-auth-oauthlib==0.4.6 +google-auth-httplib2==0.1.0 +google-api-python-client==2.34.0 +llama_stack_client==0.0.50 +pytz +beautifulsoup4 +ollama==0.4.4 +pypdf +termcolor \ No newline at end of file diff --git a/examples/email_client/shared.py b/examples/email_client/shared.py new file mode 100644 index 00000000..ea2b95ce --- /dev/null +++ b/examples/email_client/shared.py @@ -0,0 +1 @@ +memory = {} \ No newline at end of file