627 lines
22 KiB
Python
627 lines
22 KiB
Python
import asyncio
|
||
import json
|
||
import logging
|
||
import uuid
|
||
import time
|
||
import os
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, Any, Set
|
||
import threading
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
import websockets
|
||
from urllib.parse import parse_qs, urlparse
|
||
import mimetypes
|
||
import re
|
||
|
||
# 设置日志
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 存储房间信息
|
||
rooms = {}
|
||
connected_clients = {}
|
||
|
||
ws_port = 81 # ws服务外部端口
|
||
|
||
# 静态文件目录
|
||
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
|
||
if not os.path.exists(STATIC_DIR):
|
||
os.makedirs(STATIC_DIR)
|
||
logger.info(f"创建静态文件目录: {STATIC_DIR}")
|
||
|
||
class Room:
|
||
def __init__(self, room_id: str, server_host: str):
|
||
self.room_id = room_id
|
||
self.created_at = datetime.now()
|
||
self.last_activity = datetime.now()
|
||
self.clients: Set[str] = set()
|
||
|
||
# 从host中移除端口号
|
||
host_without_port = re.sub(r':\d+$', '', server_host)
|
||
|
||
# 使用80端口
|
||
self.ws_url = f"ws://{host_without_port}:{ws_port}"
|
||
self.frontend_url = f"http://{server_host}/?id={room_id}&ws={self.ws_url}"
|
||
|
||
def add_client(self, client_id: str):
|
||
self.clients.add(client_id)
|
||
self.last_activity = datetime.now()
|
||
logger.info(f"客户端 {client_id} 加入房间 {self.room_id}, 当前客户端数: {len(self.clients)}")
|
||
|
||
def remove_client(self, client_id: str):
|
||
if client_id in self.clients:
|
||
self.clients.remove(client_id)
|
||
self.last_activity = datetime.now()
|
||
logger.info(f"客户端 {client_id} 离开房间 {self.room_id}, 剩余客户端数: {len(self.clients)}")
|
||
|
||
def is_empty(self) -> bool:
|
||
return len(self.clients) == 0
|
||
|
||
def to_dict(self) -> Dict[str, Any]:
|
||
return {
|
||
'room_id': self.room_id,
|
||
'frontend_url': self.frontend_url,
|
||
'ws_url': self.ws_url,
|
||
'client_count': len(self.clients),
|
||
'created_at': self.created_at.isoformat(),
|
||
'last_activity': self.last_activity.isoformat()
|
||
}
|
||
|
||
def cleanup_empty_rooms():
|
||
"""定期清理空房间"""
|
||
while True:
|
||
time.sleep(300) # 每5分钟检查一次
|
||
current_time = datetime.now()
|
||
empty_rooms = []
|
||
|
||
for room_id, room in list(rooms.items()):
|
||
if room.is_empty() and current_time - room.last_activity > timedelta(minutes=10):
|
||
empty_rooms.append(room_id)
|
||
|
||
for room_id in empty_rooms:
|
||
if room_id in rooms:
|
||
del rooms[room_id]
|
||
logger.info(f"清理空房间: {room_id}")
|
||
|
||
# 启动清理线程
|
||
cleanup_thread = threading.Thread(target=cleanup_empty_rooms, daemon=True)
|
||
cleanup_thread.start()
|
||
|
||
class HTTPHandler(BaseHTTPRequestHandler):
|
||
def do_GET(self):
|
||
"""处理HTTP GET请求"""
|
||
try:
|
||
logger.info(f"收到GET请求: {self.path} from {self.client_address[0]}")
|
||
logger.info(f"请求头: {dict(self.headers)}")
|
||
|
||
parsed_path = urlparse(self.path)
|
||
path = parsed_path.path
|
||
query_params = parse_qs(parsed_path.query)
|
||
|
||
# API路由
|
||
if path == '/api/room' and self.command == 'POST':
|
||
self.handle_create_room()
|
||
elif path.startswith('/api/room/') and self.command == 'GET':
|
||
room_id = path.split('/')[-1]
|
||
self.handle_get_room(room_id)
|
||
elif path == '/api/rooms' and self.command == 'GET':
|
||
self.handle_list_rooms()
|
||
# 根路径处理
|
||
elif path == '/':
|
||
self.handle_root_path(query_params)
|
||
# 静态文件服务
|
||
else:
|
||
self.handle_static_file(path)
|
||
except Exception as e:
|
||
logger.error(f"处理GET请求时发生错误: {e}")
|
||
self.send_error(500, f"Internal Server Error: {str(e)}")
|
||
|
||
def do_POST(self):
|
||
"""处理HTTP POST请求"""
|
||
try:
|
||
logger.info(f"收到POST请求: {self.path} from {self.client_address[0]}")
|
||
|
||
parsed_path = urlparse(self.path)
|
||
path = parsed_path.path
|
||
|
||
if path == '/api/room':
|
||
self.handle_create_room()
|
||
else:
|
||
self.send_error(404, "Not Found")
|
||
except Exception as e:
|
||
logger.error(f"处理POST请求时发生错误: {e}")
|
||
self.send_error(500, f"Internal Server Error: {str(e)}")
|
||
|
||
def handle_root_path(self, query_params: Dict[str, Any]):
|
||
"""处理根路径请求"""
|
||
room_id = query_params.get('id', [None])[0]
|
||
ws_url = query_params.get('ws', [None])[0]
|
||
|
||
if room_id:
|
||
# 有房间ID参数,直接返回前端页面
|
||
logger.info(f"请求根路径,有房间ID: {room_id}")
|
||
self.serve_static_file('/index.html')
|
||
else:
|
||
# 没有房间ID,创建新房间并重定向
|
||
try:
|
||
# 生成唯一房间ID
|
||
room_id = str(uuid.uuid4())[:8]
|
||
|
||
# 获取服务器主机地址并移除端口号
|
||
host = self.headers.get('Host', 'localhost')
|
||
host_without_port = re.sub(r':\d+$', '', host)
|
||
|
||
# 创建房间
|
||
room = Room(room_id, host_without_port)
|
||
rooms[room_id] = room
|
||
|
||
logger.info(f"通过根路径创建新房间: {room_id}")
|
||
|
||
# 重定向到带房间ID和WebSocket URL的URL
|
||
redirect_url = f'/?id={room_id}&ws=ws://{host_without_port}:{ws_port}'
|
||
self.send_response(302)
|
||
self.send_header('Location', redirect_url)
|
||
self.end_headers()
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建房间失败: {e}")
|
||
self.send_error(500, str(e))
|
||
|
||
def handle_static_file(self, path: str):
|
||
"""处理静态文件请求"""
|
||
logger.info(f"处理静态文件请求: {path}")
|
||
|
||
# 安全检查:防止路径遍历攻击
|
||
if '..' in path:
|
||
logger.warning(f"检测到可疑路径: {path}")
|
||
self.send_error(403, "Forbidden: Path traversal not allowed")
|
||
return
|
||
|
||
# 规范化路径
|
||
if path == '/':
|
||
path = '/index.html'
|
||
|
||
# 移除开头的斜杠
|
||
file_path = path.lstrip('/')
|
||
if not file_path:
|
||
file_path = 'index.html'
|
||
|
||
# 构建完整文件路径
|
||
full_path = os.path.join(STATIC_DIR, file_path)
|
||
logger.info(f"尝试访问文件: {full_path}")
|
||
|
||
# 如果是目录,尝试查找index.html
|
||
if os.path.isdir(full_path):
|
||
index_path = os.path.join(full_path, 'index.html')
|
||
if os.path.exists(index_path):
|
||
full_path = index_path
|
||
logger.info(f"重定向到目录索引文件: {index_path}")
|
||
else:
|
||
logger.warning(f"目录不存在索引文件: {full_path}")
|
||
self.send_error(404, "Directory index not found")
|
||
return
|
||
|
||
# 检查文件是否存在且是普通文件
|
||
if not os.path.exists(full_path):
|
||
logger.warning(f"文件不存在: {full_path}")
|
||
self.send_error(404, f"File not found: {path}")
|
||
return
|
||
|
||
if not os.path.isfile(full_path):
|
||
logger.warning(f"路径不是文件: {full_path}")
|
||
self.send_error(403, "Not a file")
|
||
return
|
||
|
||
try:
|
||
# 读取文件内容
|
||
with open(full_path, 'rb') as f:
|
||
content = f.read()
|
||
|
||
# 获取MIME类型
|
||
mime_type, encoding = mimetypes.guess_type(full_path)
|
||
if mime_type is None:
|
||
mime_type = 'application/octet-stream'
|
||
|
||
logger.info(f"成功读取文件: {full_path}, 大小: {len(content)} bytes, MIME类型: {mime_type}")
|
||
|
||
# 发送响应头
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', mime_type)
|
||
self.send_header('Content-Length', str(len(content)))
|
||
|
||
# 添加缓存控制头(可选)
|
||
self.send_header('Cache-Control', 'public, max-age=3600') # 缓存1小时
|
||
|
||
self.end_headers()
|
||
|
||
# 发送文件内容
|
||
self.wfile.write(content)
|
||
logger.info(f"文件发送完成: {full_path}")
|
||
|
||
except Exception as e:
|
||
logger.error(f"读取或发送文件失败: {e}")
|
||
self.send_error(500, f"Error reading file: {str(e)}")
|
||
|
||
def serve_static_file(self, path: str):
|
||
"""服务静态文件(内部方法)"""
|
||
self.handle_static_file(path)
|
||
|
||
def handle_create_room(self):
|
||
"""创建新房间"""
|
||
try:
|
||
# 生成唯一房间ID
|
||
room_id = str(uuid.uuid4())[:8]
|
||
|
||
# 获取服务器主机地址并移除端口号
|
||
host = self.headers.get('Host', 'localhost')
|
||
host_without_port = re.sub(r':\d+$', '', host)
|
||
|
||
# 创建房间
|
||
room = Room(room_id, host_without_port)
|
||
rooms[room_id] = room
|
||
|
||
logger.info(f"创建新房间: {room_id}")
|
||
|
||
response = {
|
||
'success': True,
|
||
'room_id': room_id,
|
||
'frontend_url': room.frontend_url,
|
||
'ws_url': room.ws_url
|
||
}
|
||
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'application/json')
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.end_headers()
|
||
self.wfile.write(json.dumps(response).encode())
|
||
|
||
except Exception as e:
|
||
logger.error(f"创建房间失败: {e}")
|
||
self.send_error(500, str(e))
|
||
|
||
def handle_get_room(self, room_id: str):
|
||
"""获取房间信息"""
|
||
try:
|
||
if room_id not in rooms:
|
||
self.send_error(404, '房间不存在')
|
||
return
|
||
|
||
room = rooms[room_id]
|
||
response = {'success': True, 'data': room.to_dict()}
|
||
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'application/json')
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.end_headers()
|
||
self.wfile.write(json.dumps(response).encode())
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取房间信息失败: {e}")
|
||
self.send_error(500, str(e))
|
||
|
||
def handle_list_rooms(self):
|
||
"""列出所有活跃房间"""
|
||
try:
|
||
active_rooms = [room.to_dict() for room in rooms.values() if not room.is_empty()]
|
||
|
||
response = {
|
||
'success': True,
|
||
'data': {
|
||
'total_rooms': len(active_rooms),
|
||
'rooms': active_rooms
|
||
}
|
||
}
|
||
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'application/json')
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.end_headers()
|
||
self.wfile.write(json.dumps(response).encode())
|
||
|
||
except Exception as e:
|
||
logger.error(f"获取房间列表失败: {e}")
|
||
self.send_error(500, str(e))
|
||
|
||
def log_message(self, format, *args):
|
||
"""重写日志方法"""
|
||
logger.info("%s - - [%s] %s" % (self.client_address[0],
|
||
self.log_date_time_string(),
|
||
format % args))
|
||
|
||
class WebSocketHTTPRequestHandler(HTTPHandler):
|
||
"""支持WebSocket升级的HTTP请求处理器"""
|
||
|
||
def do_GET(self):
|
||
"""处理GET请求,支持WebSocket升级"""
|
||
if self.headers.get('Upgrade') == 'websocket':
|
||
self.handle_websocket_upgrade()
|
||
else:
|
||
super().do_GET()
|
||
|
||
def handle_websocket_upgrade(self):
|
||
"""处理WebSocket升级请求"""
|
||
# 这里我们只是记录日志,实际的WebSocket处理在websockets库中完成
|
||
logger.info(f"WebSocket连接请求: {self.path}")
|
||
self.send_error(426, "WebSocket upgrade required") # 这个错误不会被触发,因为websockets库会拦截请求
|
||
|
||
async def handle_websocket(websocket):
|
||
"""处理WebSocket连接 - 修复版本,移除了path参数"""
|
||
client_id = str(uuid.uuid4())
|
||
room_id = None
|
||
client_type = 'unknown'
|
||
|
||
try:
|
||
# 发送连接成功消息
|
||
await send_message(websocket, {
|
||
'type': 'connected',
|
||
'message': '连接成功',
|
||
'client_id': client_id,
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
|
||
logger.info(f"客户端连接: {client_id}")
|
||
|
||
# 处理消息
|
||
async for message in websocket:
|
||
try:
|
||
data = json.loads(message)
|
||
# 处理消息并获取房间和客户端类型信息
|
||
result = await handle_websocket_message(websocket, client_id, data)
|
||
if result:
|
||
room_id, client_type = result
|
||
except json.JSONDecodeError:
|
||
logger.error(f"消息格式错误: {message}")
|
||
except Exception as e:
|
||
logger.error(f"处理消息错误: {e}")
|
||
|
||
except websockets.exceptions.ConnectionClosed:
|
||
logger.info(f"客户端断开连接: {client_id}")
|
||
finally:
|
||
# 清理连接
|
||
if client_id in connected_clients:
|
||
info = connected_clients.pop(client_id)
|
||
room_id = info.get('room_id')
|
||
logger.info(f"清理客户端: {client_id}, 房间: {room_id}")
|
||
|
||
if room_id and room_id in rooms:
|
||
room = rooms[room_id]
|
||
room.remove_client(client_id)
|
||
|
||
# 通知房间内其他客户端
|
||
await broadcast_to_room(room_id, {
|
||
'type': 'user_left',
|
||
'client_id': client_id,
|
||
'message': '用户离开房间',
|
||
'room_size': len(room.clients),
|
||
'timestamp': datetime.now().isoformat()
|
||
}, exclude_client=client_id)
|
||
|
||
# 额外检查:即使不在 connected_clients 中,如果还在房间里也要移除
|
||
elif room_id and room_id in rooms:
|
||
room = rooms[room_id]
|
||
room.remove_client(client_id)
|
||
|
||
# 通知房间内其他客户端
|
||
await broadcast_to_room(room_id, {
|
||
'type': 'user_left',
|
||
'client_id': client_id,
|
||
'message': '用户离开房间',
|
||
'room_size': len(room.clients),
|
||
'timestamp': datetime.now().isoformat()
|
||
}, exclude_client=client_id)
|
||
|
||
async def handle_websocket_message(websocket, client_id: str, data: Dict[str, Any]):
|
||
"""处理WebSocket消息"""
|
||
message_type = data.get('type')
|
||
if message_type == 'join_room':
|
||
return await handle_join_room(websocket, client_id, data)
|
||
elif message_type == 'file_operation':
|
||
await handle_file_operation(websocket, client_id, data)
|
||
elif message_type == 'file_operation_response':
|
||
await handle_file_operation_response(websocket, client_id, data)
|
||
elif message_type == 'leave_room':
|
||
await handle_leave_room(websocket, client_id, data)
|
||
elif message_type == 'ping':
|
||
await send_message(websocket, {
|
||
'type': 'pong',
|
||
'message': 'pong',
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
else:
|
||
logger.warning(f"未知消息类型: {message_type}")
|
||
|
||
async def handle_join_room(websocket, client_id: str, data: Dict[str, Any]):
|
||
"""处理加入房间请求"""
|
||
room_id = data.get('room_id')
|
||
client_type = data.get('client_type', 'unknown')
|
||
|
||
if not room_id:
|
||
await send_message(websocket, {
|
||
'type': 'error',
|
||
'message': '房间ID不能为空'
|
||
})
|
||
return
|
||
|
||
if room_id not in rooms:
|
||
await send_message(websocket, {
|
||
'type': 'error',
|
||
'message': '房间不存在'
|
||
})
|
||
return
|
||
|
||
room = rooms[room_id]
|
||
room.add_client(client_id)
|
||
|
||
# 存储客户端信息
|
||
connected_clients[client_id] = {
|
||
'websocket': websocket,
|
||
'room_id': room_id,
|
||
'client_type': client_type
|
||
}
|
||
|
||
logger.info(f"客户端 {client_id} ({client_type}) 加入房间 {room_id}")
|
||
|
||
# 发送加入成功消息
|
||
await send_message(websocket, {
|
||
'type': 'joined_room',
|
||
'room_id': room_id,
|
||
'message': f'成功加入房间 {room_id}',
|
||
'client_count': len(room.clients),
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
|
||
# 通知房间内其他客户端
|
||
await broadcast_to_room(room_id, {
|
||
'type': 'user_joined',
|
||
'client_id': client_id,
|
||
'client_type': client_type,
|
||
'message': '新用户加入房间',
|
||
'room_size': len(room.clients),
|
||
'timestamp': datetime.now().isoformat()
|
||
}, exclude_client=client_id)
|
||
|
||
# 返回房间ID和客户端类型,用于finally块清理
|
||
return room_id, client_type
|
||
|
||
async def handle_leave_room(websocket, client_id: str, data: Dict[str, Any]):
|
||
"""处理离开房间请求"""
|
||
room_id = data.get('room_id')
|
||
|
||
if not room_id:
|
||
room_id = connected_clients.get(client_id, {}).get('room_id')
|
||
|
||
if room_id and room_id in rooms:
|
||
room = rooms[room_id]
|
||
room.remove_client(client_id)
|
||
|
||
# 从连接客户端中移除
|
||
if client_id in connected_clients:
|
||
del connected_clients[client_id]
|
||
|
||
# 通知房间内其他客户端
|
||
await broadcast_to_room(room_id, {
|
||
'type': 'user_left',
|
||
'client_id': client_id,
|
||
'message': '用户离开房间',
|
||
'room_size': len(room.clients),
|
||
'timestamp': datetime.now().isoformat()
|
||
}, exclude_client=client_id)
|
||
|
||
logger.info(f"客户端 {client_id} 主动离开房间 {room_id}")
|
||
|
||
async def handle_file_operation(websocket, client_id: str, data: Dict[str, Any]):
|
||
"""处理文件操作请求"""
|
||
room_id = data.get('room_id')
|
||
request_id = data.get('requestId')
|
||
operation_type = data.get('operation_type')
|
||
|
||
if not room_id or room_id not in rooms:
|
||
await send_message(websocket, {
|
||
'type': 'error',
|
||
'message': '无效的房间ID',
|
||
'requestId': request_id
|
||
})
|
||
return
|
||
|
||
# 记录操作
|
||
logger.info(f"收到文件操作请求: {operation_type} from {client_id} in room {room_id}")
|
||
|
||
# 查找文件客户端
|
||
file_clients = []
|
||
for cid, client_info in connected_clients.items():
|
||
if client_info['room_id'] == room_id and cid != client_id:
|
||
if client_info.get('client_type') == 'file_client':
|
||
file_clients.append(client_info['websocket'])
|
||
|
||
if file_clients:
|
||
# 转发给文件客户端
|
||
data['sender_id'] = client_id
|
||
await send_message(file_clients[0], {
|
||
'type': 'file_operation_request',
|
||
**data
|
||
})
|
||
else:
|
||
# 没有文件客户端,返回错误
|
||
await send_message(websocket, {
|
||
'type': 'file_operation_response',
|
||
'requestId': request_id,
|
||
'success': False,
|
||
'error': '房间内没有文件客户端',
|
||
'timestamp': datetime.now().isoformat()
|
||
})
|
||
|
||
async def handle_file_operation_response(websocket, client_id: str, data: Dict[str, Any]):
|
||
"""处理文件操作响应"""
|
||
request_id = data.get('requestId')
|
||
target_client_id = data.get('target_client_id')
|
||
|
||
logger.info(f"收到文件操作响应: {request_id} -> {target_client_id}")
|
||
|
||
if target_client_id and target_client_id in connected_clients:
|
||
# 发送给指定客户端
|
||
target_websocket = connected_clients[target_client_id]['websocket']
|
||
await send_message(target_websocket, {
|
||
'type': 'file_operation_response',
|
||
**data
|
||
})
|
||
else:
|
||
logger.error("文件操作响应没有指定目标客户端或目标客户端不存在")
|
||
|
||
async def send_message(websocket, message: Dict[str, Any]):
|
||
"""发送消息到WebSocket"""
|
||
try:
|
||
await websocket.send(json.dumps(message))
|
||
except websockets.exceptions.ConnectionClosed:
|
||
logger.warning("连接已关闭,无法发送消息")
|
||
|
||
async def broadcast_to_room(room_id: str, message: Dict[str, Any], exclude_client: str = None):
|
||
"""向房间内所有客户端广播消息"""
|
||
if room_id not in rooms:
|
||
return
|
||
|
||
room = rooms[room_id]
|
||
for client_id in room.clients:
|
||
if client_id != exclude_client and client_id in connected_clients:
|
||
try:
|
||
await send_message(connected_clients[client_id]['websocket'], message)
|
||
except Exception as e:
|
||
logger.error(f"广播消息失败: {e}")
|
||
|
||
def run_http_server():
|
||
"""运行HTTP服务器"""
|
||
try:
|
||
server = HTTPServer(('0.0.0.0', 80), WebSocketHTTPRequestHandler)
|
||
logger.info("HTTP服务器启动在端口 80")
|
||
server.serve_forever()
|
||
except Exception as e:
|
||
logger.error(f"HTTP服务器启动失败: {e}")
|
||
|
||
async def run_websocket_server():
|
||
"""运行WebSocket服务器"""
|
||
try:
|
||
# 使用新的函数签名,不传递path参数
|
||
server = await websockets.serve(handle_websocket, '0.0.0.0', 81)
|
||
logger.info("WebSocket服务器启动在端口 81")
|
||
await server.wait_closed()
|
||
except Exception as e:
|
||
logger.error(f"WebSocket服务器启动失败: {e}")
|
||
|
||
async def main():
|
||
"""主函数"""
|
||
# 启动HTTP服务器(在单独线程中)
|
||
http_thread = threading.Thread(target=run_http_server, daemon=True)
|
||
http_thread.start()
|
||
|
||
# 启动WebSocket服务器
|
||
await run_websocket_server()
|
||
|
||
if __name__ == '__main__':
|
||
logger.info("启动服务器...")
|
||
|
||
# 检查端口权限
|
||
try:
|
||
asyncio.run(main())
|
||
except PermissionError:
|
||
logger.error("需要管理员权限才能绑定到80端口")
|
||
logger.info("尝试使用8080端口...")
|
||
# 可以在这里添加回退到其他端口的逻辑 |