From 0c3d245fb2ad76e97130bd134516581621ddd833 Mon Sep 17 00:00:00 2001 From: Karthikeyan Valliyurnatt Date: Thu, 7 Mar 2024 14:43:51 -0500 Subject: [PATCH 1/5] add an option to report delta flows --- packetbeat/config/config.go | 2 ++ packetbeat/flows/flows.go | 2 +- packetbeat/flows/worker.go | 38 ++++++++++++++++++++------------- packetbeat/flows/worker_test.go | 2 +- 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/packetbeat/config/config.go b/packetbeat/config/config.go index 7d579af635b..427df6cd117 100644 --- a/packetbeat/config/config.go +++ b/packetbeat/config/config.go @@ -144,6 +144,8 @@ type Flows struct { KeepNull bool `config:"keep_null"` // Index is used to overwrite the index where flows are published Index string `config:"index"` + // DeltaFlowReports when enabled will report flow network stats(bytes, packets) as delta values + EnableDeltaFlowReports bool `config:"enable_delta_flow_reports"` } type ProtocolCommon struct { diff --git a/packetbeat/flows/flows.go b/packetbeat/flows/flows.go index b7b52217529..9df019af2d0 100644 --- a/packetbeat/flows/flows.go +++ b/packetbeat/flows/flows.go @@ -71,7 +71,7 @@ func NewFlows(pub Reporter, watcher *procs.ProcessesWatcher, config *config.Flow counter := &counterReg{} - worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period) + worker, err := newFlowsWorker(pub, watcher, table, counter, timeout, period, config.EnableDeltaFlowReports) if err != nil { logp.Err("failed to configure flows processing intervals: %v", err) return nil, err diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index e3a2008a059..cf919b94876 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -127,7 +127,7 @@ func (w *worker) periodically(tick time.Duration, fn func() error) { // reporting will be done at flow lifetime end. // Flows are published via the pub Reporter after being enriched with process information // by watcher. -func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration) (*worker, error) { +func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMetaTable, counters *counterReg, timeout, period time.Duration, enableDeltaFlowReports bool) (*worker, error) { if timeout < time.Second { return nil, ErrInvalidTimeout } @@ -161,10 +161,11 @@ func newFlowsWorker(pub Reporter, watcher *procs.ProcessesWatcher, table *flowMe defaultBatchSize := 1024 processor := &flowsProcessor{ - table: table, - watcher: watcher, - counters: counters, - timeout: timeout, + table: table, + watcher: watcher, + counters: counters, + timeout: timeout, + enableDeltaFlowReporting: enableDeltaFlowReports, } processor.spool.init(pub, defaultBatchSize) @@ -221,11 +222,12 @@ func makeWorker(processor *flowsProcessor, tick time.Duration, timeout, period i } type flowsProcessor struct { - spool spool - watcher *procs.ProcessesWatcher - table *flowMetaTable - counters *counterReg - timeout time.Duration + spool spool + watcher *procs.ProcessesWatcher + table *flowMetaTable + counters *counterReg + timeout time.Duration + enableDeltaFlowReporting bool } func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastReport bool) { @@ -281,13 +283,13 @@ func (fw *flowsProcessor) execute(w *worker, checkTimeout, handleReports, lastRe } func (fw *flowsProcessor) report(w *worker, ts time.Time, flow *biFlow, isOver bool, intNames, uintNames, floatNames []string) { - event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames) + event := createEvent(fw.watcher, ts, flow, isOver, intNames, uintNames, floatNames, fw.enableDeltaFlowReporting) debugf("add event: %v", event) fw.spool.publish(event) } -func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string) beat.Event { +func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOver bool, intNames, uintNames, floatNames []string, enableDeltaFlowReporting bool) beat.Event { timestamp := ts event := mapstr.M{ @@ -418,7 +420,7 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve var totalBytes, totalPackets uint64 if f.stats[0] != nil { // Source stats. - stats := encodeStats(f.stats[0], intNames, uintNames, floatNames) + stats := encodeStats(f.stats[0], intNames, uintNames, floatNames, enableDeltaFlowReporting) for k, v := range stats { switch k { case "icmpV4TypeCode": @@ -449,7 +451,7 @@ func createEvent(watcher *procs.ProcessesWatcher, ts time.Time, f *biFlow, isOve } if f.stats[1] != nil { // Destination stats. - stats := encodeStats(f.stats[1], intNames, uintNames, floatNames) + stats := encodeStats(f.stats[1], intNames, uintNames, floatNames, enableDeltaFlowReporting) for k, v := range stats { switch k { case "icmpV4TypeCode", "icmpV6TypeCode": @@ -533,7 +535,7 @@ func formatHardwareAddr(addr net.HardwareAddr) string { return string(buf) } -func encodeStats(stats *flowStats, ints, uints, floats []string) map[string]interface{} { +func encodeStats(stats *flowStats, ints, uints, floats []string, enableDeltaFlowReporting bool) map[string]interface{} { report := make(map[string]interface{}) i := 0 @@ -551,6 +553,12 @@ func encodeStats(stats *flowStats, ints, uints, floats []string) map[string]inte for m := mask; m != 0; m >>= 1 { if (m & 1) == 1 { report[uints[i]] = stats.uints[i] + if enableDeltaFlowReporting && (uints[i] == "bytes" || uints[i] == "packets") { + // If Delta Flow Reporting is enabled, reset bytes and packets at each period. + // Only the bytes and packets recieved during the flow period will be reported. + // This should be thread safe as it is called under the flowmetadatatable lock. + stats.uints[i] = 0 + } } i++ } diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index ef0104adc92..d9e340ee56b 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -65,7 +65,7 @@ func TestCreateEvent(t *testing.T) { } bif.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{10, 1}} bif.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{460, 2}} - event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil) + event := createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, false) // Validate the contents of the event. validate := lookslike.MustCompile(map[string]interface{}{ From bf209e7d5665ffca829dad780155684c857429ec Mon Sep 17 00:00:00 2001 From: Karthikeyan Valliyurnatt Date: Thu, 7 Mar 2024 16:56:34 -0500 Subject: [PATCH 2/5] adding unit test and doc update --- packetbeat/docs/packetbeat-options.asciidoc | 6 ++++++ packetbeat/flows/worker.go | 2 +- packetbeat/flows/worker_test.go | 24 +++++++++++++++++++++ 3 files changed, 31 insertions(+), 1 deletion(-) diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc index c48b4a1b01d..f226f723a9f 100644 --- a/packetbeat/docs/packetbeat-options.asciidoc +++ b/packetbeat/docs/packetbeat-options.asciidoc @@ -461,6 +461,12 @@ in time. Periodical reporting can be disabled by setting the value to -1. If disabled, flows are still reported once being timed out. The default value is 10s. +[float] +==== `enable_delta_flow_reports` + +Configure network.bytes and network.packets to be a delta +value instead of a cummlative sum for each flow period. The default value is false. + [float] [[packetbeat-configuration-flows-fields]] ==== `fields` diff --git a/packetbeat/flows/worker.go b/packetbeat/flows/worker.go index cf919b94876..46f7c0ca418 100644 --- a/packetbeat/flows/worker.go +++ b/packetbeat/flows/worker.go @@ -555,7 +555,7 @@ func encodeStats(stats *flowStats, ints, uints, floats []string, enableDeltaFlow report[uints[i]] = stats.uints[i] if enableDeltaFlowReporting && (uints[i] == "bytes" || uints[i] == "packets") { // If Delta Flow Reporting is enabled, reset bytes and packets at each period. - // Only the bytes and packets recieved during the flow period will be reported. + // Only the bytes and packets received during the flow period will be reported. // This should be thread safe as it is called under the flowmetadatatable lock. stats.uints[i] = 0 } diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index d9e340ee56b..8781e2b13f1 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -25,6 +25,7 @@ import ( "time" "github.com/elastic/go-lookslike/isdef" + "gotest.tools/assert" "github.com/elastic/go-lookslike" @@ -126,4 +127,27 @@ func TestCreateEvent(t *testing.T) { t.Fatal(err) } } + + // when enableDeltaFlowReporting is true, the flow stats should be reset + expectbiFlow := &biFlow{ + id: id.rawFlowID, + killed: 1, + createTS: start, + ts: end, + dir: flowDirForward, + } + expectbiFlow.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}} + expectbiFlow.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}} + event = createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, true) + result = validate(event.Fields) + if errs := result.Errors(); len(errs) > 0 { + for _, err := range errs { + t.Error(err) + } + t.FailNow() + } + assert.DeepEqual(t, expectbiFlow.stats[0].uintFlags, bif.stats[0].uintFlags) + assert.DeepEqual(t, expectbiFlow.stats[0].uints, bif.stats[0].uints) + assert.DeepEqual(t, expectbiFlow.stats[1].uintFlags, bif.stats[1].uintFlags) + assert.DeepEqual(t, expectbiFlow.stats[1].uints, bif.stats[1].uints) } From 21e1beea96af39d53fa5d251b428f43822577045 Mon Sep 17 00:00:00 2001 From: Karthikeyan Valliyurnatt Date: Thu, 7 Mar 2024 17:10:21 -0500 Subject: [PATCH 3/5] fix linting issues --- packetbeat/flows/worker_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index 8781e2b13f1..f2ee7ea1d16 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -24,11 +24,10 @@ import ( "testing" "time" + "github.com/elastic/go-lookslike" "github.com/elastic/go-lookslike/isdef" "gotest.tools/assert" - "github.com/elastic/go-lookslike" - "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/elastic-agent-libs/logp" @@ -117,7 +116,7 @@ func TestCreateEvent(t *testing.T) { // Write the event to disk if -data is used. if *dataFlag { - event.Fields.Put("@timestamp", common.Time(end)) //nolint:errcheck // Never fails. + event.Fields.Put("@timestamp", common.Time(end)) output, err := json.MarshalIndent(&event.Fields, "", " ") if err != nil { t.Fatal(err) From b99bf52b476aaa7d069c63575c49ae99ac6f88d7 Mon Sep 17 00:00:00 2001 From: Karthikeyan Valliyurnatt Date: Thu, 7 Mar 2024 17:19:30 -0500 Subject: [PATCH 4/5] fix imports --- packetbeat/flows/worker_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index f2ee7ea1d16..7e835de2b44 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -24,13 +24,13 @@ import ( "testing" "time" - "github.com/elastic/go-lookslike" - "github.com/elastic/go-lookslike/isdef" "gotest.tools/assert" "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/packetbeat/procs" "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/go-lookslike" + "github.com/elastic/go-lookslike/isdef" ) // Use `go test -data` to update sample event files. From d4c8bd7f447831aaf126480d5b9e990d4a486083 Mon Sep 17 00:00:00 2001 From: Karthikeyan Valliyurnatt Date: Wed, 20 Mar 2024 09:16:37 -0400 Subject: [PATCH 5/5] addressing review comments --- packetbeat/docs/packetbeat-options.asciidoc | 2 +- packetbeat/flows/worker_test.go | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/packetbeat/docs/packetbeat-options.asciidoc b/packetbeat/docs/packetbeat-options.asciidoc index f226f723a9f..aaa598b612c 100644 --- a/packetbeat/docs/packetbeat-options.asciidoc +++ b/packetbeat/docs/packetbeat-options.asciidoc @@ -465,7 +465,7 @@ disabled, flows are still reported once being timed out. The default value is ==== `enable_delta_flow_reports` Configure network.bytes and network.packets to be a delta -value instead of a cummlative sum for each flow period. The default value is false. +value instead of a cumlative sum for each flow period. The default value is false. [float] [[packetbeat-configuration-flows-fields]] diff --git a/packetbeat/flows/worker_test.go b/packetbeat/flows/worker_test.go index 7e835de2b44..d6e371cad87 100644 --- a/packetbeat/flows/worker_test.go +++ b/packetbeat/flows/worker_test.go @@ -21,6 +21,7 @@ import ( "encoding/json" "flag" "os" + "reflect" "testing" "time" @@ -137,6 +138,11 @@ func TestCreateEvent(t *testing.T) { } expectbiFlow.stats[0] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}} expectbiFlow.stats[1] = &flowStats{uintFlags: []uint8{1, 1}, uints: []uint64{0, 0}} + + // Assert the biflow is not 0 before the test + assert.Assert(t, !reflect.DeepEqual(expectbiFlow.stats[0].uints, bif.stats[0].uints)) + assert.Assert(t, !reflect.DeepEqual(expectbiFlow.stats[1].uints, bif.stats[1].uints)) + event = createEvent(&procs.ProcessesWatcher{}, time.Now(), bif, true, nil, []string{"bytes", "packets"}, nil, true) result = validate(event.Fields) if errs := result.Errors(); len(errs) > 0 { @@ -145,6 +151,8 @@ func TestCreateEvent(t *testing.T) { } t.FailNow() } + + // Assert the biflow is 0 after the test assert.DeepEqual(t, expectbiFlow.stats[0].uintFlags, bif.stats[0].uintFlags) assert.DeepEqual(t, expectbiFlow.stats[0].uints, bif.stats[0].uints) assert.DeepEqual(t, expectbiFlow.stats[1].uintFlags, bif.stats[1].uintFlags)