Skip to content

Commit

Permalink
[FlowAggregator] Add templateRefreshTimeout configuration (#6699)
Browse files Browse the repository at this point in the history
Can be used to configure the template retransmission interval when using
the UDP protocol to export records to an IPFIX collector.

We also ensure that the default value is 600s, not 1800s, which is the
value specified in RFC 6728. We do the same thing for the Agent
Exporter, but this time by letting the go-ipfix library decide the value
(we set it to 0 in ExporterInput to request the default). This is
important because the templateRefreshTimeout config in the Agent's
exporter and the templateTTL in teh Aggregator's collector must "match"
(templateTTL = 3 x templateRefreshTimeout), and so it is better to let
go-ipfix pick the right values for both.

Signed-off-by: Antonin Bas <antonin.bas@broadcom.com>
  • Loading branch information
antoninbas authored Oct 11, 2024
1 parent a419c8c commit 51c5107
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 24 deletions.
1 change: 1 addition & 0 deletions build/charts/flow-aggregator/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Kubernetes: `>= 1.19.0-0`
| flowCollector.enable | bool | `false` | Determine whether to enable exporting flow records to external flow collector. |
| flowCollector.observationDomainID | string | `""` | Provide the 32-bit Observation Domain ID which will uniquely identify this instance of the flow aggregator to an external flow collector. If omitted, an Observation Domain ID will be generated from the persistent cluster UUID generated by Antrea. |
| flowCollector.recordFormat | string | `"IPFIX"` | Provide format for records sent to the configured flow collector. Supported formats are IPFIX and JSON. |
| flowCollector.templateRefreshTimeout | string | `"600s"` | Template retransmission interval when using the udp protocol to export records. The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h". |
| flowLogger.compress | bool | `true` | Compress enables gzip compression on rotated files. |
| flowLogger.enable | bool | `false` | Determine whether to enable exporting flow records to a local log file. |
| flowLogger.filters | list | `[]` | Filters can be used to select which flow records to log to file. The provided filters are OR-ed to determine whether a specific flow should be logged. By default, all flows are logged. With the following filters, only flows which are denied because of a network policy will be logged: [{ingressNetworkPolicyRuleActions: ["Drop", "Reject"]}, {egressNetworkPolicyRuleActions: ["Drop", "Reject"]}] |
Expand Down
5 changes: 5 additions & 0 deletions build/charts/flow-aggregator/conf/flow-aggregator.conf
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ flowCollector:
# Supported formats are IPFIX and JSON.
recordFormat: {{ .Values.flowCollector.recordFormat | quote }}

# Template retransmission interval when using the udp protocol to export records.
# The value must be provided as a duration string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
templateRefreshTimeout: {{ .Values.flowCollector.templateRefreshTimeout | quote }}

# clickHouse contains ClickHouse related configuration options.
clickHouse:
# Enable is the switch to enable exporting flow records to ClickHouse.
Expand Down
3 changes: 3 additions & 0 deletions build/charts/flow-aggregator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ flowCollector:
# -- Provide format for records sent to the configured flow collector.
# Supported formats are IPFIX and JSON.
recordFormat: "IPFIX"
# -- Template retransmission interval when using the udp protocol to export records.
# The value must be provided as a duration string. Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
templateRefreshTimeout: "600s"
# clickHouse contains ClickHouse related configuration options.
clickHouse:
# -- Determine whether to enable exporting flow records to ClickHouse.
Expand Down
5 changes: 5 additions & 0 deletions build/yamls/flow-aggregator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ data:
# Supported formats are IPFIX and JSON.
recordFormat: "IPFIX"
# Template retransmission interval when using the udp protocol to export records.
# The value must be provided as a duration string.
# Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
templateRefreshTimeout: "600s"
# clickHouse contains ClickHouse related configuration options.
clickHouse:
# Enable is the switch to enable exporting flow records to ClickHouse.
Expand Down
12 changes: 4 additions & 8 deletions pkg/agent/flowexporter/exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,11 @@ func (exp *FlowExporter) initFlowExporter(ctx context.Context) error {
}
// TLS transport does not need any tempRefTimeout, so sending 0.
exp.exporterInput.TempRefTimeout = 0
} else if exp.exporterInput.CollectorProtocol == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
// tempRefTimeout is the template refresh timeout, which specifies how often
// the exporting process should send the template again.
exp.exporterInput.TempRefTimeout = 0
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s.
exp.exporterInput.TempRefTimeout = 1800
}
// TempRefTimeout specifies how often the exporting process should send the template
// again. It is only relevant when using the UDP protocol. We use 0 to tell the go-ipfix
// library to use the default value, which should be 600s as per the IPFIX standards.
exp.exporterInput.TempRefTimeout = 0
expProcess, err := exporter.InitExportingProcess(exp.exporterInput)
if err != nil {
return fmt.Errorf("error when starting exporter: %v", err)
Expand Down
12 changes: 6 additions & 6 deletions pkg/agent/flowexporter/exporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,12 +374,11 @@ func TestFlowExporter_initFlowExporter(t *testing.T) {
defer conn2.Close()

for _, tc := range []struct {
protocol string
address string
expectedTempRefTimeout uint32
protocol string
address string
}{
{conn1.LocalAddr().Network(), conn1.LocalAddr().String(), uint32(1800)},
{conn2.Addr().Network(), conn2.Addr().String(), uint32(0)},
{conn1.LocalAddr().Network(), conn1.LocalAddr().String()},
{conn2.Addr().Network(), conn2.Addr().String()},
} {
exp := &FlowExporter{
collectorAddr: tc.address,
Expand All @@ -391,7 +390,8 @@ func TestFlowExporter_initFlowExporter(t *testing.T) {
err = exp.initFlowExporter(context.Background())
require.NoError(t, err)
assert.Equal(t, tc.address, exp.exporterInput.CollectorAddress)
assert.Equal(t, tc.expectedTempRefTimeout, exp.exporterInput.TempRefTimeout)
// exporter should use the default value as per the go-ipfix library.
assert.Equal(t, uint32(0), exp.exporterInput.TempRefTimeout)
checkTotalReconnectionsMetric(t)
metrics.ReconnectionsToFlowCollector.Dec()
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/flowaggregator/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,10 @@ type FlowCollectorConfig struct {
// Provide format for records sent to the configured flow collector. Supported formats are IPFIX and JSON.
// Defaults to "IPFIX"
RecordFormat string `yaml:"recordFormat,omitempty"`
// Template retransmission interval when using the udp protocol to export records.
// The value must be provided as a duration string. Defaults to 600s.
// Valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
TemplateRefreshTimeout string `yaml:"templateRefreshTimeout,omitempty"`
}

type ClickHouseConfig struct {
Expand Down
4 changes: 4 additions & 0 deletions pkg/config/flowaggregator/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
DefaultInactiveFlowRecordTimeout = "90s"
DefaultAggregatorTransportProtocol = "TLS"
DefaultRecordFormat = "IPFIX"
DefaultTemplateRefreshTimeout = "600s"

DefaultClickHouseDatabase = "default"
DefaultClickHouseCommitInterval = "8s"
Expand Down Expand Up @@ -62,6 +63,9 @@ func SetConfigDefaults(flowAggregatorConf *FlowAggregatorConfig) {
if flowAggregatorConf.FlowCollector.RecordFormat == "" {
flowAggregatorConf.FlowCollector.RecordFormat = DefaultRecordFormat
}
if flowAggregatorConf.FlowCollector.TemplateRefreshTimeout == "" {
flowAggregatorConf.FlowCollector.TemplateRefreshTimeout = DefaultTemplateRefreshTimeout
}
if flowAggregatorConf.ClickHouse.Database == "" {
flowAggregatorConf.ClickHouse.Database = DefaultClickHouseDatabase
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/flowaggregator/exporter/ipfix.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"hash/fnv"
"reflect"
"time"

"github.com/google/uuid"
ipfixentities "github.com/vmware/go-ipfix/pkg/entities"
Expand Down Expand Up @@ -45,6 +46,7 @@ type IPFIXExporter struct {
exportingProcess ipfix.IPFIXExportingProcess
sendJSONRecord bool
observationDomainID uint32
templateRefreshTimeout time.Duration
templateIDv4 uint16
templateIDv6 uint16
registry ipfix.IPFIXRegistry
Expand Down Expand Up @@ -87,6 +89,7 @@ func NewIPFIXExporter(
externalFlowCollectorProto: opt.ExternalFlowCollectorProto,
sendJSONRecord: sendJSONRecord,
observationDomainID: observationDomainID,
templateRefreshTimeout: opt.TemplateRefreshTimeout,
registry: registry,
set: ipfixentities.NewSet(false),
clusterUUID: clusterUUID,
Expand Down Expand Up @@ -138,7 +141,8 @@ func (e *IPFIXExporter) UpdateOptions(opt *options.Options) {
} else {
e.observationDomainID = genObservationDomainID(e.clusterUUID)
}
klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID)
e.templateRefreshTimeout = opt.TemplateRefreshTimeout
klog.InfoS("New IPFIXExporter configuration", "collectorAddress", e.externalFlowCollectorAddr, "collectorProtocol", e.externalFlowCollectorProto, "sendJSON", e.sendJSONRecord, "domainID", e.observationDomainID, "templateRefreshTimeout", e.templateRefreshTimeout)

if e.exportingProcess != nil {
e.exportingProcess.CloseConnToCollector()
Expand Down Expand Up @@ -180,22 +184,21 @@ func (e *IPFIXExporter) initExportingProcess() error {
// externalFlowCollectorAddr and externalFlowCollectorProto instead of net.Addr input.
var expInput exporter.ExporterInput
if e.externalFlowCollectorProto == "tcp" {
// TCP transport does not need any tempRefTimeout, so sending 0.
expInput = exporter.ExporterInput{
CollectorAddress: e.externalFlowCollectorAddr,
CollectorProtocol: e.externalFlowCollectorProto,
ObservationDomainID: e.observationDomainID,
TempRefTimeout: 0,
TLSClientConfig: nil,
SendJSONRecord: e.sendJSONRecord,
// TCP transport does not need any tempRefTimeout, so sending 0.
TempRefTimeout: 0,
TLSClientConfig: nil,
SendJSONRecord: e.sendJSONRecord,
}
} else {
// For UDP transport, hardcoding tempRefTimeout value as 1800s. So we will send out template every 30 minutes.
expInput = exporter.ExporterInput{
CollectorAddress: e.externalFlowCollectorAddr,
CollectorProtocol: e.externalFlowCollectorProto,
ObservationDomainID: e.observationDomainID,
TempRefTimeout: 1800,
TempRefTimeout: uint32(e.templateRefreshTimeout.Seconds()),
TLSClientConfig: nil,
SendJSONRecord: e.sendJSONRecord,
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/flowaggregator/exporter/ipfix_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"net"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -144,18 +145,22 @@ func TestIPFIXExporter_UpdateOptions(t *testing.T) {

const newAddr = "newAddr"
const newProto = "newProto"
const newTemplateRefreshTimeout = 1200 * time.Second
config.FlowCollector.Address = fmt.Sprintf("%s:%s", newAddr, newProto)
config.FlowCollector.RecordFormat = "JSON"
config.FlowCollector.TemplateRefreshTimeout = newTemplateRefreshTimeout.String()

ipfixExporter.UpdateOptions(&options.Options{
Config: config,
ExternalFlowCollectorAddr: newAddr,
ExternalFlowCollectorProto: newProto,
TemplateRefreshTimeout: newTemplateRefreshTimeout,
})

assert.Equal(t, newAddr, ipfixExporter.externalFlowCollectorAddr)
assert.Equal(t, newProto, ipfixExporter.externalFlowCollectorProto)
assert.True(t, ipfixExporter.sendJSONRecord)
assert.Equal(t, newTemplateRefreshTimeout, ipfixExporter.templateRefreshTimeout)

require.NoError(t, ipfixExporter.AddRecord(mockRecord, false))
assert.Equal(t, 2, setCount, "Invalid number of flow sets sent by exporter")
Expand Down
6 changes: 3 additions & 3 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ func (fa *flowAggregator) InitCollectingProcess() error {
Address: collectorAddress,
Protocol: tcpTransport,
MaxBufferSize: 65535,
TemplateTTL: 0,
TemplateTTL: 0, // use default value from go-ipfix library
IsEncrypted: true,
CACert: caCert,
ServerKey: serverKey,
Expand All @@ -245,15 +245,15 @@ func (fa *flowAggregator) InitCollectingProcess() error {
Address: collectorAddress,
Protocol: tcpTransport,
MaxBufferSize: 65535,
TemplateTTL: 0,
TemplateTTL: 0, // use default value from go-ipfix library
IsEncrypted: false,
}
} else {
cpInput = collector.CollectorInput{
Address: collectorAddress,
Protocol: udpTransport,
MaxBufferSize: 1024,
TemplateTTL: 0,
TemplateTTL: 0, // use default value from go-ipfix library
IsEncrypted: false,
}
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/flowaggregator/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type Options struct {
ExternalFlowCollectorAddr string
// IPFIX flow collector transport protocol
ExternalFlowCollectorProto string
// Template retransmission interval when using the UDP protocol to export records.
TemplateRefreshTimeout time.Duration
// clickHouseCommitInterval flow records batch commit interval to clickhouse in the flow aggregator
ClickHouseCommitInterval time.Duration
// Flow records batch upload interval from flow aggregator to S3 bucket
Expand Down Expand Up @@ -87,6 +89,14 @@ func LoadConfig(configBytes []byte) (*Options, error) {
if opt.Config.FlowCollector.RecordFormat != "IPFIX" && opt.Config.FlowCollector.RecordFormat != "JSON" {
return nil, fmt.Errorf("record format %s is not supported", opt.Config.FlowCollector.RecordFormat)
}

opt.TemplateRefreshTimeout, err = time.ParseDuration(opt.Config.FlowCollector.TemplateRefreshTimeout)
if err != nil {
return nil, fmt.Errorf("templateRefreshTimeout is not a valid duration: %w", err)
}
if opt.TemplateRefreshTimeout < 0 {
return nil, fmt.Errorf("templateRefreshTimeout cannot be a negative duration")
}
}
// Validate clickhouse specific parameters
if opt.Config.ClickHouse.Enable {
Expand Down

0 comments on commit 51c5107

Please sign in to comment.