diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua index bd5679c..7b17feb 100644 --- a/supervisor/session/plc.lua +++ b/supervisor/session/plc.lua @@ -3,6 +3,7 @@ -- #REQUIRES log.lua -- #REQUIRES util.lua +local PROTOCOLS = comms.PROTOCOLS local RPLC_TYPES = comms.RPLC_TYPES PLC_S_COMMANDS = { @@ -11,6 +12,10 @@ PLC_S_COMMANDS = { ISS_CLEAR = 2 } +local PERIODICS = { + KEEP_ALIVE = 1.0 +} + -- PLC supervisor session function new_session(id, for_reactor, in_queue, out_queue) local log_header = "plc_session(" .. id .. "): " @@ -23,16 +28,22 @@ function new_session(id, for_reactor, in_queue, out_queue) commanded_state = false, -- connection properties seq_num = 0, + r_seq_num = nil, connected = true, received_struct = false, - plc_conn_watchdog = util.new_watchdog(3) + plc_conn_watchdog = util.new_watchdog(3), + last_rtt = 0, -- when to next retry one of these requests + periodics = { + last_update = 0 + keep_alive = 0 + }, retry_times = { struct_req = 0, scram_req = 0, enable_req = 0 }, - -- session PLC status database + -- session database sDB = { control_state = false, overridden = false, @@ -140,12 +151,167 @@ function new_session(id, for_reactor, in_queue, out_queue) end end + local _handle_packet = function (message) + local checks_ok = true + + -- handle an incoming packet from the PLC + rplc_pkt = message.get() + + -- check sequence number + if self.r_seq_num == nil then + self.r_seq_num = rplc_pkt.scada_frame.seq_num() + elseif self.r_seq_num >= rplc_pkt.scada_frame.seq_num() then + log._warning(log_header .. "sequence out-of-order: last = " .. self.r_seq_num .. ", new = " .. rplc_pkt.scada_frame.seq_num()) + checks_ok = false + else + self.r_seq_num = rplc_pkt.scada_frame.seq_num() + end + + -- check reactor ID + if rplc_pkt.id ~= for_reactor then + log._warning(log_header .. "RPLC packet with ID not matching reactor ID: reactor " .. self.for_reactor .. " != " .. rplc_pkt.id) + checks_ok = false + end + + -- process packet + if checks_ok then + -- feed watchdog + self.plc_conn_watchdog.feed() + + -- handle packet by type + if rplc_pkt.type == RPLC_TYPES.KEEP_ALIVE then + -- keep alive reply + if rplc_pkt.length == 2 then + local srv_start = rplc_pkt.data[1] + local plc_send = rplc_pkt.data[2] + local srv_now = os.epoch() + self.last_rtt = srv_now - srv_start + + if self.last_rtt < 0 then + log._warning(log_header .. "PLC KEEP_ALIVE round trip time less than 0 (" .. trip_time .. ")") + elseif trip_time > 1 then + log._warning(log_header .. "PLC KEEP_ALIVE round trip time > 1s (" .. trip_time .. ")") + end + + log._debug(log_header .. "RPLC RTT = ".. trip_time) + else + log._debug(log_header .. "RPLC keep alive packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.STATUS then + -- status packet received, update data + if rplc_pkt.length >= 5 then + -- @todo [1] is timestamp, determine how this will be used (if at all) + self.sDB.control_state = rplc_pkt.data[2] + self.sDB.overridden = rplc_pkt.data[3] + self.sDB.degraded = rplc_pkt.data[4] + self.sDB.mek_status.heating_rate = rplc_pkt.data[5] + + -- attempt to read mek_data table + if rplc_pkt.data[6] ~= nil then + local status = pcall(_copy_status, rplc_pkt.data[6]) + if status then + -- copied in status data OK + else + -- error copying status data + log._error(log_header .. "failed to parse status packet data") + end + end + else + log._debug(log_header .. "RPLC status packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_STRUCT then + -- received reactor structure, record it + if rplc_pkt.length == 8 then + local status = pcall(_copy_struct, rplc_pkt.data) + if status then + -- copied in structure data OK + else + -- error copying structure data + log._error(log_header .. "failed to parse struct packet data") + end + else + log._debug(log_header .. "RPLC struct packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_SCRAM then + -- SCRAM acknowledgement + local ack = _get_ack(rplc_pkt) + if ack then + self.sDB.control_state = false + elseif ack == false then + log._warning(log_header .. "SCRAM failed!") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_ENABLE then + -- enable acknowledgement + local ack = _get_ack(rplc_pkt) + if ack then + self.sDB.control_state = true + elseif ack == false then + log._warning(log_header .. "enable failed!") + end + elseif rplc_pkt.type == RPLC_TYPES.MEK_BURN_RATE then + -- burn rate acknowledgement + if _get_ack(rplc_pkt) == false then + log._warning(log_header .. "burn rate update failed!") + end + elseif rplc_pkt.type == RPLC_TYPES.ISS_STATUS then + -- ISS status packet received, copy data + if rplc_pkt.length == 7 then + local status = pcall(_copy_iss_status, rplc_pkt.data) + if status then + -- copied in ISS status data OK + else + -- error copying ISS status data + log._error(log_header .. "failed to parse ISS status packet data") + end + else + log._debug(log_header .. "RPLC ISS status packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.ISS_ALARM then + -- ISS alarm + self.sDB.overridden = true + if rplc_pkt.length == 7 then + local status = pcall(_copy_iss_status, rplc_pkt.data) + if status then + -- copied in ISS status data OK + else + -- error copying ISS status data + log._error(log_header .. "failed to parse ISS status packet data") + end + else + log._debug(log_header .. "RPLC ISS alarm packet length mismatch") + end + elseif rplc_pkt.type == RPLC_TYPES.ISS_CLEAR then + -- ISS clear acknowledgement + if _get_ack(rplc_pkt) == false then + log._warning(log_header .. "ISS clear failed") + end + else + log._debug(log_header .. "handler received unsupported RPLC packet type " .. rplc_pkt.type) + end + end + end + + local _send = function (msg_type, msg) + local s_pkt = comms.scada_packet() + local r_pkt = comms.rplc_packet() + + r_pkt.make(self.id, msg_type, msg) + s_pkt.make(self.seq_num, PROTOCOLS.RPLC, r_pkt.raw_sendable()) + + self.out_q.push_packet(s_pkt) + self.seq_num = self.seq_num + 1 + end + + -- PUBLIC FUNCTIONS -- + local get_id = function () return self.id end + local get_db = function () return self.sDB end + local close = function () self.connected = false end local check_wd = function (timer) - return timer == plc_conn_watchdog + return timer == self.plc_conn_watchdog.get_timer() end local get_struct = function () @@ -158,103 +324,37 @@ function new_session(id, for_reactor, in_queue, out_queue) end local iterate = function () - if self.connected and ~self.in_q.empty() then - -- get a new message to process - local message = self.in_q.pop() + if self.connected then + ------------------ + -- handle queue -- + ------------------ - if message.qtype == mqueue.TYPE.PACKET then - -- handle an incoming packet from the PLC - rplc_pkt = message.message.get() + if ~self.in_q.empty() then + -- get a new message to process + local message = self.in_q.pop() + + if message.qtype == mqueue.TYPE.PACKET then + _handle_packet(message.message) + elseif message.qtype == mqueue.TYPE.COMMAND then + -- handle instruction - if rplc_pkt.id == for_reactor then - if rplc_pkt.type == RPLC_TYPES.KEEP_ALIVE then - -- keep alive reply - elseif rplc_pkt.type == RPLC_TYPES.STATUS then - -- status packet received, update data - if rplc_pkt.length >= 5 then - -- @todo [1] is timestamp, determine how this will be used (if at all) - self.sDB.control_state = rplc_pkt.data[2] - self.sDB.overridden = rplc_pkt.data[3] - self.sDB.degraded = rplc_pkt.data[4] - self.sDB.mek_status.heating_rate = rplc_pkt.data[5] - - -- attempt to read mek_data table - if rplc_pkt.data[6] ~= nil then - local status = pcall(_copy_status, rplc_pkt.data[6]) - if status then - -- copied in status data OK - else - -- error copying status data - log._error(log_header .. "failed to parse status packet data") - end - end - else - log._warning(log_header .. "RPLC status packet length mismatch") - end - elseif rplc_pkt.type == RPLC_TYPES.MEK_STRUCT then - -- received reactor structure, record it - if rplc_pkt.length == 8 then - local status = pcall(_copy_struct, rplc_pkt.data) - if status then - -- copied in structure data OK - else - -- error copying structure data - log._error(log_header .. "failed to parse struct packet data") - end - else - log._warning(log_header .. "RPLC struct packet length mismatch") - end - elseif rplc_pkt.type == RPLC_TYPES.MEK_SCRAM then - -- SCRAM acknowledgement - local ack = _get_ack(rplc_pkt) - if ack then - self.sDB.control_state = false - elseif ack == false then - log._warning(log_header .. "SCRAM failed!") - end - elseif rplc_pkt.type == RPLC_TYPES.MEK_ENABLE then - -- enable acknowledgement - local ack = _get_ack(rplc_pkt) - if ack then - self.sDB.control_state = true - elseif ack == false then - log._warning(log_header .. "enable failed!") - end - elseif rplc_pkt.type == RPLC_TYPES.MEK_BURN_RATE then - -- burn rate acknowledgement - if _get_ack(rplc_pkt) == false then - log._warning(log_header .. "burn rate update failed!") - end - elseif rplc_pkt.type == RPLC_TYPES.ISS_STATUS then - -- ISS status packet received, copy data - if rplc_pkt.length == 7 then - local status = pcall(_copy_iss_status, rplc_pkt.data) - if status then - -- copied in ISS status data OK - else - -- error copying ISS status data - log._error(log_header .. "failed to parse ISS status packet data") - end - else - log._warning(log_header .. "RPLC ISS status packet length mismatch") - end - elseif rplc_pkt.type == RPLC_TYPES.ISS_ALARM then - -- ISS alarm - self.sDB.overridden = true - -- @todo - elseif rplc_pkt.type == RPLC_TYPES.ISS_CLEAR then - -- ISS clear acknowledgement - -- @todo - else - log._warning(log_header .. "handler received unsupported RPLC packet type " .. rplc_pkt.type) - end - else - log._warning(log_header .. "RPLC packet with ID not matching reactor ID: reactor " .. self.for_reactor .. " != " .. rplc_pkt.id) end - elseif message.qtype == mqueue.TYPE.COMMAND then - -- handle instruction - end + + ---------------------- + -- update periodics -- + ---------------------- + + local elapsed = os.clock() - self.periodics.last_update + + self.periodics.keep_alive += elapsed + + if self.periodics.keep_alive >= PERIODICS.KEEP_ALIVE then + _send(RPLC_TYPES.KEEP_ALIVE, { os.epoch() }) + self.periodics.keep_alive = 0 + end + + self.periodics.last_update = os.clock() end return self.connected @@ -262,9 +362,10 @@ function new_session(id, for_reactor, in_queue, out_queue) return { get_id = get_id, + get_db = get_db, + close = close, check_wd = check_wd, get_struct = get_struct, - close = close, iterate = iterate } end diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index fcc6950..4e0c75e 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -10,6 +10,7 @@ SESSION_TYPE = { } local self = { + modem = nil num_reactors = 0, rtu_sessions = {}, plc_sessions = {}, @@ -19,6 +20,10 @@ local self = { next_coord_id = 0 } +function link_modem(modem) + self.modem = modem +end + function alloc_reactor_plcs(num_reactors) self.num_reactors = num_reactors for i = 1, num_reactors do @@ -64,12 +69,13 @@ function get_reactor_session(reactor) return session end -function establish_plc_session(remote_port, for_reactor) +function establish_plc_session(local_port, remote_port, for_reactor) if get_reactor_session(for_reactor) == nil then local plc_s = { open = true, reactor = for_reactor, - r_host = remote_port, + l_port = local_port, + r_port = remote_port, in_queue = mqueue.new(), out_queue = mqueue.new(), instance = nil @@ -87,12 +93,46 @@ function establish_plc_session(remote_port, for_reactor) end end +local function _check_watchdogs(sessions, timer_event) + for i = 1, #sessions do + local session = sessions[i] + if session.open then + local triggered = session.instance.check_wd(timer_event) + if triggered then + log._debug("watchdog closing session " .. session.instance.get_id() .. " on remote port " .. session.r_port) + session.open = false + session.instance.close() + end + end + end +end + +function check_all_watchdogs(timer_event) + -- check RTU session watchdogs + _check_watchdogs(self.rtu_sessions, timer_event) + + -- check PLC session watchdogs + _check_watchdogs(self.plc_sessions, timer_event) + + -- check coordinator session watchdogs + _check_watchdogs(self.coord_sessions, timer_event) +end + local function _iterate(sessions) for i = 1, #sessions do local session = sessions[i] if session.open then local ok = session.instance.iterate() - if not ok then + if ok then + -- send packets in out queue + -- @todo handle commands if that's being used too + while not session.out_queue.empty() do + local msg = session.out_queue.pop() + if msg.qtype == mqueue.TYPE.PACKET then + self.modem.transmit(self.r_port, self.l_port, msg.message.raw_sendable()) + end + end + else session.open = false session.instance.close() end @@ -123,6 +163,7 @@ local function _free_closed(sessions) end move_to = move_to + 1 else + log._debug("free'ing closing session " .. session.instance.get_id() .. " on remote port " .. session.r_port) sessions[i] = nil end end diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 6a46f5d..1ad31bb 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -17,7 +17,7 @@ os.loadAPI("session/plc.lua") os.loadAPI("session/coordinator.lua") os.loadAPI("session/svsessions.lua") -local SUPERVISOR_VERSION = "alpha-v0.1.1" +local SUPERVISOR_VERSION = "alpha-v0.1.2" local print = util.print local println = util.println @@ -78,8 +78,18 @@ while true do end end elseif event == "timer" and param1 == loop_clock then - -- basic event tick, send keep-alives + -- main loop tick + + -- iterate sessions + svsessions.iterate_all() + + -- free any closed sessions + svsessions.free_all_closed() + loop_clock = os.startTimer(0.25) + elseif event == "timer" then + -- another timer event, check watchdogs + svsessions.check_all_watchdogs(param1) elseif event == "modem_message" then -- got a packet local packet = superv_comms.parse_packet(p1, p2, p3, p4, p5) diff --git a/supervisor/supervisor.lua b/supervisor/supervisor.lua index 0ce314a..a3d2d2a 100644 --- a/supervisor/supervisor.lua +++ b/supervisor/supervisor.lua @@ -38,6 +38,9 @@ function superv_comms(mode, num_reactors, modem, dev_listen, coord_listen) -- open at construct time _open_channels() + -- link modem to svsessions + svsessions.link_modem(self.modem) + -- send PLC link request responses local _send_plc_linking = function (dest, msg) local s_pkt = comms.scada_packet() @@ -55,6 +58,7 @@ function superv_comms(mode, num_reactors, modem, dev_listen, coord_listen) -- reconnect a newly connected modem local reconnect_modem = function (modem) self.modem = modem + svsessions.link_modem(self.modem) _open_channels() end