@@ -48,6 +48,18 @@ class Room:
' created_at ' : self . created_at . isoformat ( )
}
def safe_decode_json ( raw_body : bytes ) - > Dict [ Any , Any ] :
"""
尝试用 UTF-8 解码,失败则尝试 GB18030( 兼容 GBK/GB2312)
"""
for encoding in [ ' utf-8 ' , ' gb18030 ' , ' latin1 ' ] :
try :
text = raw_body . decode ( encoding )
return json . loads ( text )
except ( UnicodeDecodeError , json . JSONDecodeError ) :
continue
raise ValueError ( " 无法解码请求体为有效 JSON " )
def get_frontend_to_client_queue ( room_id : str ) - > List [ Dict [ str , Any ] ] :
if room_id not in frontend_to_client_queues :
frontend_to_client_queues [ room_id ] = [ ]
@@ -88,9 +100,9 @@ class HTTPHandler:
return web . json_response ( { ' error ' : str ( e ) } , status = 500 )
async def handle_frontend_send_message ( self , request ) :
""" 前端发送消息到客户端 """
try :
data = await request . json ( )
raw_body = await request . read ( )
data = safe_decode_json ( raw_body )
room_id = data . get ( ' room_id ' )
message_data = data . get ( ' message ' )
@@ -99,24 +111,20 @@ class HTTPHandler:
queue = get_frontend_to_client_queue ( room_id )
queue . append ( message_data )
# 检查是否有挂起的客户端请求并立即响应
if room_id in pending_requests and ' client ' in pending_requests [ room_id ] :
client_req_id = pending_requests [ room_id ] [ ' client ' ]
if client_req_id in pending_requests :
pending_requests [ client_req_id ] [ ' event ' ] . set ( )
logger . info ( f " 立即响应挂起的客户端请求: { client_req_id } " )
response = {
return web . json_ response( {
' success ' : True ,
' message ' : ' 消息已发送到客户端队列 '
}
} )
return web . json_response ( respons e )
except json . JSONDecodeError :
logger . error ( " JSON解析失败 " )
return web . json_response ( { ' error ' : ' 无效的JSON数据 ' } , status = 400 )
except ValueError as e:
logger . error ( f " 前端发送消息失败: 无效的JSON或编码错误 - { e } " )
return web . json_response ( { ' error ' : ' 无效的JSON数据或不支持的文本编码 ' } , status = 400 )
except Exception as e :
logger . error ( f " 前端发送消息失败: { e } " )
return web . json_response ( { ' error ' : str ( e ) } , status = 500 )
@@ -204,9 +212,9 @@ class HTTPHandler:
return web . json_response ( { ' error ' : str ( e ) } , status = 500 )
async def handle_client_send_message ( self , request ) :
""" 客户端发送消息到前端 """
try :
data = await request . json ( )
raw_body = await request . read ( ) # 获取原始字节
data = safe_decode_json ( raw_body )
room_id = data . get ( ' room_id ' )
message_data = data . get ( ' message ' )
@@ -216,23 +224,20 @@ class HTTPHandler:
queue = get_client_to_frontend_queue ( room_id )
queue . append ( message_data )
# 检查是否有挂起的前端请求并立即响应
if room_id in pending_requests and ' frontend ' in pending_requests [ room_id ] :
frontend_req_id = pending_requests [ room_id ] [ ' frontend ' ]
if frontend_req_id in pending_requests :
pending_requests [ frontend_req_id ] [ ' event ' ] . set ( )
logger . info ( f " 立即响应挂起的前端请求: { frontend_req_id } " )
response = {
return web . json_ response( {
' success ' : True ,
' message ' : ' 消息已发送到前端队列 '
}
} )
return web . json_response ( respons e )
except json . JSONDecodeError :
logger . error ( " JSON解析失败 " )
return web . json_response ( { ' error ' : ' 无效的JSON数据 ' } , status = 400 )
except ValueError as e:
logger . error ( f " 客户端发送消息失败: 无效的JSON或编码错误 - { e } " )
return web . json_response ( { ' error ' : ' 无效的JSON数据或不支持的文本编码 ' } , status = 400 )
except Exception as e :
logger . error ( f " 客户端发送消息失败: { e } " )
return web . json_response ( { ' error ' : str ( e ) } , status = 500 )
@@ -240,7 +245,8 @@ class HTTPHandler:
async def handle_client_receive_message ( self , request ) :
""" 客户端接收来自前端的消息(长轮询) """
try :
data = await request . json ( )
raw_body = await request . read ( )
data = safe_decode_json ( raw_body ) # 使用你之前添加的 safe_decode_json
room_id = data . get ( ' room_id ' )
if not room_id :
@@ -257,7 +263,7 @@ class HTTPHandler:
}
return web . json_response ( response )
# 没有消息,设置长轮询
# 没有消息,设置长轮询(最多等待 58 秒)
req_id = str ( uuid . uuid4 ( ) )
event = asyncio . Event ( )
@@ -274,10 +280,10 @@ class HTTPHandler:
pending_requests [ room_id ] [ ' client ' ] = req_id
try :
# 等待295秒或直到有消息
await asyncio . wait_for ( event . wait ( ) , timeout = 29 5)
# ⏱️ 只等待 58 秒(略小于客户端或代理的 60 秒超时)
await asyncio . wait_for ( event . wait ( ) , timeout = 58 )
# 检查队列中是否有消息
# 被唤醒后, 检查队列
queue = get_frontend_to_client_queue ( room_id )
if queue :
message = queue . pop ( 0 )
@@ -286,37 +292,36 @@ class HTTPHandler:
' message ' : message
}
else :
# 超时返回空消息
response = {
' success ' : True ,
' message ' : None
}
except asyncio . TimeoutError :
# 超时返回空消息
# 58秒 超时, 返回空消息
response = {
' success ' : True ,
' message ' : None
}
# 清理挂起的请求
if req_id in pending_requests :
del pending_requests [ req_id ]
if room_id in pending_requests and ' client ' in pending_requests [ room_id ] :
del pending_requests [ room_id ] [ ' client ' ]
pending_requests . pop ( req_id , None )
if room_id in pending_requests :
pending_requests [ room_id ] . pop ( ' client ' , None )
# 如果 room_id 下已无其他引用,也可以清理整个 room 条目(可选)
return web . json_response ( response )
except json . JSONDecodeError :
logger . error ( " JSON解析失败 " )
except ValueError as e :
logger . error ( f " 客户端接收消息失败: 无效JSON或编码 - { e } " )
return web . json_response ( { ' error ' : ' 无效的JSON数据 ' } , status = 400 )
except Exception as e :
logger . error ( f " 客户端接收消息失败: { e } " )
# 清理挂起的请求
if ' req_id ' in locals ( ) and req_id in pending_requests :
del pending_requests [ req_id ]
if room_id in pending_requests and ' client ' in pending_requests [ room_id ] :
del pending_requests [ room_id ] [ ' client ' ]
if ' req_id ' in locals ( ) :
pending_requests . pop ( req_id , None )
if ' room_id' in locals ( ) and room_id in pending_requests :
pending_requests [ room_id ] . pop ( ' client ' , None )
return web . json_response ( { ' error ' : str ( e ) } , status = 500 )
async def handle_static_file ( self , request ) :