#225 coordinator changes for new comms

This commit is contained in:
Mikayla Fischler
2023-06-06 19:41:09 -04:00
parent cdff7af431
commit 0f5ae9a756
5 changed files with 112 additions and 98 deletions

View File

@@ -5,7 +5,7 @@ local util = require("scada-common.util")
local config = require("coordinator.config")
local api = require("coordinator.session.api")
local pocket = require("coordinator.session.pocket")
local apisessions = {}
@@ -18,7 +18,7 @@ local self = {
-- PRIVATE FUNCTIONS --
-- handle a session output queue
---@param session api_session_struct
---@param session pkt_session_struct
local function _api_handle_outq(session)
-- record handler start time
local handle_start = util.time()
@@ -31,7 +31,7 @@ local function _api_handle_outq(session)
if msg ~= nil then
if msg.qtype == mqueue.TYPE.PACKET then
-- handle a packet to be sent
self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable())
self.modem.transmit(config.PKT_CHANNEL, config.CRD_CHANNEL, msg.message.raw_sendable())
elseif msg.qtype == mqueue.TYPE.COMMAND then
-- handle instruction/notification
elseif msg.qtype == mqueue.TYPE.DATA then
@@ -41,15 +41,15 @@ local function _api_handle_outq(session)
-- max 100ms spent processing queue
if util.time() - handle_start > 100 then
log.warning("API out queue handler exceeded 100ms queue process limit")
log.warning(util.c("offending session: port ", session.r_port))
log.warning("[API] out queue handler exceeded 100ms queue process limit")
log.warning(util.c("[API] offending session: ", session))
break
end
end
end
-- cleanly close a session
---@param session api_session_struct
---@param session pkt_session_struct
local function _shutdown(session)
session.open = false
session.instance.close()
@@ -58,11 +58,11 @@ local function _shutdown(session)
while session.out_queue.ready() do
local msg = session.out_queue.pop()
if msg ~= nil and msg.qtype == mqueue.TYPE.PACKET then
self.modem.transmit(session.r_port, session.l_port, msg.message.raw_sendable())
self.modem.transmit(config.PKT_CHANNEL, config.CRD_CHANNEL, msg.message.raw_sendable())
end
end
log.debug(util.c("closed API session ", session.instance.get_id(), " on remote port ", session.r_port))
log.debug(util.c("[API] closed session ", session))
end
-- PUBLIC FUNCTIONS --
@@ -81,54 +81,60 @@ end
-- find a session by remote port
---@nodiscard
---@param port integer
---@return api_session_struct|nil
function apisessions.find_session(port)
---@param source_addr integer
---@return pkt_session_struct|nil
function apisessions.find_session(source_addr)
for i = 1, #self.sessions do
if self.sessions[i].r_port == port then return self.sessions[i] end
if self.sessions[i].s_addr == source_addr then return self.sessions[i] end
end
return nil
end
-- establish a new API session
---@nodiscard
---@param local_port integer
---@param remote_port integer
---@param source_addr integer
---@param version string
---@return integer session_id
function apisessions.establish_session(local_port, remote_port, version)
---@class api_session_struct
local api_s = {
function apisessions.establish_session(source_addr, version)
---@class pkt_session_struct
local pkt_s = {
open = true,
version = version,
l_port = local_port,
r_port = remote_port,
s_addr = source_addr,
in_queue = mqueue.new(),
out_queue = mqueue.new(),
instance = nil ---@type api_session
instance = nil ---@type pkt_session
}
api_s.instance = api.new_session(self.next_id, api_s.in_queue, api_s.out_queue, config.API_TIMEOUT)
table.insert(self.sessions, api_s)
local id = self.next_id
log.debug(util.c("established new API session to ", remote_port, " with ID ", self.next_id))
pkt_s.instance = pocket.new_session(id, source_addr, pkt_s.in_queue, pkt_s.out_queue, config.API_TIMEOUT)
table.insert(self.sessions, pkt_s)
self.next_id = self.next_id + 1
local mt = {
---@param s pkt_session_struct
__tostring = function (s) return util.c("PKT [", id, "] (@", s.s_addr, ")") end
}
setmetatable(pkt_s, mt)
log.debug(util.c("[API] established new session: ", pkt_s))
self.next_id = id + 1
-- success
return api_s.instance.get_id()
return pkt_s.instance.get_id()
end
-- attempt to identify which session's watchdog timer fired
---@param timer_event number
function apisessions.check_all_watchdogs(timer_event)
for i = 1, #self.sessions do
local session = self.sessions[i] ---@type api_session_struct
local session = self.sessions[i] ---@type pkt_session_struct
if session.open then
local triggered = session.instance.check_wd(timer_event)
if triggered then
log.debug(util.c("watchdog closing API session ", session.instance.get_id(),
" on remote port ", session.r_port, "..."))
log.debug(util.c("[API] watchdog closing session ", session, "..."))
_shutdown(session)
end
end
@@ -138,7 +144,7 @@ end
-- iterate all the API sessions
function apisessions.iterate_all()
for i = 1, #self.sessions do
local session = self.sessions[i] ---@type api_session_struct
local session = self.sessions[i] ---@type pkt_session_struct
if session.open and session.instance.iterate() then
_api_handle_outq(session)
@@ -152,10 +158,9 @@ end
function apisessions.free_all_closed()
local f = function (session) return session.open end
---@param session api_session_struct
---@param session pkt_session_struct
local on_delete = function (session)
log.debug(util.c("free'ing closed API session ", session.instance.get_id(),
" on remote port ", session.r_port))
log.debug(util.c("[API] free'ing closed session ", session))
end
util.filter_table(self.sessions, f, on_delete)
@@ -164,7 +169,7 @@ end
-- close all open connections
function apisessions.close_all()
for i = 1, #self.sessions do
local session = self.sessions[i] ---@type api_session_struct
local session = self.sessions[i] ---@type pkt_session_struct
if session.open then _shutdown(session) end
end

View File

@@ -3,7 +3,7 @@ local log = require("scada-common.log")
local mqueue = require("scada-common.mqueue")
local util = require("scada-common.util")
local api = {}
local pocket = {}
local PROTOCOL = comms.PROTOCOL
-- local CAPI_TYPE = comms.CAPI_TYPE
@@ -21,8 +21,8 @@ local API_S_CMDS = {
local API_S_DATA = {
}
api.API_S_CMDS = API_S_CMDS
api.API_S_DATA = API_S_DATA
pocket.API_S_CMDS = API_S_CMDS
pocket.API_S_DATA = API_S_DATA
local PERIODICS = {
KEEP_ALIVE = 2000
@@ -31,11 +31,12 @@ local PERIODICS = {
-- pocket API session
---@nodiscard
---@param id integer session ID
---@param s_addr integer device source address
---@param in_queue mqueue in message queue
---@param out_queue mqueue out message queue
---@param timeout number communications timeout
function api.new_session(id, in_queue, out_queue, timeout)
local log_header = "api_session(" .. id .. "): "
function pocket.new_session(id, s_addr, in_queue, out_queue, timeout)
local log_header = "pkt_session(" .. id .. "): "
local self = {
-- connection properties
@@ -61,10 +62,10 @@ function api.new_session(id, in_queue, out_queue, timeout)
}
}
---@class api_session
---@class pkt_session
local public = {}
-- mark this API session as closed, stop watchdog
-- mark this pocket session as closed, stop watchdog
local function _close()
self.conn_watchdog.cancel()
self.connected = false
@@ -92,7 +93,7 @@ function api.new_session(id, in_queue, out_queue, timeout)
local m_pkt = comms.mgmt_packet()
m_pkt.make(msg_type, msg)
s_pkt.make(self.seq_num, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable())
s_pkt.make(s_addr, self.seq_num, PROTOCOL.SCADA_MGMT, m_pkt.raw_sendable())
out_queue.push_packet(s_pkt)
self.seq_num = self.seq_num + 1
@@ -134,11 +135,11 @@ function api.new_session(id, in_queue, out_queue, timeout)
self.last_rtt = srv_now - srv_start
if self.last_rtt > 750 then
log.warning(log_header .. "API KEEP_ALIVE round trip time > 750ms (" .. self.last_rtt .. "ms)")
log.warning(log_header .. "PKT KEEP_ALIVE round trip time > 750ms (" .. self.last_rtt .. "ms)")
end
-- log.debug(log_header .. "API RTT = " .. self.last_rtt .. "ms")
-- log.debug(log_header .. "API TT = " .. (srv_now - api_send) .. "ms")
-- log.debug(log_header .. "PKT RTT = " .. self.last_rtt .. "ms")
-- log.debug(log_header .. "PKT TT = " .. (srv_now - api_send) .. "ms")
else
log.debug(log_header .. "SCADA keep alive packet length mismatch")
end
@@ -171,7 +172,7 @@ function api.new_session(id, in_queue, out_queue, timeout)
function public.close()
_close()
_send_mgmt(SCADA_MGMT_TYPE.CLOSE, {})
println("connection to API session " .. id .. " closed by server")
println("connection to pocket session " .. id .. " closed by server")
log.info(log_header .. "session closed by server")
end
@@ -210,7 +211,7 @@ function api.new_session(id, in_queue, out_queue, timeout)
-- exit if connection was closed
if not self.connected then
println("connection to API session " .. id .. " closed by remote host")
println("connection to pocket session " .. id .. " closed by remote host")
log.info(log_header .. "session closed by remote host")
return self.connected
end
@@ -246,4 +247,4 @@ function api.new_session(id, in_queue, out_queue, timeout)
return public
end
return api
return pocket