import json import logging import uuid import time import os from datetime import datetime from typing import Dict, Any, List from http.server import HTTPServer, BaseHTTPRequestHandler from urllib.parse import parse_qs, urlparse import mimetypes import re # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 存储房间信息 rooms = {} # 前端到客户端的消息队列 frontend_to_client_queues = {} # 客户端到前端的消息队列 client_to_frontend_queues = {} # 静态文件目录 STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') if not os.path.exists(STATIC_DIR): os.makedirs(STATIC_DIR) logger.info(f"创建静态文件目录: {STATIC_DIR}") class Room: def __init__(self, room_id: str, server_host: str): self.room_id = room_id self.created_at = datetime.now() # 从host中移除端口号 host_without_port = re.sub(r':\d+$', '', server_host) self.frontend_url = f"http://{server_host}/?id={room_id}" def to_dict(self) -> Dict[str, Any]: return { 'room_id': self.room_id, 'frontend_url': self.frontend_url, 'created_at': self.created_at.isoformat() } def get_frontend_to_client_queue(room_id: str) -> List[Dict[str, Any]]: if room_id not in frontend_to_client_queues: frontend_to_client_queues[room_id] = [] return frontend_to_client_queues[room_id] def get_client_to_frontend_queue(room_id: str) -> List[Dict[str, Any]]: if room_id not in client_to_frontend_queues: client_to_frontend_queues[room_id] = [] return client_to_frontend_queues[room_id] class HTTPHandler(BaseHTTPRequestHandler): def do_GET(self): """处理HTTP GET请求""" try: parsed_path = urlparse(self.path) path = parsed_path.path query_params = parse_qs(parsed_path.query) # API路由 if path == '/api/room' and self.command == 'POST': self.handle_create_room() elif path.startswith('/api/room/') and self.command == 'GET': room_id = path.split('/')[-1] self.handle_get_room(room_id) elif path == '/api/rooms' and self.command == 'GET': self.handle_list_rooms() # 根路径处理 elif path == '/': self.handle_root_path(query_params) # 静态文件服务 else: self.handle_static_file(path) except Exception as e: logger.error(f"处理GET请求时发生错误: {e}") self.send_error(500, f"Internal Server Error: {str(e)}") def do_POST(self): """处理HTTP POST请求""" try: content_length = int(self.headers.get('Content-Length', 0)) post_data = self.rfile.read(content_length) if content_length > 0 else b'{}' parsed_path = urlparse(self.path) path = parsed_path.path if path == '/api/room': self.handle_create_room(post_data) elif path == '/api/frontend/send': self.handle_frontend_send_message(post_data) elif path == '/api/frontend/receive': self.handle_frontend_receive_message(post_data) elif path == '/api/client/send': self.handle_client_send_message(post_data) elif path == '/api/client/receive': self.handle_client_receive_message(post_data) else: self.send_error(404, "Not Found") except Exception as e: logger.error(f"处理POST请求时发生错误: {e}") self.send_error(500, f"Internal Server Error: {str(e)}") def handle_root_path(self, query_params: Dict[str, Any]): """处理根路径请求""" room_id = query_params.get('id', [None])[0] if room_id: # 有房间ID参数,直接返回前端页面 logger.info(f"请求根路径,有房间ID: {room_id}") self.serve_static_file('/index.html') else: # 没有房间ID,创建新房间并重定向 try: # 生成唯一房间ID room_id = str(uuid.uuid4())[:8] # 获取服务器主机地址并移除端口号 host = self.headers.get('Host', 'localhost') host_without_port = re.sub(r':\d+$', '', host) # 创建房间 room = Room(room_id, host_without_port) rooms[room_id] = room logger.info(f"通过根路径创建新房间: {room_id}") # 重定向到带房间ID的URL redirect_url = f'/?id={room_id}' self.send_response(302) self.send_header('Location', redirect_url) self.end_headers() except Exception as e: logger.error(f"创建房间失败: {e}") self.send_error(500, str(e)) def handle_static_file(self, path: str): """处理静态文件请求""" # 安全检查:防止路径遍历攻击 if '..' in path: self.send_error(403, "Forbidden: Path traversal not allowed") return # 规范化路径 if path == '/': path = '/index.html' # 移除开头的斜杠 file_path = path.lstrip('/') if not file_path: file_path = 'index.html' # 构建完整文件路径 full_path = os.path.join(STATIC_DIR, file_path) # 如果是目录,尝试查找index.html if os.path.isdir(full_path): index_path = os.path.join(full_path, 'index.html') if os.path.exists(index_path): full_path = index_path else: self.send_error(404, "Directory index not found") return # 检查文件是否存在且是普通文件 if not os.path.exists(full_path) or not os.path.isfile(full_path): self.send_error(404, f"File not found: {path}") return try: # 读取文件内容 with open(full_path, 'rb') as f: content = f.read() # 获取MIME类型 mime_type, encoding = mimetypes.guess_type(full_path) if mime_type is None: mime_type = 'application/octet-stream' # 发送响应头 self.send_response(200) self.send_header('Content-Type', mime_type) self.send_header('Content-Length', str(len(content))) self.send_header('Cache-Control', 'public, max-age=3600') self.end_headers() # 发送文件内容 self.wfile.write(content) except Exception as e: logger.error(f"读取或发送文件失败: {e}") self.send_error(500, f"Error reading file: {str(e)}") def serve_static_file(self, path: str): """服务静态文件(内部方法)""" self.handle_static_file(path) def handle_create_room(self, post_data=None): """创建新房间""" try: # 生成唯一房间ID room_id = str(uuid.uuid4())[:8] # 获取服务器主机地址并移除端口号 host = self.headers.get('Host', 'localhost') host_without_port = re.sub(r':\d+$', '', host) # 创建房间 room = Room(room_id, host_without_port) rooms[room_id] = room logger.info(f"创建新房间: {room_id}") response = { 'success': True, 'room_id': room_id, 'frontend_url': room.frontend_url } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"创建房间失败: {e}") self.send_error(500, str(e)) def handle_frontend_send_message(self, post_data): """前端发送消息到客户端""" try: data = json.loads(post_data.decode('utf-8')) room_id = data.get('room_id') message = data.get('message') if not room_id or not message: self.send_error(400, "需要room_id和message参数") return queue = get_frontend_to_client_queue(room_id) queue.append(message) response = { 'success': True, 'message': '消息已发送到客户端队列' } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"前端发送消息失败: {e}") self.send_error(500, str(e)) def handle_frontend_receive_message(self, post_data): """前端接收来自客户端的消息""" try: data = json.loads(post_data.decode('utf-8')) room_id = data.get('room_id') if not room_id: self.send_error(400, "需要room_id参数") return queue = get_client_to_frontend_queue(room_id) if queue: # 返回队列中的第一个消息 message = queue.pop(0) response = { 'success': True, 'message': message } else: # 没有消息 response = { 'success': True, 'message': None } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"前端接收消息失败: {e}") self.send_error(500, str(e)) def handle_client_send_message(self, post_data): """客户端发送消息到前端""" try: data = json.loads(post_data.decode('utf-8')) room_id = data.get('room_id') message = data.get('message') if not room_id or not message: self.send_error(400, "需要room_id和message参数") return queue = get_client_to_frontend_queue(room_id) queue.append(message) response = { 'success': True, 'message': '消息已发送到前端队列' } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"客户端发送消息失败: {e}") self.send_error(500, str(e)) def handle_client_receive_message(self, post_data): """客户端接收来自前端的消息""" try: data = json.loads(post_data.decode('utf-8')) room_id = data.get('room_id') if not room_id: self.send_error(400, "需要room_id参数") return queue = get_frontend_to_client_queue(room_id) if queue: # 返回队列中的第一个消息 message = queue.pop(0) response = { 'success': True, 'message': message } else: # 没有消息 response = { 'success': True, 'message': None } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"客户端接收消息失败: {e}") self.send_error(500, str(e)) def log_message(self, format, *args): """重写日志方法""" logger.info("%s - - [%s] %s" % (self.client_address[0], self.log_date_time_string(), format % args)) def run_http_server(): """运行HTTP服务器""" try: server = HTTPServer(('0.0.0.0', 80), HTTPHandler) logger.info("HTTP服务器启动在端口 80") server.serve_forever() except Exception as e: logger.error(f"HTTP服务器启动失败: {e}") if __name__ == '__main__': logger.info("启动服务器...") run_http_server()