From 665b33fa054b2d9149a6ff3f0f964ebe8bdca218 Mon Sep 17 00:00:00 2001 From: Mikayla Fischler Date: Tue, 3 May 2022 11:39:03 -0400 Subject: [PATCH] #42 parallel RTU reads --- rtu/modbus.lua | 92 ++++++++++++++++++++++++++++++++++++++++++++----- rtu/startup.lua | 8 ++--- 2 files changed, 88 insertions(+), 12 deletions(-) diff --git a/rtu/modbus.lua b/rtu/modbus.lua index dba239d..7ea6108 100644 --- a/rtu/modbus.lua +++ b/rtu/modbus.lua @@ -11,6 +11,7 @@ function new(rtu_dev, use_parallel_read) } local _1_read_coils = function (c_addr_start, count) + local tasks = {} local readings = {} local access_fault = false local _, coils, _, _ = self.rtu.io_count() @@ -19,12 +20,30 @@ function new(rtu_dev, use_parallel_read) if return_ok then for i = 1, count do local addr = c_addr_start + i - 1 - readings[i], access_fault = self.rtu.read_coil(addr) + + if self.use_parallel then + table.insert(tasks, function () + local reading, fault = self.rtu.read_coil(addr) + if fault then access_fault = true else readings[i] = reading end + end) + else + readings[i], access_fault = self.rtu.read_coil(addr) + + if access_fault then + return_ok = false + readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL + break + end + end + end + + -- run parallel tasks if configured + if self.use_parallel then + parallel.waitForAll(table.unpack(tasks)) if access_fault then return_ok = false readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL - break end end else @@ -35,6 +54,7 @@ function new(rtu_dev, use_parallel_read) end local _2_read_discrete_inputs = function (di_addr_start, count) + local tasks = {} local readings = {} local access_fault = false local discrete_inputs, _, _, _ = self.rtu.io_count() @@ -43,12 +63,30 @@ function new(rtu_dev, use_parallel_read) if return_ok then for i = 1, count do local addr = di_addr_start + i - 1 - readings[i], access_fault = self.rtu.read_di(addr) + + if self.use_parallel then + table.insert(tasks, function () + local reading, fault = self.rtu.read_di(addr) + if fault then access_fault = true else readings[i] = reading end + end) + else + readings[i], access_fault = self.rtu.read_di(addr) + + if access_fault then + return_ok = false + readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL + break + end + end + end + + -- run parallel tasks if configured + if self.use_parallel then + parallel.waitForAll(table.unpack(tasks)) if access_fault then return_ok = false readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL - break end end else @@ -59,6 +97,7 @@ function new(rtu_dev, use_parallel_read) end local _3_read_multiple_holding_registers = function (hr_addr_start, count) + local tasks = {} local readings = {} local access_fault = false local _, _, _, hold_regs = self.rtu.io_count() @@ -67,12 +106,30 @@ function new(rtu_dev, use_parallel_read) if return_ok then for i = 1, count do local addr = hr_addr_start + i - 1 - readings[i], access_fault = self.rtu.read_holding_reg(addr) + + if self.use_parallel then + table.insert(tasks, function () + local reading, fault = self.rtu.read_holding_reg(addr) + if fault then access_fault = true else readings[i] = reading end + end) + else + readings[i], access_fault = self.rtu.read_holding_reg(addr) + + if access_fault then + return_ok = false + readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL + break + end + end + end + + -- run parallel tasks if configured + if self.use_parallel then + parallel.waitForAll(table.unpack(tasks)) if access_fault then return_ok = false readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL - break end end else @@ -83,6 +140,7 @@ function new(rtu_dev, use_parallel_read) end local _4_read_input_registers = function (ir_addr_start, count) + local tasks = {} local readings = {} local access_fault = false local _, _, input_regs, _ = self.rtu.io_count() @@ -91,12 +149,30 @@ function new(rtu_dev, use_parallel_read) if return_ok then for i = 1, count do local addr = ir_addr_start + i - 1 - readings[i], access_fault = self.rtu.read_input_reg(addr) + + if self.use_parallel then + table.insert(tasks, function () + local reading, fault = self.rtu.read_input_reg(addr) + if fault then access_fault = true else readings[i] = reading end + end) + else + readings[i], access_fault = self.rtu.read_input_reg(addr) + + if access_fault then + return_ok = false + readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL + break + end + end + end + + -- run parallel tasks if configured + if self.use_parallel then + parallel.waitForAll(table.unpack(tasks)) if access_fault then return_ok = false readings = MODBUS_EXCODE.SERVER_DEVICE_FAIL - break end end else diff --git a/rtu/startup.lua b/rtu/startup.lua index 5f467e4..1679115 100644 --- a/rtu/startup.lua +++ b/rtu/startup.lua @@ -8,10 +8,10 @@ os.loadAPI("scada-common/util.lua") os.loadAPI("scada-common/ppm.lua") os.loadAPI("scada-common/comms.lua") os.loadAPI("scada-common/mqueue.lua") -os.loadAPI("scada-common/modbus.lua") os.loadAPI("scada-common/rsio.lua") os.loadAPI("config.lua") +os.loadAPI("modbus.lua") os.loadAPI("rtu.lua") os.loadAPI("threads.lua") @@ -20,7 +20,7 @@ os.loadAPI("dev/boiler_rtu.lua") os.loadAPI("dev/imatrix_rtu.lua") os.loadAPI("dev/turbine_rtu.lua") -local RTU_VERSION = "alpha-v0.4.12" +local RTU_VERSION = "alpha-v0.4.13" local rtu_t = types.rtu_t @@ -147,7 +147,7 @@ for reactor_idx = 1, #rtu_redstone do reactor = rtu_redstone[reactor_idx].for_reactor, device = capabilities, -- use device field for redstone channels rtu = rs_rtu, - modbus_io = modbus.new(rs_rtu), + modbus_io = modbus.new(rs_rtu, false), modbus_busy = false, pkt_queue = nil, thread = nil @@ -199,7 +199,7 @@ for i = 1, #rtu_devices do reactor = rtu_devices[i].for_reactor, device = device, rtu = rtu_iface, - modbus_io = modbus.new(rtu_iface), + modbus_io = modbus.new(rtu_iface, true), modbus_busy = false, pkt_queue = mqueue.new(), thread = nil