import asyncio import json import logging import uuid import time import os from datetime import datetime, timedelta from typing import Dict, Any, Set import threading from http.server import HTTPServer, BaseHTTPRequestHandler import websockets from urllib.parse import parse_qs, urlparse import mimetypes # 设置日志 logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # 存储房间信息 rooms = {} connected_clients = {} ws_port = 81 # ws服务外部端口 # 静态文件目录 STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static') if not os.path.exists(STATIC_DIR): os.makedirs(STATIC_DIR) logger.info(f"创建静态文件目录: {STATIC_DIR}") class Room: def __init__(self, room_id: str, server_host: str): self.room_id = room_id self.created_at = datetime.now() self.last_activity = datetime.now() self.clients: Set[str] = set() # 使用80端口 self.frontend_url = f"http://{server_host}/?id={room_id}&ws=ws://{server_host}" self.ws_url = f"ws://{server_host}:{ws_port}" def add_client(self, client_id: str): self.clients.add(client_id) self.last_activity = datetime.now() logger.info(f"客户端 {client_id} 加入房间 {self.room_id}, 当前客户端数: {len(self.clients)}") def remove_client(self, client_id: str): if client_id in self.clients: self.clients.remove(client_id) self.last_activity = datetime.now() logger.info(f"客户端 {client_id} 离开房间 {self.room_id}, 剩余客户端数: {len(self.clients)}") def is_empty(self) -> bool: return len(self.clients) == 0 def to_dict(self) -> Dict[str, Any]: return { 'room_id': self.room_id, 'frontend_url': self.frontend_url, 'ws_url': self.ws_url, 'client_count': len(self.clients), 'created_at': self.created_at.isoformat(), 'last_activity': self.last_activity.isoformat() } def cleanup_empty_rooms(): """定期清理空房间""" while True: time.sleep(300) # 每5分钟检查一次 current_time = datetime.now() empty_rooms = [] for room_id, room in list(rooms.items()): if room.is_empty() and current_time - room.last_activity > timedelta(minutes=10): empty_rooms.append(room_id) for room_id in empty_rooms: if room_id in rooms: del rooms[room_id] logger.info(f"清理空房间: {room_id}") # 启动清理线程 cleanup_thread = threading.Thread(target=cleanup_empty_rooms, daemon=True) cleanup_thread.start() class HTTPHandler(BaseHTTPRequestHandler): def do_GET(self): """处理HTTP GET请求""" try: logger.info(f"收到GET请求: {self.path} from {self.client_address[0]}") logger.info(f"请求头: {dict(self.headers)}") parsed_path = urlparse(self.path) path = parsed_path.path query_params = parse_qs(parsed_path.query) # API路由 if path == '/api/room' and self.command == 'POST': self.handle_create_room() elif path.startswith('/api/room/') and self.command == 'GET': room_id = path.split('/')[-1] self.handle_get_room(room_id) elif path == '/api/rooms' and self.command == 'GET': self.handle_list_rooms() # 根路径处理 elif path == '/': self.handle_root_path(query_params) # 静态文件服务 else: self.handle_static_file(path) except Exception as e: logger.error(f"处理GET请求时发生错误: {e}") self.send_error(500, f"Internal Server Error: {str(e)}") def do_POST(self): """处理HTTP POST请求""" try: logger.info(f"收到POST请求: {self.path} from {self.client_address[0]}") parsed_path = urlparse(self.path) path = parsed_path.path if path == '/api/room': self.handle_create_room() else: self.send_error(404, "Not Found") except Exception as e: logger.error(f"处理POST请求时发生错误: {e}") self.send_error(500, f"Internal Server Error: {str(e)}") def handle_root_path(self, query_params: Dict[str, Any]): """处理根路径请求""" room_id = query_params.get('id', [None])[0] ws_url = query_params.get('ws', [None])[0] if room_id: # 有房间ID参数,直接返回前端页面 logger.info(f"请求根路径,有房间ID: {room_id}") self.serve_static_file('/index.html') else: # 没有房间ID,创建新房间并重定向 try: # 生成唯一房间ID room_id = str(uuid.uuid4())[:8] # 获取服务器主机地址 host = self.headers.get('Host', 'localhost') # 创建房间 room = Room(room_id, host) rooms[room_id] = room logger.info(f"通过根路径创建新房间: {room_id}") # 重定向到带房间ID和WebSocket URL的URL redirect_url = f'/?id={room_id}&ws=ws://{host}:{ws_port}' self.send_response(302) self.send_header('Location', redirect_url) self.end_headers() except Exception as e: logger.error(f"创建房间失败: {e}") self.send_error(500, str(e)) def handle_static_file(self, path: str): """处理静态文件请求""" logger.info(f"处理静态文件请求: {path}") # 安全检查:防止路径遍历攻击 if '..' in path: logger.warning(f"检测到可疑路径: {path}") self.send_error(403, "Forbidden: Path traversal not allowed") return # 规范化路径 if path == '/': path = '/index.html' # 移除开头的斜杠 file_path = path.lstrip('/') if not file_path: file_path = 'index.html' # 构建完整文件路径 full_path = os.path.join(STATIC_DIR, file_path) logger.info(f"尝试访问文件: {full_path}") # 如果是目录,尝试查找index.html if os.path.isdir(full_path): index_path = os.path.join(full_path, 'index.html') if os.path.exists(index_path): full_path = index_path logger.info(f"重定向到目录索引文件: {index_path}") else: logger.warning(f"目录不存在索引文件: {full_path}") self.send_error(404, "Directory index not found") return # 检查文件是否存在且是普通文件 if not os.path.exists(full_path): logger.warning(f"文件不存在: {full_path}") self.send_error(404, f"File not found: {path}") return if not os.path.isfile(full_path): logger.warning(f"路径不是文件: {full_path}") self.send_error(403, "Not a file") return try: # 读取文件内容 with open(full_path, 'rb') as f: content = f.read() # 获取MIME类型 mime_type, encoding = mimetypes.guess_type(full_path) if mime_type is None: mime_type = 'application/octet-stream' logger.info(f"成功读取文件: {full_path}, 大小: {len(content)} bytes, MIME类型: {mime_type}") # 发送响应头 self.send_response(200) self.send_header('Content-Type', mime_type) self.send_header('Content-Length', str(len(content))) # 添加缓存控制头(可选) self.send_header('Cache-Control', 'public, max-age=3600') # 缓存1小时 self.end_headers() # 发送文件内容 self.wfile.write(content) logger.info(f"文件发送完成: {full_path}") except Exception as e: logger.error(f"读取或发送文件失败: {e}") self.send_error(500, f"Error reading file: {str(e)}") def serve_static_file(self, path: str): """服务静态文件(内部方法)""" self.handle_static_file(path) def handle_create_room(self): """创建新房间""" try: # 生成唯一房间ID room_id = str(uuid.uuid4())[:8] # 获取服务器主机地址 host = self.headers.get('Host', 'localhost') # 创建房间 room = Room(room_id, host) rooms[room_id] = room logger.info(f"创建新房间: {room_id}") response = { 'success': True, 'room_id': room_id, 'frontend_url': room.frontend_url, 'ws_url': room.ws_url } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"创建房间失败: {e}") self.send_error(500, str(e)) def handle_get_room(self, room_id: str): """获取房间信息""" try: if room_id not in rooms: self.send_error(404, '房间不存在') return room = rooms[room_id] response = {'success': True, 'data': room.to_dict()} self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"获取房间信息失败: {e}") self.send_error(500, str(e)) def handle_list_rooms(self): """列出所有活跃房间""" try: active_rooms = [room.to_dict() for room in rooms.values() if not room.is_empty()] response = { 'success': True, 'data': { 'total_rooms': len(active_rooms), 'rooms': active_rooms } } self.send_response(200) self.send_header('Content-Type', 'application/json') self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(json.dumps(response).encode()) except Exception as e: logger.error(f"获取房间列表失败: {e}") self.send_error(500, str(e)) def log_message(self, format, *args): """重写日志方法""" logger.info("%s - - [%s] %s" % (self.client_address[0], self.log_date_time_string(), format % args)) class WebSocketHTTPRequestHandler(HTTPHandler): """支持WebSocket升级的HTTP请求处理器""" def do_GET(self): """处理GET请求,支持WebSocket升级""" if self.headers.get('Upgrade') == 'websocket': self.handle_websocket_upgrade() else: super().do_GET() def handle_websocket_upgrade(self): """处理WebSocket升级请求""" # 这里我们只是记录日志,实际的WebSocket处理在websockets库中完成 logger.info(f"WebSocket连接请求: {self.path}") self.send_error(426, "WebSocket upgrade required") # 这个错误不会被触发,因为websockets库会拦截请求 async def handle_websocket(websocket): """处理WebSocket连接 - 修复版本,移除了path参数""" client_id = str(uuid.uuid4()) room_id = None client_type = 'unknown' try: # 发送连接成功消息 await send_message(websocket, { 'type': 'connected', 'message': '连接成功', 'client_id': client_id, 'timestamp': datetime.now().isoformat() }) logger.info(f"客户端连接: {client_id}") # 处理消息 async for message in websocket: try: data = json.loads(message) # 处理消息并获取房间和客户端类型信息 result = await handle_websocket_message(websocket, client_id, data) if result: room_id, client_type = result except json.JSONDecodeError: logger.error(f"消息格式错误: {message}") except Exception as e: logger.error(f"处理消息错误: {e}") except websockets.exceptions.ConnectionClosed: logger.info(f"客户端断开连接: {client_id}") finally: # 清理连接 if client_id in connected_clients: info = connected_clients.pop(client_id) room_id = info.get('room_id') logger.info(f"清理客户端: {client_id}, 房间: {room_id}") if room_id and room_id in rooms: room = rooms[room_id] room.remove_client(client_id) # 通知房间内其他客户端 await broadcast_to_room(room_id, { 'type': 'user_left', 'client_id': client_id, 'message': '用户离开房间', 'room_size': len(room.clients), 'timestamp': datetime.now().isoformat() }, exclude_client=client_id) # 额外检查:即使不在 connected_clients 中,如果还在房间里也要移除 elif room_id and room_id in rooms: room = rooms[room_id] room.remove_client(client_id) # 通知房间内其他客户端 await broadcast_to_room(room_id, { 'type': 'user_left', 'client_id': client_id, 'message': '用户离开房间', 'room_size': len(room.clients), 'timestamp': datetime.now().isoformat() }, exclude_client=client_id) async def handle_websocket_message(websocket, client_id: str, data: Dict[str, Any]): """处理WebSocket消息""" message_type = data.get('type') if message_type == 'join_room': return await handle_join_room(websocket, client_id, data) elif message_type == 'file_operation': await handle_file_operation(websocket, client_id, data) elif message_type == 'file_operation_response': await handle_file_operation_response(websocket, client_id, data) elif message_type == 'leave_room': await handle_leave_room(websocket, client_id, data) elif message_type == 'ping': await send_message(websocket, { 'type': 'pong', 'message': 'pong', 'timestamp': datetime.now().isoformat() }) else: logger.warning(f"未知消息类型: {message_type}") async def handle_join_room(websocket, client_id: str, data: Dict[str, Any]): """处理加入房间请求""" room_id = data.get('room_id') client_type = data.get('client_type', 'unknown') if not room_id: await send_message(websocket, { 'type': 'error', 'message': '房间ID不能为空' }) return if room_id not in rooms: await send_message(websocket, { 'type': 'error', 'message': '房间不存在' }) return room = rooms[room_id] room.add_client(client_id) # 存储客户端信息 connected_clients[client_id] = { 'websocket': websocket, 'room_id': room_id, 'client_type': client_type } logger.info(f"客户端 {client_id} ({client_type}) 加入房间 {room_id}") # 发送加入成功消息 await send_message(websocket, { 'type': 'joined_room', 'room_id': room_id, 'message': f'成功加入房间 {room_id}', 'client_count': len(room.clients), 'timestamp': datetime.now().isoformat() }) # 通知房间内其他客户端 await broadcast_to_room(room_id, { 'type': 'user_joined', 'client_id': client_id, 'client_type': client_type, 'message': '新用户加入房间', 'room_size': len(room.clients), 'timestamp': datetime.now().isoformat() }, exclude_client=client_id) # 返回房间ID和客户端类型,用于finally块清理 return room_id, client_type async def handle_leave_room(websocket, client_id: str, data: Dict[str, Any]): """处理离开房间请求""" room_id = data.get('room_id') if not room_id: room_id = connected_clients.get(client_id, {}).get('room_id') if room_id and room_id in rooms: room = rooms[room_id] room.remove_client(client_id) # 从连接客户端中移除 if client_id in connected_clients: del connected_clients[client_id] # 通知房间内其他客户端 await broadcast_to_room(room_id, { 'type': 'user_left', 'client_id': client_id, 'message': '用户离开房间', 'room_size': len(room.clients), 'timestamp': datetime.now().isoformat() }, exclude_client=client_id) logger.info(f"客户端 {client_id} 主动离开房间 {room_id}") async def handle_file_operation(websocket, client_id: str, data: Dict[str, Any]): """处理文件操作请求""" room_id = data.get('room_id') request_id = data.get('requestId') operation_type = data.get('operation_type') if not room_id or room_id not in rooms: await send_message(websocket, { 'type': 'error', 'message': '无效的房间ID', 'requestId': request_id }) return # 记录操作 logger.info(f"收到文件操作请求: {operation_type} from {client_id} in room {room_id}") # 查找文件客户端 file_clients = [] for cid, client_info in connected_clients.items(): if client_info['room_id'] == room_id and cid != client_id: if client_info.get('client_type') == 'file_client': file_clients.append(client_info['websocket']) if file_clients: # 转发给文件客户端 data['sender_id'] = client_id await send_message(file_clients[0], { 'type': 'file_operation_request', **data }) else: # 没有文件客户端,返回错误 await send_message(websocket, { 'type': 'file_operation_response', 'requestId': request_id, 'success': False, 'error': '房间内没有文件客户端', 'timestamp': datetime.now().isoformat() }) async def handle_file_operation_response(websocket, client_id: str, data: Dict[str, Any]): """处理文件操作响应""" request_id = data.get('requestId') target_client_id = data.get('target_client_id') logger.info(f"收到文件操作响应: {request_id} -> {target_client_id}") if target_client_id and target_client_id in connected_clients: # 发送给指定客户端 target_websocket = connected_clients[target_client_id]['websocket'] await send_message(target_websocket, { 'type': 'file_operation_response', **data }) else: logger.error("文件操作响应没有指定目标客户端或目标客户端不存在") async def send_message(websocket, message: Dict[str, Any]): """发送消息到WebSocket""" try: await websocket.send(json.dumps(message)) except websockets.exceptions.ConnectionClosed: logger.warning("连接已关闭,无法发送消息") async def broadcast_to_room(room_id: str, message: Dict[str, Any], exclude_client: str = None): """向房间内所有客户端广播消息""" if room_id not in rooms: return room = rooms[room_id] for client_id in room.clients: if client_id != exclude_client and client_id in connected_clients: try: await send_message(connected_clients[client_id]['websocket'], message) except Exception as e: logger.error(f"广播消息失败: {e}") def run_http_server(): """运行HTTP服务器""" try: server = HTTPServer(('0.0.0.0', 80), WebSocketHTTPRequestHandler) logger.info("HTTP服务器启动在端口 80") server.serve_forever() except Exception as e: logger.error(f"HTTP服务器启动失败: {e}") async def run_websocket_server(): """运行WebSocket服务器""" try: # 使用新的函数签名,不传递path参数 server = await websockets.serve(handle_websocket, '0.0.0.0', 81) logger.info("WebSocket服务器启动在端口 81") await server.wait_closed() except Exception as e: logger.error(f"WebSocket服务器启动失败: {e}") async def main(): """主函数""" # 启动HTTP服务器(在单独线程中) http_thread = threading.Thread(target=run_http_server, daemon=True) http_thread.start() # 启动WebSocket服务器 await run_websocket_server() if __name__ == '__main__': logger.info("启动服务器...") # 检查端口权限 try: asyncio.run(main()) except PermissionError: logger.error("需要管理员权限才能绑定到80端口") logger.info("尝试使用8080端口...") # 可以在这里添加回退到其他端口的逻辑