上传文件至 /

This commit is contained in:
2025-08-27 01:19:59 +08:00
commit dbbd2b3b13

299
1.py Normal file
View File

@@ -0,0 +1,299 @@
from flask import Flask, request, jsonify
from concurrent.futures import ThreadPoolExecutor
from selenium import webdriver
from selenium.webdriver.edge.options import Options
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import html2text
from bs4 import BeautifulSoup
import logging
import time
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("web_extractor.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("WebExtractor")
app = Flask(__name__)
# 创建线程池执行器
executor = ThreadPoolExecutor(max_workers=4)
def setup_driver_options():
"""设置浏览器选项的通用函数"""
options = Options()
# 反检测配置
options.add_argument("--disable-blink-features=AutomationControlled")
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
# 设置用户代理和语言
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0"
options.add_argument(f"user-agent={user_agent}")
options.add_argument("--lang=zh-CN")
#设置缩放
options.add_argument("--force-device-scale-factor=0.6")
return options
def get_bing_search_results(url):
"""提取必应搜索结果的函数"""
logger.info(f"开始获取必应搜索结果: {url}")
driver = None
try:
options = setup_driver_options()
driver = webdriver.Edge(options=options)
# 隐藏WebDriver特征
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = {runtime: {}, browser: {getVersion: () => '122.0.0.0'}};
navigator.mediaDevices = {getDevices: () => Promise.resolve([])};
"""
})
# 访问目标URL
driver.get(url)
logger.info("页面加载完成,等待搜索结果")
time.sleep(1)
# 等待搜索结果加载
wait = WebDriverWait(driver, 10)
results_element = wait.until(
EC.presence_of_element_located((By.ID, "b_results"))
)
# 获取结果HTML
results_html = results_element.get_attribute("outerHTML")
# 使用BeautifulSoup清理不需要的部分
soup = BeautifulSoup(results_html, 'html.parser')
# 移除不需要的元素
elements_to_remove = [
('div', {'id': 'brsv3'}),
('div', {'id': 'inline_rs'}),
('div', {'id': 'serpvidans'}),
('li', {'class': 'b_msg b_canvas'}),
('li', {'class': 'b_pag'}),
('ol', {'id': 'b_context'}),
]
for tag, attrs in elements_to_remove:
element = soup.find(tag, attrs)
if element:
element.decompose()
logger.debug(f"已移除元素: {tag} {attrs}")
with open("output.html", "w", encoding="utf-8") as md_file:
md_file.write(str(soup))
# 转换为Markdown
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.body_width = 0
markdown_content = h.handle(str(soup))
logger.info("成功提取必应搜索结果")
return markdown_content
except Exception as e:
logger.error(f"获取必应搜索结果时出错: {str(e)}")
return None
finally:
if driver:
driver.quit()
logger.debug("浏览器已关闭")
def get_full_page_content(url, element_selector=None, remove_selectors=None):
"""提取任何网页完整内容的函数"""
logger.info(f"开始获取网页内容: {url}, 选择器: {element_selector}")
driver = None
try:
options = setup_driver_options()
driver = webdriver.Edge(options=options)
# 隐藏WebDriver特征
driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", {
"source": """
Object.defineProperty(navigator, 'webdriver', {get: () => undefined});
window.navigator.chrome = {runtime: {}, browser: {getVersion: () => '122.0.0.0'}};
navigator.mediaDevices = {getDevices: () => Promise.resolve([])};
"""
})
# 访问目标URL
driver.get(url)
logger.info("页面加载完成,开始提取内容")
# 等待页面完全加载
WebDriverWait(driver, 10).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
# 获取指定元素或整个页面
if element_selector:
element = driver.find_element(By.CSS_SELECTOR, element_selector)
html_content = element.get_attribute("outerHTML")
logger.debug(f"已提取指定元素: {element_selector}")
else:
html_content = driver.page_source
logger.debug("已提取整个页面内容")
# 移除不需要的元素
if remove_selectors and html_content:
soup = BeautifulSoup(html_content, 'html.parser')
for selector in remove_selectors:
if selector.startswith('tag:'):
tag_name = selector[4:]
for element in soup.find_all(tag_name):
element.decompose()
logger.debug(f"已移除标签: {tag_name}")
elif selector.startswith('id:'):
element_id = selector[3:]
element = soup.find(id=element_id)
if element:
element.decompose()
logger.debug(f"已移除ID: {element_id}")
elif selector.startswith('class:'):
class_name = selector[6:]
for element in soup.find_all(class_=class_name):
element.decompose()
logger.debug(f"已移除类: {class_name}")
else:
for element in soup.select(selector):
element.decompose()
logger.debug(f"已移除选择器: {selector}")
html_content = str(soup)
# 转换为Markdown
h = html2text.HTML2Text()
h.ignore_links = False
h.ignore_images = True
h.body_width = 0
markdown_content = h.handle(html_content)
logger.info("成功提取网页内容")
return markdown_content
except Exception as e:
logger.error(f"获取网页内容时出错: {str(e)}")
return None
finally:
if driver:
driver.quit()
logger.debug("浏览器已关闭")
@app.route('/bing_search', methods=['POST'])
def bing_search():
"""必应搜索API端点"""
data = request.json
if not data or 'url' not in data:
logger.warning("缺少'url'参数")
return jsonify({"error": "Missing 'url' parameter"}), 400
try:
logger.info(f"处理必应搜索请求: {data['url']}")
future = executor.submit(get_bing_search_results, data['url'])
result = future.result(timeout=60)
if result:
logger.info("必应搜索请求处理成功")
return jsonify({"status": "success", "result": result})
else:
logger.warning("提取必应搜索结果失败")
return jsonify({"status": "error", "message": "Failed to extract content"}), 500
except Exception as e:
logger.error(f"处理必应搜索请求时出错: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/extract_content', methods=['POST'])
def extract_content():
"""通用内容提取API端点"""
data = request.json
if not data or 'url' not in data:
logger.warning("缺少'url'参数")
return jsonify({"error": "Missing 'url' parameter"}), 400
element_selector = data.get('element_selector')
remove_selectors = data.get('remove_selectors')
try:
logger.info(f"处理内容提取请求: {data['url']}")
future = executor.submit(
get_full_page_content,
data['url'],
element_selector,
remove_selectors
)
result = future.result(timeout=60)
if result:
logger.info("内容提取请求处理成功")
return jsonify({"status": "success", "result": result})
else:
logger.warning("提取网页内容失败")
return jsonify({"status": "error", "message": "Failed to extract content"}), 500
except Exception as e:
logger.error(f"处理内容提取请求时出错: {str(e)}")
return jsonify({"status": "error", "message": str(e)}), 500
@app.route('/')
def index():
"""首页提供API使用说明"""
return """
<h1>网页内容提取服务</h1>
<p>提供两种内容提取服务:</p>
<ol>
<li>
<strong>必应搜索提取</strong> - POST /bing_search
<p>参数: {"url": "必应搜索URL"}</p>
</li>
<li>
<strong>通用内容提取</strong> - POST /extract_content
<p>参数: {
"url": "目标URL",
"element_selector": "可选CSS选择器",
"remove_selectors": ["可选移除选择器列表"]
}</p>
</li>
</ol>
<h2>移除选择器格式</h2>
<p>可以使用以下格式指定要移除的元素:</p>
<ul>
<li><code>tag:div</code> - 移除所有div标签</li>
<li><code>id:header</code> - 移除id为"header"的元素</li>
<li><code>class:advertisement</code> - 移除class为"advertisement"的元素</li>
<li><code>div#header</code> - 移除id为"header"的div元素标准CSS选择器</li>
<li><code>.sidebar</code> - 移除class为"sidebar"的元素标准CSS选择器</li>
</ul>
<p>特点:</p>
<ul>
<li>直接返回提取结果</li>
<li>支持多线程处理</li>
<li>支持指定要移除的元素</li>
<li>超时自动返回错误</li>
</ul>
"""
if __name__ == "__main__":
logger.info("启动网页内容提取服务")
app.run(host='0.0.0.0', port=5000, threaded=True)