import time import asyncio import json import requests from typing import List, Tuple, Type, Dict, Any from openai import AsyncOpenAI, OpenAI from tenacity import retry, stop_after_attempt, wait_exponential # 重试机制 from src.common.logger import get_logger from src.plugin_system import ( BasePlugin, register_plugin, BaseTool, ComponentInfo, ConfigField, ToolParamType, message_api ) logger = get_logger("web_search_plugin") class AISearchTool(BaseTool): """AI搜索工具 - 使用AI模型进行智能搜索和多轮对话""" name = "ai_search" description = "使用AI模型进行智能搜索,处理复杂查询。单独查询url链接请使用get_url_content" parameters = [ ("question", ToolParamType.STRING, "要查询的问题", True, None) ] available_for_llm = True _semaphore = None async def execute(self, function_args) -> dict[str, str]: """执行AI搜索""" try: question = function_args.get("question") search_results = await self._ai_search(question) result = self._format_search_results(question, search_results) return {"name": self.name, "content": result} except Exception as e: logger.error(f"AI搜索执行失败: {str(e)}") return {"name": self.name, "content": f"AI搜索失败: {str(e)}"} async def direct_execute(self, **function_args) -> str: """直接调用AI搜索工具""" required_params = [p[0] for p in self.parameters if p[3]] missing = [p for p in required_params if p not in function_args] if missing: raise ValueError(f"工具类 {self.__class__.__name__} 缺少必要参数: {', '.join(missing)}") try: question = function_args.get("question") return await self._ai_search(question) except Exception as e: logger.error(f"执行AI搜索时发生异常: {e}") return "AI搜索失败,请稍后再试" async def _ai_search(self, question: str) -> str: """执行AI搜索的核心逻辑""" # 延迟初始化信号量 if self._semaphore is None: max_concurrency = self.get_config('search.max_concurrency', 5) self._semaphore = asyncio.Semaphore(max_concurrency) # 获取重试配置 retry_attempts = self.get_config('search.retry_attempts', 3) retry_wait_min = self.get_config('search.retry_wait_min', 2.0) retry_wait_max = self.get_config('search.retry_wait_max', 10.0) timeout = self.get_config('search.timeout', 20.0) # 使用重试机制 for attempt in range(1, retry_attempts + 1): try: async with self._semaphore: return await self._execute_ai_search(question, timeout) except (asyncio.TimeoutError, Exception) as e: if attempt < retry_attempts: wait_time = min(retry_wait_min * (2 ** (attempt - 1)), retry_wait_max) logger.warning(f"AI搜索失败({attempt}/{retry_attempts}),等待{wait_time:.1f}秒后重试: {e}") await asyncio.sleep(wait_time) else: if isinstance(e, asyncio.TimeoutError): return "AI搜索请求超时,请稍后再试" else: return "AI搜索服务暂时不可用" async def _execute_ai_search(self, question: str, timeout: float) -> str: """执行AI搜索的具体实现 - 保留原有的多轮对话结构""" logger.info(f"正在执行AI搜索,搜索内容:{question}") # 从配置获取参数 system_prompt = self.get_config("model.model_system", "") model_name = self.get_config("model.model", "Qwen/Qwen3-8B") base_url = self.get_config("model.base_url", "https://api.siliconflow.cn/v1") api_key = self.get_config("model.api_key", "") edge_api = self.get_config('search.edge_api', 'https://edge.liulikeji.cn') # 初始化OpenAI客户端 client = OpenAI( api_key=api_key, base_url=base_url, ) # 定义工具函数映射 tools_map = { "bing_search": self._bing_search_sync, "get_url_mkd": self._get_url_content_sync, } # 开始多轮对话 time_now = time.time() logger.info(f"AI搜索开始时间:{time_now}") messages = [ { "content": system_prompt + "\n\n当前时间: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), "role": "system", }, { "content": question, "role": "user", } ] logger.debug("-" * 60) # 模型的第一轮调用 i = 1 first_response = self._get_response(client, messages, model_name, timeout) delta = time.time() - time_now minutes = int(delta // 60) seconds = int(delta % 60) d_time = f"{minutes:02d}:{seconds:02d}" logger.info(f"距离开始耗时:{d_time}") logger.debug(f"第{i}轮大模型输出信息:{first_response}") assistant_output = first_response.choices[0].message if assistant_output.content is None: assistant_output.content = "" # 如果不需要调用工具,则直接返回最终答案 if assistant_output.tool_calls is None: logger.info(f"无需调用工具,直接回复:{assistant_output.content}") return assistant_output.content # 如果需要调用工具,则进行模型的多轮调用,直到模型判断无需调用工具 while assistant_output.tool_calls is not None: # 初始化工具调用信息 tool_info = { "content": "", "role": "tool", "tool_call_id": assistant_output.tool_calls[0].id, } # 获取工具名称和参数 tool_name = assistant_output.tool_calls[0].function.name arguments = assistant_output.tool_calls[0].function.arguments # 直接调用对应的工具函数 if tool_name in tools_map: try: args = json.loads(arguments) tool_info["content"] = tools_map[tool_name](args, edge_api) except Exception as e: logger.error(f"工具调用失败: {tool_name} - {str(e)}") tool_info["content"] = f"工具调用失败: {str(e)}" # 输出工具结果 tool_output = tool_info["content"] logger.debug(f"工具输出信息字数:{len(tool_output)}") logger.debug("-" * 60) # 将工具响应加入消息列表,并继续获取下一个响应 messages.append(tool_info) assistant_output = self._get_response(client, messages, model_name, timeout).choices[0].message if assistant_output.content is None: assistant_output.content = "" messages.append(assistant_output) i += 1 delta = time.time() - time_now minutes = int(delta // 60) seconds = int(delta % 60) d_time = f"{minutes:02d}:{seconds:02d}" logger.info(f"距离开始耗时:{d_time}") logger.debug(f"第{i}轮大模型输出信息:{assistant_output}") logger.info(f"最终答案:{assistant_output.content}") return assistant_output.content def _get_response(self, client, messages, model_name, timeout): """获取模型响应""" try: completion = client.chat.completions.create( model=model_name, messages=messages, tools=self._get_tools(), extra_body={"enable_thinking": False}, timeout=timeout ) return completion except Exception as e: logger.error(f"获取模型响应失败: {str(e)}") raise def _get_tools(self): """定义工具列表""" return [ { "type": "function", "function": { "name": "get_url_mkd", "description": "获取指定url的网页内容,并转换为markdown格式,便于阅读", "parameters": { "type": "object", "properties": {"url": {"type": "string", "description": "网页链接"}}, }, }, }, { "type": "function", "function": { "name": "bing_search", "description": "调用bing进行网络搜索,只能提供基础链接", "parameters": { "type": "object", "properties": {"query": {"type": "string", "description": "搜索内容"}}, }, }, }, ] def _bing_search_sync(self, arguments, edge_api): """同步执行Bing搜索""" query = arguments.get("query") if not query: return "错误:查询参数不能为空" try: response = requests.post( f'{edge_api}/bing_search', json={"url": f"https://www.bing.com/search?q={query}"}, timeout=30 ) response.raise_for_status() result = response.json() return result.get("result", "未找到相关结果") except Exception as e: logger.error(f"Bing搜索失败: {str(e)}") return f"搜索失败: {str(e)}" def _get_url_content_sync(self, arguments, edge_api): """同步获取URL内容""" url = arguments.get("url") if not url: return "错误:URL参数不能为空" try: response = requests.post( f'{edge_api}/extract_content', json={"url": url}, timeout=30 ) response.raise_for_status() result = response.json() return result.get("result", "未能提取到内容") except Exception as e: logger.error(f"URL内容提取失败: {str(e)}") return f"内容提取失败: {str(e)}" def _format_search_results(self, question: str, result: str) -> str: """格式化AI搜索结果""" return f"🤖 AI搜索 '{question}' 的结果:\n\n{result}" class BingSearchTool(BaseTool): """Bing搜索工具 - 用于执行网络搜索""" name = "bing_search" description = "调用Bing搜索引擎,只能提供参考链接并没有实际答案" parameters = [ ("query", ToolParamType.STRING, "搜索内容", True, None) ] available_for_llm = True async def execute(self, function_args) -> dict[str, str]: """执行Bing搜索""" try: query = function_args.get("query") edge_api = self.get_config('search.edge_api', 'https://edge.liulikeji.cn') response = requests.post( f'{edge_api}/bing_search', json={"url": f"https://www.bing.com/search?q={query}"}, timeout=30 ) response.raise_for_status() result = response.json() return {"name": self.name, "content": f"🔍 bing_search '{query}' 的结果:"+result.get("result", "未能提取到内容")} except Exception as e: logger.error(f"URL内容提取失败: {str(e)}") return {"name": self.name, "content": f"🔍 bing_search '{query}' 的结果: error: {str(e)}"} class URLContentTool(BaseTool): """URL内容提取工具 - 用于获取网页内容并转换为Markdown""" name = "get_url_content" description = "获取指定URL链接的网页内容,明确知道url链接时可以直接使用, 避免询问ai_search单独url" parameters = [ ("url", ToolParamType.STRING, "网页链接", True, None) ] available_for极llm = True async def execute(self, function_args) -> dict[str, str]: """执行URL内容提取""" try: url = function_args.get("url") edge_api = self.get_config('search.edge_api', 'https://edge.liulikeji.cn') response = requests.post( f'{edge_api}/extract_content', json={"url": url}, timeout=30 ) response.raise_for_status() result = response.json() return {"name": self.name, "content": f"🔍 get_url_content '{url}' 的结果:"+result.get("result", "未能提取到内容")} except Exception as e: logger.error(f"URL内容提取失败: {str(e)}") return {"name": self.name, "content": f"🔍 get_url_content '{url}' 的结果: error: {str(e)}"} @register_plugin class WebSearchPlugin(BasePlugin): """web_search_plugin插件 - 提供多种搜索功能的插件""" # 插件基本信息 plugin_name: str = "web_search_plugin" enable_plugin: bool = True dependencies: List[str] = [] python_dependencies: List[str] = ["requests", "openai"] config_file_name: str = "config.toml" # 配置节描述 config_section_descriptions = { "plugin": "插件基本信息", "model": "大模型设置", "search": "搜索设置", } # 配置Schema定义 config_schema: dict = { "plugin": { "name": ConfigField( type=str, default="web_search_plugin", description="插件名称" ), "version": ConfigField(type=str, default="1.1.0", description="插件版本"), "config_version": ConfigField(type=str, default="1.2.极0", description="配置文件版本"), "enabled": ConfigField( type=bool, default=False, description="是否启用插件" ), }, "model": { "base_url": ConfigField( type=str, default="https://api.siliconflow.cn/v1", description="模型API基础URL", ), "api_key": ConfigField( type=str, default="", description="你的apikey" ), "model": ConfigField(type=str, default="Qwen/Qwen3-8B", description="使用的模型名称"), "model_system": ConfigField( type=str, default="你是一个搜索模型助手,能够调用工具获取当前时间和网页内容。如果你需要获取最新的信息或数据,请直接使用你的工具箱中的功能。使用中文url时直接输出中文例如:(https://www.bing.com/search?q=你好),禁止使用编码过的url。如果用户未提供合适url请自行查找,如果你访问极的url出错请换一个。你可以进行多次搜索来获取更准确的信息,直到你确信你有足够的信息来回答我的问题为止。如果一次查询没用得到最终答案,你可以继续调用工具进行多次查询,直到答案足够准确为止。用户不会回复你的消息,你要做的是尽可能找出准确答案,不要询问用户任何问题。你最多使用4次工具调用来获取信息,之后你必须给出答案。", description="模型系统提示词" ) }, "search": { "edge_api": ConfigField( type=str, default="https://edge.liulikeji.cn", description="edge API的URL", ), "timeout": ConfigField( type=float, default=20.0, description="API调用超时时间(秒)", ), "max_concurrency": ConfigField( type=int, default=5, description="最大并发搜索请求数", ), "retry_attempts": ConfigField( type=int, default=3, description="搜索失败时的重试次数", ), "retry_wait_min": ConfigField( type=float, default=2.0, description="重试最小等待时间(秒)", ), "retry_wait_max": ConfigField( type=float, default=10.0, description="重试最大等待时间(秒)", ), } } def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]: return [ (BingSearchTool.get_tool_info(), BingSearchTool), (URLContentTool.get_tool_info(), URLContentTool), (AISearchTool.get_tool_info(), AISearchTool), ]