#9 MODBUS test code; fixed rtu, modbus, redstone_rtu, and rsio bugs

This commit is contained in:
Mikayla Fischler
2022-05-27 18:10:06 -04:00
parent 51111f707f
commit 6df0a1d149
8 changed files with 385 additions and 80 deletions

236
test/modbustest.lua Normal file
View File

@@ -0,0 +1,236 @@
require("/initenv").init_env()
local types = require("scada-common.types")
local util = require("scada-common.util")
local testutils = require("test.testutils")
local modbus = require("rtu.modbus")
local redstone_rtu = require("rtu.dev.redstone_rtu")
local rsio = require("scada-common.rsio")
local print = util.print
local println = util.println
local MODBUS_FCODE = types.MODBUS_FCODE
local MODBUS_EXCODE = types.MODBUS_EXCODE
println("starting redstone RTU and MODBUS tester")
println("")
-- RTU init --
print(">>> init redstone RTU: ")
local rs_rtu = redstone_rtu.new()
local di, c, ir, hr = rs_rtu.io_count()
assert(di == 0 and c == 0 and ir == 0 and hr == 0, "IOCOUNT_0")
rs_rtu.link_di("back", colors.black)
rs_rtu.link_di("back", colors.blue)
rs_rtu.link_do(rsio.IO.F_ALARM, "back", colors.red)
rs_rtu.link_do(rsio.IO.WASTE_AM, "back", colors.purple)
rs_rtu.link_ai("right")
rs_rtu.link_ao("left")
di, c, ir, hr = rs_rtu.io_count()
assert(di == 2, "IOCOUNT_DI")
assert(c == 2, "IOCOUNT_C")
assert(ir == 1, "IOCOUNT_IR")
assert(hr == 1, "IOCOUNT_HR")
println("OK")
-- MODBUS testing --
local rs_modbus = modbus.new(rs_rtu, false)
local mbt = testutils.modbus_tester(rs_modbus, MODBUS_FCODE.ERROR_FLAG)
-------------------------
--- CHECKING REQUESTS ---
-------------------------
println(">>> checking MODBUS requests:")
print("read c {0}: ")
mbt.pkt_set(MODBUS_FCODE.READ_COILS, {0})
mbt.test_error__check_request(MODBUS_EXCODE.NEG_ACKNOWLEDGE)
println("PASS")
print("99 {1,2}: ")
mbt.pkt_set(99, {1, 2})
mbt.test_error__check_request(MODBUS_EXCODE.ILLEGAL_FUNCTION)
println("PASS")
print("read c {1,2}: ")
mbt.pkt_set(MODBUS_FCODE.READ_COILS, {1, 2})
mbt.test_success__check_request(MODBUS_EXCODE.ACKNOWLEDGE)
println("PASS")
testutils.pause()
--------------------
--- BAD REQUESTS ---
--------------------
println(">>> trying bad requests:")
print("read di {1,10}: ")
mbt.pkt_set(MODBUS_FCODE.READ_DISCRETE_INPUTS, {1, 10})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("read di {5,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_DISCRETE_INPUTS, {5, 1})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("read di {1,0}: ")
mbt.pkt_set(MODBUS_FCODE.READ_DISCRETE_INPUTS, {1, 0})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("read c {5,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_COILS, {5, 1})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("read c {1,0}: ")
mbt.pkt_set(MODBUS_FCODE.READ_COILS, {1, 0})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("read ir {5,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_INPUT_REGS, {5, 1})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("read ir {1,0}: ")
mbt.pkt_set(MODBUS_FCODE.READ_INPUT_REGS, {1, 0})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("read hr {5,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_MUL_HOLD_REGS, {5, 1})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("write c {5,1}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_COIL, {5, 1})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("write mul c {5,1}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_COIL, {5, 1})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("write mul c {5,{1}}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_COIL, {5, {1}})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("write hr {5,1}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_HOLD_REG, {5, 1})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
print("write mul hr {5,{1}}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_HOLD_REG, {5, {1}})
mbt.test_error__handle_packet(MODBUS_EXCODE.ILLEGAL_DATA_ADDR)
println("PASS")
testutils.pause()
----------------------
--- READING INPUTS ---
----------------------
println(">>> reading inputs:")
print("read di {1,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_DISCRETE_INPUTS, {1, 1})
mbt.test_success__handle_packet()
print("read di {2,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_DISCRETE_INPUTS, {2, 1})
mbt.test_success__handle_packet()
print("read di {1,2}: ")
mbt.pkt_set(MODBUS_FCODE.READ_DISCRETE_INPUTS, {1, 2})
mbt.test_success__handle_packet()
print("read ir {1,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_INPUT_REGS, {1, 1})
mbt.test_success__handle_packet()
testutils.pause()
-----------------------
--- WRITING OUTPUTS ---
-----------------------
println(">>> writing outputs:")
print("write mul c {1,{LOW,LOW}}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_MUL_COILS, {1, {rsio.IO_LVL.LOW, rsio.IO_LVL.LOW}})
mbt.test_success__handle_packet()
testutils.pause()
print("write c {1,HIGH}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_COIL, {1, rsio.IO_LVL.HIGH})
mbt.test_success__handle_packet()
testutils.pause()
print("write c {2,HIGH}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_COIL, {2, rsio.IO_LVL.HIGH})
mbt.test_success__handle_packet()
testutils.pause()
print("write hr {1,7}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_SINGLE_HOLD_REG, {1, 7})
mbt.test_success__handle_packet()
testutils.pause()
print("write mul hr {1,{4}}: ")
mbt.pkt_set(MODBUS_FCODE.WRITE_MUL_HOLD_REGS, {1, {4}})
mbt.test_success__handle_packet()
println("PASS")
testutils.pause()
-----------------------
--- READING OUTPUTS ---
-----------------------
println(">>> reading outputs:")
print("read c {1,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_COILS, {1, 1})
mbt.test_success__handle_packet()
print("read c {2,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_COILS, {2, 1})
mbt.test_success__handle_packet()
print("read c {1,2}: ")
mbt.pkt_set(MODBUS_FCODE.READ_COILS, {1, 2})
mbt.test_success__handle_packet()
print("read hr {1,1}: ")
mbt.pkt_set(MODBUS_FCODE.READ_MUL_HOLD_REGS, {1, 1})
mbt.test_success__handle_packet()
println("PASS")
println("TEST COMPLETE")

View File

@@ -47,8 +47,7 @@ end
assert(max_value == cid, "RS_IO last IDx out-of-sync with count: " .. max_value .. " (count " .. cid .. ")")
---@diagnostic disable-next-line: undefined-field
os.sleep(1)
testutils.pause()
println(">>> checking invalid channels:")
@@ -57,8 +56,7 @@ testutils.test_func_nil("rsio.to_string", rsio.to_string, "")
testutils.test_func("rsio.get_io_mode", rsio.get_io_mode, { -1, 100, false }, IO_MODE.ANALOG_IN)
testutils.test_func_nil("rsio.get_io_mode", rsio.get_io_mode, IO_MODE.ANALOG_IN)
---@diagnostic disable-next-line: undefined-field
os.sleep(1)
testutils.pause()
println(">>> checking validity checks:")
@@ -76,8 +74,7 @@ testutils.test_func("rsio.is_color", rsio.is_color, ic_t_list, true)
testutils.test_func("rsio.is_color", rsio.is_color, { 0, 999999, colors.combine(colors.red, colors.blue, colors.black) }, false)
testutils.test_func_nil("rsio.is_color", rsio.is_color, false)
---@diagnostic disable-next-line: undefined-field
os.sleep(1)
testutils.pause()
println(">>> checking channel-independent I/O wrappers:")
@@ -98,8 +95,7 @@ assert(rsio.analog_write(4, 0, 15) == 4, "RS_WRITE_4_15")
assert(rsio.analog_write(12, 0, 15) == 12, "RS_WRITE_12_15")
println("PASS")
---@diagnostic disable-next-line: undefined-field
os.sleep(1)
testutils.pause()
println(">>> checking channel I/O:")
@@ -124,25 +120,25 @@ println("PASS")
print("rsio.digital_write(...): ")
-- check output channels
assert(rsio.digital_write(IO.F_ALARM, false) == IO_LVL.LOW, "IO_F_ALARM_FALSE")
assert(rsio.digital_write(IO.F_ALARM, true) == IO_LVL.HIGH, "IO_F_ALARM_TRUE")
assert(rsio.digital_write(IO.WASTE_PO, false) == IO_LVL.HIGH, "IO_WASTE_PO_FALSE")
assert(rsio.digital_write(IO.WASTE_PO, true) == IO_LVL.LOW, "IO_WASTE_PO_TRUE")
assert(rsio.digital_write(IO.WASTE_PU, false) == IO_LVL.HIGH, "IO_WASTE_PU_FALSE")
assert(rsio.digital_write(IO.WASTE_PU, true) == IO_LVL.LOW, "IO_WASTE_PU_TRUE")
assert(rsio.digital_write(IO.WASTE_AM, false) == IO_LVL.HIGH, "IO_WASTE_AM_FALSE")
assert(rsio.digital_write(IO.WASTE_AM, true) == IO_LVL.LOW, "IO_WASTE_AM_TRUE")
assert(rsio.digital_write(IO.F_ALARM, IO_LVL.LOW) == false, "IO_F_ALARM_FALSE")
assert(rsio.digital_write(IO.F_ALARM, IO_LVL.HIGH) == true, "IO_F_ALARM_TRUE")
assert(rsio.digital_write(IO.WASTE_PO, IO_LVL.HIGH) == false, "IO_WASTE_PO_FALSE")
assert(rsio.digital_write(IO.WASTE_PO, IO_LVL.LOW) == true, "IO_WASTE_PO_TRUE")
assert(rsio.digital_write(IO.WASTE_PU, IO_LVL.HIGH) == false, "IO_WASTE_PU_FALSE")
assert(rsio.digital_write(IO.WASTE_PU, IO_LVL.LOW) == true, "IO_WASTE_PU_TRUE")
assert(rsio.digital_write(IO.WASTE_AM, IO_LVL.HIGH) == false, "IO_WASTE_AM_FALSE")
assert(rsio.digital_write(IO.WASTE_AM, IO_LVL.LOW) == true, "IO_WASTE_AM_TRUE")
-- check all reactor output channels (all are active high)
for i = IO.R_ALARM, (IO.R_PLC_TIMEOUT - IO.R_ALARM + 1) do
assert(rsio.to_string(i) ~= "", "REACTOR_IO_BAD_CHANNEL")
assert(rsio.digital_write(i, false) == IO_LVL.LOW, "IO_" .. rsio.to_string(i) .. "_FALSE")
assert(rsio.digital_write(i, true) == IO_LVL.HIGH, "IO_" .. rsio.to_string(i) .. "_TRUE")
assert(rsio.digital_write(i, IO_LVL.LOW) == false, "IO_" .. rsio.to_string(i) .. "_FALSE")
assert(rsio.digital_write(i, IO_LVL.HIGH) == true, "IO_" .. rsio.to_string(i) .. "_TRUE")
end
-- non-outputs should always return false
assert(rsio.digital_write(IO.F_SCRAM, true) == IO_LVL.LOW, "IO_IN_WRITE_LOW")
assert(rsio.digital_write(IO.F_SCRAM, false) == IO_LVL.LOW, "IO_IN_WRITE_HIGH")
assert(rsio.digital_write(IO.F_SCRAM, IO_LVL.LOW) == false, "IO_IN_WRITE_LOW")
assert(rsio.digital_write(IO.F_SCRAM, IO_LVL.LOW) == false, "IO_IN_WRITE_HIGH")
println("PASS")

View File

@@ -41,4 +41,82 @@ function testutils.test_func_nil(name, f, result)
println("PASS")
end
-- get something as a string
---@param result any
---@return string
function testutils.stringify(result)
return textutils.serialize(result, { allow_repetitions = true, compact = true })
end
-- pause for 1 second, or the provided seconds
---@param seconds? number
function testutils.pause(seconds)
seconds = seconds or 1.0
---@diagnostic disable-next-line: undefined-field
os.sleep(seconds)
end
-- create a new MODBUS tester
---@param modbus modbus modbus object
---@param error_flag MODBUS_FCODE MODBUS_FCODE.ERROR_FLAG
function testutils.modbus_tester(modbus, error_flag)
-- test packet
---@type modbus_frame
local packet = {
txn_id = 0,
length = 0,
unit_id = 0,
func_code = 0,
data = {},
scada_frame = nil
}
---@class modbus_tester
local public = {}
-- set the packet function and data for the next test
---@param func MODBUS_FCODE function code
---@param data table
function public.pkt_set(func, data)
packet.length = #data
packet.data = data
packet.func_code = func
end
-- check the current packet, expecting an error
---@param excode MODBUS_EXCODE exception code to expect
function public.test_error__check_request(excode)
local rcode, reply = modbus.check_request(packet)
assert(rcode == false, "CHECK_NOT_FAIL")
assert(reply.get().func_code == bit.bor(packet.func_code, error_flag), "WRONG_FCODE")
assert(reply.get().data[1] == excode, "EXCODE_MISMATCH")
end
-- test the current packet, expecting an error
---@param excode MODBUS_EXCODE exception code to expect
function public.test_error__handle_packet(excode)
local rcode, reply = modbus.handle_packet(packet)
assert(rcode == false, "CHECK_NOT_FAIL")
assert(reply.get().func_code == bit.bor(packet.func_code, error_flag), "WRONG_FCODE")
assert(reply.get().data[1] == excode, "EXCODE_MISMATCH")
end
-- check the current packet, expecting success
---@param excode MODBUS_EXCODE exception code to expect
function public.test_success__check_request(excode)
local rcode, reply = modbus.check_request(packet)
assert(rcode, "CHECK_NOT_OK")
assert(reply.get().data[1] == excode, "EXCODE_MISMATCH")
end
-- test the current packet, expecting success
function public.test_success__handle_packet()
local rcode, reply = modbus.handle_packet(packet)
assert(rcode, "CHECK_NOT_OK")
println(testutils.stringify(reply.get().data))
end
return public
end
return testutils