Skip to content

Commit

Permalink
eval: add performance evaluation code
Browse files Browse the repository at this point in the history
  • Loading branch information
pkoutsovasilis authored and Panos Koutsovasilis committed Jul 12, 2024
1 parent df1ef35 commit 3c88380
Show file tree
Hide file tree
Showing 5 changed files with 351 additions and 33 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ require (
github.com/elastic/mito v1.13.0
github.com/elastic/tk-btf v0.1.0
github.com/elastic/toutoumomoma v0.0.0-20221026030040-594ef30cb640
github.com/felixge/fgprof v0.9.4
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15
github.com/go-ldap/ldap/v3 v3.4.6
github.com/golang-jwt/jwt/v5 v5.2.1
Expand Down Expand Up @@ -304,6 +305,7 @@ require (
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/licenseclassifier v0.0.0-20221004142553-c1ed8fcf4bab // indirect
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 // indirect
github.com/google/s2a-go v0.1.4 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.2.4 // indirect
Expand Down
21 changes: 19 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -418,9 +418,15 @@ github.com/cespare/xxhash/v2 v2.1.0/go.mod h1:dgIUBU3pDso/gPgZ1osOZ0iQf77oPR28Tj
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/chromedp/cdproto v0.0.0-20230802225258-3cf4e6d46a89/go.mod h1:GKljq0VrfU4D5yc+2qA6OVr8pmO/MBbPEWqWQ/oqGEs=
github.com/chromedp/chromedp v0.9.2/go.mod h1:LkSXJKONWTCHAfQasKFUZI+mxqS4tZqhmtGzzhLsnLs=
github.com/chromedp/sysutil v1.0.0/go.mod h1:kgWmDdq8fTzXYcKIBqIYvRRTnYb9aNS9moAV0xufSww=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/logex v1.2.1/go.mod h1:JLbx6lG2kDbNRFnfkgvh4eRJRPX1QCoOIWomwysCBrQ=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/readline v1.5.1/go.mod h1:Eh+b79XXUwfKfcPLepksvw2tcLE/Ct21YObkaSkeBlk=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/chzyer/test v1.0.0/go.mod h1:2JlltgoNkt4TW/z9V/IzDdFaMTM2JPIi26O1pF38GC8=
github.com/cilium/ebpf v0.13.2 h1:uhLimLX+jF9BTPPvoCUYh/mBeoONkjgaJ9w9fn0mRj4=
github.com/cilium/ebpf v0.13.2/go.mod h1:DHp1WyrLeiBh19Cf/tfiSMhqheEiK8fXFZ4No0P1Hso=
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=
Expand Down Expand Up @@ -632,6 +638,8 @@ github.com/fearful-symmetry/gomsr v0.0.1 h1:m208RzdTApWVbv8a9kf78rdPLQe+BY9AxRb/
github.com/fearful-symmetry/gomsr v0.0.1/go.mod h1:Qb/0Y7zwobP7v8Sji+M5mlL4N7Voyz5WaKXXRFPnLio=
github.com/fearful-symmetry/gorapl v0.0.4 h1:TMn4fhhtIAd+C3NrAl638oaYlX1vgcKNVVdad53oyiE=
github.com/fearful-symmetry/gorapl v0.0.4/go.mod h1:XoeZ+5v0tJX9WMvzqdPaaKAdX7y17mDN3pxDGemINR0=
github.com/felixge/fgprof v0.9.4 h1:ocDNwMFlnA0NU0zSB3I52xkO4sFXk80VK9lXjLClu88=
github.com/felixge/fgprof v0.9.4/go.mod h1:yKl+ERSa++RYOs32d8K6WEXCB4uXdLls4ZaZPpayhMM=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
Expand Down Expand Up @@ -835,6 +843,9 @@ github.com/gobuffalo/packd v0.1.0/go.mod h1:M2Juc+hhDXf/PnmBANFCqx4DM3wRbgDvnVWe
github.com/gobuffalo/packr/v2 v2.0.9/go.mod h1:emmyGweYTm6Kdper+iywB6YK5YzuKchGtJQZ0Odn4pQ=
github.com/gobuffalo/packr/v2 v2.2.0/go.mod h1:CaAwI0GPIAv+5wKLtv8Afwl+Cm78K/I/VCm/3ptBN+0=
github.com/gobuffalo/syncx v0.0.0-20190224160051-33c29581e754/go.mod h1:HhnNqWY95UYwwW3uSASeV7vtgYkT2t16hJgV3AEPUpw=
github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM=
github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw=
github.com/gobwas/ws v1.2.1/go.mod h1:hRKAFb8wOxFROYNsT1bqfWnhX+b5MFeJM9r2ZSwg/KY=
github.com/gocarina/gocsv v0.0.0-20170324095351-ffef3ffc77be h1:zXHeEEJ231bTf/IXqvCfeaqjLpXsq42ybLoT4ROSR6Y=
github.com/gocarina/gocsv v0.0.0-20170324095351-ffef3ffc77be/go.mod h1:/oj50ZdPq/cUjA02lMZhijk5kR31SEydKyqah1OgBuo=
github.com/goccy/go-json v0.9.11/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
Expand Down Expand Up @@ -977,8 +988,8 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc h1:AGDHt781oIcL4EFk7cPnvBUYTwU8BEU6GDTO3ZMn1sE=
github.com/google/pprof v0.0.0-20230426061923-93006964c1fc/go.mod h1:79YE0hCXdHag9sBkw2o+N/YnZtTkXi0UT9Nnixa5eYk=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7 h1:y3N7Bm7Y9/CtpiVkw/ZWj6lSlDF3F74SfKwfTCer72Q=
github.com/google/pprof v0.0.0-20240227163752-401108e1b7e7/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/s2a-go v0.1.4 h1:1kZ/sQM3srePvKs3tXAvQzo66XfcReoqFpIpIccE7Oc=
github.com/google/s2a-go v0.1.4/go.mod h1:Ej+mSEMGRnqRzjc7VtF+jdBwYG5fuJfiZ8ELkjEwM0A=
Expand Down Expand Up @@ -1074,6 +1085,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
Expand All @@ -1097,6 +1110,7 @@ github.com/huandu/xstrings v1.0.0/go.mod h1:4qWG/gcEcfX4z/mBDHJ++3ReCw9ibxbsNJbc
github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmKTg=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20230524184225-eabc099b10ab/go.mod h1:gx7rwoVhcfuVKG5uya9Hs3Sxj7EIvldVofAWIUtGouw=
github.com/icholy/digest v0.1.22 h1:dRIwCjtAcXch57ei+F0HSb5hmprL873+q7PoVojdMzM=
github.com/icholy/digest v0.1.22/go.mod h1:uLAeDdWKIWNFMH0wqbwchbTQOmJWhzSnL7zmqSPqEEc=
github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
Expand Down Expand Up @@ -1225,6 +1239,7 @@ github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v0.0.0-20160406211939-eadb3ce320cb/go.mod h1:B69LEHPfb2qLo0BaaOLcbitczOKLWTsrBG9LczfCD4k=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/ledongthuc/pdf v0.0.0-20220302134840-0c2507a12d80/go.mod h1:imJHygn/1yfhB7XSJJKlFZKl/J+dCPAknuiaGOshXAs=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lib/pq v1.10.3 h1:v9QZf2Sn6AmjXtQeFpdoq/eaNtYP6IN+7lcrygsIAtg=
Expand Down Expand Up @@ -1396,6 +1411,7 @@ github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxS
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
github.com/orisano/pixelmatch v0.0.0-20220722002657-fb0b55479cde/go.mod h1:nZgzbfBr3hhjoZnS66nKrHmduYNpc34ny7RK4z5/HM0=
github.com/osquery/osquery-go v0.0.0-20231108163517-e3cde127e724 h1:z8XmnNQeCDZB3BwVoRxcqwo7MlDdsB6AJxqTap72S7w=
github.com/osquery/osquery-go v0.0.0-20231108163517-e3cde127e724/go.mod h1:mLJRc1Go8uP32LRALGvWj2lVJ+hDYyIfxDzVa+C5Yo8=
github.com/otiai10/copy v1.12.0 h1:cLMgSQnXBs1eehF0Wy/FAGsgDTDmAqFR7rQylBb1nDY=
Expand Down Expand Up @@ -2078,6 +2094,7 @@ golang.org/x/sys v0.0.0-20211102192858-4dd72447c267/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220204135822-1c1b9b1eba6a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220310020820-b874c991c1a5/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func New(c *cfg.C) (beat.Processor, error) {
logger: logp.NewLogger("add_cloud_metadata"),
}

go p.init()
p.init()
return p, nil
}

Expand Down
124 changes: 94 additions & 30 deletions x-pack/filebeat/input/netflow/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/felixge/fgprof"
"io"
"net"
"net/http"
Expand All @@ -28,34 +29,48 @@ import (
"github.com/elastic/elastic-agent-client/v7/pkg/client/mock"
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/monitoring"

"github.com/google/gopacket"
"github.com/google/gopacket/pcap"
"github.com/stretchr/testify/require"
)

import (
_ "net/http/pprof"

_ "github.com/felixge/fgprof"
)

const (
waitFor = 10 * time.Second
tick = 200 * time.Millisecond

outputBulkMaxSize = 3 * 1600
outputWorkers = 32 // change this to 1, 4, 8, 16, 32 accordingly
outputQueueMemEvents = 2 * outputBulkMaxSize * outputWorkers
inputRateLimit = 5500
inputWorkers = 4 // change this to 100, 200, 300 accordingly
)

func TestNetFlowIntegration(t *testing.T) {

http.DefaultServeMux.Handle("/debug/fgprof", fgprof.Handler())
//go func() {
// t.Log(http.ListenAndServe(":7070", nil))
//}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// make sure there is an ES instance running
integration.EnsureESIsRunning(t)
esConnectionDetails := integration.GetESURL(t, "http")
outputHost := fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port())
// integration.EnsureESIsRunning(t)
// esConnectionDetails := integration.GetESURL(t, "http")
outputHost := "change_me" //fmt.Sprintf("%s://%s:%s", esConnectionDetails.Scheme, esConnectionDetails.Hostname(), esConnectionDetails.Port())
outputHosts := []interface{}{outputHost}

// we are going to need admin access to query ES about the logs-netflow.log-default data_stream
outputUsername := os.Getenv("ES_SUPERUSER_USER")
outputUsername := "change_me" //os.Getenv("ES_SUPERUSER_USER")
require.NotEmpty(t, outputUsername)
outputPassword := os.Getenv("ES_SUPERUSER_PASS")
outputPassword := "change_me" //os.Getenv("ES_SUPERUSER_PASS")
require.NotEmpty(t, outputPassword)
outputProtocol := esConnectionDetails.Scheme
outputProtocol := "https" //esConnectionDetails.Scheme

deleted, err := DeleteDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default")
require.NoError(t, err)
Expand Down Expand Up @@ -83,12 +98,12 @@ func TestNetFlowIntegration(t *testing.T) {
"ssl.verification_mode": "none",
// ref: https://www.elastic.co/guide/en/fleet/8.14/es-output-settings.html
"preset": "custom",
"bulk_max_size": 1600,
"worker": 4,
"queue.mem.events": 12800,
"queue.mem.flush.min_events": 1600,
"bulk_max_size": outputBulkMaxSize,
"worker": outputWorkers,
"queue.mem.events": outputQueueMemEvents, // outputQueueMemEvents,
"queue.mem.flush.min_events": outputBulkMaxSize,
"queue.mem.flush.timeout": 5,
"compression_level": 1,
"compression_level": 3,
"connection_idle_timeout": 15,
}),
},
Expand Down Expand Up @@ -126,10 +141,10 @@ func TestNetFlowIntegration(t *testing.T) {
"id": "netflow_integration_test",
"host": "localhost:6006",
"expiration_timeout": "30m",
"queue_size": 2 * 4 * 1600,
"queue_size": 2 * inputRateLimit,
"detect_sequence_reset": true,
"max_message_size": "10KiB",
"workers": 100,
"workers": inputWorkers,
}),
},
},
Expand Down Expand Up @@ -189,16 +204,26 @@ func TestNetFlowIntegration(t *testing.T) {
case <-healthyChan:
case err := <-beatRunErr:
t.Fatalf("beat run err: %v", err)
case <-time.After(waitFor):
case <-time.After(20 * time.Minute):
t.Fatalf("timed out waiting for beat to become healthy")
}

