diff --git a/reactor-plc/startup.lua b/reactor-plc/startup.lua index a4ac6a8..e448e55 100644 --- a/reactor-plc/startup.lua +++ b/reactor-plc/startup.lua @@ -6,12 +6,13 @@ os.loadAPI("scada-common/log.lua") os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("config.lua") os.loadAPI("plc.lua") os.loadAPI("threads.lua") -local R_PLC_VERSION = "alpha-v0.3.3" +local R_PLC_VERSION = "alpha-v0.4.0" local print = util.print local println = util.println @@ -28,30 +29,42 @@ ppm.mount_all() -- shared memory across threads local __shared_memory = { + -- networked setting networked = config.NETWORKED, + -- PLC system state flags plc_state = { init_ok = true, + shutdown = false, scram = true, degraded = false, no_reactor = false, no_modem = false }, - plc_devices = { + -- core PLC devices + plc_dev = { reactor = ppm.get_fission_reactor(), modem = ppm.get_wireless_modem() }, - system = { + -- system control objects + plc_sys = { iss = nil, plc_comms = nil, conn_watchdog = nil + }, + + -- message queues + q = { + mq_main = mqueue.new(), + mq_iss = mqueue.new(), + mq_comms = mqeuue.new() } } -local smem_dev = __shared_memory.plc_devices -local smem_sys = __shared_memory.system +local smem_dev = __shared_memory.plc_dev +local smem_sys = __shared_memory.plc_sys local plc_state = __shared_memory.plc_state @@ -112,12 +125,12 @@ end init() -- init threads -local main_thread = threads.thread__main(__shared_memory, init) -local iss_thread = threads.thread__iss(__shared_memory) --- local comms_thread = plc.thread__comms(__shared_memory) +local main_thread = threads.thread__main(__shared_memory, init) +local iss_thread = threads.thread__iss(__shared_memory) +local comms_thread = threads.thread__comms(__shared_memory) -- run threads -parallel.waitForAll(main_thread.exec, iss_thread.exec) +parallel.waitForAll(main_thread.exec, iss_thread.exec, comms_thread.exec) -- send an alarm: plc_comms.send_alarm(ALARMS.PLC_SHUTDOWN) ? println_ts("exited") diff --git a/reactor-plc/threads.lua b/reactor-plc/threads.lua index e9f0d3d..eb5468d 100644 --- a/reactor-plc/threads.lua +++ b/reactor-plc/threads.lua @@ -8,36 +8,37 @@ local println = util.println local print_ts = util.print_ts local println_ts = util.println_ts -local async_wait = util.async_wait +local MAIN_CLOCK = 1 -- (1Hz, 20 ticks) +local ISS_CLOCK = 0.5 -- (2Hz, 10 ticks) +local COMMS_CLOCK = 0.25 -- (4Hz, 5 ticks) -local MAIN_CLOCK = 0.5 -- (2Hz, 10 ticks) -local ISS_CLOCK = 0.25 -- (4Hz, 5 ticks) however this is AFTER all the ISS checks, so it is a pause between calls, not start-to-start - -local ISS_EVENT = { +local MQ__ISS_CMD = { SCRAM = 1, DEGRADED_SCRAM = 2, TRIP_TIMEOUT = 3 } +local MQ__COMM_CMD = { + SEND_STATUS = 1 +} + -- main thread -function thread__main(shared_memory, init) +function thread__main(smem, init) -- execute thread local exec = function () -- send status updates at 2Hz (every 10 server ticks) (every loop tick) -- send link requests at 0.5Hz (every 40 server ticks) (every 4 loop ticks) local LINK_TICKS = 4 - - local loop_clock = nil local ticks_to_update = 0 + local loop_clock = nil -- load in from shared memory - local networked = shared_memory.networked - local plc_state = shared_memory.plc_state - local plc_devices = shared_memory.plc_devices - - local iss = shared_memory.system.iss - local plc_comms = shared_memory.system.plc_comms - local conn_watchdog = shared_memory.system.conn_watchdog + local networked = smem.networked + local plc_state = smem.plc_state + local plc_dev = smem.plc_dev + local iss = smem.plc_sys.iss + local plc_comms = smem.plc_sys.plc_comms + local conn_watchdog = smem.plc_sys.conn_watchdog -- debug local last_update = util.time() @@ -56,10 +57,7 @@ function thread__main(shared_memory, init) -- send updated data if not plc_state.no_modem then if plc_comms.is_linked() then - async_wait(function () - plc_comms.send_status(iss_tripped, plc_state.degraded) - plc_comms.send_iss_status() - end) + smem.q.mq_comms.push_command(MQ__COMM_CMD.SEND_STATUS) else if ticks_to_update == 0 then plc_comms.send_link_req() @@ -80,15 +78,15 @@ function thread__main(shared_memory, init) -- feed the watchdog first so it doesn't uhh...eat our packets conn_watchdog.feed() - -- handle the packet (plc_state passed to allow clearing SCRAM flag) - async_wait(function () - local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) - plc_comms.handle_packet(packet, plc_state) - end) + -- handle the packet + local packet = plc_comms.parse_packet(param1, param2, param3, param4, param5) + if packet ~= nil then + smem.q.mq_comms.puch_packet(packet) + end elseif event == "timer" and networked and param1 == conn_watchdog.get_timer() then -- haven't heard from server recently? shutdown reactor plc_comms.unlink() - os.queueEvent("iss_command", ISS_EVENT.TRIP_TIMEOUT) + smem.q.mq_iss.push_command(MQ__ISS_CMD.TRIP_TIMEOUT) elseif event == "peripheral_detach" then -- peripheral disconnect local device = ppm.handle_unmount(param1) @@ -108,7 +106,7 @@ function thread__main(shared_memory, init) if plc_state.init_ok then -- try to scram reactor if it is still connected - os.queueEvent("iss_command", ISS_EVENT.DEGRADED_SCRAM) + smem.q.mq_iss.push_command(MQ__ISS_CMD.DEGRADED_SCRAM) end plc_state.degraded = true @@ -122,18 +120,18 @@ function thread__main(shared_memory, init) if type == "fissionReactor" then -- reconnected reactor - plc_devices.reactor = device + plc_dev.reactor = device - os.queueEvent("iss_command", ISS_EVENT.SCRAM) + smem.q.mq_iss.push_command(MQ__ISS_CMD.SCRAM) println_ts("reactor reconnected.") log._info("reactor reconnected.") plc_state.no_reactor = false if plc_state.init_ok then - iss.reconnect_reactor(plc_devices.reactor) + iss.reconnect_reactor(plc_dev.reactor) if networked then - plc_comms.reconnect_reactor(plc_devices.reactor) + plc_comms.reconnect_reactor(plc_dev.reactor) end end @@ -144,10 +142,10 @@ function thread__main(shared_memory, init) elseif networked and type == "modem" then if device.isWireless() then -- reconnected modem - plc_devices.modem = device + plc_dev.modem = device if plc_state.init_ok then - plc_comms.reconnect_modem(plc_devices.modem) + plc_comms.reconnect_modem(plc_dev.modem) end println_ts("wireless modem reconnected.") @@ -176,6 +174,7 @@ function thread__main(shared_memory, init) -- check for termination request if event == "terminate" or ppm.should_terminate() then -- iss handles reactor shutdown + plc_state.shutdown = true log._warning("terminate requested, main thread exiting") break end @@ -186,80 +185,66 @@ function thread__main(shared_memory, init) end -- ISS monitor thread -function thread__iss(shared_memory) +function thread__iss(smem) -- execute thread local exec = function () - local loop_clock = nil - -- load in from shared memory - local networked = shared_memory.networked - local plc_state = shared_memory.plc_state - local plc_devices = shared_memory.plc_devices + local networked = smem.networked + local plc_state = smem.plc_state + local plc_dev = smem.plc_dev + local iss = smem.plc_sys.iss + local plc_comms = smem.plc_sys.plc_comms - local iss = shared_memory.system.iss - local plc_comms = shared_memory.system.plc_comms + local iss_queue = smem.q.mq_iss - -- debug - -- local last_update = util.time() + local last_update = util.time() - -- event loop + -- thread loop while true do - local event, param1, param2, param3, param4, param5 = os.pullEventRaw() + local reactor = smem.plc_dev.reactor - local reactor = shared_memory.plc_devices.reactor - - if event == "timer" and param1 == loop_clock then - -- ISS checks - if plc_state.init_ok then - -- if we tried to SCRAM but failed, keep trying - -- in that case, SCRAM won't be called until it reconnects (this is the expected use of this check) - async_wait(function () - if not plc_state.no_reactor and plc_state.scram and reactor.getStatus() then - reactor.scram() - end - end) - - -- if we are in standalone mode, continuously reset ISS - -- ISS will trip again if there are faults, but if it isn't cleared, the user can't re-enable - if not networked then - plc_state.scram = false - iss.reset() - end - - -- check safety (SCRAM occurs if tripped) - async_wait(function () - if not plc_state.degraded then - local iss_tripped, iss_status_string, iss_first = iss.check() - plc_state.scram = plc_state.scram or iss_tripped - - if iss_first then - println_ts("[ISS] SCRAM! safety trip: " .. iss_status_string) - if networked then - plc_comms.send_iss_alarm(iss_status_string) - end - end - end - end) + -- ISS checks + if plc_state.init_ok then + -- if we tried to SCRAM but failed, keep trying + -- in that case, SCRAM won't be called until it reconnects (this is the expected use of this check) + if not plc_state.no_reactor and plc_state.scram and reactor.getStatus() then + reactor.scram() end - -- start next clock timer after all the long operations - -- otherwise we will never get around to other events - loop_clock = os.startTimer(ISS_CLOCK) + -- if we are in standalone mode, continuously reset ISS + -- ISS will trip again if there are faults, but if it isn't cleared, the user can't re-enable + if not networked then + plc_state.scram = false + iss.reset() + end - -- debug - -- print(util.time() - last_update) - -- println("ms") - -- last_update = util.time() - elseif event == "iss_command" then - -- handle ISS commands - if param1 == ISS_EVENT.SCRAM then - -- basic SCRAM - plc_state.scram = true - async_wait(reactor.scram) - elseif param1 == ISS_EVENT.DEGRADED_SCRAM then - -- SCRAM with print - plc_state.scram = true - async_wait(function () + -- check safety (SCRAM occurs if tripped) + if not plc_state.degraded then + local iss_tripped, iss_status_string, iss_first = iss.check() + plc_state.scram = plc_state.scram or iss_tripped + + if iss_first then + println_ts("[ISS] SCRAM! safety trip: " .. iss_status_string) + if networked then + plc_comms.send_iss_alarm(iss_status_string) + end + end + end + end + + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + if msg.message == MQ__ISS_CMD.SCRAM then + -- basic SCRAM + plc_state.scram = true + reactor.scram() + elseif msg.message == MQ__ISS_CMD.DEGRADED_SCRAM then + -- SCRAM with print + plc_state.scram = true if reactor.scram() then println_ts("successful reactor SCRAM") log._error("successful reactor SCRAM") @@ -267,38 +252,106 @@ function thread__iss(shared_memory) println_ts("failed reactor SCRAM") log._error("failed reactor SCRAM") end - end) - elseif param1 == ISS_EVENT.TRIP_TIMEOUT then - -- watchdog tripped - plc_state.scram = true - iss.trip_timeout() - println_ts("server timeout") - log._warning("server timeout") + elseif msg.message == MQ__ISS_CMD.TRIP_TIMEOUT then + -- watchdog tripped + plc_state.scram = true + iss.trip_timeout() + println_ts("server timeout") + log._warning("server timeout") + end + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet end - elseif event == "clock_start" then - -- start loop clock - loop_clock = os.startTimer(ISS_CLOCK) - log._debug("iss thread started") + + -- quick yield + if iss_queue.ready() then util.nop() end end -- check for termination request - if event == "terminate" or ppm.should_terminate() then + if plc_state.shutdown then -- safe exit - log._warning("terminate requested, iss thread shutdown") + log._warning("iss thread shutdown initiated") if plc_state.init_ok then plc_state.scram = true - async_wait(reactor.scram) + reactor.scram() if reactor.__p_is_ok() then println_ts("reactor disabled") + log._info("iss thread reactor SCRAM OK") else -- send an alarm: plc_comms.send_alarm(ALARMS.PLC_LOST_CONTROL) ? println_ts("exiting, reactor failed to disable") + log._error("iss thread failed to SCRAM reactor on exit") end end - break + log._warning("iss thread exiting") + return + end + + -- debug + -- print(util.time() - last_update) + -- println("ms") + -- last_update = util.time() + + -- delay before next check + local sleep_for = ISS_CLOCK - (util.time() - last_update) + if sleep_for > 0.05 then + sleep(sleep_for) end end end return { exec = exec } end + +function thread__comms(smem) + -- execute thread + local exec = function () + -- load in from shared memory + local plc_state = smem.plc_state + local plc_comms = smem.plc_sys.plc_comms + + local comms_queue = smem.q.mq_comms + + -- thread loop + while true do + local last_update = util.time() + + -- check for messages in the message queue + while comms_queue.ready() do + local msg = comms_queue.pop() + + if msg.qtype == mqueue.TYPE.COMMAND then + -- received a command + if msg.message == MQ__COMM_CMD.SEND_STATUS then + -- send PLC/ISS status + plc_comms.send_status(plc_state.degraded) + plc_comms.send_iss_status() + end + elseif msg.qtype == mqueue.TYPE.DATA then + -- received data + elseif msg.qtype == mqueue.TYPE.PACKET then + -- received a packet + -- handle the packet (plc_state passed to allow clearing SCRAM flag) + plc_comms.handle_packet(msg.message, plc_state) + end + + -- quick yield + if comms_queue.ready() then util.nop() end + end + + -- check for termination request + if plc_state.shutdown then + log._warning("comms thread exiting") + return + end + + -- delay before next check + local sleep_for = COMMS_CLOCK - (util.time() - last_update) + if sleep_for > 0.05 then + sleep(sleep_for) + end + end + end +end diff --git a/supervisor/mqueue.lua b/scada-common/mqueue.lua similarity index 78% rename from supervisor/mqueue.lua rename to scada-common/mqueue.lua index f79e686..3881d02 100644 --- a/supervisor/mqueue.lua +++ b/scada-common/mqueue.lua @@ -4,7 +4,8 @@ TYPE = { COMMAND = 0, - PACKET = 1 + DATA = 1, + PACKET = 2 } function new() @@ -17,19 +18,27 @@ function new() local empty = function () return #queue == 0 end + + local ready = function () + return #queue > 0 + end local _push = function (qtype, message) table.insert(queue, { qtype = qtype, message = message }) end - local push_packet = function (message) - _push(TYPE.PACKET, message) - end - local push_command = function (message) _push(TYPE.COMMAND, message) end - + + local push_data = function (message) + _push(TYPE.DATA, message) + end + + local push_packet = function (message) + _push(TYPE.PACKET, message) + end + local pop = function () if #queue > 0 then return table.remove(queue) @@ -41,7 +50,9 @@ function new() return { length = length, empty = empty, + ready = ready, push_packet = push_packet, + push_data = push_data, push_command = push_command, pop = pop } diff --git a/scada-common/util.lua b/scada-common/util.lua index 97ce601..11b258e 100644 --- a/scada-common/util.lua +++ b/scada-common/util.lua @@ -39,9 +39,10 @@ end -- PARALLELIZATION -- --- block waiting for parallel call -function async_wait(f) - parallel.waitForAll(f) +-- no-op to provide a brief pause (and a yield) +-- EVENT_CONSUMER: this function consumes events +function nop() + sleep(0.05) end -- WATCHDOG -- diff --git a/supervisor/session/plc.lua b/supervisor/session/plc.lua index 695b34b..3a51666 100644 --- a/supervisor/session/plc.lua +++ b/supervisor/session/plc.lua @@ -330,7 +330,7 @@ function new_session(id, for_reactor, in_queue, out_queue) -- handle queue -- ------------------ - if not self.in_q.empty() then + if self.in_q.ready() then -- get a new message to process local message = self.in_q.pop() diff --git a/supervisor/session/svsessions.lua b/supervisor/session/svsessions.lua index e02619c..9c02ce0 100644 --- a/supervisor/session/svsessions.lua +++ b/supervisor/session/svsessions.lua @@ -130,7 +130,7 @@ local function _iterate(sessions) if ok then -- send packets in out queue -- @todo handle commands if that's being used too - while not session.out_queue.empty() do + while session.out_queue.ready() do local msg = session.out_queue.pop() if msg.qtype == mqueue.TYPE.PACKET then self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable()) diff --git a/supervisor/startup.lua b/supervisor/startup.lua index 27a9db6..138cfd8 100644 --- a/supervisor/startup.lua +++ b/supervisor/startup.lua @@ -7,9 +7,9 @@ os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") os.loadAPI("scada-common/modbus.lua") +os.loadAPI("scada-common/mqueue.lua") os.loadAPI("config.lua") -os.loadAPI("mqueue.lua") os.loadAPI("session/rtu.lua") os.loadAPI("session/plc.lua") @@ -18,7 +18,7 @@ os.loadAPI("session/svsessions.lua") os.loadAPI("supervisor.lua") -local SUPERVISOR_VERSION = "alpha-v0.1.5" +local SUPERVISOR_VERSION = "alpha-v0.1.6" local print = util.print local println = util.println