Skip to content

Commit

Permalink
Add counters
Browse files Browse the repository at this point in the history
  • Loading branch information
alexandergall committed Mar 15, 2018
1 parent e2587ae commit f40c1b1
Showing 1 changed file with 53 additions and 7 deletions.
60 changes: 53 additions & 7 deletions src/apps/ipfix/ipfix.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ local template = require("apps.ipfix.template")
local lib = require("core.lib")
local link = require("core.link")
local packet = require("core.packet")
local shm = require("core.shm")
local counter = require("core.counter")
local datagram = require("lib.protocol.datagram")
local ether = require("lib.protocol.ethernet")
local ipv4 = require("lib.protocol.ipv4")
Expand Down Expand Up @@ -159,6 +161,14 @@ function FlowSet:new (template, args)
o.match = template.match
o.incoming_link_name, o.incoming = new_internal_link('IPFIX incoming')

o.shm = shm.create_frame("templates/"..template.id,
{ packets_in = { counter },
flow_export_packets = { counter },
exported_flows = { counter },
table_size = { counter, o.table.size },
table_byte_size = { counter, o.table.byte_size },
table_occupancy = { counter, o.table.occupancy },
table_max_displacement = { counter, o.table.max_displacement } })
return setmetatable(o, { __index = self })
end

Expand All @@ -167,6 +177,7 @@ function FlowSet:record_flows(timestamp)
timestamp = to_milliseconds(timestamp)
for i=1,link.nreadable(self.incoming) do
local pkt = link.receive(self.incoming)
counter.add(self.shm.packets_in)
self.template.extract(pkt, timestamp, entry)
packet.free(pkt)
local lookup_result = self.table:lookup_ptr(entry.key)
Expand Down Expand Up @@ -200,6 +211,7 @@ function FlowSet:add_data_record(record, out)
ffi.copy(ptr, record, record_len)
self.template.swap_fn(ffi.cast(self.template.record_ptr_t, ptr))
pkt.length = pkt.length + record_len
counter.add(self.shm.exported_flows)

self.record_count = self.record_count + 1
if self.record_count == self.max_record_count then
Expand Down Expand Up @@ -228,6 +240,7 @@ function FlowSet:flush_data_records(out)
pkt = self.parent:add_ipfix_header(pkt, record_count)
pkt = self.parent:add_transport_headers(pkt)
link.transmit(out, pkt)
counter.add(self.shm.flow_export_packets)
end

-- Print debugging messages for a flow.
Expand Down Expand Up @@ -279,6 +292,13 @@ function FlowSet:expire_records(out, now)
if self.flush_timer() then self:flush_data_records(out) end
end

function FlowSet:sync_stats()
counter.set(self.shm.table_size, self.table.size)
counter.set(self.shm.table_byte_size, self.table.byte_size)
counter.set(self.shm.table_occupancy, self.table.occupancy)
counter.set(self.shm.table_max_displacement, self.table.max_displacement)
end

IPFIX = {}
local ipfix_config_params = {
idle_timeout = { default = 300 },
Expand All @@ -304,12 +324,24 @@ local ipfix_config_params = {

function IPFIX:new(config)
config = lib.parse(config, ipfix_config_params)
local o = { sequence_number = 1,
boot_time = engine.now(),
local o = { boot_time = engine.now(),
template_refresh_interval = config.template_refresh_interval,
next_template_refresh = -1,
version = config.ipfix_version,
observation_domain = config.observation_domain }
o.shm = {
-- Total number of packets received
received_packets = { counter },
-- Packets not matched by any flow set
ignored_packets = { counter },
-- Number of template packets sent
template_packets = { counter },
-- Non-wrapping sequence number (see add_ipfix_header() for a
-- brief description of the semantics for IPFIX and Netflowv9)
sequence_number = { counter, 1 },
version = { counter, o.version },
observation_domain = { counter, o.observation_domain },
}

if o.version == 9 then
o.header_t = netflow_v9_packet_header_t
Expand Down Expand Up @@ -367,6 +399,7 @@ function IPFIX:new(config)
table.insert(o.flow_sets, FlowSet:new(template, flow_set_args))
end

o.stats_timer = lib.throttle(5)
return setmetatable(o, { __index = self })
end

Expand All @@ -385,6 +418,7 @@ function IPFIX:send_template_records(out)
end
pkt = self:add_ipfix_header(pkt, record_count)
pkt = self:add_transport_headers(pkt)
counter.add(self.shm.template_packets)
link.transmit(out, pkt)
end

Expand All @@ -393,18 +427,18 @@ function IPFIX:add_ipfix_header(pkt, count)
local header = ffi.cast(self.header_ptr_t, pkt.data)

header.version = htons(self.version)
header.sequence_number = htonl(self.sequence_number)
header.sequence_number = htonl(tonumber(counter.read(self.shm.sequence_number)))
if self.version == 9 then
-- record_count counts the number of all records in this packet
-- (template and data)
header.record_count = htons(count)
-- sequence_number counts the number of exported packets
self.sequence_number = self.sequence_number + 1
conter.add(self.shm.sequence_number)
header.uptime = htonl(to_milliseconds(engine.now() - self.boot_time))
elseif self.version == 10 then
-- sequence_number counts the cumulative number of data records
-- (i.e. excluding template and option records)
self.sequence_number = self.sequence_number + count
counter.add(self.shm.sequence_number, count)
header.byte_length = htons(pkt.length)
end
header.timestamp = htonl(math.floor(C.get_unix_time()))
Expand Down Expand Up @@ -437,7 +471,9 @@ function IPFIX:push()
end

local flow_sets = self.flow_sets
for i=1,link.nreadable(input) do
local nreadable = link.nreadable(input)
counter.add(self.shm.received_packets, nreadable)
for i=1, nreadable do
local pkt = link.receive(input)
local handled = false
for _,set in ipairs(flow_sets) do
Expand All @@ -448,11 +484,21 @@ function IPFIX:push()
end
end
-- Drop packet if it didn't match any flow set.
if not handled then packet.free(pkt) end
if not handled then
counter.add(self.shm.ignored_packets)
packet.free(pkt)
end
end

for _,set in ipairs(flow_sets) do set:record_flows(timestamp) end
for _,set in ipairs(flow_sets) do set:expire_records(output, timestamp) end

if self.stats_timer() then
for _,set in ipairs(flow_sets) do
set:sync_stats()
end
end

end

function selftest()
Expand Down

0 comments on commit f40c1b1

Please sign in to comment.