Skip to content

Commit

Permalink
program.ipfix.probe_rss: use pools to assign collector address and port
Browse files Browse the repository at this point in the history
To allow for diverse destinations and ports per exporter, introduce
pools that contain  lists of address/port pairs and assign each
exporter to a pool. Exporters cycle through the pool if it doesn't
hold enough unique destinations.
  • Loading branch information
Alexander Gall committed May 4, 2020
1 parent 5f05dab commit 15ef82b
Showing 1 changed file with 34 additions and 3 deletions.
37 changes: 34 additions & 3 deletions src/program/ipfix/probe_rss/probe_rss.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ local interface_config = {
}
local ipfix_config = {
default = { default = {} },
collector_pools = { default = {} },
maps = { default = {} },
observation_domain_base = { default = 256 },
exporters = { required = true }
}
local collector_pool_config = {
ip = { required = true },
port = { required = true }
}
local maps_config = {
pfx_to_as = { default = nil },
vlan_to_ifindex = { default = nil },
Expand All @@ -37,8 +42,7 @@ local maps_config = {
local exporter_config = {
templates = { required = true },
rss_class = { default = "default" },
collector_ip = { default = nil },
collector_port = { default = nil },
collector_pool = { default = nil },
use_maps = { default = false },
maps_log_dir = { default = nil },
instances = { default = {} }
Expand All @@ -58,6 +62,8 @@ local jit_config = {
}
local ipfix_default_config = lib.deepcopy(probe.probe_config)
for _, key in ipairs({
"collector_ip",
"collector_port",
"observation_domain",
"exporter_mac",
"templates",
Expand Down Expand Up @@ -95,10 +101,18 @@ local function create_workers (probe_config, duration, busywait, jit, logger)
local maps = lib.parse(main.ipfix.maps, maps_config)
local ipfix = lib.parse(main.ipfix, ipfix_config)
local ipfix_default = lib.parse(ipfix.default, ipfix_default_config)
local collector_pools = {}
for name, list in pairs(ipfix.collector_pools) do
pool = {}
for _, member in ipairs(list) do
table.insert(pool, lib.parse(member, collector_pool_config))
end
collector_pools[name] = pool
end

local function merge_with_default (config)
local merged = lib.deepcopy(ipfix_default)
for _, key in ipairs({ "collector_ip", "collector_port", "templates" }) do
for _, key in ipairs({ "collector_pool", "templates" }) do
if config[key] then
merged[key] = config[key]
end
Expand All @@ -121,6 +135,7 @@ local function create_workers (probe_config, duration, busywait, jit, logger)
local mellanox = {}
local rss_workers = {}
local observation_domain = ipfix.observation_domain_base

for rssq = 0, main.hw_rss_scaling - 1 do
local inputs, outputs = {}, {}
for i, interface in ipairs(main.interfaces) do
Expand Down Expand Up @@ -149,6 +164,7 @@ local function create_workers (probe_config, duration, busywait, jit, logger)
for _, exporter in ipairs(main.ipfix.exporters) do
local exporter = lib.parse(exporter, exporter_config)
local config = merge_with_default(exporter)

config.output_type = "tap_routed"
config.instance = nil
config.add_packet_metadata = false
Expand All @@ -163,6 +179,18 @@ local function create_workers (probe_config, duration, busywait, jit, logger)
local instance = lib.parse(instance, instance_config)
local rss_link = rss_link_name(exporter.rss_class, instance.weight)
local od = observation_domain

-- Select the collector ip and port from the front of the
-- pool and rotate the pool's elements by one
local pool = config.collector_pool
assert(collector_pools[pool] and #collector_pools[pool] > 0,
"Undefined or empty collector pool: "..pool)
collector = table.remove(collector_pools[pool], 1)
table.insert(collector_pools[pool], collector)
iconfig.collector_ip = collector.ip
iconfig.collector_port = collector.port
iconfig.collector_pool = nil

observation_domain = observation_domain + 1
iconfig.observation_domain = od
iconfig.output = "ipfixexport"..od
Expand Down Expand Up @@ -192,6 +220,9 @@ local function create_workers (probe_config, duration, busywait, jit, logger)
tostring(instance.busywait), probe.value_to_string(jit)
)
local child_pid = worker.start(rss_link, worker_expr)
logger:log(string.format("Selected collector %s:%d from pool %s "
.."for process #%d ",
collector.ip, collector.port, pool, child_pid))
logger:log("Launched IPFIX worker process #"..child_pid)
shm.create("ipfix_workers/"..child_pid, "uint64_t")
end
Expand Down

0 comments on commit 15ef82b

Please sign in to comment.