Files
computer-craft-web-file/PyServer/main.py
2025-12-05 19:02:19 +08:00

383 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import json
import logging
import uuid
import time
import os
from datetime import datetime
from typing import Dict, Any, List
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
import mimetypes
import re
# 设置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 存储房间信息
rooms = {}
# 前端到客户端的消息队列
frontend_to_client_queues = {}
# 客户端到前端的消息队列
client_to_frontend_queues = {}
# 静态文件目录
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
if not os.path.exists(STATIC_DIR):
os.makedirs(STATIC_DIR)
logger.info(f"创建静态文件目录: {STATIC_DIR}")
class Room:
def __init__(self, room_id: str, server_host: str):
self.room_id = room_id
self.created_at = datetime.now()
# 从host中移除端口号
host_without_port = re.sub(r':\d+$', '', server_host)
self.frontend_url = f"http://{server_host}/?id={room_id}"
def to_dict(self) -> Dict[str, Any]:
return {
'room_id': self.room_id,
'frontend_url': self.frontend_url,
'created_at': self.created_at.isoformat()
}
def get_frontend_to_client_queue(room_id: str) -> List[Dict[str, Any]]:
if room_id not in frontend_to_client_queues:
frontend_to_client_queues[room_id] = []
return frontend_to_client_queues[room_id]
def get_client_to_frontend_queue(room_id: str) -> List[Dict[str, Any]]:
if room_id not in client_to_frontend_queues:
client_to_frontend_queues[room_id] = []
return client_to_frontend_queues[room_id]
class HTTPHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""处理HTTP GET请求"""
try:
parsed_path = urlparse(self.path)
path = parsed_path.path
query_params = parse_qs(parsed_path.query)
# API路由
if path == '/api/room' and self.command == 'POST':
self.handle_create_room()
elif path.startswith('/api/room/') and self.command == 'GET':
room_id = path.split('/')[-1]
self.handle_get_room(room_id)
elif path == '/api/rooms' and self.command == 'GET':
self.handle_list_rooms()
# 根路径处理
elif path == '/':
self.handle_root_path(query_params)
# 静态文件服务
else:
self.handle_static_file(path)
except Exception as e:
logger.error(f"处理GET请求时发生错误: {e}")
self.send_error(500, f"Internal Server Error: {str(e)}")
def do_POST(self):
"""处理HTTP POST请求"""
try:
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length) if content_length > 0 else b'{}'
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == '/api/room':
self.handle_create_room(post_data)
elif path == '/api/frontend/send':
self.handle_frontend_send_message(post_data)
elif path == '/api/frontend/receive':
self.handle_frontend_receive_message(post_data)
elif path == '/api/client/send':
self.handle_client_send_message(post_data)
elif path == '/api/client/receive':
self.handle_client_receive_message(post_data)
else:
self.send_error(404, "Not Found")
except Exception as e:
logger.error(f"处理POST请求时发生错误: {e}")
self.send_error(500, f"Internal Server Error: {str(e)}")
def handle_root_path(self, query_params: Dict[str, Any]):
"""处理根路径请求"""
room_id = query_params.get('id', [None])[0]
if room_id:
# 有房间ID参数直接返回前端页面
logger.info(f"请求根路径有房间ID: {room_id}")
self.serve_static_file('/index.html')
else:
# 没有房间ID创建新房间并重定向
try:
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 获取服务器主机地址并移除端口号
host = self.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
logger.info(f"通过根路径创建新房间: {room_id}")
# 重定向到带房间ID的URL
redirect_url = f'/?id={room_id}'
self.send_response(302)
self.send_header('Location', redirect_url)
self.end_headers()
except Exception as e:
logger.error(f"创建房间失败: {e}")
self.send_error(500, str(e))
def handle_static_file(self, path: str):
"""处理静态文件请求"""
# 安全检查:防止路径遍历攻击
if '..' in path:
self.send_error(403, "Forbidden: Path traversal not allowed")
return
# 规范化路径
if path == '/':
path = '/index.html'
# 移除开头的斜杠
file_path = path.lstrip('/')
if not file_path:
file_path = 'index.html'
# 构建完整文件路径
full_path = os.path.join(STATIC_DIR, file_path)
# 如果是目录尝试查找index.html
if os.path.isdir(full_path):
index_path = os.path.join(full_path, 'index.html')
if os.path.exists(index_path):
full_path = index_path
else:
self.send_error(404, "Directory index not found")
return
# 检查文件是否存在且是普通文件
if not os.path.exists(full_path) or not os.path.isfile(full_path):
self.send_error(404, f"File not found: {path}")
return
try:
# 读取文件内容
with open(full_path, 'rb') as f:
content = f.read()
# 获取MIME类型
mime_type, encoding = mimetypes.guess_type(full_path)
if mime_type is None:
mime_type = 'application/octet-stream'
# 发送响应头
self.send_response(200)
self.send_header('Content-Type', mime_type)
self.send_header('Content-Length', str(len(content)))
self.send_header('Cache-Control', 'public, max-age=3600')
self.end_headers()
# 发送文件内容
self.wfile.write(content)
except Exception as e:
logger.error(f"读取或发送文件失败: {e}")
self.send_error(500, f"Error reading file: {str(e)}")
def serve_static_file(self, path: str):
"""服务静态文件(内部方法)"""
self.handle_static_file(path)
def handle_create_room(self, post_data=None):
"""创建新房间"""
try:
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 获取服务器主机地址并移除端口号
host = self.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
logger.info(f"创建新房间: {room_id}")
response = {
'success': True,
'room_id': room_id,
'frontend_url': room.frontend_url
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"创建房间失败: {e}")
self.send_error(500, str(e))
def handle_frontend_send_message(self, post_data):
"""前端发送消息到客户端"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
message = data.get('message')
if not room_id or not message:
self.send_error(400, "需要room_id和message参数")
return
queue = get_frontend_to_client_queue(room_id)
queue.append(message)
response = {
'success': True,
'message': '消息已发送到客户端队列'
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"前端发送消息失败: {e}")
self.send_error(500, str(e))
def handle_frontend_receive_message(self, post_data):
"""前端接收来自客户端的消息"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
if not room_id:
self.send_error(400, "需要room_id参数")
return
queue = get_client_to_frontend_queue(room_id)
if queue:
# 返回队列中的第一个消息
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
# 没有消息
response = {
'success': True,
'message': None
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"前端接收消息失败: {e}")
self.send_error(500, str(e))
def handle_client_send_message(self, post_data):
"""客户端发送消息到前端"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
message = data.get('message')
if not room_id or not message:
self.send_error(400, "需要room_id和message参数")
return
queue = get_client_to_frontend_queue(room_id)
queue.append(message)
response = {
'success': True,
'message': '消息已发送到前端队列'
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"客户端发送消息失败: {e}")
self.send_error(500, str(e))
def handle_client_receive_message(self, post_data):
"""客户端接收来自前端的消息"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
if not room_id:
self.send_error(400, "需要room_id参数")
return
queue = get_frontend_to_client_queue(room_id)
if queue:
# 返回队列中的第一个消息
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
# 没有消息
response = {
'success': True,
'message': None
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"客户端接收消息失败: {e}")
self.send_error(500, str(e))
def log_message(self, format, *args):
"""重写日志方法"""
logger.info("%s - - [%s] %s" % (self.client_address[0],
self.log_date_time_string(),
format % args))
def run_http_server():
"""运行HTTP服务器"""
try:
server = HTTPServer(('0.0.0.0', 80), HTTPHandler)
logger.info("HTTP服务器启动在端口 80")
server.serve_forever()
except Exception as e:
logger.error(f"HTTP服务器启动失败: {e}")
if __name__ == '__main__':
logger.info("启动服务器...")
run_http_server()