Skip to content

Commit

Permalink
ipfix: restrict flow-rate tracker to signature flows
Browse files Browse the repository at this point in the history
Instead of sampling all flows for an aggregate, only sample those that
match the per-flow criteria (packets and byte-per-packets). This
should greatly reduce the cache size and avoids dropping flows that
exceed the packets-per-flow or bytes-per-packets criteria.
  • Loading branch information
alexandergall committed Oct 28, 2020
1 parent 429fa21 commit 276a26d
Showing 1 changed file with 19 additions and 7 deletions.
26 changes: 19 additions & 7 deletions src/apps/ipfix/ipfix.lua
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ function FlowSet:new (spec, args)
args.scan_protection.aggregate_v4,
args.scan_protection.aggregate_v6
)
-- Will be set by resize_callback
sp.table_tb = token_bucket.new({ rate = 1 })
sp.export_rate_tb = token_bucket.new(
{ rate = args.scan_protection.export_rate })
Expand All @@ -244,7 +245,7 @@ function FlowSet:new (spec, args)
end
require('jit').flush()
sp.table_tb:set(
math.ceil(table.size / (2*args.scan_protection.interval))
math.ceil(table.size / args.scan_protection.interval)
)
end
})
Expand Down Expand Up @@ -405,29 +406,40 @@ local function reset_drop_stats(entry, timestamp)
entry.value.tstamp_drop_start = timestamp
end

-- To implement the scan-protection feature, we keep track of flows
-- that satisfy the configured criteria for packets-per-flow (ppf) and
-- bytes-per-packet (bpp) per prefix aggregate (defaulting to /24 and
-- /64 for IPv4 and IPv6, respectively).
function FlowSet:suppress_flow(flow_entry, timestamp)
local config = self.scan_protection
if not config.enable then
return false
end

-- Only consider flows that satisfy the ppf and bpp criteria
local ppf = flow_entry.value.packetDeltaCount
local bpp = flow_entry.value.octetDeltaCount/ppf
if (ppf > config.max_packets_per_flow or bpp > config.max_bytes_per_packet) then
return false
end

local entry = self.sp.scratch_entry
self.sp.aggr_key_fn(flow_entry.key, entry.key)
local result = self.sp.table:lookup_ptr(entry.key)
if result then
local interval = tonumber(timestamp - result.value.tstamp)/1000
if interval >= config.interval then
local fps = result.value.flow_count/interval
local ppf = result.value.packets/result.value.flow_count
local bpp = result.value.octets/result.value.packets
local drop_interval = (timestamp - result.value.tstamp_drop_start)/1000
if (fps >= config.threshold_rate and bpp <= config.max_bytes_per_packet and
ppf <= config.max_packets_per_flow) then
if (fps >= config.threshold_rate) then
local aggr_ppf = result.value.packets/result.value.flow_count
local aggr_bpp = result.value.octets/result.value.packets
if result.value.suppress == 0 then
self.template.logger:log(
string.format("Flow rate threshold exceeded from %s: "..
"%d fps, %d bpp, %d ppf",
self.sp.ntop_fn(entry.key),
tonumber(fps), tonumber(bpp), tonumber(ppf)))
tonumber(fps), tonumber(aggr_bpp), tonumber(aggr_ppf)))
reset_drop_stats(result, timestamp)
result.value.suppress = 1
elseif drop_interval > config.report_interval then
Expand All @@ -436,7 +448,7 @@ function FlowSet:suppress_flow(flow_entry, timestamp)
"%d fps, %d bpp, %d ppf, %d flows dropped, "..
"%d exported in past %d seconds",
self.sp.ntop_fn(entry.key),
tonumber(fps), tonumber(bpp), tonumber(ppf),
tonumber(fps), tonumber(aggr_bpp), tonumber(aggr_ppf),
tonumber(result.value.drops),
tonumber(result.value.exports),
tonumber(drop_interval)))
Expand Down

0 comments on commit 276a26d

Please sign in to comment.