diff --git a/supervisor/session/rtu.lua b/supervisor/session/rtu.lua index fab3a96..59cf0b1 100644 --- a/supervisor/session/rtu.lua +++ b/supervisor/session/rtu.lua @@ -1,6 +1,7 @@ local comms = require("scada-common.comms") local log = require("scada-common.log") local mqueue = require("scada-common.mqueue") +local rsio = require("scada-common.rsio") local util = require("scada-common.util") -- supervisor rtu sessions (svrs) @@ -38,7 +39,7 @@ local PERIODICS = { ---@class rs_session_command ---@field reactor integer ---@field channel RS_IO ----@field active boolean +---@field value integer|boolean -- create a new RTU session ---@param id integer @@ -59,7 +60,7 @@ rtu.new_session = function (id, in_queue, out_queue, advertisement) connected = true, rtu_conn_watchdog = util.new_watchdog(3), last_rtt = 0, - rs_io = {}, + rs_io_q = {}, units = {} } @@ -69,8 +70,11 @@ rtu.new_session = function (id, in_queue, out_queue, advertisement) -- parse the recorded advertisement and create unit sub-sessions local _handle_advertisement = function () self.units = {} + self.rs_io_q = {} + for i = 1, #self.advert do local unit = nil ---@type unit_session|nil + local rs_in_q = nil ---@type mqueue|nil ---@type rtu_advertisement local unit_advert = { @@ -84,7 +88,7 @@ rtu.new_session = function (id, in_queue, out_queue, advertisement) -- create unit by type if u_type == RTU_UNIT_TYPES.REDSTONE then - unit = svrs_redstone.new(self.id, unit_advert, self.out_q) + unit, rs_in_q = svrs_redstone.new(self.id, unit_advert, self.out_q) elseif u_type == RTU_UNIT_TYPES.BOILER then unit = svrs_boiler.new(self.id, unit_advert, self.out_q) elseif u_type == RTU_UNIT_TYPES.BOILER_VALVE then @@ -103,9 +107,23 @@ rtu.new_session = function (id, in_queue, out_queue, advertisement) if unit ~= nil then table.insert(self.units, unit) + + if self.rs_io_q[unit_advert.reactor] == nil then + self.rs_io_q[unit_advert.reactor] = rs_in_q + else + self.units = {} + self.rs_io_q = {} + log.error(log_header .. "bad advertisement: duplicate redstone RTU for reactor " .. unit_advert.reactor) + break + end else self.units = {} - log.error(log_header .. "bad advertisement: error occured while creating a unit") + self.rs_io_q = {} + + local type_string = comms.advert_type_to_rtu_t(u_type) + if type_string == nil then type_string = "unknown" end + + log.error(log_header .. "bad advertisement: error occured while creating a unit (type is " .. type_string .. ")") break end end @@ -226,6 +244,21 @@ rtu.new_session = function (id, in_queue, out_queue, advertisement) if cmd.key == RTU_S_DATA.RS_COMMAND then local rs_cmd = cmd.val ---@type rs_session_command + + if rsio.is_valid_channel(rs_cmd.channel) then + cmd.key = svrs_redstone.RS_RTU_S_DATA.RS_COMMAND + if rs_cmd.reactor == nil then + -- for all reactors (facility) + for i = 1, #self.rs_io_q do + local q = self.rs_io.q[i] ---@type mqueue + q.push_data(msg) + end + elseif self.rs_io_q[rs_cmd.reactor] ~= nil then + -- for just one reactor + local q = self.rs_io.q[rs_cmd.reactor] ---@type mqueue + q.push_data(msg) + end + end end end end @@ -249,8 +282,10 @@ rtu.new_session = function (id, in_queue, out_queue, advertisement) -- update units -- ------------------ + local time_now = util.time() + for i = 1, #self.units do - self.units[i].update() + self.units[i].update(time_now) end ---------------------- diff --git a/supervisor/session/rtu/redstone.lua b/supervisor/session/rtu/redstone.lua index 1477773..4380567 100644 --- a/supervisor/session/rtu/redstone.lua +++ b/supervisor/session/rtu/redstone.lua @@ -1,7 +1,9 @@ local comms = require("scada-common.comms") local log = require("scada-common.log") +local mqueue= require("scada-common.mqueue") local rsio = require("scada-common.rsio") local types = require("scada-common.types") +local util = require("scada-common.util") local txnctrl = require("supervisor.session.rtu.txnctrl") @@ -18,9 +20,21 @@ local IO_MODE = rsio.IO_MODE local rtu_t = types.rtu_t +local RS_RTU_S_CMDS = { +} + +local RS_RTU_S_DATA = { + RS_COMMAND = 1 +} + +redstone.RS_RTU_S_CMDS = RS_RTU_S_CMDS +redstone.RS_RTU_S_DATA = RS_RTU_S_DATA + local TXN_TYPES = { DI_READ = 0, - INPUT_REG_READ = 1 + COIL_WRITE = 1, + INPUT_REG_READ = 2, + HOLD_REG_WRITE = 3 } local PERIODICS = { @@ -43,6 +57,7 @@ redstone.new = function (session_id, advert, out_queue) local self = { uid = advert.index, reactor = advert.reactor, + in_q = mqueue.new(), out_q = out_queue, transaction_controller = txnctrl.new(), has_di = false, @@ -65,10 +80,12 @@ redstone.new = function (session_id, advert, out_queue) -- INITIALIZE -- + -- create all channels as disconnected for _ = 1, #RS_IO do table.insert(self.db, IO_LVL.DISCONNECT) end + -- setup I/O for i = 1, #advert.rsio do local channel = advert.rsio[i] local mode = rsio.get_io_mode(channel) @@ -95,11 +112,11 @@ redstone.new = function (session_id, advert, out_queue) -- PRIVATE FUNCTIONS -- - local _send_request = function (txn_type, f_code, register_range) + local _send_request = function (txn_type, f_code, parameters) local m_pkt = comms.modbus_packet() local txn_id = self.transaction_controller.create(txn_type) - m_pkt.make(txn_id, self.uid, f_code, register_range) + m_pkt.make(txn_id, self.uid, f_code, parameters) self.out_q.push_packet(m_pkt) end @@ -114,6 +131,16 @@ redstone.new = function (session_id, advert, out_queue) _send_request(TXN_TYPES.INPUT_REG_READ, MODBUS_FCODE.READ_INPUT_REGS, { 1, #self.io_list.analog_in }) end + -- write coil output + local _write_coil = function (coil, value) + _send_request(TXN_TYPES.COIL_WRITE, MODBUS_FCODE.WRITE_MUL_COILS, { coil, value }) + end + + -- write holding register output + local _write_holding_register = function (reg, value) + _send_request(TXN_TYPES.HOLD_REG_WRITE, MODBUS_FCODE.WRITE_MUL_HOLD_REGS, { reg, value }) + end + -- PUBLIC FUNCTIONS -- -- handle a packet @@ -168,6 +195,61 @@ redstone.new = function (session_id, advert, out_queue) -- update this runner ---@param time_now integer milliseconds public.update = function (time_now) + -- check command queue + while self.in_q.ready() do + -- get a new message to process + local msg = self.in_q.pop() + + if msg ~= nil then + if msg.qtype == mqueue.TYPE.DATA then + -- instruction with body + local cmd = msg.message ---@type queue_data + if cmd.key == RS_RTU_S_DATA.RS_COMMAND then + local rs_cmd = cmd.val ---@type rs_session_command + + if self.db[rs_cmd.channel] ~= IO_LVL.DISCONNECT then + -- we have this as a connected channel + local mode = rsio.get_io_mode(rs_cmd.channel) + if mode == IO_MODE.DIGITAL_OUT then + -- record the value for retries + self.db[rs_cmd.channel] = rs_cmd.value + + -- find the coil address then write to it + for i = 0, #self.digital_out do + if self.digital_out[i] == rs_cmd.channel then + _write_coil(i, rs_cmd.value) + break + end + end + elseif mode == IO_MODE.ANALOG_OUT then + -- record the value for retries + self.db[rs_cmd.channel] = rs_cmd.value + + -- find the holding register address then write to it + for i = 0, #self.analog_out do + if self.analog_out[i] == rs_cmd.channel then + _write_holding_register(i, rs_cmd.value) + break + end + end + elseif mode ~= nil then + log.debug(log_tag .. "attemted write to non D/O or A/O mode " .. mode) + end + end + end + end + end + + -- max 100ms spent processing queue + if util.time() - time_now > 100 then + log.warning(log_tag .. "exceeded 100ms queue process limit") + break + end + end + + time_now = util.time() + + -- poll digital inputs if self.has_di then if self.periodics.next_di_req <= time_now then _request_discrete_inputs() @@ -175,6 +257,7 @@ redstone.new = function (session_id, advert, out_queue) end end + -- poll analog inputs if self.has_ai then if self.periodics.next_ir_req <= time_now then _request_input_registers() @@ -183,7 +266,7 @@ redstone.new = function (session_id, advert, out_queue) end end - return public + return public, self.in_q end return redstone