import json import logging import uuid import time import os import asyncio import aiohttp from aiohttp import web from datetime import datetime from typing import Dict, Any, List from urllib.parse import parse_qs, urlparse import mimetypes import re # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 存储房间信息 rooms = {} # 前端到客户端的消息队列 frontend_to_client_queues = {} # 客户端到前端的消息队列 client_to_frontend_queues = {} # 挂起的请求管理 pending_requests = {} # 静态文件目录 STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') if not os.path.exists(STATIC_DIR): os.makedirs(STATIC_DIR) logger.info(f"创建静态文件目录: {STATIC_DIR}") class Room: def __init__(self, room_id: str, server_host: str): self.room_id = room_id self.created_at = datetime.now() # 从host中移除端口号 host_without_port = re.sub(r':\d+$', '', server_host) self.frontend_url = f"http://{server_host}/?id={room_id}" def to_dict(self) -> Dict[str, Any]: return { 'room_id': self.room_id, 'frontend_url': self.frontend_url, 'created_at': self.created_at.isoformat() } def get_frontend_to_client_queue(room_id: str) -> List[Dict[str, Any]]: if room_id not in frontend_to_client_queues: frontend_to_client_queues[room_id] = [] return frontend_to_client_queues[room_id] def get_client_to_frontend_queue(room_id: str) -> List[Dict[str, Any]]: if room_id not in client_to_frontend_queues: client_to_frontend_queues[room_id] = [] return client_to_frontend_queues[room_id] class HTTPHandler: async def handle_create_room(self, request): """创建新房间""" try: # 生成唯一房间ID room_id = str(uuid.uuid4())[:8] # 获取服务器主机地址并移除端口号 host = request.headers.get('Host', 'localhost') host_without_port = re.sub(r':\d+$', '', host) # 创建房间 room = Room(room_id, host_without_port) rooms[room_id] = room logger.info(f"创建新房间: {room_id}") response = { 'success': True, 'room_id': room_id, 'frontend_url': room.frontend_url } return web.json_response(response) except Exception as e: logger.error(f"创建房间失败: {e}") return web.json_response({'error': str(e)}, status=500) async def handle_frontend_send_message(self, request): try: raw_body = await request.read() data = safe_decode_json(raw_body) room_id = data.get('room_id') message_data = data.get('message') if not room_id or not message_data: return web.json_response({'error': '需要room_id和message参数'}, status=400) queue = get_frontend_to_client_queue(room_id) queue.append(message_data) if room_id in pending_requests and 'client' in pending_requests[room_id]: client_req_id = pending_requests[room_id]['client'] if client_req_id in pending_requests: pending_requests[client_req_id]['event'].set() logger.info(f"立即响应挂起的客户端请求: {client_req_id}") return web.json_response({ 'success': True, 'message': '消息已发送到客户端队列' }) except ValueError as e: logger.error(f"前端发送消息失败: 无效的JSON或编码错误 - {e}") return web.json_response({'error': '无效的JSON数据或不支持的文本编码'}, status=400) except Exception as e: logger.error(f"前端发送消息失败: {e}") return web.json_response({'error': str(e)}, status=500) async def handle_frontend_receive_message(self, request): """前端接收来自客户端的消息(长轮询)""" try: data = await request.json() room_id = data.get('room_id') if not room_id: return web.json_response({'error': '需要room_id参数'}, status=400) queue = get_client_to_frontend_queue(room_id) # 立即检查是否有消息 if queue: message = queue.pop(0) response = { 'success': True, 'message': message } return web.json_response(response) # 没有消息,设置长轮询 req_id = str(uuid.uuid4()) event = asyncio.Event() # 存储挂起的请求 pending_requests[req_id] = { 'room_id': room_id, 'event': event, 'type': 'frontend', 'timestamp': time.time() } if room_id not in pending_requests: pending_requests[room_id] = {} pending_requests[room_id]['frontend'] = req_id try: # 等待295秒或直到有消息 await asyncio.wait_for(event.wait(), timeout=295) # 检查队列中是否有消息 queue = get_client_to_frontend_queue(room_id) if queue: message = queue.pop(0) response = { 'success': True, 'message': message } else: # 超时返回空消息 response = { 'success': True, 'message': None } except asyncio.TimeoutError: # 超时返回空消息 response = { 'success': True, 'message': None } # 清理挂起的请求 if req_id in pending_requests: del pending_requests[req_id] if room_id in pending_requests and 'frontend' in pending_requests[room_id]: del pending_requests[room_id]['frontend'] return web.json_response(response) except json.JSONDecodeError: logger.error("JSON解析失败") return web.json_response({'error': '无效的JSON数据'}, status=400) except Exception as e: logger.error(f"前端接收消息失败: {e}") # 清理挂起的请求 if 'req_id' in locals() and req_id in pending_requests: del pending_requests[req_id] if room_id in pending_requests and 'frontend' in pending_requests[room_id]: del pending_requests[room_id]['frontend'] return web.json_response({'error': str(e)}, status=500) async def handle_client_send_message(self, request): try: raw_body = await request.read() # 获取原始字节 data = safe_decode_json(raw_body) room_id = data.get('room_id') message_data = data.get('message') if not room_id or not message_data: return web.json_response({'error': '需要room_id和message参数'}, status=400) queue = get_client_to_frontend_queue(room_id) queue.append(message_data) if room_id in pending_requests and 'frontend' in pending_requests[room_id]: frontend_req_id = pending_requests[room_id]['frontend'] if frontend_req_id in pending_requests: pending_requests[frontend_req_id]['event'].set() logger.info(f"立即响应挂起的前端请求: {frontend_req_id}") return web.json_response({ 'success': True, 'message': '消息已发送到前端队列' }) except ValueError as e: logger.error(f"客户端发送消息失败: 无效的JSON或编码错误 - {e}") return web.json_response({'error': '无效的JSON数据或不支持的文本编码'}, status=400) except Exception as e: logger.error(f"客户端发送消息失败: {e}") return web.json_response({'error': str(e)}, status=500) async def handle_client_receive_message(self, request): """客户端接收来自前端的消息(长轮询)""" try: raw_body = await request.read() data = safe_decode_json(raw_body) # 使用你之前添加的 safe_decode_json room_id = data.get('room_id') if not room_id: return web.json_response({'error': '需要room_id参数'}, status=400) queue = get_frontend_to_client_queue(room_id) # 立即检查是否有消息 if queue: message = queue.pop(0) response = { 'success': True, 'message': message } return web.json_response(response) # 没有消息,设置长轮询(最多等待 58 秒) req_id = str(uuid.uuid4()) event = asyncio.Event() # 存储挂起的请求 pending_requests[req_id] = { 'room_id': room_id, 'event': event, 'type': 'client', 'timestamp': time.time() } if room_id not in pending_requests: pending_requests[room_id] = {} pending_requests[room_id]['client'] = req_id try: # ⏱️ 只等待 58 秒(略小于客户端或代理的 60 秒超时) await asyncio.wait_for(event.wait(), timeout=58) # 被唤醒后,检查队列 queue = get_frontend_to_client_queue(room_id) if queue: message = queue.pop(0) response = { 'success': True, 'message': message } else: response = { 'success': True, 'message': None } except asyncio.TimeoutError: # 58秒超时,返回空消息 response = { 'success': True, 'message': None } # 清理挂起的请求 pending_requests.pop(req_id, None) if room_id in pending_requests: pending_requests[room_id].pop('client', None) # 如果 room_id 下已无其他引用,也可以清理整个 room 条目(可选) return web.json_response(response) except ValueError as e: logger.error(f"客户端接收消息失败: 无效JSON或编码 - {e}") return web.json_response({'error': '无效的JSON数据'}, status=400) except Exception as e: logger.error(f"客户端接收消息失败: {e}") # 清理挂起的请求 if 'req_id' in locals(): pending_requests.pop(req_id, None) if 'room_id' in locals() and room_id in pending_requests: pending_requests[room_id].pop('client', None) return web.json_response({'error': str(e)}, status=500) async def handle_static_file(self, request): """处理静态文件请求""" path = request.match_info.get('path', '') # 安全检查:防止路径遍历攻击 if '..' in path: return web.Response(text="Forbidden: Path traversal not allowed", status=403) # 规范化路径 if path == '' or path == '/': path = 'index.html' # 构建完整文件路径 full_path = os.path.join(STATIC_DIR, path) # 如果是目录,尝试查找index.html if os.path.isdir(full_path): index_path = os.path.join(full_path, 'index.html') if os.path.exists(index_path): full_path = index_path else: return web.Response(text="Directory index not found", status=404) # 检查文件是否存在且是普通文件 if not os.path.exists(full_path) or not os.path.isfile(full_path): return web.Response(text=f"File not found: {path}", status=404) try: # 读取文件内容 with open(full_path, 'rb') as f: content = f.read() # 获取MIME类型 mime_type, encoding = mimetypes.guess_type(full_path) if mime_type is None: mime_type = 'application/octet-stream' # 发送响应 return web.Response( body=content, content_type=mime_type, headers={ 'Cache-Control': 'public, max-age=3600' } ) except Exception as e: logger.error(f"读取或发送文件失败: {e}") return web.Response(text=f"Error reading file: {str(e)}", status=500) async def handle_root_path(self, request): """处理根路径请求""" query_params = dict(request.query) room_id = query_params.get('id') if room_id: # 有房间ID参数,直接返回前端页面 logger.info(f"请求根路径,有房间ID: {room_id}") return await self.handle_static_file(request) else: # 没有房间ID,创建新房间并重定向 try: # 生成唯一房间ID room_id = str(uuid.uuid4())[:8] # 获取服务器主机地址并移除端口号 host = request.headers.get('Host', 'localhost') host_without_port = re.sub(r':\d+$', '', host) # 创建房间 room = Room(room_id, host_without_port) rooms[room_id] = room logger.info(f"通过根路径创建新房间: {room_id}") # 重定向到带房间ID的URL redirect_url = f'/?id={room_id}' return web.HTTPFound(redirect_url) except Exception as e: logger.error(f"创建房间失败: {e}") return web.Response(text=str(e), status=500) async def create_app(): """创建Web应用""" handler = HTTPHandler() app = web.Application() # API路由 app.router.add_post('/api/room', handler.handle_create_room) app.router.add_post('/api/frontend/send', handler.handle_frontend_send_message) app.router.add_post('/api/frontend/receive', handler.handle_frontend_receive_message) app.router.add_post('/api/client/send', handler.handle_client_send_message) app.router.add_post('/api/client/receive', handler.handle_client_receive_message) # 静态文件和根路径路由 app.router.add_get('/', handler.handle_root_path) app.router.add_get('/{path:.*}', handler.handle_static_file) return app async def cleanup_pending_requests(): """定期清理过期的挂起请求""" while True: await asyncio.sleep(60) # 每分钟清理一次 current_time = time.time() expired_reqs = [] for req_id, req_info in pending_requests.items(): if current_time - req_info['timestamp'] > 300: # 超过5分钟 expired_reqs.append(req_id) for req_id in expired_reqs: if req_id in pending_requests: # 尝试触发事件以释放挂起的请求 try: pending_requests[req_id]['event'].set() except: pass del pending_requests[req_id] if expired_reqs: logger.info(f"清理了 {len(expired_reqs)} 个过期的挂起请求") async def start_background_tasks(app): """启动后台任务""" app['cleanup_task'] = asyncio.create_task(cleanup_pending_requests()) async def cleanup_background_tasks(app): """清理后台任务""" if 'cleanup_task' in app: app['cleanup_task'].cancel() try: await app['cleanup_task'] except asyncio.CancelledError: pass def run_http_server(): """运行HTTP服务器""" try: # 创建应用 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) app = loop.run_until_complete(create_app()) # 注册启动和清理钩子 app.on_startup.append(start_background_tasks) app.on_cleanup.append(cleanup_background_tasks) # 启动服务器 logger.info("HTTP服务器启动在端口 80") web.run_app(app, host='0.0.0.0', port=80) except Exception as e: logger.error(f"HTTP服务器启动失败: {e}") if __name__ == '__main__': logger.info("启动服务器...") run_http_server()