diff --git a/MCAgent/Readme.md b/MCAgent/Readme.md new file mode 100644 index 00000000..fa08eda5 --- /dev/null +++ b/MCAgent/Readme.md @@ -0,0 +1,203 @@ +# MCAgent 使用文档 + +MCAgent 可以通过自然语言让AI调用各种游戏工具完成一系列操作 目前已支持聊天栏菜单调用 + + + +首先你需要访问DeepSeek官网或硅基流动官网获取API密匙(网上有教程) + +将密匙添加进配置文件中的'Api key' 并更改模型提供商 + +在游戏中输入以下触发词之一即可启动 AI 助手: +- `助手` +- `ai` + +然后输入你的需求,例如: +``` +给我10个钻石 +传送我到坐标 100 64 200 +查看在线玩家列表 +帮我举报xxx玩家(已安装举报系统) +``` + +### 其他命令 + +- **清除对话历史**:`清除对话` 或 `clear` 清除你与ai的对话历史 +- **取消当前请求**:`退出助手`、`取消` 或 `cancel` 防止请求卡住或循环 + +## 权限等级说明 + +MCAgent 采用权限系统,不同权限等级可以使用不同的工具: + +### 1. 普通玩家 (NONE) +- 无法使用 AI 助手功能 + +### 2. 创造模式玩家 (CREATIVE) +可使用以下工具: +- ✅ 传送玩家 (`teleport_player`) +- ✅ 给予物品 (`give_item`) +- ✅ 设置方块 (`set_block`) +- ✅ 填充方块 (`fill_blocks`) +- ✅ 广播消息 (`broadcast_message`) +- ✅ 获取玩家信息 (`get_player_info`) +- ✅ 获取在线玩家 (`get_online_players`) +- ✅ 获取玩家位置 (`get_player_position`) +- ✅ 获取玩家背包 (`get_player_inventory`) +- ✅ 获取玩家标签 (`get_player_tags`) +- ✅ 获取玩家分数 (`get_player_score`) +- ✅ 发送消息 (`send_message`) +- ✅ 获取菜单触发词 (`get_chatbar_menu_triggers`) +- ✅ 与菜单交互 (`interact_with_menu`) + +### 3. OP 权限 (OP) +除创造模式权限外,还可使用: +- ✅ 执行命令 (`execute_command`) - 受危险命令限制 +- ✅ 获取游戏规则 (`get_game_rule`) +- ✅ 放置命令方块 (`place_command_block`) - 受安全限制 + +### 4. 完全权限 (FULL) +- ✅ 所有工具无限制使用 +- ✅ 可以执行危险命令 +- ✅ 可以创建循环命令方块和保持激活的命令方块 +- ✅ 可以查看工具调用的详细参数 + +## 🛡️ 安全机制 + +### 危险命令拦截 +非完全权限玩家执行命令时,系统会自动拦截以下危险操作: +- `op` / `deop` - 权限管理 +- `stop` - 服务器关闭 +- 配置中定义的危险命令 + +### 命令方块安全限制 +为了安全考虑 非完全权限玩家放置命令方块时: +- ❌ 禁止创建循环命令方块 +- ⚠️ 强制设置为需要红石激活 +- ⚠️ 不在第一刻自动执行 + +## 📚 使用示例 + +### 示例 1:基础操作 +``` +玩家:给我10个钻石 +AI:§a✓ §f已给予§e Steve §f物品§b minecraft:diamond ×10 +``` + +### 示例 2:传送操作 +``` +玩家:传送我到主城 +AI:首先让我查看在线玩家... + §a✓ §f已将玩家§e Steve §f传送到§b (100, 64, 200) +``` + +### 示例 3:查询信息 +``` +玩家:查看我的背包 +AI:§e Steve §f的背包: + §b• minecraft:diamond ×10 + §b• minecraft:iron_ingot ×64 + §b• minecraft:gold_ingot ×32 + §f等共15种物品,剩余21个空槽位 +``` + +### 示例 4:菜单交互 +``` +玩家:帮我把职业升到40级 (这只是个例子 实际上插件不附带职业系统) +AI:正在为您操作职业系统... + [工具调用 #1] interact_with_menu + • 触发词: 职业 + • 操作: 开始会话 + + [工具调用 #2] interact_with_menu + • 输入: 4 (选择提升等级) + + [工具调用 #3] interact_with_menu + • 输入: 39 (提升39级) + + [工具调用 #4] interact_with_menu + • 输入: 确认 + + §a✓ §f职业等级已提升至40级! +``` + +### 示例 5:建筑操作 +``` +玩家:在我脚下建一个5x5的石头平台 +AI:[工具调用 #1] get_player_position + 获取到您的位置:(150, 65, 200) + + [工具调用 #2] fill_blocks + §a✓ §f已在 (148, 64, 198) 到 (152, 64, 202) 填充石头 +``` + +## ⚙️ 配置说明 + +### 白名单配置 +在插件配置文件中设置: + +```json +{ + "白名单": ["player1", "player2"], + "完全权限白名单": ["admin"], + "一级权限白名单": ["builder1", "builder2"], + "危险命令列表": ["op ", "deop ", "stop"] +} +``` + +### AI 配置 +```json +{ + "AI配置": { + "API提供商": "siliconflow", + "APIkey": "your-api-key-here", + "模型名称": "deepseek-chat", + "硅基流动模型名称": "deepseek-ai/DeepSeek-V3.2", + "最大历史长度": 3, + "最大工具调用次数": 24, + "API请求超时秒数": 90 + } +} +``` + +每次 AI 响应后会显示: +- 🔧 工具调用次数 +- 📝 Token 使用量 +- 💰 预估费用 注意 Cost仅为预期消耗费用 实际产生费用以模型提供方的每百万输入/输出token为主 + +## ⚠️ 注意事项 + +请妥善保管您的 API 密钥 +合理分配玩家权限,避免滥用 +注意 Token 消耗,合理设置最大工具调用次数 +如果 AI 执行了错误操作,立即使用取消命令 + +## 🐛 常见问题 + +### Q: AI 没有响应? +A: 检查以下几点: +- 查看面板输出返回的错误码 询问AI解决问题 +- 是否有权限使用 AI 助手 +- API 密钥是否正确配置 +- 网络连接是否正常 +- 是否有其他请求正在处理中 + +### Q: 工具调用失败? +A: 可能原因: +- 权限不足 +- 参数格式错误 +- 目标玩家不在线 + +### Q: 如何查看操作日志? +A: 关键工具调用会自动记录在插件数据目录的日志文件中,文件名格式为 `tool_calls_YYYY-MM-DD.json` + +### Q: 对话历史如何管理? +A: +- 对话历史自动保存,支持上下文理解 +- 使用 `清除对话` 命令可以清空历史 +- 历史长度由配置文件中的 `最大历史长度` 控制 + +## 广告 + +-定制插件可联系3340903371 (*^_^*) + +--- diff --git a/MCAgent/__init__.py b/MCAgent/__init__.py new file mode 100644 index 00000000..28946296 --- /dev/null +++ b/MCAgent/__init__.py @@ -0,0 +1,59 @@ +from tooldelta import Plugin, plugin_entry, cfg +from .config import CONFIG_DEFAULT, CONFIG_STD +from .core import Core +from .agent import AIAgent +from .permission import PermissionManager +from .tool_logger import ToolLogger +from . import utils +# Author 3340903371 定制插件dd + +class MCAgent(Plugin): + name = "MCAgent" + author = "果_k" + version = (1, 0, 0) + + def __init__(self, frame): + """Initialize MCAgent plugin.""" + super().__init__(frame) + self.players = self.game_ctrl.players + self.ListenPreload(self.on_def) + config, _ = cfg.get_plugin_config_and_version( + self.name, CONFIG_STD, CONFIG_DEFAULT, self.version + ) + self.config = config + self.whitelist = config["白名单"] + self.full_permission_whitelist = config.get("完全权限白名单", []) + self.level1_permission_whitelist = config.get("一级权限白名单", []) + self.dangerous_commands = config.get("危险命令列表", ["/op", "/deop", "op ", "deop ", "stop"]) + self.Info = config["Info"] + self.ui_texts = config["UI文本"] + self.agent_config = config["AI配置"] + self.utils = utils.Utils(self) + self.permission_manager = PermissionManager(self) + self.tool_logger = ToolLogger(self) + self.core = Core(self) + self.agent = AIAgent(self) + + def on_def(self): + """Register chatbar menu triggers.""" + self.chatbar = self.GetPluginAPI("聊天栏菜单") + + always_registered = [ + ([ "助手", "ai"], ..., "MC Agent(支持工具调用)", self.core.AIAssistant), + (["清除对话", "clear"], [], "清除AI对话历史", self.core.ClearChat), + (["退出助手", "取消", "cancel"], [], "取消当前AI请求", self.core.CancelAI), + ] + + for trigger in always_registered: + self.chatbar.add_new_trigger(*trigger) + + def get_core(self): + return self.core + + def get_agent(self): + return self.agent + + def get_utils(self): + return utils + +entry = plugin_entry(MCAgent, "GetMCAgentAPI", (1, 0, 0)) diff --git a/MCAgent/agent.py b/MCAgent/agent.py new file mode 100644 index 00000000..45cf6ac1 --- /dev/null +++ b/MCAgent/agent.py @@ -0,0 +1,902 @@ +import json +import time +from typing import TYPE_CHECKING, Optional, Dict, Any +import requests +from tooldelta import Player, fmts, utils +from .utils import Utils +from .mctools import MinecraftAITool +from .permission import PermissionLevel +if TYPE_CHECKING: + from . import MCAgent + +class AIAgent: + API_PROVIDERS = { + "deepseek": { + "base_url": "https://api.deepseek.com", + "chat_endpoint": "/v1/chat/completions", + "format": "openai" + }, + "siliconflow": { + "base_url": "https://api.siliconflow.cn", + "chat_endpoint": "/v1/chat/completions", + "format": "openai" + } + } + + def __init__(self, plugin: "MCAgent"): + self.plugin = plugin + self.conversations = {} + self.active_requests = set() + self.cancel_requests = set() # 存储需要取消的请求 + self.mc_tools = MinecraftAITool(plugin) + + def get_api_config(self, provider: str = "deepseek") -> Dict[str, str]: + """Get API configuration for specified provider""" + provider = provider.lower() + if provider not in self.API_PROVIDERS: + fmts.print_wrn(f"未知的API提供商: {provider},使用默认的deepseek") + provider = "deepseek" + return self.API_PROVIDERS[provider] + + def extract_content_from_response(self, response_data: Dict[str, Any]) -> Optional[str]: + """Extract content from API response""" + try: + if ( + isinstance(response_data, dict) + and "choices" in response_data + and len(response_data["choices"]) > 0 + and "message" in response_data["choices"][0] + and "content" in response_data["choices"][0]["message"] + ): + return response_data["choices"][0]["message"]["content"] + return None + except (TypeError, KeyError, IndexError) as e: + fmts.print_err(f"从API响应中提取内容时出错: {e}") + return None + + def extract_content_from_siliconflow_response(self, response_data: Dict[str, Any]) -> Optional[str]: + """Extract content from SiliconFlow API response""" + try: + if isinstance(response_data, dict) and "content" in response_data: + content = response_data["content"] + if isinstance(content, list) and len(content) > 0: + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + return block.get("text") + elif isinstance(content, str): + return content + + return self.extract_content_from_response(response_data) + except (TypeError, KeyError, IndexError) as e: + fmts.print_err(f"从硅基流动API响应中提取内容时出错: {e}") + return None + + def calculate_cost(self, total_tokens: int, cost_per_million: float = 8.0) -> float: + """Calculate API usage cost based on token count""" + return (total_tokens / 1_000_000) * cost_per_million + + def load_conversation_history(self, player_name: str, conversation_type: str = "default") -> list: + """Load conversation history from disk""" + file_path = Utils.make_data_file(self.plugin, f"agent_conversations_{conversation_type}.json") + history_data = Utils.disk_read(file_path) + + if player_name in history_data: + return history_data[player_name] + return [] + + def clean_orphaned_tool_messages(self, messages: list) -> list: + """Clean orphaned tool messages that don't have corresponding tool_calls""" + cleaned_messages = [] + + for i, msg in enumerate(messages): + # 如果是tool消息,检查前面是否有对应的assistant tool_calls + if msg.get("role") == "tool": + tool_call_id = msg.get("tool_call_id") + has_valid_tool_call = False + + # 向前查找最近的assistant消息 + for j in range(len(cleaned_messages) - 1, -1, -1): + if cleaned_messages[j].get("role") == "assistant": + # 检查是否有匹配的tool_call + for tc in cleaned_messages[j].get("tool_calls", []): + if tc.get("id") == tool_call_id: + has_valid_tool_call = True + break + break + + # 只保留有效的tool消息 + if has_valid_tool_call: + cleaned_messages.append(msg) + # else: skip orphaned tool messages + else: + cleaned_messages.append(msg) + + return cleaned_messages + + def save_conversation_history(self, player_name: str, messages: list, conversation_type: str = "default", max_history_length: int = 15) -> None: + """Save conversation history to disk""" + cleaned_messages = self.clean_orphaned_tool_messages(messages) + limited_messages = self._limit_conversation_history(cleaned_messages, max_history_length, player_name) + + file_path = Utils.make_data_file(self.plugin, f"agent_conversations_{conversation_type}.json") + history_data = Utils.disk_read(file_path) + history_data[player_name] = limited_messages + Utils.disk_write(file_path, history_data) + + def _limit_conversation_history(self, messages: list, max_history_length: int, player_name: str = "未知玩家") -> list: + """Limit conversation history to specified length""" + if not messages: + return messages + + system_msg = messages[0] if messages and messages[0].get("role") == "system" else None + non_system_messages = messages[1:] if system_msg else messages + + user_message_count = sum(1 for msg in non_system_messages if msg.get("role") == "user") + + if user_message_count <= max_history_length: + return messages + + users_to_remove = user_message_count - max_history_length + user_count = 0 + cutoff_index = 0 + + for i, msg in enumerate(non_system_messages): + if msg.get("role") == "user": + user_count += 1 + if user_count > users_to_remove: + cutoff_index = i + break + + recent_messages = non_system_messages[cutoff_index:] + + if system_msg: + limited_messages = [system_msg] + recent_messages + else: + limited_messages = recent_messages + + return limited_messages + + def get_conversation_key(self, player_name: str, conversation_type: str = "default") -> str: + """Generate conversation key for player""" + return f"{player_name}_{conversation_type}" + + def send_request( + self, + player_name: str, + message: str, + system_prompt: str, + api_key: str, + model: str = "deepseek-chat", + max_history_length: int = 15, + stream: bool = False, + conversation_type: str = "default", + api_provider: str = "deepseek" + ) -> Any: + """ + Send request to AI API + + Args: + player_name (str): 玩家名称 + message (str): 用户消息 + system_prompt (str): 系统提示词 + api_key (str): API密钥 + model (str): 模型名称 + max_history_length (int): 最大历史记录长度 + stream (bool): 是否使用流式输出 + conversation_type (str): 对话类型 + api_provider (str): API提供商 (deepseek/siliconflow) + + Returns: + dict/requests.Response/str: API响应结果或错误信息 + """ + if not api_key: + return "错误:未提供 API 密钥" + + # 获取API配置 + api_config = self.get_api_config(api_provider) + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + # 获取或初始化对话历史 + conversation_key = self.get_conversation_key(player_name, conversation_type) + if conversation_key not in self.conversations: + # 尝试从文件加载 + self.conversations[conversation_key] = self.load_conversation_history(player_name, conversation_type) + + messages = self.conversations[conversation_key] + + # 确保系统提示词在第一条 + if not messages or messages[0]["role"] != "system": + messages.insert(0, { + "role": "system", + "content": system_prompt + }) + + # 添加用户消息 + messages.append({ + "role": "user", + "content": f"{player_name}: {message}" + }) + + payload = { + "model": model, + "messages": messages, + "stream": stream + } + + try: + url = f"{api_config['base_url']}{api_config['chat_endpoint']}" + response = requests.post( + url, + headers=headers, + json=payload, + stream=stream + ) + + if response.status_code != 200: + return f"请求失败,状态码:{response.status_code},响应内容:{response.text}" + + # 如果是流式输出,直接返回响应对象 + if stream: + return response + + # 非流式输出,解析响应 + result = response.json() + assistant_response = self.extract_content_from_response(result) + + if assistant_response is None: + return "错误:无法从API响应中提取内容" + + # 添加AI回复到对话历史 + messages.append({ + "role": "assistant", + "content": assistant_response + }) + + # 限制对话历史轮数 + # 计算对话轮数:每个user消息算一轮 + system_msg = messages[0] if messages and messages[0]["role"] == "system" else None + non_system_messages = messages[1:] if system_msg else messages + + # 统计user消息数量(轮数) + user_message_count = sum(1 for msg in non_system_messages if msg.get("role") == "user") + + # 如果超过max_history_length轮,保留最近的轮次 + if user_message_count > max_history_length: + # 找到第(max_history_length+1)轮的user消息位置 + user_count = 0 + cutoff_index = len(non_system_messages) + + for i in range(len(non_system_messages) - 1, -1, -1): + if non_system_messages[i].get("role") == "user": + user_count += 1 + if user_count > max_history_length: + cutoff_index = i + 1 + break + + # 截取最近的消息 + recent_messages = non_system_messages[cutoff_index:] + + # 重新组装消息列表(保留系统提示词) + if system_msg: + messages = [system_msg] + recent_messages + else: + messages = recent_messages + + self.conversations[conversation_key] = messages + + # 保存对话历史到文件 + self.save_conversation_history(player_name, messages, conversation_type, max_history_length) + + return result + + except requests.exceptions.RequestException as e: + return f"网络请求异常:{str(e)}" + except UnicodeError as e: + return f"编码异常:{str(e)}" + except Exception as e: + return f"未知错误:{str(e)}" + + def handle_stream_response( + self, + player: Player, + response: requests.Response, + player_name: str, + conversation_type: str = "default", + display_interval: int = 100, + max_history_length: int = 15 + ) -> str: + """ + Handle streaming response from AI API + + Args: + player (Player): 玩家对象 + response (requests.Response): 流式响应对象 + player_name (str): 玩家名称 + conversation_type (str): 对话类型 + display_interval (int): 显示更新间隔(毫秒) + max_history_length (int): 最大历史长度 + + Returns: + str: 完整的响应内容 + """ + full_response = "" + last_update_time = time.time() + + try: + for line in response.iter_lines(): + if line: + decoded_line = line.decode('utf-8') + if decoded_line.startswith("data: "): + data = decoded_line[6:] # 移除 "data: " 前缀 + + if data == "[DONE]": + break + + try: + chunk_data = json.loads(data) + if 'choices' in chunk_data and len(chunk_data['choices']) > 0: + delta = chunk_data['choices'][0].get('delta', {}) + content = delta.get('content', '') + + if content: + full_response += content + + # 控制更新频率 + current_time = time.time() + if (current_time - last_update_time) * 1000 >= display_interval: + # 使用ActionBar显示流式输出 + player.setActionbar(f"§a: §f{full_response}") + last_update_time = current_time + except json.JSONDecodeError: + pass + + # 最后一次更新 + if full_response: + player.setActionbar(f"§a: §f{full_response}") + + # 保存完整响应到对话历史 + conversation_key = self.get_conversation_key(player_name, conversation_type) + if conversation_key in self.conversations: + messages = self.conversations[conversation_key] + messages.append({ + "role": "assistant", + "content": full_response + }) + self.save_conversation_history(player_name, messages, conversation_type, max_history_length) + + return full_response + + except Exception as e: + fmts.print_err(f"流式传输过程中发生错误: {str(e)}") + return full_response + + def clear_conversation_history(self, player_name: str, conversation_type: str = "default") -> bool: + """Clear conversation history for player""" + try: + conversation_key = self.get_conversation_key(player_name, conversation_type) + + if conversation_key in self.conversations: + del self.conversations[conversation_key] + + file_path = Utils.make_data_file(self.plugin, f"agent_conversations_{conversation_type}.json") + history_data = Utils.disk_read(file_path) + + if player_name in history_data: + del history_data[player_name] + Utils.disk_write(file_path, history_data) + + return True + except Exception as e: + fmts.print_err(f"清除对话历史时出错: {str(e)}") + return False + + def get_conversation_history(self, player_name: str, conversation_type: str = "default") -> list: + """Get conversation history for player""" + conversation_key = self.get_conversation_key(player_name, conversation_type) + + if conversation_key in self.conversations: + return self.conversations[conversation_key] + + return self.load_conversation_history(player_name, conversation_type) + + @utils.thread_func("AI_Agent_Chat") + def chat( + self, + player: Player, + message: str, + system_prompt: str = "你是一个友好的AI助手", + api_key: str = "", + model: str = "deepseek-chat", + max_history_length: int = 15, + stream: bool = False, + conversation_type: str = "default", + api_provider: str = "deepseek" + ) -> bool: + """ + Handle player chat request (thread-safe) + + Args: + player (Player): 玩家对象 + message (str): 用户消息 + system_prompt (str): 系统提示词 + api_key (str): API密钥 + model (str): 模型名称 + max_history_length (int): 最大历史记录长度 + stream (bool): 是否使用流式输出 + conversation_type (str): 对话类型 + api_provider (str): API提供商 (deepseek/siliconflow) + + Returns: + bool: 执行结果 + """ + ui = self.plugin.ui_texts['基础对话'] + + if not message: + player.show(ui['输入为空']) + return False + + player.show(ui['处理中框']) + player.show(ui['处理中标题']) + player.show(ui['处理中框底']) + player.show(f"§f{message}") + + # 发送请求 + result = self.send_request( + player.name, + message, + system_prompt, + api_key, + model, + max_history_length, + stream, + conversation_type, + api_provider + ) + + # 处理响应 + if isinstance(result, str): + # 错误消息 + player.show(ui['错误框']) + player.show(ui['错误标题']) + player.show(ui['错误框底']) + player.show(f"§c{result}") + return False + elif isinstance(result, requests.Response): + # 流式响应 + full_response = self.handle_stream_response(player, result, player.name, conversation_type, 100, max_history_length) + player.show(ui['响应框']) + player.show(ui['响应标题']) + player.show(ui['响应框底']) + player.show(f"§f{full_response}") + fmts.print_inf(f"Player {player.name} request: {message}, response: {full_response}") + return True + elif isinstance(result, dict): + # 非流式响应 + content = self.extract_content_from_response(result) + + if content is None: + player.show(ui['提取内容失败']) + return False + + # 显示回复和token消耗 + player.show(ui['响应框']) + player.show(ui['响应标题']) + player.show(ui['响应框底']) + player.show(f"§f{content}") + + if 'usage' in result and 'total_tokens' in result['usage']: + total_tokens = result['usage']['total_tokens'] + cost = self.calculate_cost(total_tokens) + player.show(ui['统计分隔线']) + player.show(ui['统计信息'].format(tokens=total_tokens, cost=f"{cost:.6f}")) + player.show(ui['统计分隔线']) + fmts.print_inf(f"Player {player.name} request: {message}, response: {content}, tokens: {total_tokens}") + else: + fmts.print_inf(f"Player {player.name} request: {message}, response: {content}") + + return True + else: + player.show(ui['响应格式无效']) + return False + + + def cancel_request(self, player_name: str) -> bool: + """ + Cancel player's AI request + + Args: + player_name (str): 玩家名称 + + Returns: + bool: 是否成功标记为取消 + """ + if player_name in self.active_requests: + self.cancel_requests.add(player_name) + return True + return False + + def send_request_with_tools( + self, + player: Player, + message: str, + system_prompt: str, + api_key: str, + model: str = "deepseek-chat", + max_history_length: int = 15, + conversation_type: str = "default", + max_tool_calls: int = 10, + api_provider: str = "deepseek", + timeout: int = 90 + ) -> Dict[str, Any]: + """ + Send request with tool calling support to AI API + + Args: + player (Player): 玩家对象 + message (str): 用户消息 + system_prompt (str): 系统提示词 + api_key (str): API密钥 + model (str): 模型名称 + max_history_length (int): 最大历史记录长度 + conversation_type (str): 对话类型 + max_tool_calls (int): 最大工具调用次数 + api_provider (str): API提供商 (deepseek/siliconflow) + timeout (int): API请求超时时间(秒) + + Returns: + Dict[str, Any]: 包含响应和执行信息的字典 + """ + if not api_key: + return {"success": False, "error": "错误:未提供 API 密钥"} + + # 检查是否被取消 + if player.name in self.cancel_requests: + self.cancel_requests.remove(player.name) + return {"success": False, "error": "请求已被用户取消", "cancelled": True} + + # 获取API配置 + api_config = self.get_api_config(api_provider) + + headers = { + "Content-Type": "application/json", + "Authorization": f"Bearer {api_key}" + } + + # 获取或初始化对话历史 + conversation_key = self.get_conversation_key(player.name, conversation_type) + if conversation_key not in self.conversations: + self.conversations[conversation_key] = self.load_conversation_history(player.name, conversation_type) + + messages = self.conversations[conversation_key] + + # 确保系统提示词在第一条 + if not messages or messages[0]["role"] != "system": + messages.insert(0, { + "role": "system", + "content": system_prompt + }) + + # 添加用户消息 + messages.append({ + "role": "user", + "content": f"{player.name}: {message}" + }) + + # 获取工具定义(根据玩家权限过滤) + tools = self.mc_tools.get_tools_schema(player) + + tool_call_count = 0 + total_tokens = 0 + tool_execution_logs = [] + + try: + # 循环处理工具调用 + while tool_call_count < max_tool_calls: + # 检查是否被取消 + if player.name in self.cancel_requests: + self.cancel_requests.remove(player.name) + return { + "success": False, + "error": "请求已被用户取消", + "cancelled": True, + "total_tokens": total_tokens, + "tool_calls_count": tool_call_count, + "tool_execution_logs": tool_execution_logs + } + + payload = { + "model": model, + "messages": messages, + "tools": tools, + "tool_choice": "auto" + } + + ui = self.plugin.ui_texts['AI助手'] + player.show(ui['思考中'].format(current=tool_call_count + 1, max=max_tool_calls)) + player.show("§7§o提示: 输入 §e取消 §7可以中止AI处理") + + url = f"{api_config['base_url']}{api_config['chat_endpoint']}" + response = requests.post( + url, + headers=headers, + json=payload, + timeout=timeout # 使用配置的超时时间 + ) + + if response.status_code != 200: + return { + "success": False, + "error": f"请求失败,状态码:{response.status_code},响应内容:{response.text}" + } + + result = response.json() + + # 累计token消耗 + if 'usage' in result and 'total_tokens' in result['usage']: + total_tokens += result['usage']['total_tokens'] + + # 获取AI响应 + if 'choices' not in result or len(result['choices']) == 0: + return {"success": False, "error": "API响应格式错误"} + + choice = result['choices'][0] + assistant_message = choice['message'] + + # 添加助手消息到历史 + messages.append(assistant_message) + + # 检查是否有工具调用 + if assistant_message.get('tool_calls'): + tool_call_count += 1 + + # 处理每个工具调用 + for tool_call in assistant_message['tool_calls']: + tool_name = tool_call['function']['name'] + tool_args = json.loads(tool_call['function']['arguments']) + tool_id = tool_call['id'] + + # 检查玩家权限等级 + player_permission = self.plugin.permission_manager.get_player_permission_level(player) + is_full_permission = (player_permission == PermissionLevel.FULL) + + # 记录工具调用 - 使用配置的UI文本 + log_msg = ui['工具调用框顶'].format(count=tool_call_count) + player.show(log_msg) + tool_execution_logs.append(log_msg) + + tool_info = ui['工具名称'].format(tool=tool_name) + player.show(tool_info) + tool_execution_logs.append(tool_info) + + # 显示参数信息(仅完全权限用户) + args_info = ui['工具参数'].format(args=json.dumps(tool_args, ensure_ascii=False)) + if is_full_permission: + player.show(args_info) + tool_execution_logs.append(args_info) + + # 执行工具 + tool_result = self.mc_tools.execute_tool(tool_name, tool_args, player) + + # 显示结果信息 + if tool_result.get('success'): + result_msg = ui['工具结果成功'].format(message=tool_result.get('message', 'Success')) + else: + result_msg = ui['工具结果失败'].format(error=tool_result.get('error', 'Unknown error')) + + player.show(result_msg) + tool_execution_logs.append(result_msg) + + footer = ui['工具调用框底'] + player.show(footer) + tool_execution_logs.append(footer) + + # 添加工具结果到消息历史 + messages.append({ + "role": "tool", + "tool_call_id": tool_id, + "content": json.dumps(tool_result, ensure_ascii=False) + }) + + # 继续循环,让AI处理工具结果 + continue + + # 没有工具调用,获取最终回复 + final_content = assistant_message.get('content', '') + + if not final_content: + return {"success": False, "error": "AI未返回有效内容"} + + # 检测并处理格式化指令 + formatted_content = self._process_format_instruction(final_content, player) + + # 限制对话历史(使用max_history_length参数) + # 直接使用_limit_conversation_history方法来统一处理 + messages = self._limit_conversation_history(messages, max_history_length, player.name) + self.conversations[conversation_key] = messages + + # 保存对话历史 + self.save_conversation_history(player.name, messages, conversation_type, max_history_length) + + return { + "success": True, + "content": formatted_content, # 返回处理后的内容 + "total_tokens": total_tokens, + "tool_calls_count": tool_call_count, + "tool_execution_logs": tool_execution_logs + } + + # 达到最大调用次数 + return { + "success": False, + "error": f"已达到最大工具调用次数限制 ({max_tool_calls})", + "total_tokens": total_tokens, + "tool_calls_count": tool_call_count, + "tool_execution_logs": tool_execution_logs + } + + except requests.exceptions.RequestException as e: + return {"success": False, "error": f"网络请求异常:{str(e)}"} + except json.JSONDecodeError as e: + return {"success": False, "error": f"JSON解析错误:{str(e)}"} + except Exception as e: + return {"success": False, "error": f"未知错误:{str(e)}"} + + def _process_format_instruction(self, content: str, player: Player) -> str: + """AI now directly uses Minecraft color codes, no additional processing needed. + + Args: + content: The content string from AI + player: The player object (unused but kept for interface compatibility) + + Returns: + The unmodified content string + """ + return content + + @utils.thread_func("AI_Agent_Chat_With_Tools") + def chat_with_tools( + self, + player: Player, + message: str, + system_prompt: str = """你是Minecraft基岩版AI助手,帮助玩家执行游戏操作。 + +【核心规则】 +1. 操作玩家前必须先调用get_online_players获取在线列表 +2. 支持模糊匹配玩家名(中英文、大小写不敏感) +3. 找不到玩家时列出在线玩家供选择 + +【ID格式】 +- 物品/方块:minecraft:item_name 或 item_name +- 坐标:x y z(如:100 64 200) + +【回复格式】 +使用Minecraft颜色代码直接格式化文本: +- §a = 绿色(成功) +- §c = 红色(错误) +- §6 = 金色(警告) +- §b = 蓝色(信息) +- §e = 黄色(高亮) +- §f = 白色(普通) + +示例: +- 传送成功:§a✓ §f已将玩家§e Steve §f传送到§b (100, 64, 200) +- 给予物品:§a✓ §f已给予§e Steve §f物品§b minecraft:diamond ×10 +- 查询背包:§e Steve §f的背包:§b 钻石×10 铁锭×64 §f等共15种物品 + +保持简洁""", + api_key: str = "", + model: str = "deepseek-chat", + max_history_length: int = 15, + conversation_type: str = "default", + max_tool_calls: int = 10, + api_provider: str = "deepseek" + ) -> bool: + """ + Handle player chat request with tool calling support (thread-safe) + + Args: + player (Player): 玩家对象 + message (str): 用户消息 + system_prompt (str): 系统提示词 + api_key (str): API密钥 + model (str): 模型名称 + max_history_length (int): 最大历史记录长度 + conversation_type (str): 对话类型 + max_tool_calls (int): 最大工具调用次数 + api_provider (str): API提供商 (deepseek/siliconflow) + + Returns: + bool: 执行结果 + """ + ui = self.plugin.ui_texts['AI助手'] + + if not message: + player.show(ui['输入为空']) + return False + + # 检查是否有正在进行的请求 + if player.name in self.active_requests: + player.show("§c╔═══════════════════════╗") + player.show("§c║ §4§l请求被拒绝 §c║") + player.show("§c╚═══════════════════════╝") + player.show("§c您有一个AI请求正在处理中") + player.show("§7请等待当前请求完成后再试") + return False + + # 添加请求锁定 + self.active_requests.add(player.name) + + try: + player.show(ui['处理中框']) + player.show(ui['处理中标题']) + player.show(ui['处理中框底']) + player.show(f"§f{message}") + player.show(ui['分隔线']) + player.show(ui['工具说明']) + player.show(ui['分隔线']) + + # 发送请求 + result = self.send_request_with_tools( + player, + message, + system_prompt, + api_key, + model, + max_history_length, + conversation_type, + max_tool_calls, + api_provider, + self.plugin.agent_config.get('API请求超时秒数', 90) # 从配置读取超时时间 + ) + + # 处理响应 并检查取消 + if not result.get('success'): + if result.get('cancelled'): + player.show("§6╔═══════════════════════╗") + player.show("§6║ §e§l操作已取消 §6║") + player.show("§6╚═══════════════════════╝") + player.show("§e✓ AI请求已成功取消") + return False + + player.show(ui['错误框']) + player.show(ui['错误标题']) + player.show(ui['错误框底']) + player.show(f"§c{result.get('error', 'Unknown error')}") + + return False + + content = result.get('content', '') + + if content: + player.show(ui['响应框']) + player.show(ui['响应标题']) + player.show(ui['响应框底']) + player.show(content) + + total_tokens = result.get('total_tokens', 0) + tool_calls_count = result.get('tool_calls_count', 0) + cost = self.calculate_cost(total_tokens) + + player.show("") + player.show(ui['统计标题']) + player.show(ui['工具调用次数'].format(count=tool_calls_count)) + player.show(ui['Token使用'].format(tokens=total_tokens)) + player.show(ui['费用'].format(cost=f"{cost:.6f}")) + + fmts.print_inf( + f"玩家 {player.name} 请求: {message}, " + f"工具调用: {tool_calls_count}次, " + f"Token: {total_tokens}, " + f"回复: {content}" + ) + + return True + + finally: + # 无论成功还是失败,都要释放请求锁定 + if player.name in self.active_requests: + self.active_requests.remove(player.name) diff --git a/MCAgent/command_block_tool.py b/MCAgent/command_block_tool.py new file mode 100644 index 00000000..415b51aa --- /dev/null +++ b/MCAgent/command_block_tool.py @@ -0,0 +1,194 @@ +import time +from typing import TYPE_CHECKING, Dict, Any, Tuple +if TYPE_CHECKING: + from . import MCAgent + +class CommandBlockTool: + def __init__(self, plugin: "MCAgent"): + self.plugin = plugin + self.game_ctrl = plugin.game_ctrl + + @staticmethod + def make_packet_command_block_update( + position: Tuple[int, int, int], + command: str, + mode: int = 0, + need_redstone: bool = False, + tick_delay: int = 0, + conditional: bool = False, + name: str = "", + should_track_output: bool = True, + execute_on_first_tick: bool = True, + ) -> Dict[str, Any]: + return { + "Block": True, + "Position": list(position), + "Mode": mode, + "NeedsRedstone": need_redstone, + "Conditional": conditional, + "MinecartEntityRuntimeID": 0, + "Command": command, + "LastOutput": "", + "Name": name, + "ShouldTrackOutput": should_track_output, + "TickDelay": tick_delay, + "ExecuteOnFirstTick": execute_on_first_tick, + } + + def _place_command_block_internal( + self, + command_block_update_packet: Dict[str, Any], + facing: int = 0, + limit_seconds: float = 0.0, + limit_seconds2: float = 0.0, + in_dim: str = "overworld", + ): + mode_prefix = ["", "repeating_", "chain_"][command_block_update_packet["Mode"]] + position = command_block_update_packet["Position"] + cmd = f"/setblock {position[0]} {position[1]} {position[2]} {mode_prefix}command_block {facing}" + + tp_cmd = f"/execute at @s in {in_dim} run tp {position[0]} {position[1]} {position[2]}" + + if limit_seconds > 0: + self.game_ctrl.sendcmd(tp_cmd) + self.game_ctrl.sendcmd(cmd) + time.sleep(limit_seconds) + else: + tp_result = self.game_ctrl.sendwscmd_with_resp(tp_cmd) + if tp_result.SuccessCount == 0: + raise ValueError("无法tp至对应坐标") + + setblock_result = self.game_ctrl.sendwscmd_with_resp(cmd) + if setblock_result.SuccessCount == 0 and "noChange" not in setblock_result.OutputMessages[0].Message: + raise ValueError(f"无法放置命令方块: {setblock_result.OutputMessages[0].Message}") + + time.sleep(limit_seconds2) + self.game_ctrl.sendPacket(78, command_block_update_packet) + + def place_command_block( + self, + x: int, + y: int, + z: int, + command: str, + mode: int = 0, + facing: int = 0, + need_redstone: bool = False, + tick_delay: int = 0, + conditional: bool = False, + name: str = "", + should_track_output: bool = True, + execute_on_first_tick: bool = True, + in_dim: str = "overworld" + ) -> Dict[str, Any]: + try: + packet = self.make_packet_command_block_update( + position=(x, y, z), + command=command, + mode=mode, + need_redstone=need_redstone, + tick_delay=tick_delay, + conditional=conditional, + name=name, + should_track_output=should_track_output, + execute_on_first_tick=execute_on_first_tick + ) + + self._place_command_block_internal( + command_block_update_packet=packet, + facing=facing, + limit_seconds=0.0, + limit_seconds2=0.0, + in_dim=in_dim + ) + + mode_names = {0: "脉冲", 1: "循环", 2: "连锁"} + mode_name = mode_names.get(mode, "未知") + + return { + "success": True, + "message": f"已在 ({x}, {y}, {z}) 放置{mode_name}命令方块", + "position": {"x": x, "y": y, "z": z}, + "command": command, + "mode": mode_name, + "facing": facing, + "dimension": in_dim + } + + except ValueError as e: + return { + "success": False, + "error": f"放置命令方块失败: {str(e)}" + } + except Exception as e: + return { + "success": False, + "error": f"放置命令方块异常: {str(e)}" + } + + def place_impulse_command_block( + self, + x: int, + y: int, + z: int, + command: str, + facing: int = 0, + need_redstone: bool = False, + name: str = "", + in_dim: str = "overworld" + ) -> Dict[str, Any]: + return self.place_command_block( + x=x, y=y, z=z, + command=command, + mode=0, + facing=facing, + need_redstone=need_redstone, + name=name, + in_dim=in_dim + ) + + def place_repeating_command_block( + self, + x: int, + y: int, + z: int, + command: str, + facing: int = 0, + need_redstone: bool = False, + tick_delay: int = 0, + name: str = "", + in_dim: str = "overworld" + ) -> Dict[str, Any]: + return self.place_command_block( + x=x, y=y, z=z, + command=command, + mode=1, + facing=facing, + need_redstone=need_redstone, + tick_delay=tick_delay, + name=name, + in_dim=in_dim + ) + + def place_chain_command_block( + self, + x: int, + y: int, + z: int, + command: str, + facing: int = 0, + conditional: bool = False, + tick_delay: int = 0, + name: str = "", + in_dim: str = "overworld" + ) -> Dict[str, Any]: + return self.place_command_block( + x=x, y=y, z=z, + command=command, + mode=2, + facing=facing, + conditional=conditional, + tick_delay=tick_delay, + name=name, + in_dim=in_dim + ) diff --git a/MCAgent/config.py b/MCAgent/config.py new file mode 100644 index 00000000..6503ad3b --- /dev/null +++ b/MCAgent/config.py @@ -0,0 +1,130 @@ +from tooldelta import cfg + +CONFIG_DEFAULT = { + "白名单": [ + "player1", + "player2", + "MagiCow" + ], + "完全权限白名单": [ + "player1" + ], + "一级权限白名单": [ + "player1", + "player2" + ], + "危险命令列表": [ + "op ", + "deop ", + "stop" + ], + "Info": { + "无权限提示": "§c§l您没有权限使用此功能", + }, + "UI文本": { + "AI助手": { + "输入为空": "§c§l✗ Input is empty", + "消息不能为空": "§c✗ Message cannot be empty", + "输入提示": "§e§lEnter your request: ", + "参数格式错误": "§c✗ Invalid argument format: {error}", + "输入错误": "§c✗ Invalid input: {error}", + + "处理中框": "§d╔═══════════════════════╗", + "处理中标题": "§d║ §6§lProcessing Request §d║", + "处理中框底": "§d╚═══════════════════════╝", + "分隔线": "§8━━━━━━━━━━━━━━━━━━━━━━━", + "工具说明": "§7§oAI can call game tools to complete your request...", + + "思考中": "§d┃ §fAI Thinking... §7(Call: {current}/{max})", + + "工具调用框顶": "§b┌─ §6[Tool Call #{count}] §b─┐", + "工具名称": "§b│ §fTool: §e{tool}", + "工具参数": "§b│ §fArgs: §7{args}", + "工具结果成功": "§b│ §fResult: §a✓ {message}", + "工具结果失败": "§b│ §fResult: §c✗ {error}", + "工具调用框底": "§b└─────────────────┘", + + "错误框": "§c╔═══════════════════════╗", + "错误标题": "§c║ §4§lError Occurred §c║", + "错误框底": "§c╚═══════════════════════╝", + + "响应框": "§d╔═══════════════════════╗", + "响应标题": "§d║ §b§lAI Agent Response §d║", + "响应框底": "§d╚═══════════════════════╝", + + "统计标题": "§7§l Statistics", + "工具调用次数": "§7• §fTools Called: §e{count}", + "Token使用": "§7• §fTokens Used: §e{tokens}", + "费用": "§7• §fCost: §e¥{cost}" + # 注意 Cost仅为预期消耗费用 实际产生费用以模型提供方的每百万输入/输出token为主 + }, + + "清除对话": { + "成功框": "§a╔═══════════════════════╗", + "成功标题": "§a║ §2§lSuccess §a║", + "成功框底": "§a╚═══════════════════════╝", + "成功消息": "§a✓ Chat history cleared", + + "失败框": "§c╔═══════════════════════╗", + "失败标题": "§c║ §4§lFailed §c║", + "失败框底": "§c╚═══════════════════════╝", + "失败消息": "§c✗ Failed to clear chat history" + }, + + "取消AI": { + "成功框": "§a╔═══════════════════════╗", + "成功标题": "§a║ §2§l请求已取消 §a║", + "成功框底": "§a╚═══════════════════════╝", + "成功消息": "§a✓ 您的AI请求已被标记为取消", + "提示": "§7AI将在当前操作完成后停止", + + "无请求框": "§c╔═══════════════════════╗", + "无请求标题": "§c║ §4§l无活动请求 §c║", + "无请求框底": "§c╚═══════════════════════╝", + "无请求消息": "§c✗ 您当前没有正在进行的AI请求" + } + }, + "AI配置": { + "API提供商": "siliconflow", + "APIkey": "Api key", + "工具系统提示词": """你是Minecraft基岩版AI助手,帮助玩家执行游戏操作。 + +1. 操作玩家相关操作前必须先调用get_online_players获取在线列表 +2. 支持模糊匹配玩家名(中英文、大小写不敏感) +3. 找不到玩家时列出在线玩家供选择 +【ID格式】 +- 物品/方块:minecraft:item_name 或 item_name +- 坐标:x y z(如:100 64 200) +【工作流程】 +涉及玩家操作 → get_online_players → 匹配名称 → 执行工具 → 格式化回复 +【回复格式】 +使用Minecraft颜色代码直接格式化文本: +- §a = 绿色(成功) +- §c = 红色(错误) +- §6 = 金色(警告) +- §b = 蓝色(数据信息) +- §e = 黄色(高亮) +- §f = 白色(普通) +示例: +- 传送成功:§a✓ §f已将玩家§e Steve §f传送到§b (100, 64, 200) +- 给予物品:§a✓ §f已给予§e Steve §f物品§b minecraft:diamond ×10 +- 查询背包:§e Steve §f的背包:§b 钻石×10 铁锭×64 §f等共15种物品 +保持语言简洁 逻辑清晰 +""", + "模型名称": "deepseek-chat", + "硅基流动模型名称": "deepseek-ai/DeepSeek-V3.2", + "最大历史长度": 3, + "最大工具调用次数": 24, + "API请求超时秒数": 90 + }, +} + +CONFIG_STD = { + "白名单": cfg.JsonList((int, str)), + "完全权限白名单": cfg.JsonList((int, str)), + "一级权限白名单": cfg.JsonList((int, str)), + "危险命令列表": cfg.JsonList(str), + "Info": dict, + "UI文本": dict, + "AI配置": dict, +} diff --git a/MCAgent/core.py b/MCAgent/core.py new file mode 100644 index 00000000..15d1f504 --- /dev/null +++ b/MCAgent/core.py @@ -0,0 +1,185 @@ +from typing import TYPE_CHECKING +from tooldelta import Player +from tooldelta.internal.launch_cli import FrameNeOmgAccessPoint +if TYPE_CHECKING: + from . import MCAgent + +class Core: + def __init__(self, plugin: "MCAgent"): + self.plugin = plugin + self.game_ctrl = plugin.game_ctrl + self.players = plugin.players + self.neomega = self.get_neomega() + + def get_neomega(self): + if isinstance(self.plugin.frame.launcher, FrameNeOmgAccessPoint): + return self.plugin.frame.launcher.omega + else: + raise ValueError("此启动框架无法使用 NeOmega API") + + def ClearChat(self, player: Player, args: tuple): + from .permission import PermissionLevel + player_permission = self.plugin.permission_manager.get_player_permission_level(player) + + if player_permission.value < PermissionLevel.CREATIVE.value: + player.show(self.plugin.Info['无权限提示']) + return True + + self.clear_chat_history(player, "tools") + return True + + def CancelAI(self, player: Player, args: tuple): + """取消当前正在进行的AI请求""" + from .permission import PermissionLevel + player_permission = self.plugin.permission_manager.get_player_permission_level(player) + + if player_permission.value < PermissionLevel.CREATIVE.value: + player.show(self.plugin.Info['无权限提示']) + return True + + # 尝试取消请求 + if self.plugin.agent.cancel_request(player.name): + player.show("§a╔═══════════════════════╗") + player.show("§a║ §2§l请求已取消 §a║") + player.show("§a╚═══════════════════════╝") + player.show("§a✓ 您的AI请求已被标记为取消") + player.show("§7AI将在当前操作完成后停止") + else: + player.show("§c╔═══════════════════════╗") + player.show("§c║ §4§l无活动请求 §c║") + player.show("§c╚═══════════════════════╝") + player.show("§c✗ 您当前没有正在进行的AI请求") + + return True + + def clear_chat_history(self, player: Player, conversation_type: str = "tools"): + ui = self.plugin.ui_texts['清除对话'] + + success = self.plugin.agent.clear_conversation_history(player.name, conversation_type) + if success: + player.show(ui['成功框']) + player.show(ui['成功标题']) + player.show(ui['成功框底']) + player.show(ui['成功消息']) + else: + player.show(ui['失败框']) + player.show(ui['失败标题']) + player.show(ui['失败框底']) + player.show(ui['失败消息']) + + + def AIAssistant(self, player: Player, args: tuple): + ui = self.plugin.ui_texts['AI助手'] + + from .permission import PermissionLevel + player_permission = self.plugin.permission_manager.get_player_permission_level(player) + + if player_permission.value < PermissionLevel.CREATIVE.value: + player.show(self.plugin.Info['无权限提示']) + return True + + if args: + try: + if isinstance(args[0], list): + message = ' '.join(args[0]) + else: + message = ' '.join(args) + except Exception as e: + player.show(ui['参数格式错误'].format(error=str(e))) + return True + else: + try: + player.show("§7§l提示: 输入消息时请勿使用其他菜单命令") + player.show("§7如需退出,请输入 §e退出 §7或 §ecancel") + message = player.input(ui['输入提示'], 120) + except TimeoutError: + player.show("§c输入超时,已自动退出") + return True + except Exception as e: + player.show(ui['输入错误'].format(error=str(e))) + return True + + # 检查是否是退出命令 + if message and message.lower() in ['退出', 'exit', 'quit', 'cancel', 'q']: + player.show("§a已取消操作") + return True + + if not message: + player.show(ui['消息不能为空']) + return True + + self.chat_with_tools(player, message) + return True + + def chat_with_tools(self, player: Player, message: str): + config = self.plugin.agent_config + + api_provider = config.get('API提供商', 'deepseek').lower() + + if api_provider == 'siliconflow': + model = config.get('硅基流动模型名称', 'deepseek/deepseek-chat') + else: + model = config.get('模型名称', 'deepseek-chat') + + default_system_prompt = """你是Minecraft基岩版AI助手,帮助玩家执行游戏操作。 +】 +1. 操作玩家前必须先调用get_online_players获取在线列表 +2. 支持模糊匹配玩家名(中英文、大小写不敏感) +3. 找不到玩家时列出在线玩家供选择 +4. 操作后简洁告知结果 + +【ID格式】 +- 物品/方块:minecraft:item_name 或 item_name +- 坐标:x y z(如:100 64 200) + +【工作流程】 +涉及玩家操作 → get_online_players → 匹配名称 → 执行工具 → 格式化回复 + +【交互式菜单工具使用规则 】 +使用interact_with_menu工具与菜单系统交互,支持会话管理: + +1. 开始新会话: + interact_with_menu(player_name, trigger_word, action="start") + # 创建新会话并获取菜单选项 + +2. 读取返回信息: + - new_messages: 本次新增的消息 + - menu_options: 提取的选项列表 + - input_prompts: 输入提示 + - has_more_interaction: 是否需要继续 + - session_active: 会话是否活跃 + +3. 继续会话(提供输入): + interact_with_menu(player_name, trigger_word, user_input="4") + # action默认为"continue",在同一会话中输入 + +4. 循环交互: + 重复步骤2-3直到has_more_interaction为false + +5. 会话自动结束: + 菜单完成后会话自动清理,无需手动结束 + +示例流程(提升职业等级): +玩家:"帮我把职业升到40级" +AI: interact_with_menu("Steve", "职业", action="start") +返回: ["1. 创建职业", "2. 职业晋升", "3. 重置职业", "4. 提升等级", ...] +AI: interact_with_menu("Steve", "职业", user_input="4") # 选择提升等级 +返回: [输入提示] 请输入想要提升的等级数量: +AI: interact_with_menu("Steve", "职业", user_input="39") # 输入39级 +返回: [输入提示] 确定要进行升级吗? +AI: interact_with_menu("Steve", "职业", user_input="确认") # 确认 +完成! + +保持友好、简洁、专业""" + + self.plugin.agent.chat_with_tools( + player=player, + message=message, + system_prompt=config.get('工具系统提示词', default_system_prompt), + api_key=config['APIkey'], + model=model, + max_history_length=config['最大历史长度'], + conversation_type="tools", + max_tool_calls=config.get('最大工具调用次数', 10), + api_provider=api_provider + ) diff --git a/MCAgent/datas.json b/MCAgent/datas.json new file mode 100644 index 00000000..df7ae885 --- /dev/null +++ b/MCAgent/datas.json @@ -0,0 +1,8 @@ +{ + "author": "果_k", + "version": "1.0.0", + "plugin-type": "classic", + "description": "MCAgent插件", + "pre-plugins": {}, + "plugin-id": "MCAgent" +} diff --git a/MCAgent/mctools.py b/MCAgent/mctools.py new file mode 100644 index 00000000..ee91633f --- /dev/null +++ b/MCAgent/mctools.py @@ -0,0 +1,1074 @@ +import json +import queue +import threading +import time +from typing import TYPE_CHECKING, Dict, List, Any, Optional, Callable +from tooldelta import Player, game_utils, fmts +from .tool_schemas import get_all_tool_schemas +from .permission import PermissionManager +from .command_block_tool import CommandBlockTool +if TYPE_CHECKING: + from . import MCAgent + +class MenuSession: + def __init__(self, player: Player, trigger_word: str): + self.player = player + self.trigger_word = trigger_word + self.input_queue = queue.Queue() + self.output_queue = queue.Queue() + self.is_active = True + self.captured_messages = [] + self.original_show = player.show + self.original_input = player.input + + def mock_show(self, message): + """捕获显示的消息""" + self.captured_messages.append(str(message)) + self.original_show(message) + self.output_queue.put(("message", str(message))) + + def mock_input(self, prompt="", timeout=None): + """从队列获取输入""" + self.captured_messages.append(f"[输入提示] {prompt}") + self.output_queue.put(("input_request", prompt)) + + # 等待AI提供输入 + try: + user_input = self.input_queue.get(timeout=timeout or 300) + return user_input + except queue.Empty: + raise TimeoutError("等待AI输入超时") + + def provide_input(self, user_input: str): + """AI提供输入""" + self.input_queue.put(user_input) + + def start(self): + """启动会话 替换player的方法""" + self.player.show = self.mock_show + self.player.input = self.mock_input + + def stop(self): + """停止会话 恢复player的方法""" + self.player.show = self.original_show + self.player.input = self.original_input + self.is_active = False + + +class MinecraftAITool: + def __init__(self, plugin: "MCAgent"): + self.plugin = plugin + self.game_ctrl = plugin.game_ctrl + self.players = plugin.players + self.permission_manager = PermissionManager(plugin) + self.tool_logger = plugin.tool_logger + self.command_block_tool = CommandBlockTool(plugin) + self.tools: Dict[str, Callable] = {} + self._register_tools() + self._current_caller: Optional[Player] = None + self.menu_sessions: Dict[str, MenuSession] = {} # 存储活动的菜单会话 + + def _register_tools(self): + self.tools = { + "execute_command": self.execute_command, + "teleport_player": self.teleport_player, + "give_item": self.give_item, + "get_player_info": self.get_player_info, + "get_online_players": self.get_online_players, + "get_player_position": self.get_player_position, + "get_player_inventory": self.get_player_inventory, + "get_player_tags": self.get_player_tags, + "get_player_score": self.get_player_score, + "set_block": self.set_block, + "fill_blocks": self.fill_blocks, + "send_message": self.send_message, + "broadcast_message": self.broadcast_message, + "get_game_rule": self.get_game_rule, + "place_command_block": self.place_command_block, + "get_chatbar_menu_triggers": self.get_chatbar_menu_triggers, + "interact_with_menu": self.interact_with_menu, + } + + def get_tools_schema(self, player: Optional[Player] = None) -> List[Dict[str, Any]]: + all_schemas = get_all_tool_schemas() + + if player is None: + return all_schemas + + available_tools = self.permission_manager.get_available_tools_for_player(player) + + filtered_schemas = [ + schema for schema in all_schemas + if schema.get("function", {}).get("name") in available_tools + ] + + return filtered_schemas + + def execute_tool(self, tool_name: str, parameters: Dict[str, Any], player: Optional[Player] = None) -> Dict[str, Any]: + if tool_name not in self.tools: + return { + "success": False, + "error": f"未知的工具: {tool_name}", + "available_tools": list(self.tools.keys()) + } + + if player is not None: + has_permission, error_msg = self.permission_manager.check_tool_permission(player, tool_name) + if not has_permission: + return { + "success": False, + "error": error_msg, + "tool_name": tool_name, + "permission_denied": True + } + + try: + if tool_name == "execute_command" and player is not None: + command = parameters.get("command", "") + is_safe, error_msg = self.permission_manager.check_command_safety(player, command) + if not is_safe: + return { + "success": False, + "error": error_msg, + "tool_name": tool_name, + "command_blocked": True + } + + self._current_caller = player + result = self.tools[tool_name](**parameters) + self.tool_logger.log_tool_call(tool_name, player, parameters, result) + + return result + except Exception as e: + return { + "success": False, + "error": f"工具执行失败: {str(e)}", + "tool_name": tool_name, + "parameters": parameters + } + finally: + self._current_caller = None + + def execute_command(self, command: str) -> Dict[str, Any]: + try: + if not command.startswith("/"): + command = "/" + command + + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0: + return { + "success": True, + "message": "命令执行成功", + "output": result.OutputMessages[0].Message if result.OutputMessages else "", + "success_count": result.SuccessCount + } + else: + return { + "success": False, + "error": result.OutputMessages[0].Message if result.OutputMessages else "命令执行失败", + "command": command + } + except Exception as e: + return { + "success": False, + "error": f"命令执行异常: {str(e)}", + "command": command + } + + def teleport_player(self, player_name: str, x: float, y: float, z: float) -> Dict[str, Any]: + try: + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + command = f"/tp {player_name} {x} {y} {z}" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0: + return { + "success": True, + "message": f"已将玩家 {player_name} 传送到 ({x}, {y}, {z})", + "player": player_name, + "position": {"x": x, "y": y, "z": z} + } + else: + return { + "success": False, + "error": "传送失败", + "details": result.OutputMessages[0].Message if result.OutputMessages else "" + } + except Exception as e: + return { + "success": False, + "error": f"传送异常: {str(e)}" + } + + def give_item(self, player_name: str, item_id: str, amount: int = 1) -> Dict[str, Any]: + try: + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + if ":" not in item_id: + item_id = f"minecraft:{item_id}" + + command = f"/give {player_name} {item_id} {amount}" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0: + return { + "success": True, + "message": f"已给予玩家 {player_name} {amount}个 {item_id}", + "player": player_name, + "item": item_id, + "amount": amount + } + else: + return { + "success": False, + "error": "给予物品失败", + "details": result.OutputMessages[0].Message if result.OutputMessages else "" + } + except Exception as e: + return { + "success": False, + "error": f"给予物品异常: {str(e)}" + } + + def get_player_info(self, player_name: str) -> Dict[str, Any]: + try: + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + # 获取玩家对象 + player = self.plugin.frame.players_maintainer.getPlayerByName(player_name) + if player is None: + return { + "success": False, + "error": f"无法获取玩家 {player_name} 的信息" + } + + try: + pos_data = game_utils.getPos(player_name, timeout=5) + position = pos_data["position"] + dimension = pos_data["dimension"] + except Exception: + position = None + dimension = None + + return { + "success": True, + "player": { + "name": player.name, + "uuid": player.uuid, + "xuid": player.xuid, + "online": player.online, + "is_op": player.is_op(), + "position": position, + "dimension": dimension + } + } + except Exception as e: + return { + "success": False, + "error": f"获取玩家信息异常: {str(e)}" + } + + def get_online_players(self) -> Dict[str, Any]: + try: + players = game_utils.get_all_player() + return { + "success": True, + "players": players, + "count": len(players) + } + except Exception as e: + return { + "success": False, + "error": f"获取在线玩家列表异常: {str(e)}" + } + + def get_player_position(self, player_name: str) -> Dict[str, Any]: + try: + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + pos_data = game_utils.getPos(player_name, timeout=5) + position = pos_data["position"] + dimension = pos_data["dimension"] + + return { + "success": True, + "player": player_name, + "position": { + "x": position["x"], + "y": position["y"], + "z": position["z"] + }, + "dimension": dimension + } + except Exception as e: + return { + "success": False, + "error": f"获取玩家位置异常: {str(e)}" + } + + def get_player_inventory(self, player_name: str) -> Dict[str, Any]: + try: + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + # 获取玩家对象 + player = self.plugin.frame.players_maintainer.getPlayerByName(player_name) + if player is None: + return { + "success": False, + "error": f"无法获取玩家 {player_name} 的信息" + } + + inventory = player.queryInventory() + items_list = [] + item_summary = {} + + for i in range(36): + slot = inventory.slots[i] if i < len(inventory.slots) else None + if slot is not None: + item_id = getattr(slot, 'id', 'unknown') + stack_size = getattr(slot, 'stackSize', 0) + + if stack_size > 0: + item_info = { + "slot": i, + "item_id": item_id, + "count": stack_size + } + items_list.append(item_info) + + if item_id in item_summary: + item_summary[item_id] += stack_size + else: + item_summary[item_id] = stack_size + + return { + "success": True, + "player": player_name, + "inventory": { + "items": items_list, + "summary": item_summary, + "total_slots": len(items_list), + "empty_slots": 36 - len(items_list) + } + } + except Exception as e: + return { + "success": False, + "error": f"获取玩家背包异常: {str(e)}" + } + + def get_player_tags(self, player_name: str) -> Dict[str, Any]: + try: + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + command = f"/tag \"{player_name}\" list" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0 and result.OutputMessages: + message = result.OutputMessages[0].Message + fmts.print_inf(f"[get_player_tags] 原始消息: {message}") + + tags = [] + message_lower = message.lower() + + if "has no tags" in message_lower or "no tags" in message_lower: + return { + "success": True, + "player": player_name, + "tags": [], + "count": 0, + "raw_message": message + } + + if hasattr(result.OutputMessages[0], 'Parameters') and result.OutputMessages[0].Parameters: + params = result.OutputMessages[0].Parameters + fmts.print_inf(f"[get_player_tags] Parameters: {params}") + + if len(params) > 1: + potential_tags = params[1:] + tags = [str(tag).strip() for tag in potential_tags if tag and str(tag).strip()] + + if not tags: + if ":" in message: + tags_part = message.split(":", 1)[1].strip() + if "," in tags_part: + tags = [tag.strip() for tag in tags_part.split(",") if tag.strip()] + elif " " in tags_part: + tags = [tag.strip() for tag in tags_part.split() if tag.strip()] + else: + if tags_part: + tags = [tags_part] + + # 尝试查找方括号内的内容 [tag1, tag2] + elif "[" in message and "]" in message: + import re + match = re.search(r'\[(.*?)\]', message) + if match: + tags_str = match.group(1) + tags = [tag.strip() for tag in tags_str.split(",") if tag.strip()] + + fmts.print_inf(f"[get_player_tags] 解析结果: {tags}") + + return { + "success": True, + "player": player_name, + "tags": tags, + "count": len(tags), + "raw_message": message + } + else: + error_msg = result.OutputMessages[0].Message if result.OutputMessages else "未知错误" + return { + "success": False, + "error": "获取玩家标签失败", + "details": error_msg, + "success_count": result.SuccessCount + } + except Exception as e: + import traceback + return { + "success": False, + "error": f"获取玩家标签异常: {str(e)}", + "traceback": traceback.format_exc() + } + + def get_player_score(self, player_name: str, objective: str) -> Dict[str, Any]: + """ + 获取玩家在指定计分板上的分数 + + Args: + player_name (str): 玩家名称 + objective (str): 计分板名称 + + Returns: + Dict[str, Any]: 玩家分数信息 + """ + try: + # 检查玩家是否在线 + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + # 执行scoreboard命令查询分数 + command = f"/scoreboard players test {player_name} {objective} * *" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0 and result.OutputMessages: + message = result.OutputMessages[0].Message + parameters = result.OutputMessages[0].Parameters if hasattr(result.OutputMessages[0], 'Parameters') else [] + + # 尝试从参数中提取分数 + score = None + if parameters and len(parameters) > 0: + try: + score = int(parameters[0]) + except (ValueError, IndexError): + # 如果参数解析失败,尝试从消息中提取 + import re + match = re.search(r'(\d+)', message) + if match: + score = int(match.group(1)) + + if score is not None: + return { + "success": True, + "player": player_name, + "objective": objective, + "score": score + } + else: + return { + "success": False, + "error": "无法解析分数值", + "details": message + } + else: + # 如果test命令失败,尝试使用list命令 + command = f"/scoreboard players list {player_name}" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0 and result.OutputMessages: + message = result.OutputMessages[0].Message + # 尝试从消息中提取指定计分板的分数 + import re + pattern = rf"{objective}:\s*(\d+)" + match = re.search(pattern, message) + if match: + score = int(match.group(1)) + return { + "success": True, + "player": player_name, + "objective": objective, + "score": score + } + + return { + "success": False, + "error": f"玩家 {player_name} 在计分板 {objective} 上没有分数或计分板不存在", + "details": result.OutputMessages[0].Message if result.OutputMessages else "" + } + except Exception as e: + return { + "success": False, + "error": f"获取玩家分数异常: {str(e)}" + } + + def set_block(self, x: int, y: int, z: int, block_id: str) -> Dict[str, Any]: + """ + 设置方块 + + Args: + x (int): X坐标 + y (int): Y坐标 + z (int): Z坐标 + block_id (str): 方块ID + + Returns: + Dict[str, Any]: 设置结果 + """ + try: + if ":" not in block_id: + block_id = f"minecraft:{block_id}" + + command = f"/setblock {x} {y} {z} {block_id}" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0: + return { + "success": True, + "message": f"已在 ({x}, {y}, {z}) 设置方块 {block_id}", + "position": {"x": x, "y": y, "z": z}, + "block": block_id + } + else: + return { + "success": False, + "error": "设置方块失败", + "details": result.OutputMessages[0].Message if result.OutputMessages else "" + } + except Exception as e: + return { + "success": False, + "error": f"设置方块异常: {str(e)}" + } + + def fill_blocks(self, x1: int, y1: int, z1: int, x2: int, y2: int, z2: int, block_id: str) -> Dict[str, Any]: + """ + 填充方块区域 + + Args: + x1, y1, z1 (int): 起始坐标 + x2, y2, z2 (int): 结束坐标 + block_id (str): 方块ID + + Returns: + Dict[str, Any]: 填充结果 + """ + try: + # 确保方块ID格式正确 + if ":" not in block_id: + block_id = f"minecraft:{block_id}" + + # 执行fill命令 + command = f"/fill {x1} {y1} {z1} {x2} {y2} {z2} {block_id}" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=10) + + if result.SuccessCount > 0: + return { + "success": True, + "message": f"已填充区域 ({x1},{y1},{z1}) 到 ({x2},{y2},{z2}) 为 {block_id}", + "start": {"x": x1, "y": y1, "z": z1}, + "end": {"x": x2, "y": y2, "z": z2}, + "block": block_id + } + else: + return { + "success": False, + "error": "填充方块失败", + "details": result.OutputMessages[0].Message if result.OutputMessages else "" + } + except Exception as e: + return { + "success": False, + "error": f"填充方块异常: {str(e)}" + } + + def send_message(self, player_name: str, message: str) -> Dict[str, Any]: + """ + 向玩家发送消息 + + Args: + player_name (str): 玩家名称 + message (str): 消息内容 + + Returns: + Dict[str, Any]: 发送结果 + """ + try: + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + self.game_ctrl.say_to(player_name, message) + + return { + "success": True, + "message": f"已向玩家 {player_name} 发送消息", + "player": player_name, + "content": message + } + except Exception as e: + return { + "success": False, + "error": f"发送消息异常: {str(e)}" + } + + def broadcast_message(self, message: str) -> Dict[str, Any]: + """ + 向所有玩家广播消息 + + Args: + message (str): 消息内容 + + Returns: + Dict[str, Any]: 广播结果 + """ + try: + command = f"/say {message}" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0: + return { + "success": True, + "message": "消息已广播", + "content": message, + "recipients": len(self.game_ctrl.allplayers) + } + else: + return { + "success": False, + "error": "广播消息失败", + "details": result.OutputMessages[0].Message if result.OutputMessages else "" + } + except Exception as e: + return { + "success": False, + "error": f"广播消息异常: {str(e)}" + } + + def get_game_rule(self, rule_name: str) -> Dict[str, Any]: + """ + 获取游戏规则 + + Args: + rule_name (str): 规则名称 + + Returns: + Dict[str, Any]: 规则值 + """ + try: + # 执行gamerule查询命令 + command = f"/gamerule {rule_name}" + result = self.game_ctrl.sendwscmd_with_resp(command, timeout=5) + + if result.SuccessCount > 0 and result.OutputMessages: + # 解析返回的规则值 + message = result.OutputMessages[0].Message + parameters = result.OutputMessages[0].Parameters if hasattr(result.OutputMessages[0], 'Parameters') else [] + + return { + "success": True, + "rule": rule_name, + "value": parameters[0] if parameters else message + } + else: + return { + "success": False, + "error": "获取游戏规则失败", + "details": result.OutputMessages[0].Message if result.OutputMessages else "" + } + except Exception as e: + return { + "success": False, + "error": f"获取游戏规则异常: {str(e)}" + } + + def get_chatbar_menu_triggers(self) -> Dict[str, Any]: + """ + 获取前置_聊天栏菜单中注册的所有触发词 + + Returns: + Dict[str, Any]: 所有触发词信息 + """ + try: + chatbar_api = self.plugin.chatbar + if chatbar_api is None: + return { + "success": False, + "error": "聊天栏菜单API未加载" + } + + triggers_info = [] + for trigger in chatbar_api.chatbar_triggers: + # 只处理StandardChatbarTriggers类型 + if hasattr(trigger, 'triggers') and hasattr(trigger, 'usage'): + trigger_data = { + "triggers": trigger.triggers, + "usage": trigger.usage, + "op_only": trigger.op_only if hasattr(trigger, 'op_only') else False, + } + + # 获取参数提示信息 + if hasattr(trigger, 'argument_hints'): + if trigger.argument_hints == ...: + trigger_data["argument_hints"] = "任意参数" + else: + args_info = [] + for hint_name, hint_type, default_val in trigger.argument_hints: + arg_info = { + "name": hint_name, + "type": hint_type.__name__ if hasattr(hint_type, '__name__') else str(hint_type), + "default": default_val + } + args_info.append(arg_info) + trigger_data["argument_hints"] = args_info + + triggers_info.append(trigger_data) + + return { + "success": True, + "triggers": triggers_info, + "count": len(triggers_info) + } + except Exception as e: + import traceback + return { + "success": False, + "error": f"获取聊天栏菜单触发词异常: {str(e)}", + "traceback": traceback.format_exc() + } + + def interact_with_menu(self, player_name: str, trigger_word: str, user_input: str = None, action: str = "continue") -> Dict[str, Any]: + """ + 交互式菜单工具 - AI可以触发菜单、读取返回信息、并自动输入选择 + + 支持会话管理,AI可以在同一个菜单会话中连续输入多次 + + Args: + player_name (str): 玩家名称 + trigger_word (str): 触发词 + user_input (str): AI提供的输入(如选项编号、参数等) + action (str): 操作类型 + - "start": 开始新会话 + - "continue": 继续当前会话(默认) + - "end": 结束会话 + + Returns: + Dict[str, Any]: 包含菜单返回的信息和状态 + """ + try: + session_key = f"{player_name}_{trigger_word}" + + # 检查玩家是否在线 + if player_name not in self.game_ctrl.allplayers: + return { + "success": False, + "error": f"玩家 {player_name} 不在线" + } + + # 获取玩家对象 + player = self.plugin.frame.players_maintainer.getPlayerByName(player_name) + if player is None: + return { + "success": False, + "error": f"无法获取玩家 {player_name} 的信息" + } + + # 处理结束会话 + if action == "end": + if session_key in self.menu_sessions: + session = self.menu_sessions[session_key] + session.stop() + del self.menu_sessions[session_key] + return { + "success": True, + "message": "会话已结束", + "session_ended": True + } + else: + return { + "success": False, + "error": "没有活动的会话" + } + + # 获取或创建会话 + if action == "start" or session_key not in self.menu_sessions: + # 清理旧会话 + if session_key in self.menu_sessions: + old_session = self.menu_sessions[session_key] + old_session.stop() + + # 获取聊天栏菜单API + chatbar_api = self.plugin.chatbar + if chatbar_api is None: + return { + "success": False, + "error": "聊天栏菜单API未加载" + } + + # 查找匹配的触发词 + matched_trigger = None + for trigger in chatbar_api.chatbar_triggers: + if hasattr(trigger, 'triggers'): + if trigger_word in trigger.triggers: + matched_trigger = trigger + break + + if matched_trigger is None: + return { + "success": False, + "error": f"未找到触发词: {trigger_word}", + "available_triggers": [ + t.triggers[0] for t in chatbar_api.chatbar_triggers + if hasattr(t, 'triggers') and t.triggers + ] + } + + # 检查权限 + if hasattr(matched_trigger, 'op_only') and matched_trigger.op_only: + if not player.is_op(): + return { + "success": False, + "error": f"触发词 {trigger_word} 需要OP权限", + "permission_denied": True + } + + # 创建新会话 + session = MenuSession(player, trigger_word) + self.menu_sessions[session_key] = session + session.start() + + # 在新线程中执行菜单 + def run_menu(): + try: + if hasattr(matched_trigger, 'execute_with_no_args'): + matched_trigger.execute_with_no_args(player, trigger_word) + elif hasattr(matched_trigger, 'func'): + matched_trigger.func(player.name, []) + except Exception as e: + session.output_queue.put(("error", str(e))) + finally: + session.output_queue.put(("completed", None)) + + menu_thread = threading.Thread(target=run_menu, daemon=True) + menu_thread.start() + + # 等待第一个输出 + time.sleep(0.5) + + # 获取当前会话 + if session_key not in self.menu_sessions: + return { + "success": False, + "error": "会话不存在,请使用 action='start' 开始新会话" + } + + session = self.menu_sessions[session_key] + + # 如果提供了输入,发送给会话 + if user_input is not None: + session.provide_input(user_input) + time.sleep(0.3) # 等待处理 + + # 收集输出 + messages = [] + input_requests = [] + has_input_request = False + + while not session.output_queue.empty(): + try: + msg_type, content = session.output_queue.get_nowait() + if msg_type == "message": + messages.append(content) + elif msg_type == "input_request": + input_requests.append(content) + has_input_request = True + elif msg_type == "completed": + session.stop() + if session_key in self.menu_sessions: + del self.menu_sessions[session_key] + elif msg_type == "error": + return { + "success": False, + "error": f"菜单执行错误: {content}" + } + except queue.Empty: + break + + # 解析菜单选项 + menu_options = [] + for msg in messages: + if any(f"{i}." in msg or f"{i}、" in msg for i in range(1, 20)): + menu_options.append(msg) + + return { + "success": True, + "trigger": trigger_word, + "user_input_provided": user_input, + "captured_messages": session.captured_messages[-20:], # 最近20条消息 + "new_messages": messages, + "menu_options": menu_options, + "input_prompts": input_requests, + "has_more_interaction": has_input_request, + "session_active": session_key in self.menu_sessions, + "note": "使用action='start'开始新会话,action='continue'继续会话,action='end'结束会话" + } + + except Exception as e: + import traceback + # 清理会话 + session_key = f"{player_name}_{trigger_word}" + if session_key in self.menu_sessions: + try: + self.menu_sessions[session_key].stop() + del self.menu_sessions[session_key] + except: + pass + + return { + "success": False, + "error": f"交互式菜单异常: {str(e)}", + "traceback": traceback.format_exc() + } + + def place_command_block( + self, + x: int, + y: int, + z: int, + command: str, + mode: int = 0, + facing: int = 0, + need_redstone: bool = False, + tick_delay: int = 0, + conditional: bool = False, + name: str = "", + in_dim: str = "overworld" + ) -> Dict[str, Any]: + """ + 放置命令方块并配置命令 + + Args: + x (int): X坐标 + y (int): Y坐标 + z (int): Z坐标 + command (str): 命令方块中的命令 + mode (int): 命令方块模式 (0: 脉冲, 1: 循环, 2: 连锁) + facing (int): 朝向 (0-5: 下/上/北/南/西/东) + need_redstone (bool): 是否需要红石激活 + tick_delay (int): 刻度延迟 + conditional (bool): 是否为条件模式 + name (str): 命令方块名称 + in_dim (str): 维度 (overworld/nether/the_end) + + Returns: + Dict[str, Any]: 放置结果 + """ + try: + # 安全检查:非完全权限玩家不能创建循环命令方块或保持激活的命令方块 + if self._current_caller is not None: + from .permission import PermissionLevel + player_permission = self.permission_manager.get_player_permission_level(self._current_caller) + + # 如果不是完全权限玩家 + if player_permission != PermissionLevel.FULL: + # 强制设置为需要红石激活 + original_need_redstone = need_redstone + need_redstone = True + + # 禁止循环模式 + if mode == 1: + return { + "success": False, + "error": "§c§l安全限制: 非完全权限玩家不能创建循环命令方块。请使用脉冲模式(mode=0)或连锁模式(mode=2)。", + "security_blocked": True + } + + # 如果原本请求的是保持激活状态,返回提示信息 + result = self.command_block_tool.place_command_block( + x=x, y=y, z=z, + command=command, + mode=mode, + facing=facing, + need_redstone=need_redstone, + tick_delay=tick_delay, + conditional=conditional, + name=name, + should_track_output=True, + execute_on_first_tick=False, # 非完全权限玩家的命令方块不在第一刻执行 + in_dim=in_dim + ) + + # 如果成功放置,添加安全提示 + if result.get("success"): + mode_names = {0: "脉冲", 1: "循环", 2: "连锁"} + mode_name = mode_names.get(mode, "未知") + + result["message"] = f"§e§l[安全模式] §f已在 ({x}, {y}, {z}) 放置{mode_name}命令方块\n§6§l注意: §f命令方块已设置为需要红石激活,请检查命令后手动激活" + result["security_mode"] = True + result["need_manual_activation"] = True + + if not original_need_redstone: + result["message"] += "\n§7(原请求为保持激活状态,已被安全系统修改为需要红石激活)" + + return result + + # 完全权限玩家或系统调用,正常执行 + return self.command_block_tool.place_command_block( + x=x, y=y, z=z, + command=command, + mode=mode, + facing=facing, + need_redstone=need_redstone, + tick_delay=tick_delay, + conditional=conditional, + name=name, + should_track_output=True, + execute_on_first_tick=True, + in_dim=in_dim + ) + except Exception as e: + return { + "success": False, + "error": f"放置命令方块异常: {str(e)}" + } diff --git a/MCAgent/permission.py b/MCAgent/permission.py new file mode 100644 index 00000000..0f562c31 --- /dev/null +++ b/MCAgent/permission.py @@ -0,0 +1,133 @@ +from enum import Enum +from typing import TYPE_CHECKING, Optional, Tuple, List +from tooldelta import Player +if TYPE_CHECKING: + from . import MCAgent + +class PermissionLevel(Enum): + """Permission level enumeration""" + NONE = 0 + CREATIVE = 1 + OP = 2 + FULL = 3 + +class PermissionManager: + """Permission manager for tool access control""" + # 若想添加新的tool 需在此处进行注册并划分权限 + TOOL_PERMISSIONS = { + "execute_command": PermissionLevel.OP, + "get_game_rule": PermissionLevel.OP, + "place_command_block": PermissionLevel.OP, + + "teleport_player": PermissionLevel.CREATIVE, + "give_item": PermissionLevel.CREATIVE, + "set_block": PermissionLevel.CREATIVE, + "fill_blocks": PermissionLevel.CREATIVE, + "broadcast_message": PermissionLevel.CREATIVE, + "get_player_info": PermissionLevel.CREATIVE, + "get_online_players": PermissionLevel.CREATIVE, + "get_player_position": PermissionLevel.CREATIVE, + "get_player_inventory": PermissionLevel.CREATIVE, + "get_player_tags": PermissionLevel.CREATIVE, + "get_player_score": PermissionLevel.CREATIVE, + "send_message": PermissionLevel.CREATIVE, + "get_chatbar_menu_triggers": PermissionLevel.CREATIVE, + "interact_with_menu": PermissionLevel.CREATIVE, + } + + def __init__(self, plugin: "MCAgent"): + self.plugin = plugin + self.full_permission_whitelist = plugin.full_permission_whitelist + self.level1_permission_whitelist = plugin.level1_permission_whitelist + self.dangerous_commands = plugin.dangerous_commands + + def get_player_permission_level(self, player: Player) -> PermissionLevel: + if player.name in self.full_permission_whitelist: + return PermissionLevel.FULL + + if player.name in self.plugin.whitelist: + return PermissionLevel.OP + + try: + if player.is_op(): + return PermissionLevel.OP + except Exception: + pass + + # 一级权限白名单和创造模式拥有相同的权限等级 + if player.name in self.level1_permission_whitelist: + return PermissionLevel.CREATIVE + + if self._is_creative_mode(player): + return PermissionLevel.CREATIVE + + return PermissionLevel.NONE + + def _is_creative_mode(self, player: Player) -> bool: + try: + command = f"/testfor @a[name={player.name},m=creative]" + result = self.plugin.game_ctrl.sendwscmd_with_resp(command, timeout=3) + if result.SuccessCount > 0: + return True + + command2 = f"/testfor @a[name={player.name},m=1]" + result2 = self.plugin.game_ctrl.sendwscmd_with_resp(command2, timeout=3) + if result2.SuccessCount > 0: + return True + + return False + except Exception: + return False + + def check_tool_permission(self, player: Player, tool_name: str) -> Tuple[bool, Optional[str]]: + player_level = self.get_player_permission_level(player) + + if player_level == PermissionLevel.FULL: + return True, None + + required_level = self.TOOL_PERMISSIONS.get(tool_name, PermissionLevel.CREATIVE) + + if player_level.value < required_level.value: + error_msg = self._get_permission_error_message(player_level, required_level, tool_name) + return False, error_msg + + return True, None + + def check_command_safety(self, player: Player, command: str) -> Tuple[bool, Optional[str]]: + player_level = self.get_player_permission_level(player) + if player_level == PermissionLevel.FULL: + return True, None + + command_lower = command.lower().strip() + for dangerous_cmd in self.dangerous_commands: + if dangerous_cmd.lower() in command_lower: + error_msg = f"§c§l拦截: 命令 '{command}' 包含危险操作 '{dangerous_cmd}',已被阻止执行" + return False, error_msg + + return True, None + + def _get_permission_error_message(self, player_level: PermissionLevel, required_level: PermissionLevel, tool_name: str) -> str: + level_names = { + PermissionLevel.NONE: "普通玩家", + PermissionLevel.CREATIVE: "创造模式", + PermissionLevel.OP: "OP权限", + PermissionLevel.FULL: "完全权限" + } + + current = level_names.get(player_level, "未知") + required = level_names.get(required_level, "未知") + + return f"§c§l权限不足: 工具 '{tool_name}' 需要 {required} 权限,您当前为 {current}" + + def get_available_tools_for_player(self, player: Player) -> List[str]: + player_level = self.get_player_permission_level(player) + + if player_level == PermissionLevel.FULL: + return list(self.TOOL_PERMISSIONS.keys()) + + available_tools = [] + for tool_name, required_level in self.TOOL_PERMISSIONS.items(): + if player_level.value >= required_level.value: + available_tools.append(tool_name) + + return available_tools diff --git a/MCAgent/tool_logger.py b/MCAgent/tool_logger.py new file mode 100644 index 00000000..a79cef0c --- /dev/null +++ b/MCAgent/tool_logger.py @@ -0,0 +1,99 @@ +import json +import os +import time +from typing import TYPE_CHECKING, Dict, Any, Optional, List +from tooldelta import Player +if TYPE_CHECKING: + from . import MCAgent + +class ToolLogger: + def __init__(self, plugin: "MCAgent"): + self.plugin = plugin + self.log_file_path = self._get_log_file_path() + self.logged_tools = [ + "execute_command", + "fill_blocks", + "place_command_block" + ] + + def _get_log_file_path(self) -> str: + self.plugin.make_data_path() + date_str = time.strftime("%Y-%m-%d", time.localtime()) + log_filename = f"tool_calls_{date_str}.json" + return os.path.join(self.plugin.data_path, log_filename) + + def _load_logs(self) -> List[Dict[str, Any]]: + if not os.path.exists(self.log_file_path): + return [] + + try: + with open(self.log_file_path, 'r', encoding='utf-8') as f: + return json.load(f) + except (json.JSONDecodeError, IOError): + return [] + + def _save_logs(self, logs: List[Dict[str, Any]]) -> None: + try: + with open(self.log_file_path, 'w', encoding='utf-8') as f: + json.dump(logs, f, ensure_ascii=False, indent=2) + except IOError as e: + print(f"保存日志失败: {e}") + + def log_tool_call( + self, + tool_name: str, + caller: Optional[Player], + parameters: Dict[str, Any], + result: Dict[str, Any] + ) -> None: + if tool_name not in self.logged_tools: + return + + timestamp = int(time.time()) + date_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp)) + + log_entry = { + "timestamp": timestamp, + "datetime": date_time, + "tool_name": tool_name, + "caller": { + "name": caller.name if caller else "System", + "uuid": caller.uuid if caller else None, + "xuid": caller.xuid if caller else None + }, + "parameters": parameters, + "result": { + "success": result.get("success", False), + "message": result.get("message", ""), + "error": result.get("error", None) + } + } + + logs = self._load_logs() + logs.append(log_entry) + self._save_logs(logs) + + def get_logs_by_tool(self, tool_name: str) -> List[Dict[str, Any]]: + logs = self._load_logs() + return [log for log in logs if log.get("tool_name") == tool_name] + + def get_logs_by_caller(self, caller_name: str) -> List[Dict[str, Any]]: + logs = self._load_logs() + return [log for log in logs if log.get("caller", {}).get("name") == caller_name] + + def get_logs_by_date(self, date_str: str) -> List[Dict[str, Any]]: + logs = self._load_logs() + return [log for log in logs if log.get("datetime", "").startswith(date_str)] + + def clear_old_logs(self, days: int = 7) -> int: + logs = self._load_logs() + current_time = int(time.time()) + cutoff_time = current_time - (days * 24 * 60 * 60) + + filtered_logs = [log for log in logs if log.get("timestamp", 0) >= cutoff_time] + removed_count = len(logs) - len(filtered_logs) + + if removed_count > 0: + self._save_logs(filtered_logs) + + return removed_count diff --git a/MCAgent/tool_schemas.py b/MCAgent/tool_schemas.py new file mode 100644 index 00000000..9f4655e6 --- /dev/null +++ b/MCAgent/tool_schemas.py @@ -0,0 +1,313 @@ +from typing import List, Dict, Any + +def get_all_tool_schemas() -> List[Dict[str, Any]]: + """Get all tool schema definitions for AI model""" + return [ + { + "type": "function", + "function": { + "name": "execute_command", + "description": "执行Minecraft命令", + "parameters": { + "type": "object", + "properties": { + "command": { + "type": "string", + "description": "要执行的Minecraft命令(不需要前缀/)" + } + }, + "required": ["command"] + } + } + }, + { + "type": "function", + "function": { + "name": "teleport_player", + "description": "传送玩家到指定坐标", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"}, + "x": {"type": "number", "description": "X坐标"}, + "y": {"type": "number", "description": "Y坐标"}, + "z": {"type": "number", "description": "Z坐标"} + }, + "required": ["player_name", "x", "y", "z"] + } + } + }, + { + "type": "function", + "function": { + "name": "give_item", + "description": "给予玩家物品", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"}, + "item_id": {"type": "string", "description": "物品ID,如minecraft:diamond"}, + "amount": {"type": "integer", "description": "物品数量", "default": 1} + }, + "required": ["player_name", "item_id"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_player_info", + "description": "获取玩家详细信息", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"} + }, + "required": ["player_name"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_online_players", + "description": "获取当前在线玩家列表", + "parameters": {"type": "object", "properties": {}} + } + }, + { + "type": "function", + "function": { + "name": "get_player_position", + "description": "获取玩家当前位置坐标", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"} + }, + "required": ["player_name"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_player_inventory", + "description": "获取玩家背包物品列表,包括物品ID、数量、槽位等信息", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"} + }, + "required": ["player_name"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_player_tags", + "description": "获取玩家的所有标签(tag)列表。标签是用于标记玩家的自定义文本,返回玩家当前拥有的所有标签。", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"} + }, + "required": ["player_name"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_player_score", + "description": "获取玩家在指定计分板上的分数", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"}, + "objective": {"type": "string", "description": "计分板名称"} + }, + "required": ["player_name", "objective"] + } + } + }, + { + "type": "function", + "function": { + "name": "set_block", + "description": "在指定坐标设置方块", + "parameters": { + "type": "object", + "properties": { + "x": {"type": "integer", "description": "X坐标"}, + "y": {"type": "integer", "description": "Y坐标"}, + "z": {"type": "integer", "description": "Z坐标"}, + "block_id": {"type": "string", "description": "方块ID,如minecraft:stone"} + }, + "required": ["x", "y", "z", "block_id"] + } + } + }, + { + "type": "function", + "function": { + "name": "fill_blocks", + "description": "填充方块区域", + "parameters": { + "type": "object", + "properties": { + "x1": {"type": "integer", "description": "起始X坐标"}, + "y1": {"type": "integer", "description": "起始Y坐标"}, + "z1": {"type": "integer", "description": "起始Z坐标"}, + "x2": {"type": "integer", "description": "结束X坐标"}, + "y2": {"type": "integer", "description": "结束Y坐标"}, + "z2": {"type": "integer", "description": "结束Z坐标"}, + "block_id": {"type": "string", "description": "方块ID"} + }, + "required": ["x1", "y1", "z1", "x2", "y2", "z2", "block_id"] + } + } + }, + { + "type": "function", + "function": { + "name": "send_message", + "description": "向指定玩家发送消息", + "parameters": { + "type": "object", + "properties": { + "player_name": {"type": "string", "description": "玩家名称"}, + "message": {"type": "string", "description": "消息内容"} + }, + "required": ["player_name", "message"] + } + } + }, + { + "type": "function", + "function": { + "name": "broadcast_message", + "description": "向所有玩家广播消息", + "parameters": { + "type": "object", + "properties": { + "message": {"type": "string", "description": "消息内容"} + }, + "required": ["message"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_game_rule", + "description": "查询游戏规则", + "parameters": { + "type": "object", + "properties": { + "rule_name": {"type": "string", "description": "规则名称"} + }, + "required": ["rule_name"] + } + } + }, + { + "type": "function", + "function": { + "name": "place_command_block", + "description": "在指定坐标放置命令方块并配置命令。命令方块可以自动执行Minecraft命令。", + "parameters": { + "type": "object", + "properties": { + "x": {"type": "integer", "description": "X坐标"}, + "y": {"type": "integer", "description": "Y坐标"}, + "z": {"type": "integer", "description": "Z坐标"}, + "command": {"type": "string", "description": "命令方块中要执行的命令(不需要前缀/)"}, + "mode": { + "type": "integer", + "description": "命令方块模式:0=脉冲(需要激活一次执行一次),1=循环(持续执行),2=连锁(前一个命令方块执行后触发)", + "enum": [0, 1, 2], + "default": 0 + }, + "facing": { + "type": "integer", + "description": "命令方块朝向:0=下,1=上,2=北,3=南,4=西,5=东", + "enum": [0, 1, 2, 3, 4, 5], + "default": 0 + }, + "need_redstone": { + "type": "boolean", + "description": "是否需要红石信号激活(true=需要红石,false=保持激活)", + "default": False + }, + "tick_delay": { + "type": "integer", + "description": "执行延迟(游戏刻,20刻=1秒)", + "default": 0 + }, + "conditional": { + "type": "boolean", + "description": "是否为条件模式(仅当前一个命令方块成功执行时才执行)", + "default": False + }, + "name": { + "type": "string", + "description": "命令方块的名称(可选)", + "default": "" + }, + "in_dim": { + "type": "string", + "description": "放置的维度", + "enum": ["overworld", "nether", "the_end"], + "default": "overworld" + } + }, + "required": ["x", "y", "z", "command"] + } + } + }, + { + "type": "function", + "function": { + "name": "get_chatbar_menu_triggers", + "description": "获取聊天栏菜单中注册的所有触发词列表,包括触发词名称、功能说明、参数提示等信息。可以用来查看服务器中有哪些可用的菜单命令。", + "parameters": { + "type": "object", + "properties": {} + } + } + }, + { + "type": "function", + "function": { + "name": "interact_with_menu", + "description": "交互式菜单工具 - 支持会话管理,AI可以在同一个菜单会话中连续输入多次。工作流程:1. action='start'开始新会话并获取菜单;2. 读取返回的menu_options;3. 提供user_input继续交互;4. 重复步骤2-3直到完成;5. action='end'结束会话(可选)。", + "parameters": { + "type": "object", + "properties": { + "player_name": { + "type": "string", + "description": "玩家名称" + }, + "trigger_word": { + "type": "string", + "description": "要触发的触发词(从get_chatbar_menu_triggers获取)" + }, + "user_input": { + "type": "string", + "description": "AI提供的输入(如选项编号'1'、'2',或其他参数)。首次调用时不提供,后续调用时提供。", + "default": None + }, + "action": { + "type": "string", + "enum": ["start", "continue", "end"], + "description": "操作类型:'start'=开始新会话,'continue'=继续当前会话(默认),'end'=结束会话", + "default": "continue" + } + }, + "required": ["player_name", "trigger_word"] + } + } + } + ] diff --git a/MCAgent/utils.py b/MCAgent/utils.py new file mode 100644 index 00000000..f6f1f3f6 --- /dev/null +++ b/MCAgent/utils.py @@ -0,0 +1,80 @@ +import json +import os +import time +import uuid +from typing import Any, Dict, Tuple, TYPE_CHECKING +from tooldelta.utils import tempjson +from tooldelta import constants +from tooldelta.constants.netease import PYRPC_OP_SEND +if TYPE_CHECKING: + from . import MCAgent + +class Utils: + def __init__(self, plugin: "MCAgent"): + self.file_path = os.path.join(os.path.dirname(__file__), "data.json") + self.plugin = plugin + + @staticmethod + def disk_read(path: str) -> Dict[Any, Any]: + data = tempjson.load_and_read( + path, need_file_exists=False, default={}, timeout=2 + ) + tempjson.unload_to_path(path) + return data + + @staticmethod + def disk_read_need_exists(path: str) -> Dict[Any, Any]: + data = tempjson.load_and_read( + path, need_file_exists=True, default={}, timeout=2 + ) + tempjson.unload_to_path(path) + return data + + @staticmethod + def disk_write(path: str, data: Dict[Any, Any]) -> None: + tempjson.load_and_write( + path, + data, + need_file_exists=False, + timeout=2, + ) + tempjson.flush(path) + tempjson.unload_to_path(path) + + @staticmethod + def now() -> Tuple[int, str]: + timestamp_now = int(time.time()) + date_now = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(timestamp_now)) + return (timestamp_now, date_now) + + @staticmethod + def make_data_file(plugin, filename: str) -> str: + plugin.make_data_path() + data_file_path = os.path.join(plugin.data_path, filename) + + if not os.path.exists(data_file_path): + with open(data_file_path, 'w', encoding='utf-8') as f: + json.dump({}, f, ensure_ascii=False, indent=4) + + return data_file_path + + def sendaicmd(self, cmd: str): + my_runtimeid = self.plugin.game_ctrl.players.getBotInfo().runtime_id + pk = { + "Value": [ + "ModEventC2S", + [ + "Minecraft", + "aiCommand", + "ExecuteCommandEvent", + { + "playerId": str(my_runtimeid), + "cmd": cmd, + "uuid": str(uuid.uuid4()), + }, + ], + None, + ], + "OperationType": PYRPC_OP_SEND, + } + self.plugin.game_ctrl.sendPacket(constants.PacketIDS.PyRpc, pk) diff --git "a/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/__init__.py" "b/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/__init__.py" index 93407459..1f972533 100644 --- "a/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/__init__.py" +++ "b/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/__init__.py" @@ -1,65 +1,106 @@ -from tooldelta import Plugin, plugin_entry, Player, Chat, FrameExit, cfg, game_utils, utils, Config, fmts, TYPE_CHECKING +from tooldelta import Plugin, plugin_entry, Player, utils, Config from tooldelta.constants import PacketIDS + class DeathBack(Plugin): name = "死亡点返回" author = "果_k" - version = (0, 0, 2) + version = (0, 0, 3) def __init__(self, frame): super().__init__(frame) CONFIG_DEFAULT = { "取消返回": "§c§l已自动取消返回", + "成功返回提示词": "§a§l已返回死亡点", "等待输入时间": 20, "是否启用标签限制": False, "标签名称": "backdeath", + "返回询问消息": "§a§l你死于 {dim} 维度的 {x} {y} {z} 是否返回?§b输入y同意 输入n拒绝" } CONFIG_STD = { "取消返回": str, + "成功返回提示词": str, "等待输入时间": Config.NNInt, "是否启用标签限制": bool, "标签名称": str, + "返回询问消息": str } - cfg, cfg_version = Config.get_plugin_config_and_version( + cfg, _ = Config.get_plugin_config_and_version( self.name, CONFIG_STD, CONFIG_DEFAULT, self.version ) self.BackInfo = cfg["取消返回"] + self.SuccessInfo = cfg["成功返回提示词"] self.TimeOut = cfg["等待输入时间"] self.TagCheck = cfg["是否启用标签限制"] - self.PlayerTag= cfg["标签名称"] - self.ListenPacket(PacketIDS.Text, self.TextDeath) + self.PlayerTag = cfg["标签名称"] + self.DeathMessage = cfg["返回询问消息"] + self.ListenPacket(PacketIDS.PyRpc, self.on_pyrpc) - def TextDeath(self,packet): - Message = packet['Message'] - if isinstance(Message, str) and "death" in Message.lower(): #筛选触发条件 - self.GetPos(packet['Parameters'][0]) + def on_pyrpc(self, packet): + """使用PyRpc监听死亡事件""" + try: + self._test_die(packet) + except: + pass + return False - def GetPos(self,playername): - players = self.frame.get_players() - player = players.getPlayerByName(playername) - if not player: + def _test_die(self, pk: dict): + values = pk["Value"] + if len(values) != 3: return - dim, x, y, z = player.getPos() - if self.TagCheck: #标签筛选玩家 - backCmd = self.game_ctrl.sendcmd(f"/testfor @a[name={player.name},tag={self.PlayerTag}]",waitForResp=True) - Check = backCmd.OutputMessages[0].Success - if Check == True: - self.AskPlayer(player,dim,x,y,z) - else: + eventType, contents = values[0:2] + if eventType != "ModEventS2C": + return + elif len(contents) != 4: + return + eventName, eventData = contents[2:4] + if eventName != "OnPlayerDie": + return + die = eventData["die"] + # 死亡时为True,重生时为False + if die: + playerUniqueID = int(eventData["pid"]) + player = self.game_ctrl.players.getPlayerByUniqueID(playerUniqueID) + if player is None: return - else: - self.AskPlayer(player,dim,x,y,z) - - def AskPlayer(self,player,dim,x,y,z): - reply = player.input(f"§a§l你死于 {dim} 维度的 {x} {y} {z} 是否返回?§b输入y同意 输入n拒绝",self.TimeOut) - if reply == "y": - player.show("返回") - if dim == 0: #不同维度的判断 - self.game_ctrl.sendwscmd(f"/execute in overworld run tp {player.name} {x} {y} {z}") - elif dim == 1: - self.game_ctrl.sendwscmd(f"/execute in nether run tp {player.name} {x} {y} {z}") - elif dim == 2: - self.game_ctrl.sendwscmd(f"/execute in the_end run tp {player.name} {x} {y} {z}") - else: - player.show(self.BackInfo) + self.on_player_die(player) + + def on_player_die(self, player: Player): + """当玩家死亡时调用""" + try: + dim, x, y, z = player.getPos() + if self.TagCheck: + backCmd = self.game_ctrl.sendcmd(f"/testfor @a[name={player.name},tag={self.PlayerTag}]", waitForResp=True) + Check = backCmd.OutputMessages[0].Success + if Check == True: + self.AskPlayer(player, dim, x, y, z) + else: + self.AskPlayer(player, dim, x, y, z) + except: + pass + + def AskPlayer(self, player, dim, x, y, z): + try: + message = utils.simple_fmt({ + "{dim}": dim, + "{x}": round(x, 2), + "{y}": round(y, 2), + "{z}": round(z, 2) + }, self.DeathMessage) + + reply = player.input(message, self.TimeOut) + if reply == "y": + player.show(self.SuccessInfo) + if dim == 0: + self.game_ctrl.sendwscmd(f"/execute in overworld run tp {player.name} {x} {y} {z}") + elif dim == 1: + self.game_ctrl.sendwscmd(f"/execute in nether run tp {player.name} {x} {y} {z}") + elif dim == 2: + self.game_ctrl.sendwscmd(f"/execute in the_end run tp {player.name} {x} {y} {z}") + else: + self.game_ctrl.sendwscmd(f"/execute in dm{dim} run tp {player.name} {x} {y} {z}") + else: + player.show(self.BackInfo) + except: + pass entry = plugin_entry(DeathBack) diff --git "a/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/datas.json" "b/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/datas.json" index 8b0f32ea..64e41cc2 100644 --- "a/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/datas.json" +++ "b/\346\255\273\344\272\241\347\202\271\350\277\224\345\233\236/datas.json" @@ -1,6 +1,6 @@ { "author": "果_k", - "version": "0.0.2", + "version": "0.0.3", "plugin-type": "classic", "description": "支持多维度的返回死亡点", "pre-plugins": {},