Files
computer-craft-web-file/PyServer/main.py
2025-12-12 19:18:40 +08:00

487 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import uuid
import time
import os
import asyncio
import aiohttp
from aiohttp import web
from datetime import datetime
from typing import Dict, Any, List
from urllib.parse import parse_qs, urlparse
import mimetypes
import re
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 存储房间信息
rooms = {}
# 前端到客户端的消息队列
frontend_to_client_queues = {}
# 客户端到前端的消息队列
client_to_frontend_queues = {}
# 挂起的请求管理
pending_requests = {}
# 静态文件目录
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
if not os.path.exists(STATIC_DIR):
os.makedirs(STATIC_DIR)
logger.info(f"创建静态文件目录: {STATIC_DIR}")
class Room:
def __init__(self, room_id: str, server_host: str):
self.room_id = room_id
self.created_at = datetime.now()
# 从host中移除端口号
host_without_port = re.sub(r':\d+$', '', server_host)
self.frontend_url = f"http://{server_host}/?id={room_id}"
def to_dict(self) -> Dict[str, Any]:
return {
'room_id': self.room_id,
'frontend_url': self.frontend_url,
'created_at': self.created_at.isoformat()
}
def safe_decode_json(raw_body: bytes) -> Dict[Any, Any]:
"""
尝试用 UTF-8 解码,失败则尝试 GB18030兼容 GBK/GB2312
"""
for encoding in ['utf-8', 'gb18030', 'latin1']:
try:
text = raw_body.decode(encoding)
return json.loads(text)
except (UnicodeDecodeError, json.JSONDecodeError):
continue
raise ValueError("无法解码请求体为有效 JSON")
def get_frontend_to_client_queue(room_id: str) -> List[Dict[str, Any]]:
if room_id not in frontend_to_client_queues:
frontend_to_client_queues[room_id] = []
return frontend_to_client_queues[room_id]
def get_client_to_frontend_queue(room_id: str) -> List[Dict[str, Any]]:
if room_id not in client_to_frontend_queues:
client_to_frontend_queues[room_id] = []
return client_to_frontend_queues[room_id]
class HTTPHandler:
async def handle_create_room(self, request):
"""创建新房间"""
try:
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 获取服务器主机地址并移除端口号
host = request.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
logger.info(f"创建新房间: {room_id}")
response = {
'success': True,
'room_id': room_id,
'frontend_url': room.frontend_url
}
return web.json_response(response)
except Exception as e:
logger.error(f"创建房间失败: {e}")
return web.json_response({'error': str(e)}, status=500)
async def handle_frontend_send_message(self, request):
try:
raw_body = await request.read()
data = safe_decode_json(raw_body)
room_id = data.get('room_id')
message_data = data.get('message')
if not room_id or not message_data:
return web.json_response({'error': '需要room_id和message参数'}, status=400)
queue = get_frontend_to_client_queue(room_id)
queue.append(message_data)
if room_id in pending_requests and 'client' in pending_requests[room_id]:
client_req_id = pending_requests[room_id]['client']
if client_req_id in pending_requests:
pending_requests[client_req_id]['event'].set()
logger.info(f"立即响应挂起的客户端请求: {client_req_id}")
return web.json_response({
'success': True,
'message': '消息已发送到客户端队列'
})
except ValueError as e:
logger.error(f"前端发送消息失败: 无效的JSON或编码错误 - {e}")
return web.json_response({'error': '无效的JSON数据或不支持的文本编码'}, status=400)
except Exception as e:
logger.error(f"前端发送消息失败: {e}")
return web.json_response({'error': str(e)}, status=500)
async def handle_frontend_receive_message(self, request):
"""前端接收来自客户端的消息(长轮询)"""
try:
data = await request.json()
room_id = data.get('room_id')
if not room_id:
return web.json_response({'error': '需要room_id参数'}, status=400)
queue = get_client_to_frontend_queue(room_id)
# 立即检查是否有消息
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
return web.json_response(response)
# 没有消息,设置长轮询
req_id = str(uuid.uuid4())
event = asyncio.Event()
# 存储挂起的请求
pending_requests[req_id] = {
'room_id': room_id,
'event': event,
'type': 'frontend',
'timestamp': time.time()
}
if room_id not in pending_requests:
pending_requests[room_id] = {}
pending_requests[room_id]['frontend'] = req_id
try:
# 等待295秒或直到有消息
await asyncio.wait_for(event.wait(), timeout=295)
# 检查队列中是否有消息
queue = get_client_to_frontend_queue(room_id)
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
# 超时返回空消息
response = {
'success': True,
'message': None
}
except asyncio.TimeoutError:
# 超时返回空消息
response = {
'success': True,
'message': None
}
# 清理挂起的请求
if req_id in pending_requests:
del pending_requests[req_id]
if room_id in pending_requests and 'frontend' in pending_requests[room_id]:
del pending_requests[room_id]['frontend']
return web.json_response(response)
except json.JSONDecodeError:
logger.error("JSON解析失败")
return web.json_response({'error': '无效的JSON数据'}, status=400)
except Exception as e:
logger.error(f"前端接收消息失败: {e}")
# 清理挂起的请求
if 'req_id' in locals() and req_id in pending_requests:
del pending_requests[req_id]
if room_id in pending_requests and 'frontend' in pending_requests[room_id]:
del pending_requests[room_id]['frontend']
return web.json_response({'error': str(e)}, status=500)
async def handle_client_send_message(self, request):
try:
raw_body = await request.read() # 获取原始字节
data = safe_decode_json(raw_body)
room_id = data.get('room_id')
message_data = data.get('message')
if not room_id or not message_data:
return web.json_response({'error': '需要room_id和message参数'}, status=400)
queue = get_client_to_frontend_queue(room_id)
queue.append(message_data)
if room_id in pending_requests and 'frontend' in pending_requests[room_id]:
frontend_req_id = pending_requests[room_id]['frontend']
if frontend_req_id in pending_requests:
pending_requests[frontend_req_id]['event'].set()
logger.info(f"立即响应挂起的前端请求: {frontend_req_id}")
return web.json_response({
'success': True,
'message': '消息已发送到前端队列'
})
except ValueError as e:
logger.error(f"客户端发送消息失败: 无效的JSON或编码错误 - {e}")
return web.json_response({'error': '无效的JSON数据或不支持的文本编码'}, status=400)
except Exception as e:
logger.error(f"客户端发送消息失败: {e}")
return web.json_response({'error': str(e)}, status=500)
async def handle_client_receive_message(self, request):
"""客户端接收来自前端的消息(长轮询)"""
try:
raw_body = await request.read()
data = safe_decode_json(raw_body) # 使用你之前添加的 safe_decode_json
room_id = data.get('room_id')
if not room_id:
return web.json_response({'error': '需要room_id参数'}, status=400)
queue = get_frontend_to_client_queue(room_id)
# 立即检查是否有消息
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
return web.json_response(response)
# 没有消息,设置长轮询(最多等待 58 秒)
req_id = str(uuid.uuid4())
event = asyncio.Event()
# 存储挂起的请求
pending_requests[req_id] = {
'room_id': room_id,
'event': event,
'type': 'client',
'timestamp': time.time()
}
if room_id not in pending_requests:
pending_requests[room_id] = {}
pending_requests[room_id]['client'] = req_id
try:
# ⏱️ 只等待 58 秒(略小于客户端或代理的 60 秒超时)
await asyncio.wait_for(event.wait(), timeout=58)
# 被唤醒后,检查队列
queue = get_frontend_to_client_queue(room_id)
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
response = {
'success': True,
'message': None
}
except asyncio.TimeoutError:
# 58秒超时返回空消息
response = {
'success': True,
'message': None
}
# 清理挂起的请求
pending_requests.pop(req_id, None)
if room_id in pending_requests:
pending_requests[room_id].pop('client', None)
# 如果 room_id 下已无其他引用,也可以清理整个 room 条目(可选)
return web.json_response(response)
except ValueError as e:
logger.error(f"客户端接收消息失败: 无效JSON或编码 - {e}")
return web.json_response({'error': '无效的JSON数据'}, status=400)
except Exception as e:
logger.error(f"客户端接收消息失败: {e}")
# 清理挂起的请求
if 'req_id' in locals():
pending_requests.pop(req_id, None)
if 'room_id' in locals() and room_id in pending_requests:
pending_requests[room_id].pop('client', None)
return web.json_response({'error': str(e)}, status=500)
async def handle_static_file(self, request):
"""处理静态文件请求"""
path = request.match_info.get('path', '')
# 安全检查:防止路径遍历攻击
if '..' in path:
return web.Response(text="Forbidden: Path traversal not allowed", status=403)
# 规范化路径
if path == '' or path == '/':
path = 'index.html'
# 构建完整文件路径
full_path = os.path.join(STATIC_DIR, path)
# 如果是目录尝试查找index.html
if os.path.isdir(full_path):
index_path = os.path.join(full_path, 'index.html')
if os.path.exists(index_path):
full_path = index_path
else:
return web.Response(text="Directory index not found", status=404)
# 检查文件是否存在且是普通文件
if not os.path.exists(full_path) or not os.path.isfile(full_path):
return web.Response(text=f"File not found: {path}", status=404)
try:
# 读取文件内容
with open(full_path, 'rb') as f:
content = f.read()
# 获取MIME类型
mime_type, encoding = mimetypes.guess_type(full_path)
if mime_type is None:
mime_type = 'application/octet-stream'
# 发送响应
return web.Response(
body=content,
content_type=mime_type,
headers={
'Cache-Control': 'public, max-age=3600'
}
)
except Exception as e:
logger.error(f"读取或发送文件失败: {e}")
return web.Response(text=f"Error reading file: {str(e)}", status=500)
async def handle_root_path(self, request):
"""处理根路径请求"""
query_params = dict(request.query)
room_id = query_params.get('id')
if room_id:
# 有房间ID参数直接返回前端页面
logger.info(f"请求根路径有房间ID: {room_id}")
return await self.handle_static_file(request)
else:
# 没有房间ID创建新房间并重定向
try:
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 获取服务器主机地址并移除端口号
host = request.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
logger.info(f"通过根路径创建新房间: {room_id}")
# 重定向到带房间ID的URL
redirect_url = f'/?id={room_id}'
return web.HTTPFound(redirect_url)
except Exception as e:
logger.error(f"创建房间失败: {e}")
return web.Response(text=str(e), status=500)
async def create_app():
"""创建Web应用"""
handler = HTTPHandler()
app = web.Application()
# API路由
app.router.add_post('/api/room', handler.handle_create_room)
app.router.add_post('/api/frontend/send', handler.handle_frontend_send_message)
app.router.add_post('/api/frontend/receive', handler.handle_frontend_receive_message)
app.router.add_post('/api/client/send', handler.handle_client_send_message)
app.router.add_post('/api/client/receive', handler.handle_client_receive_message)
# 静态文件和根路径路由
app.router.add_get('/', handler.handle_root_path)
app.router.add_get('/{path:.*}', handler.handle_static_file)
return app
async def cleanup_pending_requests():
"""定期清理过期的挂起请求"""
while True:
await asyncio.sleep(60) # 每分钟清理一次
current_time = time.time()
expired_reqs = []
for req_id, req_info in pending_requests.items():
if current_time - req_info['timestamp'] > 300: # 超过5分钟
expired_reqs.append(req_id)
for req_id in expired_reqs:
if req_id in pending_requests:
# 尝试触发事件以释放挂起的请求
try:
pending_requests[req_id]['event'].set()
except:
pass
del pending_requests[req_id]
if expired_reqs:
logger.info(f"清理了 {len(expired_reqs)} 个过期的挂起请求")
async def start_background_tasks(app):
"""启动后台任务"""
app['cleanup_task'] = asyncio.create_task(cleanup_pending_requests())
async def cleanup_background_tasks(app):
"""清理后台任务"""
if 'cleanup_task' in app:
app['cleanup_task'].cancel()
try:
await app['cleanup_task']
except asyncio.CancelledError:
pass
def run_http_server():
"""运行HTTP服务器"""
try:
# 创建应用
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
app = loop.run_until_complete(create_app())
# 注册启动和清理钩子
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
# 启动服务器
logger.info("HTTP服务器启动在端口 80")
web.run_app(app, host='0.0.0.0', port=80)
except Exception as e:
logger.error(f"HTTP服务器启动失败: {e}")
if __name__ == '__main__':
logger.info("启动服务器...")
run_http_server()