Skip to content

Commit

Permalink
Refactor template definitions
Browse files Browse the repository at this point in the history
Templates are now defined as elements of a table to be able to refer
to them by name for configuration.

The names of the templates passed to the IPFIX constructor can now
have an optional parameter attached to it which sets a
template-specific cache size. The size is appended to the name of the
template separared by ":'.
  • Loading branch information
alexandergall committed Mar 15, 2018
1 parent 46263a7 commit 57b71a0
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 75 deletions.
31 changes: 27 additions & 4 deletions src/apps/ipfix/ipfix.lua
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,25 @@ end

FlowSet = {}

function FlowSet:new (template, args)
function FlowSet:new (spec, args)
local t = {}
for s in spec:split(':') do
table.insert(t, s)
end
assert(#t == 1 or #t == 2, "Invalid template specifier: "..spec)
local template_name, cache_size = unpack(t)
assert(template.templates[template_name],
"Undefined template : "..template_name)
if cache_size then
assert(cache_size:match("^%d+$"),
string.format("Invalid cache size for template %s: %s",
template_name, cache_size))
args.cache_size = tonumber(cache_size)
end

local template =
template.make_template_info(template.templates[template_name])
template.name = template_name
assert(args.active_timeout > args.scan_time,
string.format("Template #%d: active timeout (%d) "
.."must be larger than scan time (%d)",
Expand Down Expand Up @@ -199,19 +217,23 @@ function FlowSet:new (template, args)
return setmetatable(o, { __index = self })
end

function FlowSet:id()
return string.format("%s(#%d)", self.template.name, self.template.id)
end

function FlowSet:record_flows(timestamp)
local entry = self.scratch_entry
timestamp = to_milliseconds(timestamp)
for i=1,link.nreadable(self.incoming) do
local pkt = link.receive(self.incoming)
counter.add(self.shm.packets_in)
self.template.extract(pkt, timestamp, entry)
self.template:extract(pkt, timestamp, entry)
packet.free(pkt)
local lookup_result = self.table:lookup_ptr(entry.key)
if lookup_result == nil then
self.table:add(entry.key, entry.value)
else
self.template.accumulate(lookup_result, entry)
self.template:accumulate(lookup_result, entry)
end
end
end
Expand Down Expand Up @@ -356,7 +378,7 @@ local ipfix_config_params = {
exporter_eth_dst = { default = '00:00:00:00:00:00' },
collector_ip = { required = true },
collector_port = { required = true },
templates = { default = { template.v4, template.v6 } }
templates = { default = { "v4", "v6" } }
}

function IPFIX:new(config)
Expand Down Expand Up @@ -435,6 +457,7 @@ function IPFIX:new(config)
o.flow_sets = {}
for _, template in ipairs(config.templates) do
table.insert(o.flow_sets, FlowSet:new(template, flow_set_args))
print("Added template "..o.flow_sets[#o.flow_sets]:id())
end

o.stats_timer = lib.throttle(5)
Expand Down
143 changes: 72 additions & 71 deletions src/apps/ipfix/template.lua
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ local swap_fn_env = { htons = htons, htonl = htonl, htonq = htonq }

-- Create a table describing the information needed to create
-- flow templates and data records.
local function make_template_info(spec)
function make_template_info(spec)
-- Representations of IPFIX IEs.
local ctypes =
{ unsigned8 = 'uint8_t', unsigned16 = 'uint16_t',
Expand Down Expand Up @@ -207,7 +207,9 @@ local function make_template_info(spec)
match = pf.compile_filter(spec.filter),
logger = lib.logger_new({ module = "IPFIX template #"..spec.id }),
counters = spec.counters,
counters_names = counters_names
counters_names = counters_names,
extract = spec.extract,
accumulate = spec.accumulate
}
end

Expand Down Expand Up @@ -324,89 +326,88 @@ local function accumulate_generic(dst, new)
dst.value.octetDeltaCount + new.value.octetDeltaCount
end

v4 = make_template_info {
id = 256,
filter = "ip",
keys = { "sourceIPv4Address",
"destinationIPv4Address",
"protocolIdentifier",
"sourceTransportPort",
"destinationTransportPort" },
values = { "flowStartMilliseconds",
"flowEndMilliseconds",
"packetDeltaCount",
"octetDeltaCount",
"tcpControlBitsReduced" }
}

function v4.extract(pkt, timestamp, entry)
local function v4_extract (self, pkt, timestamp, entry)
local md = metadata_get(pkt)
extract_5_tuple(pkt, timestamp, entry, md, extract_v4_addr)
if md.proto == IP_PROTO_TCP and md.frag_offset == 0 then
extract_tcp_flags_reduced(md.l4, entry)
end
end

function v4.accumulate(dst, new)
accumulate_generic(dst, new)
if dst.key.protocolIdentifier == IP_PROTO_TCP then
accumulate_tcp_flags_reduced(dst, new)
end
end

function v4.tostring(entry)
local ipv4 = require("lib.protocol.ipv4")
local key = entry.key
local protos =
{ [IP_PROTO_TCP]='TCP', [IP_PROTO_UDP]='UDP', [IP_PROTO_SCTP]='SCTP' }
return string.format(
"%s (%d) -> %s (%d) [%s]",
ipv4:ntop(key.sourceIPv4Address), key.sourceTransportPort,
ipv4:ntop(key.destinationIPv4Address), key.destinationTransportPort,
protos[key.protocolIdentifier] or tostring(key.protocolIdentifier))
end

v6 = make_template_info {
id = 257,
filter = "ip6",
keys = { "sourceIPv6Address",
"destinationIPv6Address",
"protocolIdentifier",
"sourceTransportPort",
"destinationTransportPort" },
values = { "flowStartMilliseconds",
"flowEndMilliseconds",
"packetDeltaCount",
"octetDeltaCount",
"tcpControlBitsReduced" }
}

function v6.extract(pkt, timestamp, entry)
local function v6_extract (self, pkt, timestamp, entry)
local md = metadata_get(pkt)
extract_5_tuple(pkt, timestamp, entry, md, extract_v6_addr)
if md.proto == IP_PROTO_TCP and md.frag_offset == 0 then
extract_tcp_flags_reduced(md.l4, entry)
end
end

function v6.accumulate(dst, new)
accumulate_generic(dst, new)
if dst.key.protocolIdentifier == IP_PROTO_TCP then
accumulate_tcp_flags_reduced(dst, new)
end
end

function v6.tostring(entry)
local ipv6 = require("lib.protocol.ipv6")
local key = entry.key
local protos =
{ [IP_PROTO_TCP]='TCP', [IP_PROTO_UDP]='UDP', [IP_PROTO_SCTP]='SCTP' }
return string.format(
"%s (%d) -> %s (%d) [%s]",
ipv6:ntop(key.sourceIPv6Address), key.sourceTransportPort,
ipv6:ntop(key.destinationIPv6Address), key.destinationTransportPort,
protos[key.protocolIdentifier] or tostring(key.protocolIdentifier))
end
templates = {
v4 = {
id = 256,
filter = "ip",
keys = { "sourceIPv4Address",
"destinationIPv4Address",
"protocolIdentifier",
"sourceTransportPort",
"destinationTransportPort" },
values = { "flowStartMilliseconds",
"flowEndMilliseconds",
"packetDeltaCount",
"octetDeltaCount",
"tcpControlBitsReduced" },
extract = v4_extract,
accumulate = function (self, dst, new)
accumulate_generic(dst, new)
if dst.key.protocolIdentifier == IP_PROTO_TCP then
accumulate_tcp_flags_reduced(dst, new)
end
end,
tostring = function (entry)
local ipv4 = require("lib.protocol.ipv4")
local key = entry.key
local protos =
{ [IP_PROTO_TCP]='TCP', [IP_PROTO_UDP]='UDP', [IP_PROTO_SCTP]='SCTP' }
return string.format(
"%s (%d) -> %s (%d) [%s]",
ipv4:ntop(key.sourceIPv4Address), key.sourceTransportPort,
ipv4:ntop(key.destinationIPv4Address), key.destinationTransportPort,
protos[key.protocolIdentifier] or tostring(key.protocolIdentifier))
end
},
v6 = {
id = 512,
filter = "ip6",
keys = { "sourceIPv6Address",
"destinationIPv6Address",
"protocolIdentifier",
"sourceTransportPort",
"destinationTransportPort" },
values = { "flowStartMilliseconds",
"flowEndMilliseconds",
"packetDeltaCount",
"octetDeltaCount",
"tcpControlBitsReduced" },
extract = v6_extract,
accumulate = function (self, dst, new)
accumulate_generic(dst, new)
if dst.key.protocolIdentifier == IP_PROTO_TCP then
accumulate_tcp_flags_reduced(dst, new)
end
end,
tostring = function (entry)
local ipv6 = require("lib.protocol.ipv6")
local key = entry.key
local protos =
{ [IP_PROTO_TCP]='TCP', [IP_PROTO_UDP]='UDP', [IP_PROTO_SCTP]='SCTP' }
return string.format(
"%s (%d) -> %s (%d) [%s]",
ipv6:ntop(key.sourceIPv6Address), key.sourceTransportPort,
ipv6:ntop(key.destinationIPv6Address), key.destinationTransportPort,
protos[key.protocolIdentifier] or tostring(key.protocolIdentifier))
end
},
}

function selftest()
print('selftest: apps.ipfix.template')
Expand Down

0 comments on commit 57b71a0

Please sign in to comment.