diff --git a/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/ReadME.md b/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/ReadME.md new file mode 100644 index 00000000..4f2e17a2 --- /dev/null +++ b/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/ReadME.md @@ -0,0 +1,97 @@ +# Chainlit + OpenAI Responses Demo (Multimodal • Reasoning • Streaming • Functions) + +A lean Chainlit app that wires the **OpenAI Responses API** to a real **local Python executor** with **multimodal I/O (text, images, files)**, **reasoning traces**, **token+function streaming**, and **tool functions** (web search preview, image generation, Python runner, file ops, optional file-search). + +--- + +## Features (at a glance) + +* **Multimodal input**: accepts text, images, and file uploads in one message. +* **Multimodal output**: previews **images** inline and **CSV DataFrames** (with a download link) or falls back to readable text. +* **Reasoning**: configurable effort + **streamed reasoning summary** you can show in the UI when available. +* **Streaming**: assistant text streams token-by-token; **function call metadata & arguments stream** too, so you can live-render generated Python code. +* **Functions / Tools**: registered tools include `execute_python_code`, `upload_file_to_workspace`, `list_workspace_files`, simple calculator, **web\_search\_preview**, **image\_generation**, and optional **file\_search** when a vector store is present. +* **Per-chat workspace**: each chat gets its own `.files/{session_id}/pyws` folder; Python runs there and files persist for the chat’s lifetime. +* **Nice UX**: progress steps for web search, image gen, and Python runs; optional full conversation/“reasoning summary” panels. + +--- + +## What’s in the repo + +* **`app.py`** — Chainlit lifecycle, dev-prompt injection on first turn, vector-store hookup on upload, Responses **streaming loop** (text + function args), live “Python Code Being Generated” pane, and a multi-iteration tool loop. +* **`tools.py`** — Tool registry and implementations: + + * `execute_python_code` (**persistent workspace**, returns stdout/stderr + collected files). + * `upload_file_to_workspace` / `list_workspace_files` helpers. + * File renderer that shows **images inline**, **CSV as DataFrame**, and **text previews** with download buttons. + +--- + +## Quick start + +```bash +python -m venv .venv && source .venv/bin/activate # Windows: .venv\Scripts\activate +pip install chainlit openai pandas matplotlib +export OPENAI_API_KEY=sk-... # Windows PowerShell: $env:OPENAI_API_KEY="sk-..." +chainlit run app.py -w +``` + +Open [http://localhost:8000](http://localhost:8000). + +--- + +## How it works + +1. **Dev prompt + settings** + First turn injects your developer instructions; reasoning effort and summary are toggled in settings. + +2. **Multimodal in** + You can send text, images, and files together; non-image files are converted to **function calls** to place them in the workspace. + +3. **Streaming loop** + The app calls `responses.create(stream=True)` and processes an event stream: + + * assistant text tokens, + * function call *creation* events, + * **function argument deltas** (used to live-render code for `execute_python_code`). + +4. **Tool execution** + Detected calls are executed, then their outputs are returned via `function_call_output` and the loop continues until the model is done. + +5. **Results rendering** + Python runs can emit images/CSVs/text; these are previewed inline and attached as downloads. + +6. **Optional RAG** + If you upload docs at start, the app creates a vector store and enables the **file\_search** tool for the model. + +--- + +## Configuration + +* **Model**: change the model in `_ask_gpt`. +* **Reasoning**: set effort, enable summary. +* **Workspace**: path = `.files/{session_id}/pyws`; auto-cleaned on chat end. +* **Tools**: edit `build_tools(...)` to add/remove tools; file-search is gated on vector store presence. + +--- + +## Troubleshooting + +* **No code preview while functions run** → ensure you’re calling Responses with `stream=True` and watch for `function_call_arguments.delta` events. +* **CSV not rendering as a table** → pandas must be importable; otherwise it falls back to a text preview. +* **Files didn’t persist** → they persist for the chat; cleanup runs on `@cl.on_chat_end`. + +--- + +## Security + +Python executes **locally** inside the per-chat workspace. Treat generated code as untrusted; time limits are applied in the executor and files are confined to that directory. (See the Python tool descriptions and file-handling utilities.) + +--- + +## File map + +* `app.py` — chat lifecycle, dev prompt, vector store & **file\_search**, event-driven streaming, multi-iteration function loop, code preview. +* `tools.py` — tool registry, Python executor, upload/list helpers, image/CSV/text previewers, progress + reasoning summary steps. + +Add a license (MIT recommended) and you’re set. diff --git a/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/app.py b/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/app.py new file mode 100644 index 00000000..1c19a6d9 --- /dev/null +++ b/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/app.py @@ -0,0 +1,556 @@ +""" +Chainlit + OpenAI Integration with Local Python Execution +Cleaned up version with efficient file upload handling +""" + +import os +import json +import base64 +from typing import List, Dict, Any, Optional +import pathlib +import shutil + +from openai import AsyncOpenAI +import chainlit as cl +from chainlit.input_widget import Select, Switch + +# Import our custom tools +from tools import ( + build_tools, + call_function_tool, + show_tool_progress, + show_reasoning_summary, + upload_file_to_workspace, + list_workspace_files +) + +# ─────────────────── OpenAI Client Setup ──────────────────────────────────── +cl.instrument_openai() +client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY")) + +MAX_ITER = 20 +DEV_PROMPT = "Talk in Ned Flanders style. You are a helpful assistant with access to local Python execution, web search, image generation, file search, and file upload capabilities. You can upload files to a persistent Python workspace and analyze them with code. Use tools when needed." + +# ─────────────────── Session Management ───────────────────────────────────── +@cl.on_chat_start +async def _start(): + """Initialize chat session with settings and optional file upload""" + settings = await cl.ChatSettings([ + Select( + id="reasoning_effort", + label="Reasoning Level", + values=["minimal", "low", "medium", "high"], + initial_index=1 + ), + Switch(id="show_reasoning", label="Show Reasoning Summary", initial=True), + Switch(id="show_conversation_history", label="Show Conversation History", initial=True), + Switch(id="show_tool_execution", label="Show Tool Execution Details", initial=True), + ]).send() + + cl.user_session.set("settings", settings) + cl.user_session.set("full_conversation_history", []) + cl.user_session.set("previous_response_id", None) + cl.user_session.set("tool_results", {}) + cl.user_session.set("vector_store_id", None) + cl.user_session.set("dev_prompt", DEV_PROMPT) + + # Ask about file upload for search capabilities + upload_choice = await cl.AskActionMessage( + content="Would you like to upload files for search capabilities?", + actions=[ + cl.Action(name="upload", label="Upload files", payload={"value": "upload"}), + cl.Action(name="skip", label="Skip", payload={"value": "skip"}), + ], + ).send() + + if upload_choice and upload_choice.get("value") == "upload": + files = await cl.AskFileMessage( + content="Upload files to search through:", + accept=["text/plain", "application/pdf", ".txt", ".md", ".py", ".js", ".html", ".css"], + max_files=10, + timeout=180, + ).send() + + if files and len(files) > 0: + try: + # Create vector store for file search + vector_store = await client.vector_stores.create(name="Chat Files") + uploaded_files = [] + + for file in files: + uploaded_file = await client.files.create( + file=file.content, + purpose="file-search" + ) + uploaded_files.append(uploaded_file.id) + + await client.vector_stores.files.create_many( + vector_store_id=vector_store.id, + file_ids=uploaded_files + ) + + cl.user_session.set("vector_store_id", vector_store.id) + await cl.Message( + f"Uploaded {len(files)} files for search!", + author="System" + ).send() + + except Exception as e: + await cl.Message(f"Error uploading files: {e}", author="System").send() + else: + await cl.Message( + "Continuing without file upload. All features are ready including local Python execution!", + author="System" + ).send() + + # Build a per-chat workspace under .files/{session_id}/pyws + session_id = cl.user_session.get("id") # unique per chat + base_files_dir = os.path.join(os.getcwd(), ".files", session_id) + ws_dir = os.path.join(base_files_dir, "pyws") + os.makedirs(ws_dir, exist_ok=True) + + cl.user_session.set("python_workspace_dir", ws_dir) + await cl.Message(f"Using per-chat workspace: `{ws_dir}`", author="System").send() + + +@cl.on_settings_update +async def setup_agent(settings): + """Handle settings updates""" + cl.user_session.set("settings", settings) + + +@cl.on_chat_end +def _cleanup(): + """Clean up workspace when chat ends""" + ws_dir = cl.user_session.get("python_workspace_dir") + if ws_dir and os.path.isdir(ws_dir): + try: + shutil.rmtree(ws_dir) + except Exception: + pass + + +# ─────────────────── File Upload Helper ───────────────────────────────────── +async def handle_file_uploads(uploaded_files: List[Dict[str, Any]]) -> str: + """ + Handle file uploads and return context message for LLM + + Args: + uploaded_files: List of file info dicts + + Returns: + Context message about uploads and current workspace state + """ + if not uploaded_files: + return "" + + workspace_dir = cl.user_session.get("python_workspace_dir") + context_parts = [] + + # Process uploads + for file_info in uploaded_files: + try: + # Decode and upload file + content_bytes = base64.b64decode(file_info['content_b64']) + upload_result_json = upload_file_to_workspace( + file_info['filename'], + content_bytes, + workspace_dir + ) + upload_result = json.loads(upload_result_json) + + if upload_result['success']: + # Show success message to user + await cl.Message( + content=f"**File uploaded successfully!**\n\n" + f"**{file_info['filename']}** has been saved to the Python workspace.\n" + f"Size: {file_info['size']:,} bytes\n" + f"You can now analyze this file using Python code.", + author="File Manager" + ).send() + else: + await cl.Message( + content=f"**Upload failed:** {upload_result.get('message', 'Unknown error')}", + author="File Manager" + ).send() + + except Exception as e: + await cl.Message( + content=f"**Upload failed:** Error processing {file_info['filename']}: {str(e)}", + author="File Manager" + ).send() + + # Get current workspace state + workspace_result = list_workspace_files(workspace_dir) + try: + workspace_data = json.loads(workspace_result) + if workspace_data['success'] and workspace_data['files']: + file_list = [f"{f['filename']} ({f['size']:,} bytes)" for f in workspace_data['files']] + + # Build context message + uploaded_names = [f['filename'] for f in uploaded_files] + context_parts.append(f"I just uploaded {len(uploaded_files)} file(s): {', '.join(uploaded_names)}.") + context_parts.append(f"Current workspace contains {len(file_list)} files: {', '.join(file_list)}.") + else: + context_parts.append(f"I just uploaded {len(uploaded_files)} file(s) but workspace appears empty.") + except: + context_parts.append(f"I just uploaded {len(uploaded_files)} file(s) to your workspace.") + + return " ".join(context_parts) + + +# ─────────────────── Core GPT Interaction ─────────────────────────────────── +async def _ask_gpt(input_data, prev_id=None): + """ + Send request to OpenAI API and handle streaming response + + Args: + input_data: Input for the API call + prev_id: Previous response ID for continuation + + Returns: + Tuple of (response_id, function_calls) + """ + # Get user settings + settings = cl.user_session.get("settings", {}) + vector_store_id = cl.user_session.get("vector_store_id") + tools = build_tools(vector_store_id) + + reasoning_effort = settings.get("reasoning_effort", "low") + show_reasoning = settings.get("show_reasoning", True) + + # Configure reasoning + reasoning_config = {"effort": reasoning_effort} + if show_reasoning: + reasoning_config["summary"] = "auto" + + # Handle different input types + if isinstance(input_data, list) and len(input_data) > 0 and isinstance(input_data[0], dict): + api_input = input_data + else: + api_input = input_data + + dev_input = [] + if not prev_id: + dev_input.append({ + "role": "developer", + "content": cl.user_session.get("dev_prompt") or DEV_PROMPT, + }) + + print(dev_input + api_input) + + # Make API call with streaming + stream = await client.responses.create( + model="gpt-5-mini", + reasoning=reasoning_config, + input=dev_input + api_input, + instructions=( + "You are a helpful, neutral assistant. Use the execute_python_code function " + "when you need to run Python code, create visualizations, or perform data analysis. " + "The function runs code locally and can generate files like plots, CSVs, etc. " + "Always explain what the code does before executing it. " + "Pay attention to any function call results in the conversation history - these show what actions have been taken." + ), + stream=True, + store=True, + tools=tools, + tool_choice="auto", + **({"previous_response_id": prev_id} if prev_id else {}), + ) + + # Initialize response tracking + ans = cl.Message(author="Assistant", content="") + await ans.send() + + calls = [] + resp_id = None + assistant_text = "" + reasoning_text = "" + + # Track function call streaming + streaming_code_message = None + current_code = "" + + # Process streaming response + async for ev in stream: + if ev.type == "response.created": + resp_id = ev.response.id + + elif ev.type == "response.completed": + pass + + # Tool progress tracking + elif ev.type == "response.web_search_call.in_progress": + if settings.get("show_tool_execution", True): + await show_tool_progress("web_search_call", "in_progress") + elif ev.type == "response.web_search_call.searching": + if settings.get("show_tool_execution", True): + await show_tool_progress("web_search_call", "searching") + elif ev.type == "response.web_search_call.completed": + if settings.get("show_tool_execution", True): + await show_tool_progress("web_search_call", "completed") + + elif ev.type == "response.image_generation_call.in_progress": + if settings.get("show_tool_execution", True): + await show_tool_progress("image_generation_call", "in_progress") + elif ev.type == "response.image_generation_call.generating": + if settings.get("show_tool_execution", True): + await show_tool_progress("image_generation_call", "generating") + elif ev.type == "response.image_generation_call.completed": + if settings.get("show_tool_execution", True): + await show_tool_progress("image_generation_call", "completed") + elif ev.type == "response.image_generation_call.partial_image": + # Display the generated image using base64 data + if hasattr(ev, 'partial_image_b64'): + image_data = f"data:image/png;base64,{ev.partial_image_b64}" + image_msg = cl.Message( + content="Here's your generated image:", + author="Assistant", + elements=[cl.Image(url=image_data, name="Generated Image", display="inline")] + ) + await image_msg.send() + # Reasoning summary streaming + elif ev.type == "response.reasoning_summary_text.delta": + reasoning_text += ev.delta + + # Function calls + elif ev.type == "response.output_item.added" and getattr(ev.item, "type", "") == "function_call": + calls.append({ + "id": ev.item.id, + "call_id": ev.item.call_id, + "name": ev.item.name, + "arguments": "" + }) + + # If this is a Python code execution call, start streaming display + if ev.item.name == "execute_python_code": + streaming_code_message = cl.Message( + author="Code Generator", + content="**Python Code Being Generated:**\n```python\n\n```" + ) + await streaming_code_message.send() + current_code = "" + + elif ev.type == "response.function_call_arguments.delta": + # Find the call and append arguments + for call in calls: + if call["id"] == ev.item_id: + call["arguments"] += ev.delta + + # If this is Python code, stream the code generation + if call["name"] == "execute_python_code" and streaming_code_message: + try: + current_args_str = call["arguments"] + + if '"code":"' in current_args_str: + code_start = current_args_str.find('"code":"') + 8 + code_part = current_args_str[code_start:] + + code_end = len(code_part) + if '"}' in code_part: + code_end = code_part.find('"}') + elif '",' in code_part: + code_end = code_part.find('",') + + new_code = code_part[:code_end] + new_code = new_code.replace('\\"', '"').replace('\\n', '\n').replace('\\t', '\t') + + if new_code != current_code: + current_code = new_code + streaming_code_message.content = f"**Python Code Being Generated:**\n```python\n{current_code}\n```" + await streaming_code_message.update() + except Exception: + pass + break + + # Assistant text streaming + elif ev.type == "response.output_text.delta": + assistant_text += ev.delta + await ans.stream_token(ev.delta) + + await ans.update() + + # Display reasoning summary if available + if reasoning_text.strip() and settings.get("show_reasoning", True): + await show_reasoning_summary(reasoning_text) + + # Update conversation history + if assistant_text.strip(): + full_history = cl.user_session.get("full_conversation_history", []) + full_history.append({"role": "assistant", "content": assistant_text}) + cl.user_session.set("full_conversation_history", full_history) + + return resp_id, calls + + +# ─────────────────── Message Handling ─────────────────────────────────────── +@cl.on_message +async def _on_msg(m: cl.Message): + """ + Handle incoming messages with support for text, images, and file uploads + + Args: + m: Chainlit message object + """ + full_history = cl.user_session.get("full_conversation_history", []) + + # Initialize conversation if empty + if not full_history: + full_history.append({ + "role": "developer", + "content": cl.user_session.get("dev_prompt") or DEV_PROMPT + }) + + # Handle file uploads + uploaded_files = [] + if m.elements: + for element in m.elements: + if (hasattr(element, 'path') and hasattr(element, 'name') and + not (element.mime and element.mime.startswith("image/"))): + try: + with open(element.path, 'rb') as f: + file_content = f.read() + + content_b64 = base64.b64encode(file_content).decode('utf-8') + uploaded_files.append({ + 'filename': element.name, + 'content_b64': content_b64, + 'size': len(file_content) + }) + except Exception as e: + await cl.Message( + content=f"Failed to process uploaded file {element.name}: {str(e)}", + author="System" + ).send() + + # Handle multimodal input (text + images) + image_count = 0 + content = [{"type": "input_text", "text": m.content}] + + if m.elements: + for element in m.elements: + if element.mime and element.mime.startswith("image/"): + if hasattr(element, "path") and element.path: + with open(element.path, "rb") as image_file: + encoded_string = base64.b64encode(image_file.read()).decode() + content.append({ + "type": "input_image", + "image_url": f"data:{element.mime};base64,{encoded_string}" + }) + image_count += 1 + elif element.url: + content.append({"type": "input_image", "image_url": element.url}) + image_count += 1 + + # Process file uploads and build context + upload_context = "" + if uploaded_files: + upload_context = await handle_file_uploads(uploaded_files) + + # Build final message content + enhanced_content = m.content + if upload_context: + enhanced_content = f"{m.content}\nNote: {upload_context}" + + # Update conversation input + if image_count > 0: + content[0]["text"] = enhanced_content + current_input = [{"role": "user", "content": content}] + history_content = f"{enhanced_content} [+ {image_count} image(s)]" + else: + current_input = [{"role": "user", "content": enhanced_content}] + history_content = enhanced_content + + # Update conversation history + full_history.append({"role": "user", "content": history_content}) + cl.user_session.set("full_conversation_history", full_history) + + # Get previous response ID for continuation + prev_response_id = cl.user_session.get("previous_response_id") + + # Handle multiple iterations for function calls + for iteration in range(MAX_ITER): + resp_id, calls = await _ask_gpt(current_input, prev_response_id) + + if not calls: + break + + # Get settings for this iteration + settings = cl.user_session.get("settings", {}) + + # Process function calls + for call in calls: + if settings.get("show_tool_execution", True): + await show_tool_progress("python_execution", "in_progress") + + await call_function_tool(call, full_history) + + if settings.get("show_tool_execution", True): + await show_tool_progress("python_execution", "completed") + + # Prepare function results for next iteration + tool_results = cl.user_session.get("tool_results", {}) + current_input = [ + { + "type": "function_call_output", + "call_id": call["call_id"], + "output": tool_results[call["call_id"]] + } + for call in calls + ] + prev_response_id = resp_id + + # Update session state + cl.user_session.set("full_conversation_history", full_history) + cl.user_session.set("previous_response_id", resp_id) + + # Show conversation history if enabled + settings = cl.user_session.get("settings", {}) + if settings.get("show_conversation_history", True): + await show_full_conversation([{"role": "user", "content": m.content}], full_history) + + +# ─────────────────── Debug and Utilities ──────────────────────────────────── +@cl.step(type="tool") +async def show_full_conversation(current_message: List[Dict], full_history: List[Dict]): + """ + Display full conversation history for debugging + + Args: + current_message: Current message that triggered this + full_history: Complete conversation history + """ + s = cl.context.current_step + s.name = "Full Conversation History" + s.input = current_message + + # Format history for display + formatted_history = [] + for msg in full_history: + if isinstance(msg, dict): + role = msg.get("role") + if role in ("system", "user", "assistant", "developer"): + formatted_history.append({ + "role": role, + "content": msg["content"] + }) + elif role == "function" or msg.get("name"): + formatted_history.append({ + "role": "function", + "name": msg.get("name", "unknown_function"), + "content": msg["content"], + "tool_call_id": msg.get("tool_call_id", "unknown_id"), + }) + else: + formatted_history.append(msg) + else: + formatted_history.append(msg) + + s.output = formatted_history + s.language = "json" + return formatted_history + + +if __name__ == "__main__": + import chainlit as cl + cl.run() \ No newline at end of file diff --git a/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/tools.py b/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/tools.py new file mode 100644 index 00000000..8caf8861 --- /dev/null +++ b/openai-responses-gpt5-functions-streaming-multi-modal-reasoning-super-advanced/tools.py @@ -0,0 +1,753 @@ +""" +Custom tools and functions for Chainlit + OpenAI integration +Includes local Python executor and other utility functions +""" + +import json +import subprocess +import sys +import tempfile +import os +import pathlib +import base64 +from typing import List, Dict, Any, Optional, Tuple +import chainlit as cl + + +class LocalPythonExecutor: + """Execute Python code locally and return results with file attachments""" + + def __init__(self, workspace_dir=None): + # Use provided workspace directory or fall back to global one + if workspace_dir: + self.workspace_dir = workspace_dir + else: + self.workspace_dir = os.path.join(os.getcwd(), "python_workspace") + os.makedirs(self.workspace_dir, exist_ok=True) + self.output_files = [] + + def execute_code(self, code: str) -> Dict[str, Any]: + """ + Execute Python code locally and capture output, errors, and generated files + + Args: + code: Python code to execute + + Returns: + Dict containing stdout, stderr, return_code, and generated files + """ + # Create a temporary Python file in the workspace + temp_file = tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, dir=self.workspace_dir) + + # Track files before execution (in the persistent workspace) + initial_files = set(os.listdir(self.workspace_dir)) + + # Modify the code to work in the persistent workspace and list existing files + enhanced_code = f""" +import sys +import os +import matplotlib +matplotlib.use('Agg') # Use non-interactive backend +import matplotlib.pyplot as plt + +# Change to workspace directory (using raw string to handle Windows paths) +os.chdir(r'{self.workspace_dir}') + +# Show existing files at start +existing_files = [f for f in os.listdir('.') if not f.endswith('.py')] +if existing_files: + print(f"Existing files in workspace: {{existing_files}}") + +# Track files before execution +_initial_files = set(os.listdir('.')) + +# Original code +{code} + +# Save any matplotlib figures +if plt.get_fignums(): + for i, fig_num in enumerate(plt.get_fignums()): + fig = plt.figure(fig_num) + filename = f'figure_{{i+1}}.png' + fig.savefig(filename, dpi=150, bbox_inches='tight') + print(f"Saved figure: {{filename}}") + +# Post-process CSV files to ensure they have proper headers +import glob +for csv_file in glob.glob('*.csv'): + try: + import pandas as pd + # Try to read the CSV + df = pd.read_csv(csv_file, header=None) + # If it has no headers and looks like numeric data, add generic headers + if len(df.columns) > 1 and df.dtypes.apply(lambda x: x.name in ['float64', 'int64']).all(): + # Add column headers for better display + df.columns = [f'col_{{i}}' for i in range(len(df.columns))] + # Save back with headers + df.to_csv(csv_file, index=False) + print(f"Added headers to {{csv_file}}") + except Exception as e: + # If pandas processing fails, leave the CSV as is + pass + +# Track new files created +_final_files = set(os.listdir('.')) +_new_files = _final_files - _initial_files +if _new_files: + print(f"Generated files: {{list(_new_files)}}") +""" + + temp_file.write(enhanced_code) + temp_file.close() + + try: + # Execute the code in the persistent workspace + result = subprocess.run( + [sys.executable, temp_file.name], + capture_output=True, + text=True, + cwd=self.workspace_dir, + timeout=30 # 30 second timeout + ) + + # Find newly generated files + final_files = set(os.listdir(self.workspace_dir)) + new_files = final_files - initial_files - {os.path.basename(temp_file.name)} + + generated_files = [] + for file in new_files: + file_path = os.path.join(self.workspace_dir, file) + if os.path.isfile(file_path): + generated_files.append(file_path) + + return { + 'stdout': result.stdout, + 'stderr': result.stderr, + 'return_code': result.returncode, + 'generated_files': generated_files, + 'success': result.returncode == 0, + 'workspace_dir': self.workspace_dir + } + + except subprocess.TimeoutExpired: + return { + 'stdout': '', + 'stderr': 'Code execution timed out after 30 seconds', + 'return_code': -1, + 'generated_files': [], + 'success': False, + 'workspace_dir': self.workspace_dir + } + except Exception as e: + return { + 'stdout': '', + 'stderr': f'Execution error: {str(e)}', + 'return_code': -1, + 'generated_files': [], + 'success': False, + 'workspace_dir': self.workspace_dir + } + finally: + # Clean up temp Python file but keep generated files + try: + os.unlink(temp_file.name) + except: + pass + + def cleanup(self): + """Clean up - but now this just removes temp files, not the workspace""" + # Don't remove the workspace directory - let files persist + pass + + +def simple_calculator(operation: str, a: float, b: float) -> str: + """Simple calculator for testing function calls""" + try: + a, b = float(a), float(b) + if operation == "add": + result = a + b + elif operation == "subtract": + result = a - b + elif operation == "multiply": + result = a * b + elif operation == "divide": + result = a / b if b != 0 else "Error: Division by zero" + else: + result = "Error: Unknown operation" + return json.dumps({"operation": operation, "operands": [a, b], "result": result}) + except ValueError: + return json.dumps({"error": "Invalid numbers provided"}) + + +def list_workspace_files(workspace_dir=None) -> str: + """ + List all files currently in the Python workspace + + Args: + workspace_dir: Optional specific workspace directory + + Returns: + JSON string with list of files and their info + """ + if workspace_dir is None: + workspace_dir = os.path.join(os.getcwd(), "python_workspace") + + try: + if not os.path.exists(workspace_dir): + return json.dumps({ + 'success': True, + 'files': [], + 'message': 'Workspace directory does not exist yet' + }) + + files_info = [] + for filename in os.listdir(workspace_dir): + file_path = os.path.join(workspace_dir, filename) + if os.path.isfile(file_path) and not filename.endswith('.py'): # Exclude temp Python files + file_stat = os.stat(file_path) + file_info = { + 'filename': filename, + 'size': file_stat.st_size, + 'extension': pathlib.Path(filename).suffix.lower(), + 'modified_time': file_stat.st_mtime + } + files_info.append(file_info) + + return json.dumps({ + 'success': True, + 'files': files_info, + 'count': len(files_info), + 'message': f'Found {len(files_info)} files in workspace' + }) + + except Exception as e: + return json.dumps({ + 'success': False, + 'error': str(e), + 'message': f'Failed to list workspace files: {str(e)}' + }) + + +def upload_file_to_workspace(filename: str, content: bytes, workspace_dir=None) -> str: + """ + Upload a file to the persistent workspace + + Args: + filename: Name of the file to save + content: File content as bytes + workspace_dir: Optional specific workspace directory + + Returns: + JSON string with upload result + """ + try: + # Create workspace if it doesn't exist + if workspace_dir is None: + workspace_dir = os.path.join(os.getcwd(), "python_workspace") + os.makedirs(workspace_dir, exist_ok=True) + + # Save file to workspace + file_path = os.path.join(workspace_dir, filename) + with open(file_path, 'wb') as f: + f.write(content) + + # Get file info + file_size = len(content) + file_ext = pathlib.Path(filename).suffix.lower() + + result = { + 'success': True, + 'filename': filename, + 'file_path': file_path, + 'file_size': file_size, + 'file_extension': file_ext, + 'message': f'File "{filename}" uploaded successfully to workspace' + } + + return json.dumps(result) + + except Exception as e: + result = { + 'success': False, + 'filename': filename, + 'error': str(e), + 'message': f'Failed to upload file "{filename}": {str(e)}' + } + return json.dumps(result) + + +def execute_python_code(code: str, workspace_dir=None) -> Tuple[str, List[Dict[str, Any]]]: + """ + Execute Python code locally and return formatted results + + Args: + code: Python code to execute + workspace_dir: Optional specific workspace directory + + Returns: + Tuple of (conversation_result_json, files_for_display) + """ + executor = LocalPythonExecutor(workspace_dir) + + try: + result = executor.execute_code(code) + + # Process files BEFORE cleanup + processed_files = [] + for file_path in result['generated_files']: + try: + file_info = process_generated_file(file_path) + if file_info: + processed_files.append(file_info) + except Exception as e: + print(f"Error processing file {file_path}: {e}") + + # Create a minimal response for the conversation history (no file content) + conversation_response = { + 'success': result['success'], + 'stdout': result['stdout'][:500] + "..." if len(result['stdout']) > 500 else result['stdout'], + 'stderr': result['stderr'][:500] + "..." if len(result['stderr']) > 500 else result['stderr'], + 'return_code': result['return_code'], + 'files_generated': len(processed_files) + } + + return json.dumps(conversation_response), processed_files + + finally: + executor.cleanup() + + +def process_generated_file(file_path: str) -> Optional[Dict[str, Any]]: + """ + Process a generated file and prepare it for attachment + + Args: + file_path: Path to the generated file + + Returns: + Dict with file information or None if processing failed + """ + try: + filename = os.path.basename(file_path) + file_size = os.path.getsize(file_path) + + # Determine file type + suffix = pathlib.Path(filename).suffix.lower() + + # Read file content + if suffix in ['.png', '.jpg', '.jpeg', '.gif', '.webp']: + # Image file - encode as base64 + with open(file_path, 'rb') as f: + content = base64.b64encode(f.read()).decode('utf-8') + return { + 'filename': filename, + 'type': 'image', + 'size': file_size, + 'content': content, + 'mime_type': f'image/{suffix[1:]}' + } + elif suffix == '.csv': + # CSV file - read as text + with open(file_path, 'r', encoding='utf-8') as f: + content = f.read() + return { + 'filename': filename, + 'type': 'csv', + 'size': file_size, + 'content': content, + 'mime_type': 'text/csv' + } + else: + # Text file - read as text + with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + content = f.read() + return { + 'filename': filename, + 'type': 'text', + 'size': file_size, + 'content': content, + 'mime_type': 'text/plain' + } + + except Exception as e: + print(f"Error processing file {file_path}: {e}") + return None + + +def build_tools(vector_store_id: Optional[str] = None) -> List[Dict[str, Any]]: + """ + Build the tools list for OpenAI API + + Args: + vector_store_id: Optional vector store ID for file search + + Returns: + List of tool definitions + """ + tools: List[Dict[str, Any]] = [ + {"type": "web_search_preview"}, + {"type": "image_generation"}, + { + "type": "function", + "name": "simple_calculator", + "description": "Perform basic math operations", + "parameters": { + "type": "object", + "properties": { + "operation": {"type": "string", "enum": ["add", "subtract", "multiply", "divide"]}, + "a": {"type": "number"}, + "b": {"type": "number"}, + }, + "required": ["operation", "a", "b"], + "additionalProperties": False, + }, + "strict": True, + }, + { + "type": "function", + "name": "execute_python_code", + "description": "Execute Python code locally and return results with any generated files. Has access to a persistent workspace where files are saved between executions.", + "parameters": { + "type": "object", + "properties": { + "code": { + "type": "string", + "description": "Python code to execute. Can use matplotlib, pandas, numpy, and other common libraries. Files saved here persist between executions." + } + }, + "required": ["code"], + "additionalProperties": False, + }, + "strict": True, + }, + { + "type": "function", + "name": "list_workspace_files", + "description": "List all files currently available in the Python workspace directory", + "parameters": { + "type": "object", + "properties": {}, + "required": [], + "additionalProperties": False, + }, + "strict": True, + }, + { + "type": "function", + "name": "upload_file_to_workspace", + "description": "Upload a file to the persistent Python workspace so it can be accessed by Python code", + "parameters": { + "type": "object", + "properties": { + "filename": { + "type": "string", + "description": "Name for the file to be saved in the workspace" + }, + "content": { + "type": "string", + "description": "Base64 encoded file content" + } + }, + "required": ["filename", "content"], + "additionalProperties": False, + }, + "strict": True, + } + ] + + if vector_store_id: + tools.append({"type": "file_search", "vector_store_ids": [vector_store_id]}) + + return tools + + +@cl.step(type="tool") +async def call_function_tool(call: Dict[str, Any], full_history: List[Dict[str, Any]]) -> str: + """ + Handle function tool calls + + Args: + call: Function call information + full_history: Conversation history to update + + Returns: + Function call result + """ + s = cl.context.current_step + s.name = f"{call['name']}" + + try: + args = json.loads(call.get("arguments") or "{}") + except json.JSONDecodeError: + args = {} + out = json.dumps({"error": "Invalid function arguments"}) + full_history.append({ + "role": "function", + "name": call["name"], + "content": out, + "tool_call_id": call["call_id"] + }) + + tool_results = cl.user_session.get("tool_results", {}) + tool_results[call["call_id"]] = out + cl.user_session.set("tool_results", tool_results) + + s.input = {"function": call["name"], "arguments": "invalid_json"} + s.output = out + s.language = "json" + return out + + # Get workspace directory from session + workspace_dir = cl.user_session.get("python_workspace_dir") + + # Execute the appropriate function + if call["name"] == "simple_calculator": + out = simple_calculator(args.get("operation"), args.get("a"), args.get("b")) + + elif call["name"] == "execute_python_code": + code = args.get("code", "") + + # Execute the code and get both conversation result and files + conversation_result, processed_files = execute_python_code(code, workspace_dir) + out = conversation_result + + # Display execution results and files + try: + result_data = json.loads(conversation_result) + await display_execution_results_simple(result_data, processed_files) + except Exception as e: + print(f"Error displaying execution results: {e}") + + elif call["name"] == "list_workspace_files": + out = list_workspace_files(workspace_dir) + + # Display file list + try: + result_data = json.loads(out) + if result_data['success'] and result_data['files']: + file_list = [] + for file_info in result_data['files']: + size_mb = file_info['size'] / (1024 * 1024) + if size_mb >= 1: + size_str = f"{size_mb:.1f} MB" + else: + size_kb = file_info['size'] / 1024 + if size_kb >= 1: + size_str = f"{size_kb:.1f} KB" + else: + size_str = f"{file_info['size']} bytes" + + file_list.append(f"- **{file_info['filename']}** ({size_str})") + + await cl.Message( + content=f"**Workspace Files ({result_data['count']} files):**\n\n" + "\n".join(file_list), + author="File Manager" + ).send() + else: + await cl.Message( + content="**Workspace is empty** - no files found", + author="File Manager" + ).send() + except Exception as e: + print(f"Error displaying file list: {e}") + + elif call["name"] == "upload_file_to_workspace": + filename = args.get("filename", "") + content_b64 = args.get("content", "") + + try: + # Decode base64 content + content_bytes = base64.b64decode(content_b64) + out = upload_file_to_workspace(filename, content_bytes, workspace_dir) + + # Display upload confirmation + result_data = json.loads(out) + if result_data['success']: + await cl.Message( + content=f"**File uploaded successfully!**\n\n" + f"**{result_data['filename']}** has been saved to the Python workspace.\n" + f"Size: {result_data['file_size']:,} bytes\n" + f"You can now analyze this file using Python code.", + author="File Manager" + ).send() + else: + await cl.Message( + content=f"**Upload failed:** {result_data['message']}", + author="File Manager" + ).send() + except Exception as e: + out = json.dumps({"success": False, "error": f"Failed to process upload: {str(e)}"}) + await cl.Message( + content=f"**Upload failed:** Error processing file upload", + author="File Manager" + ).send() + + else: + out = json.dumps({"error": f"Unknown function {call['name']}"}) + + # Update tool results and history + tool_results = cl.user_session.get("tool_results", {}) + tool_results[call["call_id"]] = out + cl.user_session.set("tool_results", tool_results) + + full_history.append({ + "role": "function", + "name": call["name"], + "content": out, + "tool_call_id": call["call_id"] + }) + + s.input = {"function": call["name"], "arguments": args} + s.output = out + s.language = "json" + return out + + +async def display_execution_results_simple(result_data: Dict[str, Any], processed_files: List[Dict[str, Any]]): + """ + Display Python execution results with minimal overhead + + Args: + result_data: Execution result data + processed_files: List of processed file information + """ + # Display stdout if present + if result_data.get('stdout'): + await cl.Message( + content=f"**Output:**\n```\n{result_data['stdout']}\n```", + author="Python Executor" + ).send() + + # Display stderr if present + if result_data.get('stderr'): + await cl.Message( + content=f"**Error:**\n```\n{result_data['stderr']}\n```", + author="Python Executor" + ).send() + + # Display processed files + for file_info in processed_files: + try: + await display_generated_file(file_info) + except Exception as e: + print(f"Error displaying file: {e}") + + +async def display_generated_file(file_info: Dict[str, Any]): + """ + Display a generated file with appropriate preview + + Args: + file_info: File information dictionary + """ + filename = file_info['filename'] + file_type = file_info['type'] + content = file_info['content'] + + # Create temporary file for Chainlit + temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=pathlib.Path(filename).suffix) + + try: + if file_type == 'image': + # Decode base64 image + image_data = base64.b64decode(content) + temp_file.write(image_data) + temp_file.flush() + + await cl.Message( + content=f"**{filename}** — Generated image", + author="Python Executor", + elements=[ + cl.Image(path=temp_file.name, name=filename, display="inline"), + cl.File(name=filename, path=temp_file.name) + ] + ).send() + + elif file_type == 'csv': + # Write CSV content + temp_file.write(content.encode('utf-8')) + temp_file.flush() + + # Use Chainlit's Dataframe component + try: + import pandas as pd + import io + df = pd.read_csv(io.StringIO(content)) + + await cl.Message( + content=f"**{filename}** — Generated CSV file with {len(df)} rows and {len(df.columns)} columns", + author="Python Executor", + elements=[ + cl.Dataframe(data=df, name=filename, display="inline"), + cl.File(name=filename, path=temp_file.name) + ] + ).send() + + except Exception as e: + # Fallback to text preview if dataframe fails + print(f"Dataframe display error: {e}") + lines = content.split('\n')[:11] # First 10 lines + header + preview_md = "```csv\n" + "\n".join(lines) + "\n```" + + await cl.Message( + content=f"**{filename}** — Generated CSV file. Preview:\n\n{preview_md}", + author="Python Executor", + elements=[cl.File(name=filename, path=temp_file.name)] + ).send() + + else: + # Text file + temp_file.write(content.encode('utf-8')) + temp_file.flush() + + # Show preview of text content (first 1000 chars) + preview = content[:1000] + ("..." if len(content) > 1000 else "") + + await cl.Message( + content=f"**{filename}** — Generated text file. Preview:\n\n```\n{preview}\n```", + author="Python Executor", + elements=[cl.File(name=filename, path=temp_file.name)] + ).send() + + finally: + temp_file.close() + + +@cl.step(type="tool") +async def show_tool_progress(tool_type: str, status: str): + """Display tool execution progress""" + s = cl.context.current_step + status_messages = { + "web_search_call.in_progress": "Starting web search...", + "web_search_call.searching": "Searching the web...", + "web_search_call.completed": "Web search completed", + "python_execution.in_progress": "Starting Python code execution...", + "python_execution.running": "Running Python code...", + "python_execution.completed": "Python code execution completed", + "image_generation_call.in_progress": "Starting image generation...", + "image_generation_call.generating": "Generating image...", + "image_generation_call.completed": "Image generation completed", + } + s.name = "Tool Progress" + s.input = tool_type + s.output = status_messages.get(f"{tool_type}.{status}", f"{tool_type}: {status}") + + +@cl.step(type="tool") +async def show_reasoning_summary(summary_text: str): + """Display model reasoning summary""" + s = cl.context.current_step + s.name = "Reasoning Summary" + s.input = "Model's reasoning process" + s.output = summary_text + s.language = "markdown" + + +@cl.step(type="tool") +async def show_python_code(code: str): + """Display the Python code that will be executed""" + s = cl.context.current_step + s.name = "Python Code to Execute" + s.input = "Code submitted for execution" + s.output = code + s.language = "python" \ No newline at end of file diff --git a/openai-responses-gpt5-functions-streaming/app.py b/openai-responses-gpt5-functions-streaming/app.py index 69426b01..d696ebdd 100644 --- a/openai-responses-gpt5-functions-streaming/app.py +++ b/openai-responses-gpt5-functions-streaming/app.py @@ -65,6 +65,10 @@ def search_web(query): }, ] MAX_ITER = 20 +DEV_PROMPT = ( + "You are a helpful assistant. Use tools when needed. " + "Talk in Ned Flanders Simpsons style always." +) # ─────────────────── 3. conversation helpers ──────────────────────────────── @cl.on_chat_start @@ -72,6 +76,7 @@ def _start(): cl.user_session.set("full_conversation_history", []) cl.user_session.set("previous_response_id", None) cl.user_session.set("tool_results", {}) + cl.user_session.set("dev_prompt", DEV_PROMPT) # Enhanced debug view with full conversation history @@ -158,11 +163,18 @@ async def call_function_tool(call, full_history): async def _ask_gpt5(input_messages, prev_id=None): + dev_input = [] + if not prev_id: + dev_input.append({ + "role": "developer", # or "system" + "content": cl.user_session.get("dev_prompt") or DEV_PROMPT, + }) + print(dev_input + input_messages) stream = await client.responses.create( model="gpt-5-mini", reasoning={"effort": "minimal"}, - input=input_messages, # Only current turn messages - instructions="Never ask clarifying questions. Talk in Ned Flanders Simpsons style.", + input=dev_input + input_messages, # Only current turn messages + instructions="Never ask clarifying questions. Use the tools when needed.", stream=True, store=True, tools=tools, @@ -223,7 +235,7 @@ async def _on_msg(m: cl.Message): if not full_history: full_history.append( - {"role": "developer", "content": "You are a helpful assistant. Use tools when needed."} + {"role": "developer", "content": cl.user_session.get("dev_prompt") or DEV_PROMPT} ) full_history.append({"role": "user", "content": m.content})