Thank you Andrey. I’m also comfortable reading the source code to sort it out.
This Lua code we’re using so far, for the cluster sync traffic on Port 41001, is based on inspection of the wire traffic rather than the source code. There are some places in this code where I just skip a byte without knowing why, or read in several int32 fields and make an interpretation of their pattern that matches the data without knowing why.
The main thing I want to be sure of is that we’re looking in the right spot for each of the events: Binary events from nodes on port 41004, json events from nodes on port 41003, and both types of events from the Primary, multiplexed onto port 41001 (along with other sync traffic, and events sourced by other nodes).
cosm_lib = require "cosm_lib"
local p_cosm_cep = Proto("cosm_cep", "Unreal Cluster Sync Events v1.0.0");
local cep_table = DissectorTable.new("cosm_cep", "Unreal Cluster Sync Events", ftypes.INT, base.DEC, p_cosm_cep)
local f_class = ProtoField.string("cosm_cep.class", "Event Class")
local f_type = ProtoField.string("cosm_cep.type", "Event Type")
local f_source = ProtoField.string("cosm_cep.source", "Source")
local f_destination = ProtoField.string("cosm_cep.destination", "Destination")
local f_jce_count = ProtoField.uint32("cosm_cep.jce_count", "JSON Event Count", base.DEC)
local f_bce_count = ProtoField.uint32("cosm_cep.bce_count", "Binary Event Count", base.DEC)
p_cosm_cep.fields = { f_class, f_type, f_source, f_destination, f_jce_count, f_bce_count }
local data_dis = Dissector.get("data")
function p_cosm_cep.dissector(buf, pkt, tree)
local subtree = tree:add(p_cosm_cep, buf)
local ip_src = pkt.net_src -- Source IP address from the network layer
local ip_dst = pkt.net_dst -- Destination IP address from the network layer
local ip_source_port = pkt.src_port
local payload_len = buf(0,4):le_uint()
local cursor = 4
-- e.g. GetEventsData
local type_len = buf(cursor + 0, 4):le_uint()
local type_value = buf(cursor + 4, type_len):stringz()
cursor = cursor + (4 + type_len)
-- e.g. response
local subtype_len = buf(cursor + 0, 4):le_uint()
local subtype_value = buf(cursor + 4, subtype_len):stringz()
cursor = cursor + (4 + subtype_len)
-- e.g. ClusterSync
local family_len = buf(cursor + 0, 4):le_uint()
local family_value = buf(cursor + 4, family_len):stringz()
cursor = cursor + (4 + family_len)
-- Add the source and destination GPs
subtree:add(f_class, family_value)
subtree:add(f_type, type_value)
subtree:add(f_source, cosm_lib.ip_to_gp(ip_src))
subtree:add(f_destination, cosm_lib.ip_to_gp(ip_dst))
if ip_source_port == 41001 then
-- TODO :Factor this out into its own dissector for ease of filtering
if (type_value == "GetEventsData") and (string.lower(subtype_value) == "response") and (family_value == "ClusterSync") then
-- ???
cursor = cursor + 1
-- ??? Three int32 values
-- (0,0,1) followed by:
-- (3, CS) followed by:
-- int32 event count (or 1)
-- json cluster events in a special format
-- int32 length (from first byte after length)
-- "category:type:name:system:discard:v5=(k=v,...);" nul-terminated; v5 is "Settings"
-- if buffer left then int32 1
-- then (3, CS)
-- int32 1
-- binary cluster events
-- (0,0,0) followed by:
-- int32 1 followed by
-- (3, CS) followed by:
-- int32 event count
-- binary cluster events, encrypted
-- OR in32 0 followed by other zeros meaning, no events
local field_tbd3 = buf(cursor, 4):le_uint()
cursor = cursor + 4
local field_tbd4 = buf(cursor, 4):le_uint()
cursor = cursor + 4
local field_tbd5 = buf(cursor, 4):le_uint()
cursor = cursor + 4
if (field_tbd3 == 0) and (field_tbd4 == 0) then
if (field_tbd5 == 1) then
-- Handle reformatted JSON cluster events
-- Get the CEPJ dissector inside this function to pick up registrations from other scripts
local cepj_dissector = DissectorTable.get("cosm_cep"):get_dissector(0)
-- CS
local flag_len = buf(cursor, 4):le_uint()
local flag_value = buf(cursor + 4, flag_len):stringz()
cursor = cursor + (4 + flag_len)
-- Count
local jce_count = buf(cursor + 0, 4):le_uint()
cursor = cursor + 4
subtree:add(f_jce_count, jce_count)
for i = 1, jce_count do
local jce_len = buf(cursor + 0, 4):le_uint()
if cepj_dissector ~= nil then
-- Dissector was found, invoke subdissector with a new Tvb,
-- created from the current buffer.
cepj_dissector:call(buf(cursor, jce_len + 4):tvb(), pkt, tree)
else
-- fallback dissector that just shows the raw data.
data_dis:call(buf(cursor + 4, jce_len):tvb(), pkt, tree)
end
cursor = cursor + (4 + jce_len)
end
-- If buffer remaining, it's binary cluster events.
-- Trick the code below into parsing them
if cursor < buf:len() then
field_tbd5 = 0
end
else
subtree:add(f_jce_count, 0)
end
if field_tbd5 == 0 then
-- Handle binary cluster events
local field_tbd6 = buf(cursor + 0, 4):le_uint()
cursor = cursor + 4
if field_tbd6 > 0 then
-- Get the CEB dissector inside this function to pick up registrations from other scripts
local ceb_dissector = DissectorTable.get("wtap_encap"):get_dissector(wtap.USER15)
-- CS
local flag_len = buf(cursor, 4):le_uint()
local flag_value = buf(cursor + 4, flag_len):stringz()
cursor = cursor + (4 + flag_len)
-- Count
local bce_count = buf(cursor + 0, 4):le_uint()
cursor = cursor + 4
subtree:add(f_bce_count, bce_count)
for i = 1, bce_count do
local bce_len = buf(cursor + 0, 4):le_uint()
if ceb_dissector ~= nil then
-- Dissector was found, invoke subdissector with a new Tvb,
-- created from the current buffer.
ceb_dissector:call(buf(cursor, bce_len + 4):tvb(), pkt, tree)
else
-- fallback dissector that just shows the raw data.
data_dis:call(buf(cursor + 4, bce_len):tvb(), pkt, tree)
end
cursor = cursor + (4 + bce_len)
end
else
subtree:add(f_bce_count, 0)
end
else
subtree:add(f_bce_count, 0)
end
end
else
-- Don't understand this subformat. Fallback dissector that just shows the raw data.
subtree:add(f_jce_count, 0)
subtree:add(f_bce_count, 0)
data_dis:call(buf(cursor):tvb(), pkt, tree)
end
else
-- Event type we don't care about. fallback dissector that just shows the raw data.
subtree:add(f_jce_count, 0)
subtree:add(f_bce_count, 0)
data_dis:call(buf(cursor):tvb(), pkt, tree)
end
end
local wtap_encap_table = DissectorTable.get("wtap_encap")
wtap_encap_table:add(wtap.USER14, p_cosm_cep)
wtap_encap_table:add(wtap.USER11, p_cosm_cep)
local tcp_encap_table = DissectorTable.get("tcp.port")
tcp_encap_table:add(41001 , p_cosm_cep)