diff --git a/supervisor/config.lua b/supervisor/config.lua index 47d530d..9fa3290 100644 --- a/supervisor/config.lua +++ b/supervisor/config.lua @@ -10,6 +10,7 @@ config.TRUSTED_RANGE = 0 config.PLC_TIMEOUT = 5 config.RTU_TIMEOUT = 5 config.CRD_TIMEOUT = 5 +config.PKT_TIMEOUT = 5 -- expected number of reactors config.NUM_REACTORS = 4 diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua index fd0d0f8..b3e2be2 100644 --- a/supervisor/session/plc.lua +++ b/supervisor/session/plc.lua @@ -64,7 +64,7 @@ function plc.new_session(id, reactor_id, in_queue, out_queue, timeout) connected = true, received_struct = false, received_status_cache = false, - plc_conn_watchdog = util.new_watchdog(timeout), + conn_watchdog = util.new_watchdog(timeout), last_rtt = 0, -- periodic messages periodics = { @@ -233,7 +233,7 @@ function plc.new_session(id, reactor_id, in_queue, out_queue, timeout) -- mark this PLC session as closed, stop watchdog local function _close() - self.plc_conn_watchdog.cancel() + self.conn_watchdog.cancel() self.connected = false end @@ -301,7 +301,7 @@ function plc.new_session(id, reactor_id, in_queue, out_queue, timeout) end -- feed watchdog - self.plc_conn_watchdog.feed() + self.conn_watchdog.feed() -- handle packet by type if pkt.type == RPLC_TYPE.STATUS then @@ -576,7 +576,7 @@ function plc.new_session(id, reactor_id, in_queue, out_queue, timeout) -- check if a timer matches this session's watchdog ---@nodiscard function public.check_wd(timer) - return self.plc_conn_watchdog.is_timer(timer) and self.connected + return self.conn_watchdog.is_timer(timer) and self.connected end -- close the connection diff --git a/supervisor/session/pocket.lua b/supervisor/session/pocket.lua new file mode 100644 index 0000000..54017f5 --- /dev/null +++ b/supervisor/session/pocket.lua @@ -0,0 +1,222 @@ +local comms = require("scada-common.comms") +local log = require("scada-common.log") +local mqueue = require("scada-common.mqueue") +local util = require("scada-common.util") + +local pocket = {} + +local PROTOCOL = comms.PROTOCOL +local SCADA_MGMT_TYPE = comms.SCADA_MGMT_TYPE + +local println = util.println + +-- retry time constants in ms +-- local INITIAL_WAIT = 1500 +-- local RETRY_PERIOD = 1000 + +local POCKET_S_CMDS = { +} + +local POCKET_S_DATA = { +} + +pocket.POCKET_S_CMDS = POCKET_S_CMDS +pocket.POCKET_S_DATA = POCKET_S_DATA + +local PERIODICS = { + KEEP_ALIVE = 2000 +} + +-- pocket diagnostics session +---@nodiscard +---@param id integer session ID +---@param in_queue mqueue in message queue +---@param out_queue mqueue out message queue +---@param timeout number communications timeout +function pocket.new_session(id, in_queue, out_queue, timeout) + local log_header = "diag_session(" .. id .. "): " + + local self = { + -- connection properties + seq_num = 0, + r_seq_num = nil, + connected = true, + conn_watchdog = util.new_watchdog(timeout), + last_rtt = 0, + -- periodic messages + periodics = { + keep_alive = 0 + }, + -- when to next retry one of these requests + retry_times = { + }, + -- command acknowledgements + acks = { + }, + -- session database + ---@class diag_db + sDB = { + } + } + + ---@class diag_session + local public = {} + + -- mark this diagnostics session as closed, stop watchdog + local function _close() + self.conn_watchdog.cancel() + self.connected = false + end + + -- send a SCADA management packet + ---@param msg_type SCADA_MGMT_TYPE + ---@param msg table + local function _send_mgmt(msg_type, msg) + local s_pkt = comms.scada_packet() + local m_pkt = comms.mgmt_packet() + + m_pkt.make(msg_type, msg) + s_pkt.make(self.seq_num, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable()) + + out_queue.push_packet(s_pkt) + self.seq_num = self.seq_num + 1 + end + + -- handle a packet + ---@param pkt mgmt_frame + local function _handle_packet(pkt) + -- check sequence number + if self.r_seq_num == nil then + self.r_seq_num = pkt.scada_frame.seq_num() + elseif self.r_seq_num >= pkt.scada_frame.seq_num() then + log.warning(log_header .. "sequence out-of-order: last = " .. self.r_seq_num .. ", new = " .. pkt.scada_frame.seq_num()) + return + else + self.r_seq_num = pkt.scada_frame.seq_num() + end + + -- process packet + if pkt.scada_frame.protocol() == PROTOCOL.SCADA_MGMT then + ---@cast pkt mgmt_frame + if pkt.type == SCADA_MGMT_TYPE.KEEP_ALIVE then + -- keep alive reply + if pkt.length == 2 then + local srv_start = pkt.data[1] + -- local diag_send = pkt.data[2] + local srv_now = util.time() + self.last_rtt = srv_now - srv_start + + if self.last_rtt > 750 then + log.warning(log_header .. "DIAG KEEP_ALIVE round trip time > 750ms (" .. self.last_rtt .. "ms)") + end + + -- log.debug(log_header .. "DIAG RTT = " .. self.last_rtt .. "ms") + -- log.debug(log_header .. "DIAG TT = " .. (srv_now - diag_send) .. "ms") + else + log.debug(log_header .. "SCADA keep alive packet length mismatch") + end + elseif pkt.type == SCADA_MGMT_TYPE.CLOSE then + -- close the session + _close() + else + log.debug(log_header .. "handler received unsupported SCADA_MGMT packet type " .. pkt.type) + end + end + end + + -- PUBLIC FUNCTIONS -- + + -- get the session ID + ---@nodiscard + function public.get_id() return id end + + -- get the session database + ---@nodiscard + function public.get_db() return self.sDB end + + -- check if a timer matches this session's watchdog + ---@nodiscard + function public.check_wd(timer) + return self.conn_watchdog.is_timer(timer) and self.connected + end + + -- close the connection + function public.close() + _close() + _send_mgmt(SCADA_MGMT_TYPE.CLOSE, {}) + println("connection to pocket diag session " .. id .. " closed by server") + log.info(log_header .. "session closed by server") + end + + -- iterate the session + ---@nodiscard + ---@return boolean connected + function public.iterate() + if self.connected then + ------------------ + -- handle queue -- + ------------------ + + local handle_start = util.time() + + while in_queue.ready() and self.connected do + -- get a new message to process + local message = in_queue.pop() + + if message ~= nil then + if message.qtype == mqueue.TYPE.PACKET then + -- handle a packet + _handle_packet(message.message) + elseif message.qtype == mqueue.TYPE.COMMAND then + -- handle instruction + elseif message.qtype == mqueue.TYPE.DATA then + -- instruction with body + end + end + + -- max 100ms spent processing queue + if util.time() - handle_start > 100 then + log.warning(log_header .. "exceeded 100ms queue process limit") + break + end + end + + -- exit if connection was closed + if not self.connected then + println("connection to pocket diag session " .. id .. " closed by remote host") + log.info(log_header .. "session closed by remote host") + return self.connected + end + + ---------------------- + -- update periodics -- + ---------------------- + + local elapsed = util.time() - self.periodics.last_update + + local periodics = self.periodics + + -- keep alive + + periodics.keep_alive = periodics.keep_alive + elapsed + if periodics.keep_alive >= PERIODICS.KEEP_ALIVE then + _send_mgmt(SCADA_MGMT_TYPE.KEEP_ALIVE, { util.time() }) + periodics.keep_alive = 0 + end + + self.periodics.last_update = util.time() + + --------------------- + -- attempt retries -- + --------------------- + + -- local rtimes = self.retry_times + end + + return self.connected + end + + return public +end + +return pocket diff --git a/supervisor/session/rtu.lua b/supervisor/session/rtu.lua index 9357c71..7da723c 100644 --- a/supervisor/session/rtu.lua +++ b/supervisor/session/rtu.lua @@ -47,7 +47,7 @@ function rtu.new_session(id, in_queue, out_queue, timeout, advertisement, facili seq_num = 0, r_seq_num = nil, connected = true, - rtu_conn_watchdog = util.new_watchdog(timeout), + conn_watchdog = util.new_watchdog(timeout), last_rtt = 0, -- periodic messages periodics = { @@ -174,7 +174,7 @@ function rtu.new_session(id, in_queue, out_queue, timeout, advertisement, facili -- mark this RTU session as closed, stop watchdog local function _close() - self.rtu_conn_watchdog.cancel() + self.conn_watchdog.cancel() self.connected = false -- mark all RTU unit sessions as closed so the reactor unit knows @@ -222,7 +222,7 @@ function rtu.new_session(id, in_queue, out_queue, timeout, advertisement, facili end -- feed watchdog - self.rtu_conn_watchdog.feed() + self.conn_watchdog.feed() -- process packet if pkt.scada_frame.protocol() == PROTOCOL.MODBUS_TCP then @@ -286,7 +286,7 @@ function rtu.new_session(id, in_queue, out_queue, timeout, advertisement, facili ---@nodiscard ---@param timer number function public.check_wd(timer) - return self.rtu_conn_watchdog.is_timer(timer) and self.connected + return self.conn_watchdog.is_timer(timer) and self.connected end -- close the connection diff --git a/supervisor/session/rtu/redstone.lua b/supervisor/session/rtu/redstone.lua index 65831d6..bc7b81d 100644 --- a/supervisor/session/rtu/redstone.lua +++ b/supervisor/session/rtu/redstone.lua @@ -121,6 +121,7 @@ function redstone.new(session_id, unit_id, advert, out_queue) ---@nodiscard read = function () return rsio.digital_is_active(port, self.phy_io.digital_in[port].phy) end, ---@param active boolean +---@diagnostic disable-next-line: unused-local write = function (active) end } @@ -155,6 +156,7 @@ function redstone.new(session_id, unit_id, advert, out_queue) ---@return integer read = function () return self.phy_io.analog_in[port].phy end, ---@param value integer +---@diagnostic disable-next-line: unused-local write = function (value) end } diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index 9ed462d..1bf2d59 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -9,6 +9,7 @@ local svqtypes = require("supervisor.session.svqtypes") local coordinator = require("supervisor.session.coordinator") local plc = require("supervisor.session.plc") +local pocket = require("supervisor.session.pocket") local rtu = require("supervisor.session.rtu") -- Supervisor Sessions Handler @@ -22,29 +23,28 @@ local CRD_S_DATA = coordinator.CRD_S_DATA local svsessions = {} local SESSION_TYPE = { - RTU_SESSION = 0, - PLC_SESSION = 1, - COORD_SESSION = 2 + RTU_SESSION = 0, -- RTU gateway + PLC_SESSION = 1, -- reactor PLC + COORD_SESSION = 2, -- coordinator + DIAG_SESSION = 3 -- pocket diagnostics } svsessions.SESSION_TYPE = SESSION_TYPE local self = { - modem = nil, + modem = nil, ---@type table|nil num_reactors = 0, - facility = nil, ---@type facility - rtu_sessions = {}, - plc_sessions = {}, - coord_sessions = {}, - next_rtu_id = 0, - next_plc_id = 0, - next_coord_id = 0 + facility = nil, ---@type facility|nil + sessions = { rtu = {}, plc = {}, coord = {}, diag = {} }, + next_ids = { rtu = 0, plc = 0, coord = 0, diag = 0 } } +---@alias sv_session_structs plc_session_struct|rtu_session_struct|coord_session_struct|diag_session_struct + -- PRIVATE FUNCTIONS -- -- handle a session output queue ----@param session plc_session_struct|rtu_session_struct|coord_session_struct +---@param session sv_session_structs local function _sv_handle_outq(session) -- record handler start time local handle_start = util.time() @@ -112,7 +112,7 @@ end ---@param sessions table local function _iterate(sessions) for i = 1, #sessions do - local session = sessions[i] ---@type plc_session_struct|rtu_session_struct|coord_session_struct + local session = sessions[i] ---@type sv_session_structs if session.open and session.instance.iterate() then _sv_handle_outq(session) @@ -123,7 +123,7 @@ local function _iterate(sessions) end -- cleanly close a session ----@param session plc_session_struct|rtu_session_struct|coord_session_struct +---@param session sv_session_structs local function _shutdown(session) session.open = false session.instance.close() @@ -143,7 +143,7 @@ end ---@param sessions table local function _close(sessions) for i = 1, #sessions do - local session = sessions[i] ---@type plc_session_struct|rtu_session_struct|coord_session_struct + local session = sessions[i] ---@type sv_session_structs if session.open then _shutdown(session) end end end @@ -153,7 +153,7 @@ end ---@param timer_event number local function _check_watchdogs(sessions, timer_event) for i = 1, #sessions do - local session = sessions[i] ---@type plc_session_struct|rtu_session_struct|coord_session_struct + local session = sessions[i] ---@type sv_session_structs if session.open then local triggered = session.instance.check_wd(timer_event) if triggered then @@ -170,7 +170,7 @@ end local function _free_closed(sessions) local f = function (session) return session.open end - ---@param session plc_session_struct|rtu_session_struct|coord_session_struct + ---@param session sv_session_structs local on_delete = function (session) log.debug(util.c("free'ing closed ", session.s_type, " session ", session.instance.get_id(), " on remote port ", session.r_port)) @@ -183,7 +183,7 @@ end ---@nodiscard ---@param list table ---@param port integer ----@return plc_session_struct|rtu_session_struct|coord_session_struct|nil +---@return sv_session_structs|nil local function _find_session(list, port) for i = 1, #list do if list[i].r_port == port then return list[i] end @@ -215,7 +215,7 @@ end ---@return rtu_session_struct|nil function svsessions.find_rtu_session(remote_port) -- check RTU sessions - local session = _find_session(self.rtu_sessions, remote_port) + local session = _find_session(self.sessions.rtu, remote_port) ---@cast session rtu_session_struct return session end @@ -226,7 +226,7 @@ end ---@return plc_session_struct|nil function svsessions.find_plc_session(remote_port) -- check PLC sessions - local session = _find_session(self.plc_sessions, remote_port) + local session = _find_session(self.sessions.plc, remote_port) ---@cast session plc_session_struct return session end @@ -237,10 +237,10 @@ end ---@return plc_session_struct|rtu_session_struct|nil function svsessions.find_device_session(remote_port) -- check RTU sessions - local session = _find_session(self.rtu_sessions, remote_port) + local session = _find_session(self.sessions.rtu, remote_port) -- check PLC sessions - if session == nil then session = _find_session(self.plc_sessions, remote_port) end + if session == nil then session = _find_session(self.sessions.plc, remote_port) end ---@cast session plc_session_struct|rtu_session_struct|nil return session @@ -253,7 +253,7 @@ end ---@return coord_session_struct|nil function svsessions.find_coord_session(remote_port) -- check coordinator sessions - local session = _find_session(self.coord_sessions, remote_port) + local session = _find_session(self.sessions.coord, remote_port) ---@cast session coord_session_struct return session end @@ -262,7 +262,7 @@ end ---@nodiscard ---@return coord_session_struct|nil function svsessions.get_coord_session() - return self.coord_sessions[1] + return self.sessions.coord[1] end -- get a session by reactor ID @@ -272,9 +272,9 @@ end function svsessions.get_reactor_session(reactor) local session = nil - for i = 1, #self.plc_sessions do - if self.plc_sessions[i].reactor == reactor then - session = self.plc_sessions[i] + for i = 1, #self.sessions.plc do + if self.sessions.plc[i].reactor == reactor then + session = self.sessions.plc[i] end end @@ -303,15 +303,15 @@ function svsessions.establish_plc_session(local_port, remote_port, for_reactor, instance = nil ---@type plc_session } - plc_s.instance = plc.new_session(self.next_plc_id, for_reactor, plc_s.in_queue, plc_s.out_queue, config.PLC_TIMEOUT) - table.insert(self.plc_sessions, plc_s) + plc_s.instance = plc.new_session(self.next_ids.plc, for_reactor, plc_s.in_queue, plc_s.out_queue, config.PLC_TIMEOUT) + table.insert(self.sessions.plc, plc_s) local units = self.facility.get_units() units[for_reactor].link_plc_session(plc_s) - log.debug(util.c("established new PLC session to ", remote_port, " with ID ", self.next_plc_id, " for reactor ", for_reactor)) + log.debug(util.c("established new PLC session to ", remote_port, " with ID ", self.next_ids.plc, " for reactor ", for_reactor)) - self.next_plc_id = self.next_plc_id + 1 + self.next_ids.plc = self.next_ids.plc + 1 -- success return plc_s.instance.get_id() @@ -341,12 +341,12 @@ function svsessions.establish_rtu_session(local_port, remote_port, advertisement instance = nil ---@type rtu_session } - rtu_s.instance = rtu.new_session(self.next_rtu_id, rtu_s.in_queue, rtu_s.out_queue, config.RTU_TIMEOUT, advertisement, self.facility) - table.insert(self.rtu_sessions, rtu_s) + rtu_s.instance = rtu.new_session(self.next_ids.rtu, rtu_s.in_queue, rtu_s.out_queue, config.RTU_TIMEOUT, advertisement, self.facility) + table.insert(self.sessions.rtu, rtu_s) - log.debug("established new RTU session to " .. remote_port .. " with ID " .. self.next_rtu_id) + log.debug("established new RTU session to " .. remote_port .. " with ID " .. self.next_ids.rtu) - self.next_rtu_id = self.next_rtu_id + 1 + self.next_ids.rtu = self.next_ids.rtu + 1 -- success return rtu_s.instance.get_id() @@ -372,12 +372,12 @@ function svsessions.establish_coord_session(local_port, remote_port, version) instance = nil ---@type coord_session } - coord_s.instance = coordinator.new_session(self.next_coord_id, coord_s.in_queue, coord_s.out_queue, config.CRD_TIMEOUT, self.facility) - table.insert(self.coord_sessions, coord_s) + coord_s.instance = coordinator.new_session(self.next_ids.coord, coord_s.in_queue, coord_s.out_queue, config.CRD_TIMEOUT, self.facility) + table.insert(self.sessions.coord, coord_s) - log.debug("established new coordinator session to " .. remote_port .. " with ID " .. self.next_coord_id) + log.debug("established new coordinator session to " .. remote_port .. " with ID " .. self.next_ids.coord) - self.next_coord_id = self.next_coord_id + 1 + self.next_ids.coord = self.next_ids.coord + 1 -- success return coord_s.instance.get_id() @@ -387,32 +387,49 @@ function svsessions.establish_coord_session(local_port, remote_port, version) end end +-- establish a new pocket diagnostics session +---@nodiscard +---@param local_port integer +---@param remote_port integer +---@param version string +---@return integer|false session_id +function svsessions.establish_diag_session(local_port, remote_port, version) + ---@class diag_session_struct + local diag_s = { + s_type = "pkt", + open = true, + version = version, + l_port = local_port, + r_port = remote_port, + in_queue = mqueue.new(), + out_queue = mqueue.new(), + instance = nil ---@type diag_session + } + + diag_s.instance = pocket.new_session(self.next_ids.diag, diag_s.in_queue, diag_s.out_queue, config.PKT_TIMEOUT) + table.insert(self.sessions.diag, diag_s) + + log.debug("established new pocket diagnostics session to " .. remote_port .. " with ID " .. self.next_ids.diag) + + self.next_ids.diag = self.next_ids.diag + 1 + + -- success + return diag_s.instance.get_id() +end + -- attempt to identify which session's watchdog timer fired ---@param timer_event number function svsessions.check_all_watchdogs(timer_event) - -- check RTU session watchdogs - _check_watchdogs(self.rtu_sessions, timer_event) - - -- check PLC session watchdogs - _check_watchdogs(self.plc_sessions, timer_event) - - -- check coordinator session watchdogs - _check_watchdogs(self.coord_sessions, timer_event) + for _, list in pairs(self.sessions) do _check_watchdogs(list, timer_event) end end --- iterate all sessions +-- iterate all sessions, and update facility/unit data & process control logic function svsessions.iterate_all() - -- iterate RTU sessions - _iterate(self.rtu_sessions) - - -- iterate PLC sessions - _iterate(self.plc_sessions) - - -- iterate coordinator sessions - _iterate(self.coord_sessions) + -- iterate sessions + for _, list in pairs(self.sessions) do _iterate(list) end -- report RTU sessions to facility - self.facility.report_rtus(self.rtu_sessions) + self.facility.report_rtus(self.sessions.rtu) -- iterate facility self.facility.update() @@ -423,22 +440,15 @@ end -- delete all closed sessions function svsessions.free_all_closed() - -- free closed RTU sessions - _free_closed(self.rtu_sessions) - - -- free closed PLC sessions - _free_closed(self.plc_sessions) - - -- free closed coordinator sessions - _free_closed(self.coord_sessions) + for _, list in pairs(self.sessions) do _free_closed(list) end end -- close all open connections function svsessions.close_all() -- close sessions - _close(self.rtu_sessions) - _close(self.plc_sessions) - _close(self.coord_sessions) + for _, list in pairs(self.sessions) do + _close(list) + end -- free sessions svsessions.free_all_closed() diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 68db989..fd80180 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -14,7 +14,7 @@ local supervisor = require("supervisor.supervisor") local svsessions = require("supervisor.session.svsessions") -local SUPERVISOR_VERSION = "v0.14.5" +local SUPERVISOR_VERSION = "v0.15.0" local println = util.println local println_ts = util.println_ts @@ -34,6 +34,8 @@ cfv.assert_type_num(config.RTU_TIMEOUT) cfv.assert_min(config.RTU_TIMEOUT, 2) cfv.assert_type_num(config.CRD_TIMEOUT) cfv.assert_min(config.CRD_TIMEOUT, 2) +cfv.assert_type_num(config.PKT_TIMEOUT) +cfv.assert_min(config.PKT_TIMEOUT, 2) cfv.assert_type_int(config.NUM_REACTORS) cfv.assert_type_table(config.REACTOR_COOLING) cfv.assert_type_str(config.LOG_PATH)