// workaround for cloud-metadata init
time.Sleep(5 * time.Second)
startTime := time.Now()

registry := monitoring.GetNamespace("dataset").GetRegistry().GetRegistry("netflow_integration_test")

discardedEventsTotalVar, ok := registry.Get("discarded_events_total").(*monitoring.Uint)
require.True(t, ok)

receivedEventTotalVar, ok := registry.Get("received_events_total").(*monitoring.Uint)
//receivedEventTotalVar, ok := registry.Get("received_events_total").(*monitoring.Uint)
//require.True(t, ok)

flowsTotalVar, ok := registry.Get("flows_total").(*monitoring.Uint)
require.True(t, ok)

decodeTotalVar, ok := registry.Get("decode_errors_total").(*monitoring.Uint)
require.True(t, ok)

udpAddr, err := net.ResolveUDPAddr("udp", "127.0.0.1:6006")
Expand All @@ -216,19 +241,56 @@ func TestNetFlowIntegration(t *testing.T) {
err = json.Unmarshal(data, &expectedFlows)
require.NoError(t, err)

f, err := pcap.OpenOffline("testdata/pcap/ipfix_cisco.reversed.pcap")
f, err := pcap.OpenOffline("testdata/performance/perf.pcap")
require.NoError(t, err)
defer f.Close()

//start pprof server
//go func() {
// // Create the file
// out, err := os.Create("/Users/pkoutsovasilis/repos/fgprof")
// require.NoError(t, err)
// t.Log("starting analyzing")
// resp, err := http.Get("http://localhost:7070/debug/fgprof?seconds=10")
// require.NoError(t, err)
// defer resp.Body.Close()
//
// _, err = io.Copy(out, resp.Body)
// require.NoError(t, err)
//
// t.Log("done analyzing")
// out.Close()
//}()

var totalBytes, totalPackets int
rateLimit := 10000
limiter := rate.NewLimiter(rate.Limit(rateLimit), rateLimit)

wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
timer := time.NewTicker(1 * time.Second)
defer timer.Stop()
for {
select {
case <-timer.C:
count, err := DataStreamEventsCount(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default")
require.NoError(t, err)
t.Log(time.Now().Unix(), "total_flows: ", flowsTotalVar.Get(), " packets_sent: ", totalPackets,
"flows_in_data_stream: ", count, " discarded_events_total: ", discardedEventsTotalVar.Get(),
" decode_errors: ", decodeTotalVar.Get())
case <-ctx.Done():
return
}
}
}()

limiter := rate.NewLimiter(rate.Limit(inputRateLimit), inputRateLimit)

packetSource := gopacket.NewPacketSource(f, f.LinkType())
for pkt := range packetSource.Packets() {

if totalPackets%rateLimit == 0 {
err = limiter.WaitN(ctx, rateLimit)
if totalPackets%inputRateLimit == 0 {
err = limiter.WaitN(ctx, inputRateLimit)
require.NoError(t, err)
}

Expand All @@ -241,21 +303,23 @@ func TestNetFlowIntegration(t *testing.T) {
totalPackets++
}

require.Zero(t, discardedEventsTotalVar.Get())
select {
case <-time.After(time.Until(startTime.Add(30 * time.Second))):
}

require.Eventually(t, func() bool {
return receivedEventTotalVar.Get() == uint64(totalPackets)
}, waitFor, tick)
require.Zero(t, discardedEventsTotalVar.Get())

require.Eventually(t, func() bool {
return HasDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default") == nil
err = HasDataStream(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default")
t.Log(err)
return err == nil
}, waitFor, tick)

require.Eventually(t, func() bool {
eventsCount, err := DataStreamEventsCount(ctx, outputUsername, outputPassword, outputHost, "logs-netflow.log-default")
require.NoError(t, err)
return eventsCount == uint64(len(expectedFlows.Flows))
}, waitFor, tick)
return eventsCount == flowsTotalVar.Get()
}, 20*time.Minute, 1*time.Second)
}

type unitPayload map[string]interface{}
Expand Down Expand Up @@ -345,7 +409,7 @@ func DataStreamEventsCount(ctx context.Context, username string, password string
}

if resultBytes == nil {
return 0, errors.New("http not found error")
return 0, nil
}

var results CountResults
Expand Down
Loading

0 comments on commit 3c88380

Please sign in to comment.