416 lines
17 KiB
Python
416 lines
17 KiB
Python
import time
|
||
import asyncio
|
||
import json
|
||
import requests
|
||
from typing import List, Tuple, Type, Dict, Any
|
||
from openai import AsyncOpenAI, OpenAI
|
||
from tenacity import retry, stop_after_attempt, wait_exponential # 重试机制
|
||
from src.common.logger import get_logger
|
||
from src.plugin_system import (
|
||
BasePlugin,
|
||
register_plugin,
|
||
BaseTool,
|
||
ComponentInfo,
|
||
ConfigField,
|
||
ToolParamType,
|
||
message_api
|
||
)
|
||
|
||
logger = get_logger("web_search_plugin")
|
||
|
||
class AISearchTool(BaseTool):
|
||
"""AI搜索工具 - 使用AI模型进行智能搜索和多轮对话"""
|
||
name = "ai_search"
|
||
description = "使用AI模型进行智能搜索,处理复杂查询。单独查询url链接请使用get_url_content"
|
||
parameters = [
|
||
("question", ToolParamType.STRING, "要查询的问题", True, None)
|
||
]
|
||
available_for_llm = True
|
||
_semaphore = None
|
||
|
||
async def execute(self, function_args) -> dict[str, str]:
|
||
"""执行AI搜索"""
|
||
try:
|
||
question = function_args.get("question")
|
||
search_results = await self._ai_search(question)
|
||
result = self._format_search_results(question, search_results)
|
||
return {"name": self.name, "content": result}
|
||
except Exception as e:
|
||
logger.error(f"AI搜索执行失败: {str(e)}")
|
||
return {"name": self.name, "content": f"AI搜索失败: {str(e)}"}
|
||
|
||
async def direct_execute(self, **function_args) -> str:
|
||
"""直接调用AI搜索工具"""
|
||
required_params = [p[0] for p in self.parameters if p[3]]
|
||
missing = [p for p in required_params if p not in function_args]
|
||
if missing:
|
||
raise ValueError(f"工具类 {self.__class__.__name__} 缺少必要参数: {', '.join(missing)}")
|
||
|
||
try:
|
||
question = function_args.get("question")
|
||
return await self._ai_search(question)
|
||
except Exception as e:
|
||
logger.error(f"执行AI搜索时发生异常: {e}")
|
||
return "AI搜索失败,请稍后再试"
|
||
|
||
async def _ai_search(self, question: str) -> str:
|
||
"""执行AI搜索的核心逻辑"""
|
||
# 延迟初始化信号量
|
||
if self._semaphore is None:
|
||
max_concurrency = self.get_config('search.max_concurrency', 5)
|
||
self._semaphore = asyncio.Semaphore(max_concurrency)
|
||
|
||
# 获取重试配置
|
||
retry_attempts = self.get_config('search.retry_attempts', 3)
|
||
retry_wait_min = self.get_config('search.retry_wait_min', 2.0)
|
||
retry_wait_max = self.get_config('search.retry_wait_max', 10.0)
|
||
timeout = self.get_config('search.timeout', 20.0)
|
||
|
||
# 使用重试机制
|
||
for attempt in range(1, retry_attempts + 1):
|
||
try:
|
||
async with self._semaphore:
|
||
return await self._execute_ai_search(question, timeout)
|
||
except (asyncio.TimeoutError, Exception) as e:
|
||
if attempt < retry_attempts:
|
||
wait_time = min(retry_wait_min * (2 ** (attempt - 1)), retry_wait_max)
|
||
logger.warning(f"AI搜索失败({attempt}/{retry_attempts}),等待{wait_time:.1f}秒后重试: {e}")
|
||
await asyncio.sleep(wait_time)
|
||
else:
|
||
if isinstance(e, asyncio.TimeoutError):
|
||
return "AI搜索请求超时,请稍后再试"
|
||
else:
|
||
return "AI搜索服务暂时不可用"
|
||
|
||
async def _execute_ai_search(self, question: str, timeout: float) -> str:
|
||
"""执行AI搜索的具体实现 - 保留原有的多轮对话结构"""
|
||
logger.info(f"正在执行AI搜索,搜索内容:{question}")
|
||
|
||
# 从配置获取参数
|
||
system_prompt = self.get_config("model.model_system", "")
|
||
model_name = self.get_config("model.model", "Qwen/Qwen3-8B")
|
||
base_url = self.get_config("model.base_url", "https://api.siliconflow.cn/v1")
|
||
api_key = self.get_config("model.api_key", "")
|
||
edge_api = self.get_config('search.edge_api', 'https://edge.liulikeji.cn')
|
||
|
||
# 初始化OpenAI客户端
|
||
client = OpenAI(
|
||
api_key=api_key,
|
||
base_url=base_url,
|
||
)
|
||
|
||
# 定义工具函数映射
|
||
tools_map = {
|
||
"bing_search": self._bing_search_sync,
|
||
"get_url_mkd": self._get_url_content_sync,
|
||
}
|
||
|
||
# 开始多轮对话
|
||
time_now = time.time()
|
||
logger.info(f"AI搜索开始时间:{time_now}")
|
||
|
||
messages = [
|
||
{
|
||
"content": system_prompt + "\n\n当前时间: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
|
||
"role": "system",
|
||
},
|
||
{
|
||
"content": question,
|
||
"role": "user",
|
||
}
|
||
]
|
||
|
||
logger.debug("-" * 60)
|
||
|
||
# 模型的第一轮调用
|
||
i = 1
|
||
first_response = self._get_response(client, messages, model_name, timeout)
|
||
delta = time.time() - time_now
|
||
minutes = int(delta // 60)
|
||
seconds = int(delta % 60)
|
||
d_time = f"{minutes:02d}:{seconds:02d}"
|
||
|
||
logger.info(f"距离开始耗时:{d_time}")
|
||
logger.debug(f"第{i}轮大模型输出信息:{first_response}")
|
||
|
||
assistant_output = first_response.choices[0].message
|
||
if assistant_output.content is None:
|
||
assistant_output.content = ""
|
||
|
||
# 如果不需要调用工具,则直接返回最终答案
|
||
if assistant_output.tool_calls is None:
|
||
logger.info(f"无需调用工具,直接回复:{assistant_output.content}")
|
||
return assistant_output.content
|
||
|
||
# 如果需要调用工具,则进行模型的多轮调用,直到模型判断无需调用工具
|
||
while assistant_output.tool_calls is not None:
|
||
# 初始化工具调用信息
|
||
tool_info = {
|
||
"content": "",
|
||
"role": "tool",
|
||
"tool_call_id": assistant_output.tool_calls[0].id,
|
||
}
|
||
|
||
# 获取工具名称和参数
|
||
tool_name = assistant_output.tool_calls[0].function.name
|
||
arguments = assistant_output.tool_calls[0].function.arguments
|
||
|
||
# 直接调用对应的工具函数
|
||
if tool_name in tools_map:
|
||
try:
|
||
args = json.loads(arguments)
|
||
tool_info["content"] = tools_map[tool_name](args, edge_api)
|
||
except Exception as e:
|
||
logger.error(f"工具调用失败: {tool_name} - {str(e)}")
|
||
tool_info["content"] = f"工具调用失败: {str(e)}"
|
||
|
||
# 输出工具结果
|
||
tool_output = tool_info["content"]
|
||
logger.debug(f"工具输出信息字数:{len(tool_output)}")
|
||
logger.debug("-" * 60)
|
||
|
||
# 将工具响应加入消息列表,并继续获取下一个响应
|
||
messages.append(tool_info)
|
||
|
||
assistant_output = self._get_response(client, messages, model_name, timeout).choices[0].message
|
||
if assistant_output.content is None:
|
||
assistant_output.content = ""
|
||
|
||
messages.append(assistant_output)
|
||
i += 1
|
||
|
||
delta = time.time() - time_now
|
||
minutes = int(delta // 60)
|
||
seconds = int(delta % 60)
|
||
d_time = f"{minutes:02d}:{seconds:02d}"
|
||
|
||
logger.info(f"距离开始耗时:{d_time}")
|
||
logger.debug(f"第{i}轮大模型输出信息:{assistant_output}")
|
||
|
||
logger.info(f"最终答案:{assistant_output.content}")
|
||
return assistant_output.content
|
||
|
||
def _get_response(self, client, messages, model_name, timeout):
|
||
"""获取模型响应"""
|
||
try:
|
||
completion = client.chat.completions.create(
|
||
model=model_name,
|
||
messages=messages,
|
||
tools=self._get_tools(),
|
||
extra_body={"enable_thinking": False},
|
||
timeout=timeout
|
||
)
|
||
return completion
|
||
except Exception as e:
|
||
logger.error(f"获取模型响应失败: {str(e)}")
|
||
raise
|
||
|
||
def _get_tools(self):
|
||
"""定义工具列表"""
|
||
return [
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "get_url_mkd",
|
||
"description": "获取指定url的网页内容,并转换为markdown格式,便于阅读",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {"url": {"type": "string", "description": "网页链接"}},
|
||
},
|
||
},
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "bing_search",
|
||
"description": "调用bing进行网络搜索,只能提供基础链接",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {"query": {"type": "string", "description": "搜索内容"}},
|
||
},
|
||
},
|
||
},
|
||
]
|
||
|
||
def _bing_search_sync(self, arguments, edge_api):
|
||
"""同步执行Bing搜索"""
|
||
query = arguments.get("query")
|
||
if not query:
|
||
return "错误:查询参数不能为空"
|
||
|
||
try:
|
||
response = requests.post(
|
||
f'{edge_api}/bing_search',
|
||
json={"url": f"https://www.bing.com/search?q={query}"},
|
||
timeout=30
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
return result.get("result", "未找到相关结果")
|
||
except Exception as e:
|
||
logger.error(f"Bing搜索失败: {str(e)}")
|
||
return f"搜索失败: {str(e)}"
|
||
|
||
def _get_url_content_sync(self, arguments, edge_api):
|
||
"""同步获取URL内容"""
|
||
url = arguments.get("url")
|
||
if not url:
|
||
return "错误:URL参数不能为空"
|
||
|
||
try:
|
||
response = requests.post(
|
||
f'{edge_api}/extract_content',
|
||
json={"url": url},
|
||
timeout=30
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
return result.get("result", "未能提取到内容")
|
||
except Exception as e:
|
||
logger.error(f"URL内容提取失败: {str(e)}")
|
||
return f"内容提取失败: {str(e)}"
|
||
|
||
def _format_search_results(self, question: str, result: str) -> str:
|
||
"""格式化AI搜索结果"""
|
||
return f"🤖 AI搜索 '{question}' 的结果:\n\n{result}"
|
||
|
||
|
||
class BingSearchTool(BaseTool):
|
||
"""Bing搜索工具 - 用于执行网络搜索"""
|
||
name = "bing_search"
|
||
description = "调用Bing搜索引擎,只能提供参考链接并没有实际答案"
|
||
parameters = [
|
||
("query", ToolParamType.STRING, "搜索内容", True, None)
|
||
]
|
||
available_for_llm = True
|
||
|
||
async def execute(self, function_args) -> dict[str, str]:
|
||
"""执行Bing搜索"""
|
||
try:
|
||
query = function_args.get("query")
|
||
edge_api = self.get_config('search.edge_api', 'https://edge.liulikeji.cn')
|
||
response = requests.post(
|
||
f'{edge_api}/bing_search',
|
||
json={"url": f"https://www.bing.com/search?q={query}"},
|
||
timeout=30
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
return {"name": self.name, "content": f"🔍 bing_search '{query}' 的结果:"+result.get("result", "未能提取到内容")}
|
||
except Exception as e:
|
||
logger.error(f"URL内容提取失败: {str(e)}")
|
||
return {"name": self.name, "content": f"🔍 bing_search '{query}' 的结果: error: {str(e)}"}
|
||
|
||
|
||
class URLContentTool(BaseTool):
|
||
"""URL内容提取工具 - 用于获取网页内容并转换为Markdown"""
|
||
name = "get_url_content"
|
||
description = "获取指定URL链接的网页内容,明确知道url链接时可以直接使用, 避免询问ai_search单独url"
|
||
parameters = [
|
||
("url", ToolParamType.STRING, "网页链接", True, None)
|
||
]
|
||
available_for极llm = True
|
||
|
||
async def execute(self, function_args) -> dict[str, str]:
|
||
"""执行URL内容提取"""
|
||
try:
|
||
url = function_args.get("url")
|
||
edge_api = self.get_config('search.edge_api', 'https://edge.liulikeji.cn')
|
||
response = requests.post(
|
||
f'{edge_api}/extract_content',
|
||
json={"url": url},
|
||
timeout=30
|
||
)
|
||
response.raise_for_status()
|
||
result = response.json()
|
||
return {"name": self.name, "content": f"🔍 get_url_content '{url}' 的结果:"+result.get("result", "未能提取到内容")}
|
||
except Exception as e:
|
||
logger.error(f"URL内容提取失败: {str(e)}")
|
||
return {"name": self.name, "content": f"🔍 get_url_content '{url}' 的结果: error: {str(e)}"}
|
||
|
||
|
||
@register_plugin
|
||
class WebSearchPlugin(BasePlugin):
|
||
"""web_search_plugin插件 - 提供多种搜索功能的插件"""
|
||
|
||
# 插件基本信息
|
||
plugin_name: str = "web_search_plugin"
|
||
enable_plugin: bool = True
|
||
dependencies: List[str] = []
|
||
python_dependencies: List[str] = ["requests", "openai"]
|
||
config_file_name: str = "config.toml"
|
||
|
||
# 配置节描述
|
||
config_section_descriptions = {
|
||
"plugin": "插件基本信息",
|
||
"model": "大模型设置",
|
||
"search": "搜索设置",
|
||
}
|
||
|
||
# 配置Schema定义
|
||
config_schema: dict = {
|
||
"plugin": {
|
||
"name": ConfigField(
|
||
type=str, default="web_search_plugin", description="插件名称"
|
||
),
|
||
"version": ConfigField(type=str, default="1.1.0", description="插件版本"),
|
||
"config_version": ConfigField(type=str, default="1.2.极0", description="配置文件版本"),
|
||
"enabled": ConfigField(
|
||
type=bool, default=False, description="是否启用插件"
|
||
),
|
||
},
|
||
"model": {
|
||
"base_url": ConfigField(
|
||
type=str,
|
||
default="https://api.siliconflow.cn/v1",
|
||
description="模型API基础URL",
|
||
),
|
||
"api_key": ConfigField(
|
||
type=str, default="", description="你的apikey"
|
||
),
|
||
"model": ConfigField(type=str, default="Qwen/Qwen3-8B", description="使用的模型名称"),
|
||
"model_system": ConfigField(
|
||
type=str, default="你是一个搜索模型助手,能够调用工具获取当前时间和网页内容。如果你需要获取最新的信息或数据,请直接使用你的工具箱中的功能。使用中文url时直接输出中文例如:(https://www.bing.com/search?q=你好),禁止使用编码过的url。如果用户未提供合适url请自行查找,如果你访问极的url出错请换一个。你可以进行多次搜索来获取更准确的信息,直到你确信你有足够的信息来回答我的问题为止。如果一次查询没用得到最终答案,你可以继续调用工具进行多次查询,直到答案足够准确为止。用户不会回复你的消息,你要做的是尽可能找出准确答案,不要询问用户任何问题。你最多使用4次工具调用来获取信息,之后你必须给出答案。",
|
||
description="模型系统提示词"
|
||
)
|
||
},
|
||
"search": {
|
||
"edge_api": ConfigField(
|
||
type=str,
|
||
default="https://edge.liulikeji.cn",
|
||
description="edge API的URL",
|
||
),
|
||
"timeout": ConfigField(
|
||
type=float,
|
||
default=20.0,
|
||
description="API调用超时时间(秒)",
|
||
),
|
||
"max_concurrency": ConfigField(
|
||
type=int,
|
||
default=5,
|
||
description="最大并发搜索请求数",
|
||
),
|
||
"retry_attempts": ConfigField(
|
||
type=int,
|
||
default=3,
|
||
description="搜索失败时的重试次数",
|
||
),
|
||
"retry_wait_min": ConfigField(
|
||
type=float,
|
||
default=2.0,
|
||
description="重试最小等待时间(秒)",
|
||
),
|
||
"retry_wait_max": ConfigField(
|
||
type=float,
|
||
default=10.0,
|
||
description="重试最大等待时间(秒)",
|
||
),
|
||
}
|
||
}
|
||
|
||
def get_plugin_components(self) -> List[Tuple[ComponentInfo, Type]]:
|
||
return [
|
||
(BingSearchTool.get_tool_info(), BingSearchTool),
|
||
(URLContentTool.get_tool_info(), URLContentTool),
|
||
(AISearchTool.get_tool_info(), AISearchTool),
|
||
] |