将http轮询改为 长轮询请求挂起

This commit is contained in:
nnwang
2025-12-06 01:02:28 +08:00
parent e5708e10fb
commit 83822ae165
5 changed files with 506 additions and 390 deletions

View File

@@ -25,7 +25,7 @@ function table_to_json(t, indent)
indent = indent or 0
local spaces = string.rep(" ", indent)
local result = {}
if type(t) ~= "table" then
if type(t) == "string" then
-- 正确转义所有特殊字符
@@ -48,7 +48,7 @@ function table_to_json(t, indent)
return '"' .. tostring(t) .. '"'
end
end
-- 检查是否是数组
local is_array = true
local max_index = 0
@@ -62,12 +62,12 @@ function table_to_json(t, indent)
max_index = k
end
end
-- 空表当作对象处理
if count == 0 then
is_array = false
end
if is_array then
-- 处理数组
table.insert(result, "[")
@@ -102,7 +102,7 @@ function table_to_json(t, indent)
table.insert(result, "}")
end
end
return table.concat(result, indent > 0 and "\n" .. spaces or "")
end
@@ -113,23 +113,30 @@ end
local function httpPost(path, data)
local jsonData = table_to_json(data)
local url = httpServer .. path
local response = http.post(url, jsonData, {
["Content-Type"] = "application/json"
-- 使用长轮询设置超时时间为300秒
local response,err = http.post({
url = url,
body = jsonData,
method = "POST",
headers = {
["Content-Type"] = "application/json"
},
timeout = 300 -- 300秒超时
})
if not response then
return nil, "无法连接到服务器"
return nil, "0 "..err
end
local responseBody = response.readAll()
response.close()
local ok, result = pcall(textutils.unserialiseJSON, responseBody)
if ok then
return result
else
return nil, "无效的JSON响应"
return nil, "1无效的JSON响应: " .. responseBody
end
end
@@ -148,7 +155,7 @@ local function getFiles(currentPath, result, prefix)
local computerPrefix = "computer_" .. computerID
local fullPrefix = currentPath == "" and prefix:sub(1, -2) or prefix .. currentPath
local absPath = "/" .. (currentPath == "" and "" or currentPath)
if fs.isDir(absPath) then
result[fullPrefix] = { isFolder = true }
for _, entry in ipairs(fs.list(absPath)) do
@@ -292,10 +299,13 @@ end
local function sendResponse(response)
if response and roomId then
httpPost("/api/client/send", {
local result, err = httpPost("/api/client/send", {
room_id = roomId,
message = response
})
if not result then
log("3 发送响应失败: " .. tostring(err))
end
end
end
@@ -305,52 +315,56 @@ local function pollMessages()
sleep(pollInterval)
break
end
local result, err = httpPost("/api/client/receive", {
room_id = roomId
})
if result and result.success and result.message then
if result and result.success then
local msg = result.message
local msgType = msg.type
if msgType == "file_operation" or msgType == "file_operation_request" then
local op = msg.operation_type or msg.type
local data = msg.data or {}
local reqId = msg.requestId or msg.request_id
local sender = msg.sender_id
local response
if op == "fetch_files" then
response = handleFetchFiles(reqId, sender)
elseif op == "create_or_save_file" then
response = handleSaveFile(data, reqId, sender)
elseif op == "new_file" then
response = handleCreateFile(data, reqId, sender)
elseif op == "new_folder" then
response = handleCreateFolder(data, reqId, sender)
elseif op == "rename" then
response = handleRename(data, reqId, sender)
elseif op == "delete_file" then
response = handleDelete(data, reqId, sender)
else
response = {
type = "file_operation_response",
requestId = reqId,
success = false,
error = "Unknown operation: " .. tostring(op),
target_client_id = sender
}
log(msg)
if msg then
local msgType = msg.type
if msgType == "file_operation" or msgType == "file_operation_request" then
local op = msg.operation_type or msg.type
local data = msg.data or {}
local reqId = msg.requestId or msg.request_id
local sender = msg.sender_id
local response
if op == "fetch_files" then
response = handleFetchFiles(reqId, sender)
elseif op == "create_or_save_file" then
response = handleSaveFile(data, reqId, sender)
elseif op == "new_file" then
response = handleCreateFile(data, reqId, sender)
elseif op == "new_folder" then
response = handleCreateFolder(data, reqId, sender)
elseif op == "rename" then
response = handleRename(data, reqId, sender)
elseif op == "delete_file" then
response = handleDelete(data, reqId, sender)
else
response = {
type = "file_operation_response",
requestId = reqId,
success = false,
error = "Unknown operation: " .. tostring(op),
target_client_id = sender
}
end
sendResponse(response)
end
sendResponse(response)
end
elseif err then
log("轮询错误: " .. tostring(err))
log("2 轮询错误: " .. tostring(err))
-- 如果是连接错误,稍后再试
sleep(5)
end
sleep(pollInterval)
end
end
@@ -369,7 +383,7 @@ local function main()
os.queueEvent("mouse_click",1,1,1)
-- 启动消息轮询
mainFrame:addThread():start(pollMessages)
log("客户端已启动。房间ID: " .. roomId)
log("计算机ID: " .. computerID)
log("按 Q 退出")

View File

@@ -1,13 +0,0 @@
import tomllib
# 您的 TOML 字符串
toml_str = '''{ a = "你好", "a-c-v" = "你好", b = "世界", }'''
# 反序列化
data = tomllib.loads(toml_str)
print(data)
# 输出: {'a': '你好', 'a-c-v': '你好', 'b': '世界'}
# 访问数据
print(data['a']) # 输出: 你好
print(data['a-c-v']) # 输出: 你好

View File

@@ -2,7 +2,7 @@ import type { Files } from 'monaco-tree-editor'
let roomId: string | null = null
let serverUrl: string | null = null
let pollIntervalMs = 1000
let pollIntervalMs = 100
let isPolling = false
let pollingTimeout: number | null = null
@@ -32,29 +32,6 @@ function getParamsFromUrl(): { roomId: string | null } {
return { roomId }
}
async function httpPost(path: string, data: any): Promise<any> {
const url = `${serverUrl}${path}`
try {
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify(data),
})
if (!response.ok) {
throw new Error(`HTTP错误: ${response.status} ${response.statusText}`)
}
return await response.json()
} catch (error) {
console.error('HTTP请求失败:', error)
throw error
}
}
export async function initWebSocketConnection(): Promise<void> {
const params = getParamsFromUrl()
roomId = params.roomId
@@ -67,6 +44,17 @@ export async function initWebSocketConnection(): Promise<void> {
console.log('HTTP连接已初始化服务器:', serverUrl)
console.log('房间ID:', roomId)
// 处理挂起的初始请求
for (const request of pendingInitialRequests) {
try {
const result = await sendFileOperationInternal(request.operation, request.data)
request.resolve(result)
} catch (error) {
request.reject(error as Error)
}
}
pendingInitialRequests.length = 0
startPolling()
return Promise.resolve()
}
@@ -86,18 +74,33 @@ function stopPolling() {
}
async function pollForResponses() {
if (!isPolling || !roomId) return
if (!isPolling || !roomId || !serverUrl) return
try {
const response = await httpPost('/api/frontend/receive', {
room_id: roomId,
const response = await fetch(`${serverUrl}/api/frontend/receive`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
room_id: roomId,
}),
})
if (response.success && response.message) {
handleMessage(response.message)
if (response.ok) {
const data = await response.json()
if (data.success && data.message) {
handleMessage(data.message)
}
} else {
console.error('轮询请求失败:', response.status)
// 短暂的延迟后重试
await new Promise((resolve) => setTimeout(resolve, 2000))
}
} catch (error) {
console.error('轮询消息失败:', error)
// 网络错误,稍后重试
await new Promise((resolve) => setTimeout(resolve, 5000))
}
if (isPolling) {
@@ -130,8 +133,13 @@ function handleFileOperationResponse(data: any): void {
}
}
function sendFileOperationInternal(operationType: string, data?: any, timeoutMs: number = 30000): Promise<any> {
async function sendFileOperationInternal(operationType: string, data?: any, timeoutMs: number = 30000): Promise<any> {
return new Promise((resolve, reject) => {
if (!serverUrl || !roomId) {
reject(new Error('未初始化连接'))
return
}
const requestId = generateRequestId()
const timeout = window.setTimeout(() => {
@@ -143,21 +151,28 @@ function sendFileOperationInternal(operationType: string, data?: any, timeoutMs:
pendingRequests.set(requestId, { resolve, reject, timeout })
httpPost('/api/frontend/send', {
room_id: roomId,
message: {
type: 'file_operation',
requestId: requestId,
operation_type: operationType,
data: data,
room_id: roomId,
fetch(`${serverUrl}/api/frontend/send`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
},
body: JSON.stringify({
room_id: roomId,
message: {
type: 'file_operation',
requestId: requestId,
operation_type: operationType,
data: data,
room_id: roomId,
},
}),
})
.then((response) => {
if (!response.success) {
.then(async (response) => {
if (!response.ok) {
const errorText = await response.text()
pendingRequests.delete(requestId)
clearTimeout(timeout)
reject(new Error(response.message || '发送请求失败'))
reject(new Error(errorText || `发送请求失败: ${response.status}`))
}
})
.catch((error) => {

View File

@@ -3,9 +3,11 @@ import logging
import uuid
import time
import os
import asyncio
import aiohttp
from aiohttp import web
from datetime import datetime
from typing import Dict, Any, List
from http.server import HTTPServer, BaseHTTPRequestHandler
from urllib.parse import parse_qs, urlparse
import mimetypes
import re
@@ -20,6 +22,8 @@ rooms = {}
frontend_to_client_queues = {}
# 客户端到前端的消息队列
client_to_frontend_queues = {}
# 挂起的请求管理
pending_requests = {}
# 静态文件目录
STATIC_DIR = os.path.join(os.path.dirname(__file__), 'static')
@@ -31,10 +35,10 @@ class Room:
def __init__(self, room_id: str, server_host: str):
self.room_id = room_id
self.created_at = datetime.now()
# 从host中移除端口号
host_without_port = re.sub(r':\d+$', '', server_host)
self.frontend_url = f"http://{server_host}/?id={room_id}"
def to_dict(self) -> Dict[str, Any]:
@@ -54,326 +58,421 @@ def get_client_to_frontend_queue(room_id: str) -> List[Dict[str, Any]]:
client_to_frontend_queues[room_id] = []
return client_to_frontend_queues[room_id]
class HTTPHandler(BaseHTTPRequestHandler):
def do_GET(self):
"""处理HTTP GET请求"""
class HTTPHandler:
async def handle_create_room(self, request):
"""创建新房间"""
try:
parsed_path = urlparse(self.path)
path = parsed_path.path
query_params = parse_qs(parsed_path.query)
# API路由
if path == '/api/room' and self.command == 'POST':
self.handle_create_room()
elif path.startswith('/api/room/') and self.command == 'GET':
room_id = path.split('/')[-1]
self.handle_get_room(room_id)
elif path == '/api/rooms' and self.command == 'GET':
self.handle_list_rooms()
# 根路径处理
elif path == '/':
self.handle_root_path(query_params)
# 静态文件服务
else:
self.handle_static_file(path)
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 获取服务器主机地址并移除端口号
host = request.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
logger.info(f"创建新房间: {room_id}")
response = {
'success': True,
'room_id': room_id,
'frontend_url': room.frontend_url
}
return web.json_response(response)
except Exception as e:
logger.error(f"处理GET请求时发生错误: {e}")
self.send_error(500, f"Internal Server Error: {str(e)}")
def do_POST(self):
"""处理HTTP POST请求"""
logger.error(f"创建房间失败: {e}")
return web.json_response({'error': str(e)}, status=500)
async def handle_frontend_send_message(self, request):
"""前端发送消息到客户端"""
try:
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length) if content_length > 0 else b'{}'
data = await request.json()
room_id = data.get('room_id')
message_data = data.get('message')
if not room_id or not message_data:
return web.json_response({'error': '需要room_id和message参数'}, status=400)
queue = get_frontend_to_client_queue(room_id)
queue.append(message_data)
parsed_path = urlparse(self.path)
path = parsed_path.path
if path == '/api/room':
self.handle_create_room(post_data)
elif path == '/api/frontend/send':
self.handle_frontend_send_message(post_data)
elif path == '/api/frontend/receive':
self.handle_frontend_receive_message(post_data)
elif path == '/api/client/send':
self.handle_client_send_message(post_data)
elif path == '/api/client/receive':
self.handle_client_receive_message(post_data)
else:
self.send_error(404, "Not Found")
# 检查是否有挂起的客户端请求并立即响应
if room_id in pending_requests and 'client' in pending_requests[room_id]:
client_req_id = pending_requests[room_id]['client']
if client_req_id in pending_requests:
pending_requests[client_req_id]['event'].set()
logger.info(f"立即响应挂起的客户端请求: {client_req_id}")
response = {
'success': True,
'message': '消息已发送到客户端队列'
}
return web.json_response(response)
except json.JSONDecodeError:
logger.error("JSON解析失败")
return web.json_response({'error': '无效的JSON数据'}, status=400)
except Exception as e:
logger.error(f"处理POST请求时发生错误: {e}")
self.send_error(500, f"Internal Server Error: {str(e)}")
def handle_root_path(self, query_params: Dict[str, Any]):
"""处理根路径请求"""
room_id = query_params.get('id', [None])[0]
if room_id:
# 有房间ID参数直接返回前端页面
logger.info(f"请求根路径有房间ID: {room_id}")
self.serve_static_file('/index.html')
else:
# 没有房间ID创建新房间并重定向
logger.error(f"前端发送消息失败: {e}")
return web.json_response({'error': str(e)}, status=500)
async def handle_frontend_receive_message(self, request):
"""前端接收来自客户端的消息(长轮询)"""
try:
data = await request.json()
room_id = data.get('room_id')
if not room_id:
return web.json_response({'error': '需要room_id参数'}, status=400)
queue = get_client_to_frontend_queue(room_id)
# 立即检查是否有消息
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
return web.json_response(response)
# 没有消息,设置长轮询
req_id = str(uuid.uuid4())
event = asyncio.Event()
# 存储挂起的请求
pending_requests[req_id] = {
'room_id': room_id,
'event': event,
'type': 'frontend',
'timestamp': time.time()
}
if room_id not in pending_requests:
pending_requests[room_id] = {}
pending_requests[room_id]['frontend'] = req_id
try:
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 等待295秒或直到有消息
await asyncio.wait_for(event.wait(), timeout=295)
# 获取服务器主机地址并移除端口号
host = self.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 检查队列中是否有消息
queue = get_client_to_frontend_queue(room_id)
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
# 超时返回空消息
response = {
'success': True,
'message': None
}
except asyncio.TimeoutError:
# 超时返回空消息
response = {
'success': True,
'message': None
}
# 清理挂起的请求
if req_id in pending_requests:
del pending_requests[req_id]
if room_id in pending_requests and 'frontend' in pending_requests[room_id]:
del pending_requests[room_id]['frontend']
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
return web.json_response(response)
except json.JSONDecodeError:
logger.error("JSON解析失败")
return web.json_response({'error': '无效的JSON数据'}, status=400)
except Exception as e:
logger.error(f"前端接收消息失败: {e}")
# 清理挂起的请求
if 'req_id' in locals() and req_id in pending_requests:
del pending_requests[req_id]
if room_id in pending_requests and 'frontend' in pending_requests[room_id]:
del pending_requests[room_id]['frontend']
return web.json_response({'error': str(e)}, status=500)
async def handle_client_send_message(self, request):
"""客户端发送消息到前端"""
try:
data = await request.json()
room_id = data.get('room_id')
message_data = data.get('message')
if not room_id or not message_data:
return web.json_response({'error': '需要room_id和message参数'}, status=400)
queue = get_client_to_frontend_queue(room_id)
queue.append(message_data)
# 检查是否有挂起的前端请求并立即响应
if room_id in pending_requests and 'frontend' in pending_requests[room_id]:
frontend_req_id = pending_requests[room_id]['frontend']
if frontend_req_id in pending_requests:
pending_requests[frontend_req_id]['event'].set()
logger.info(f"立即响应挂起的前端请求: {frontend_req_id}")
response = {
'success': True,
'message': '消息已发送到前端队列'
}
return web.json_response(response)
except json.JSONDecodeError:
logger.error("JSON解析失败")
return web.json_response({'error': '无效的JSON数据'}, status=400)
except Exception as e:
logger.error(f"客户端发送消息失败: {e}")
return web.json_response({'error': str(e)}, status=500)
async def handle_client_receive_message(self, request):
"""客户端接收来自前端的消息(长轮询)"""
try:
data = await request.json()
room_id = data.get('room_id')
if not room_id:
return web.json_response({'error': '需要room_id参数'}, status=400)
queue = get_frontend_to_client_queue(room_id)
# 立即检查是否有消息
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
return web.json_response(response)
# 没有消息,设置长轮询
req_id = str(uuid.uuid4())
event = asyncio.Event()
# 存储挂起的请求
pending_requests[req_id] = {
'room_id': room_id,
'event': event,
'type': 'client',
'timestamp': time.time()
}
if room_id not in pending_requests:
pending_requests[room_id] = {}
pending_requests[room_id]['client'] = req_id
try:
# 等待295秒或直到有消息
await asyncio.wait_for(event.wait(), timeout=295)
logger.info(f"通过根路径创建新房间: {room_id}")
# 检查队列中是否有消息
queue = get_frontend_to_client_queue(room_id)
if queue:
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
# 超时返回空消息
response = {
'success': True,
'message': None
}
except asyncio.TimeoutError:
# 超时返回空消息
response = {
'success': True,
'message': None
}
# 清理挂起的请求
if req_id in pending_requests:
del pending_requests[req_id]
if room_id in pending_requests and 'client' in pending_requests[room_id]:
del pending_requests[room_id]['client']
# 重定向到带房间ID的URL
redirect_url = f'/?id={room_id}'
self.send_response(302)
self.send_header('Location', redirect_url)
self.end_headers()
except Exception as e:
logger.error(f"创建房间失败: {e}")
self.send_error(500, str(e))
def handle_static_file(self, path: str):
return web.json_response(response)
except json.JSONDecodeError:
logger.error("JSON解析失败")
return web.json_response({'error': '无效的JSON数据'}, status=400)
except Exception as e:
logger.error(f"客户端接收消息失败: {e}")
# 清理挂起的请求
if 'req_id' in locals() and req_id in pending_requests:
del pending_requests[req_id]
if room_id in pending_requests and 'client' in pending_requests[room_id]:
del pending_requests[room_id]['client']
return web.json_response({'error': str(e)}, status=500)
async def handle_static_file(self, request):
"""处理静态文件请求"""
path = request.match_info.get('path', '')
# 安全检查:防止路径遍历攻击
if '..' in path:
self.send_error(403, "Forbidden: Path traversal not allowed")
return
return web.Response(text="Forbidden: Path traversal not allowed", status=403)
# 规范化路径
if path == '/':
path = '/index.html'
# 移除开头的斜杠
file_path = path.lstrip('/')
if not file_path:
file_path = 'index.html'
if path == '' or path == '/':
path = 'index.html'
# 构建完整文件路径
full_path = os.path.join(STATIC_DIR, file_path)
full_path = os.path.join(STATIC_DIR, path)
# 如果是目录尝试查找index.html
if os.path.isdir(full_path):
index_path = os.path.join(full_path, 'index.html')
if os.path.exists(index_path):
full_path = index_path
else:
self.send_error(404, "Directory index not found")
return
return web.Response(text="Directory index not found", status=404)
# 检查文件是否存在且是普通文件
if not os.path.exists(full_path) or not os.path.isfile(full_path):
self.send_error(404, f"File not found: {path}")
return
return web.Response(text=f"File not found: {path}", status=404)
try:
# 读取文件内容
with open(full_path, 'rb') as f:
content = f.read()
# 获取MIME类型
mime_type, encoding = mimetypes.guess_type(full_path)
if mime_type is None:
mime_type = 'application/octet-stream'
# 发送响应
self.send_response(200)
self.send_header('Content-Type', mime_type)
self.send_header('Content-Length', str(len(content)))
self.send_header('Cache-Control', 'public, max-age=3600')
self.end_headers()
# 发送文件内容
self.wfile.write(content)
# 发送响应
return web.Response(
body=content,
content_type=mime_type,
headers={
'Cache-Control': 'public, max-age=3600'
}
)
except Exception as e:
logger.error(f"读取或发送文件失败: {e}")
self.send_error(500, f"Error reading file: {str(e)}")
return web.Response(text=f"Error reading file: {str(e)}", status=500)
async def handle_root_path(self, request):
"""处理根路径请求"""
query_params = dict(request.query)
room_id = query_params.get('id')
if room_id:
# 有房间ID参数直接返回前端页面
logger.info(f"请求根路径有房间ID: {room_id}")
return await self.handle_static_file(request)
else:
# 没有房间ID创建新房间并重定向
try:
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 获取服务器主机地址并移除端口号
host = request.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
logger.info(f"通过根路径创建新房间: {room_id}")
# 重定向到带房间ID的URL
redirect_url = f'/?id={room_id}'
return web.HTTPFound(redirect_url)
except Exception as e:
logger.error(f"创建房间失败: {e}")
return web.Response(text=str(e), status=500)
async def create_app():
"""创建Web应用"""
handler = HTTPHandler()
def serve_static_file(self, path: str):
"""服务静态文件(内部方法)"""
self.handle_static_file(path)
app = web.Application()
def handle_create_room(self, post_data=None):
"""创建新房间"""
# API路由
app.router.add_post('/api/room', handler.handle_create_room)
app.router.add_post('/api/frontend/send', handler.handle_frontend_send_message)
app.router.add_post('/api/frontend/receive', handler.handle_frontend_receive_message)
app.router.add_post('/api/client/send', handler.handle_client_send_message)
app.router.add_post('/api/client/receive', handler.handle_client_receive_message)
# 静态文件和根路径路由
app.router.add_get('/', handler.handle_root_path)
app.router.add_get('/{path:.*}', handler.handle_static_file)
return app
async def cleanup_pending_requests():
"""定期清理过期的挂起请求"""
while True:
await asyncio.sleep(60) # 每分钟清理一次
current_time = time.time()
expired_reqs = []
for req_id, req_info in pending_requests.items():
if current_time - req_info['timestamp'] > 300: # 超过5分钟
expired_reqs.append(req_id)
for req_id in expired_reqs:
if req_id in pending_requests:
# 尝试触发事件以释放挂起的请求
try:
pending_requests[req_id]['event'].set()
except:
pass
del pending_requests[req_id]
if expired_reqs:
logger.info(f"清理了 {len(expired_reqs)} 个过期的挂起请求")
async def start_background_tasks(app):
"""启动后台任务"""
app['cleanup_task'] = asyncio.create_task(cleanup_pending_requests())
async def cleanup_background_tasks(app):
"""清理后台任务"""
if 'cleanup_task' in app:
app['cleanup_task'].cancel()
try:
# 生成唯一房间ID
room_id = str(uuid.uuid4())[:8]
# 获取服务器主机地址并移除端口号
host = self.headers.get('Host', 'localhost')
host_without_port = re.sub(r':\d+$', '', host)
# 创建房间
room = Room(room_id, host_without_port)
rooms[room_id] = room
logger.info(f"创建新房间: {room_id}")
response = {
'success': True,
'room_id': room_id,
'frontend_url': room.frontend_url
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"创建房间失败: {e}")
self.send_error(500, str(e))
def handle_frontend_send_message(self, post_data):
"""前端发送消息到客户端"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
message = data.get('message')
if not room_id or not message:
self.send_error(400, "需要room_id和message参数")
return
queue = get_frontend_to_client_queue(room_id)
queue.append(message)
response = {
'success': True,
'message': '消息已发送到客户端队列'
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"前端发送消息失败: {e}")
self.send_error(500, str(e))
def handle_frontend_receive_message(self, post_data):
"""前端接收来自客户端的消息"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
if not room_id:
self.send_error(400, "需要room_id参数")
return
queue = get_client_to_frontend_queue(room_id)
if queue:
# 返回队列中的第一个消息
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
# 没有消息
response = {
'success': True,
'message': None
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"前端接收消息失败: {e}")
self.send_error(500, str(e))
def handle_client_send_message(self, post_data):
"""客户端发送消息到前端"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
message = data.get('message')
if not room_id or not message:
self.send_error(400, "需要room_id和message参数")
return
queue = get_client_to_frontend_queue(room_id)
queue.append(message)
response = {
'success': True,
'message': '消息已发送到前端队列'
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"客户端发送消息失败: {e}")
self.send_error(500, str(e))
def handle_client_receive_message(self, post_data):
"""客户端接收来自前端的消息"""
try:
data = json.loads(post_data.decode('utf-8'))
room_id = data.get('room_id')
if not room_id:
self.send_error(400, "需要room_id参数")
return
queue = get_frontend_to_client_queue(room_id)
if queue:
# 返回队列中的第一个消息
message = queue.pop(0)
response = {
'success': True,
'message': message
}
else:
# 没有消息
response = {
'success': True,
'message': None
}
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(json.dumps(response).encode())
except Exception as e:
logger.error(f"客户端接收消息失败: {e}")
self.send_error(500, str(e))
def log_message(self, format, *args):
"""重写日志方法"""
logger.info("%s - - [%s] %s" % (self.client_address[0],
self.log_date_time_string(),
format % args))
await app['cleanup_task']
except asyncio.CancelledError:
pass
def run_http_server():
"""运行HTTP服务器"""
try:
server = HTTPServer(('0.0.0.0', 80), HTTPHandler)
# 创建应用
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
app = loop.run_until_complete(create_app())
# 注册启动和清理钩子
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
# 启动服务器
logger.info("HTTP服务器启动在端口 80")
server.serve_forever()
web.run_app(app, host='0.0.0.0', port=80)
except Exception as e:
logger.error(f"HTTP服务器启动失败: {e}")

View File

@@ -0,0 +1 @@
aiohttp