This commit is contained in:
@@ -12,6 +12,10 @@ local rtu = require("supervisor.session.rtu")
|
||||
-- Supervisor Sessions Handler
|
||||
|
||||
local SV_Q_CMDS = svqtypes.SV_Q_CMDS
|
||||
local SV_Q_DATA = svqtypes.SV_Q_DATA
|
||||
|
||||
local PLC_S_CMDS = plc.PLC_S_CMDS
|
||||
local PLC_S_DATA = plc.PLC_S_DATA
|
||||
local CRD_S_CMDS = coordinator.CRD_S_CMDS
|
||||
|
||||
local svsessions = {}
|
||||
@@ -38,32 +42,75 @@ local self = {
|
||||
|
||||
-- PRIVATE FUNCTIONS --
|
||||
|
||||
-- handle a session output queue
|
||||
---@param session plc_session_struct|rtu_session_struct|coord_session_struct
|
||||
local function _sv_handle_outq(session)
|
||||
-- record handler start time
|
||||
local handle_start = util.time()
|
||||
|
||||
-- process output queue
|
||||
while session.out_queue.ready() do
|
||||
-- get a new message to process
|
||||
local msg = session.out_queue.pop()
|
||||
|
||||
if msg ~= nil then
|
||||
if msg.qtype == mqueue.TYPE.PACKET then
|
||||
-- handle a packet to be sent
|
||||
self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable())
|
||||
elseif msg.qtype == mqueue.TYPE.COMMAND then
|
||||
-- handle instruction/notification
|
||||
local cmd = msg.message
|
||||
if cmd == SV_Q_CMDS.BUILD_CHANGED then
|
||||
-- notify coordinator(s) that a build has changed
|
||||
for j = 1, #self.coord_sessions do
|
||||
local s = self.coord_sessions[j] ---@type coord_session_struct
|
||||
s.in_queue.push_command(CRD_S_CMDS.RESEND_BUILDS)
|
||||
end
|
||||
end
|
||||
elseif msg.qtype == mqueue.TYPE.DATA then
|
||||
-- instruction/notification with body
|
||||
local cmd = msg.message ---@type queue_data
|
||||
local plc_s = nil
|
||||
|
||||
if type(cmd.val) == "table" then
|
||||
plc_s = svsessions.get_reactor_session(cmd.val[1])
|
||||
elseif type(cmd.val) == "number" then
|
||||
plc_s = svsessions.get_reactor_session(cmd.val)
|
||||
end
|
||||
|
||||
if plc_s ~= nil then
|
||||
if cmd.key == SV_Q_DATA.START then
|
||||
plc_s.in_queue.push_command(PLC_S_CMDS.ENABLE)
|
||||
elseif cmd.key == SV_Q_DATA.SCRAM then
|
||||
plc_s.in_queue.push_command(PLC_S_CMDS.SCRAM)
|
||||
elseif cmd.key == SV_Q_DATA.RESET_RPS then
|
||||
plc_s.in_queue.push_command(PLC_S_CMDS.RPS_RESET)
|
||||
elseif cmd.key == SV_Q_DATA.SET_BURN and type(cmd.val) == "table" and #cmd.val == 2 then
|
||||
plc_s.in_queue.push_data(PLC_S_DATA.BURN_RATE, cmd.val[2])
|
||||
elseif cmd.key == SV_Q_DATA.SET_WASTE and type(cmd.val) == "table" and #cmd.val == 2 then
|
||||
---@todo set waste
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- max 100ms spent processing queue
|
||||
if util.time() - handle_start > 100 then
|
||||
log.warning("supervisor out queue handler exceeded 100ms queue process limit")
|
||||
log.warning(util.c("offending session: port ", session.r_port, " type '", session.s_type, "'"))
|
||||
break
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
-- iterate all the given sessions
|
||||
---@param sessions table
|
||||
local function _iterate(sessions)
|
||||
for i = 1, #sessions do
|
||||
local session = sessions[i] ---@type plc_session_struct|rtu_session_struct|coord_session_struct
|
||||
|
||||
if session.open and session.instance.iterate() then
|
||||
-- process output queues
|
||||
while session.out_queue.ready() do
|
||||
local msg = session.out_queue.pop()
|
||||
if msg ~= nil then
|
||||
if msg.qtype == mqueue.TYPE.PACKET then
|
||||
-- packet to be sent
|
||||
self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable())
|
||||
elseif msg.qtype == mqueue.TYPE.COMMAND then
|
||||
-- notification
|
||||
local cmd = msg.message
|
||||
if cmd == SV_Q_CMDS.BUILD_CHANGED then
|
||||
-- notify coordinator(s) that a build has changed
|
||||
for j = 1, #self.coord_sessions do
|
||||
local s = self.coord_sessions[j] ---@type coord_session_struct
|
||||
s.in_queue.push_command(CRD_S_CMDS.RESEND_BUILDS)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
_sv_handle_outq(session)
|
||||
else
|
||||
session.open = false
|
||||
end
|
||||
@@ -221,9 +268,10 @@ end
|
||||
---@param version string
|
||||
---@return integer|false session_id
|
||||
function svsessions.establish_plc_session(local_port, remote_port, for_reactor, version)
|
||||
if svsessions.get_reactor_session(for_reactor) == nil then
|
||||
if svsessions.get_reactor_session(for_reactor) == nil and for_reactor >= 1 and for_reactor <= self.num_reactors then
|
||||
---@class plc_session_struct
|
||||
local plc_s = {
|
||||
s_type = "plc",
|
||||
open = true,
|
||||
reactor = for_reactor,
|
||||
version = version,
|
||||
@@ -246,7 +294,7 @@ function svsessions.establish_plc_session(local_port, remote_port, for_reactor,
|
||||
-- success
|
||||
return plc_s.instance.get_id()
|
||||
else
|
||||
-- reactor already assigned to a PLC
|
||||
-- reactor already assigned to a PLC or ID out of range
|
||||
return false
|
||||
end
|
||||
end
|
||||
@@ -262,6 +310,7 @@ function svsessions.establish_rtu_session(local_port, remote_port, advertisement
|
||||
|
||||
---@class rtu_session_struct
|
||||
local rtu_s = {
|
||||
s_type = "rtu",
|
||||
open = true,
|
||||
version = version,
|
||||
l_port = local_port,
|
||||
@@ -290,6 +339,7 @@ end
|
||||
function svsessions.establish_coord_session(local_port, remote_port, version)
|
||||
---@class coord_session_struct
|
||||
local coord_s = {
|
||||
s_type = "crd",
|
||||
open = true,
|
||||
version = version,
|
||||
l_port = local_port,
|
||||
|
||||
Reference in New Issue
Block a user