Skip to content

Commit

Permalink
wip: hard glue go-docappender in netflow
Browse files Browse the repository at this point in the history
  • Loading branch information
Panos Koutsovasilis committed Jul 12, 2024
1 parent 3c88380 commit b37ec49
Show file tree
Hide file tree
Showing 10 changed files with 176 additions and 74 deletions.
4 changes: 4 additions & 0 deletions filebeat/beater/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func withPipelineEventCounter(pipeline beat.PipelineConnector, counter *eventCou
return pipeline
}

func (c *countingClient) Process(event *beat.Event) (*beat.Event, error) {
return c.client.Process(event)
}

func (c *countingClient) Publish(event beat.Event) {
c.counter.Add(1)
c.client.Publish(event)
Expand Down
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ require (
github.com/elastic/elastic-agent-autodiscover v0.7.0
github.com/elastic/elastic-agent-libs v0.9.13
github.com/elastic/elastic-agent-system-metrics v0.10.3
github.com/elastic/go-docappender/v2 v2.2.0
github.com/elastic/go-elasticsearch/v8 v8.14.0
github.com/elastic/go-sfdc v0.0.0-20240621062639-bcc8456508ff
github.com/elastic/mito v1.13.0
Expand All @@ -210,6 +211,7 @@ require (
github.com/felixge/fgprof v0.9.4
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15
github.com/go-ldap/ldap/v3 v3.4.6
github.com/goccy/go-json v0.10.2
github.com/golang-jwt/jwt/v5 v5.2.1
github.com/google/cel-go v0.19.0
github.com/googleapis/gax-go/v2 v2.12.0
Expand Down Expand Up @@ -288,15 +290,14 @@ require (
github.com/felixge/httpsnoop v1.0.1 // indirect
github.com/go-asn1-ber/asn1-ber v1.5.5 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-stack/stack v1.8.0 // indirect
github.com/gobuffalo/here v0.6.7 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/godror/knownpb v0.1.0 // indirect
github.com/golang-jwt/jwt/v4 v4.5.0 // indirect
github.com/golang-sql/civil v0.0.0-20190719163853-cb61b32ac6fe // indirect
Expand Down Expand Up @@ -328,7 +329,7 @@ require (
github.com/karrick/godirwalk v1.17.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/klauspost/cpuid/v2 v2.2.5 // indirect
github.com/kortschak/utter v1.5.0 // indirect
github.com/kylelemons/godebug v1.1.0 // indirect
Expand Down Expand Up @@ -366,11 +367,12 @@ require (
github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.elastic.co/fastjson v1.1.0 // indirect
go.elastic.co/apm/module/apmzap/v2 v2.6.0 // indirect
go.elastic.co/fastjson v1.3.0 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.24.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
go.opentelemetry.io/otel/trace v1.24.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
37 changes: 20 additions & 17 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,8 @@ github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdb
github.com/elastic/glog v1.0.1-0.20210831205241-7d8b5c89dfc4/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
github.com/elastic/go-concert v0.3.0 h1:Y66JFn3ENndpHErOhTASu8/Fz1SSsLZicPufCmvQD60=
github.com/elastic/go-concert v0.3.0/go.mod h1:UWt1MB5HxxZ85hKynLaYl/AaLIKFx0WiBP2uJSRfduA=
github.com/elastic/go-docappender/v2 v2.2.0 h1:Pq6w+R0ZbWlqZHoqzoujR2ElKBfaPUf3M/cJvPVSrfA=
github.com/elastic/go-docappender/v2 v2.2.0/go.mod h1:efwvMZfrJ1dRr0SkfmZosXKmgmV/D+3E5G66rQL/B4A=
github.com/elastic/go-elasticsearch/v8 v8.14.0 h1:1ywU8WFReLLcxE1WJqii3hTtbPUE2hc38ZK/j4mMFow=
github.com/elastic/go-elasticsearch/v8 v8.14.0/go.mod h1:WRvnlGkSuZyp83M2U8El/LGXpCjYLrvlkSgkAH4O5I4=
github.com/elastic/go-libaudit/v2 v2.5.0 h1:5OK919QRnGtcjVBz3n/cs5F42im1mPlVTA9TyIn2K54=
Expand Down Expand Up @@ -693,8 +695,8 @@ github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTg
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.3.0/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ=
github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-martini/martini v0.0.0-20170121215854-22fa46961aab h1:xveKWz2iaueeTaUgdetzel+U7exyigDYBryyVfV/rZk=
Expand Down Expand Up @@ -1085,8 +1087,6 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
Expand Down Expand Up @@ -1211,8 +1211,8 @@ github.com/klauspost/compress v1.12.2/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/cpuid/v2 v2.2.5 h1:0E5MSMDEoAulmXNFquVs//DdoomxaoTY1kUhbc/qbZg=
Expand Down Expand Up @@ -1706,12 +1706,14 @@ go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0 h1:ukMcwyMaDXsS1dRK2qRYXT2As
go.elastic.co/apm/module/apmelasticsearch/v2 v2.6.0/go.mod h1:YpfiTTrqX5LB/CKBwX89oDCBAxuLJTFv40gcfxJyehM=
go.elastic.co/apm/module/apmhttp/v2 v2.6.0 h1:s8UeNFQmVBCNd4eoz7KDD9rEFhQC0HeUFXz3z9gpAmQ=
go.elastic.co/apm/module/apmhttp/v2 v2.6.0/go.mod h1:D0GLppLuI0Ddwvtl595GUxRgn6Z8L5KaDFVMv2H3GK0=
go.elastic.co/apm/module/apmzap/v2 v2.6.0 h1:R/iVORzGu3F9uM43iEVHD0nwiRo59O0bIXdayKsgayQ=
go.elastic.co/apm/module/apmzap/v2 v2.6.0/go.mod h1:B3i/8xRkqLgi6zNuV+Bp7Pt4cutaOObvrVSa7wUTAPw=
go.elastic.co/apm/v2 v2.6.0 h1:VieBMLQFtXua2YxpYxaSdYGnmmxhLT46gosI5yErJgY=
go.elastic.co/apm/v2 v2.6.0/go.mod h1:33rOXgtHwbgZcDgi6I/GtCSMZQqgxkHC0IQT3gudKvo=
go.elastic.co/ecszap v1.0.2 h1:iW5OGx8IiokiUzx/shD4AJCPFMC9uUtr7ycaiEIU++I=
go.elastic.co/ecszap v1.0.2/go.mod h1:dJkSlK3BTiwG/qXhCwe50Mz/jwu854vSip8sIeQhNZg=
go.elastic.co/fastjson v1.1.0 h1:3MrGBWWVIxe/xvsbpghtkFoPciPhOCmjsR/HfwEeQR4=
go.elastic.co/fastjson v1.1.0/go.mod h1:boNGISWMjQsUPy/t6yqt2/1Wx4YNPSe+mZjlyw9vKKI=
go.elastic.co/fastjson v1.3.0 h1:hJO3OsYIhiqiT4Fgu0ZxAECnKASbwgiS+LMW5oCopKs=
go.elastic.co/fastjson v1.3.0/go.mod h1:K9vDh7O0ODsVKV2B5e2XYLY277QZaCbB3tS1SnARvko=
go.elastic.co/go-licence-detector v0.6.1 h1:T2PFHYdow+9mAjj6K5ehn5anTxtsURfom2P4S6PgMzg=
go.elastic.co/go-licence-detector v0.6.1/go.mod h1:qQ1clBRS2f0Ee5ie+y2LLYnyhSNJNm0Ha6d7SoYVtM4=
go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
Expand Down Expand Up @@ -1740,14 +1742,16 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0=
go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo=
go.opentelemetry.io/otel v1.24.0 h1:0LAOdjNmQeSTzGBzduGe/rU4tZhMwL5rWgtp9Ku5Jfo=
go.opentelemetry.io/otel v1.24.0/go.mod h1:W7b9Ozg4nkF5tWI5zsXkaKKDjdVjpD4oAt9Qi/MArHo=
go.opentelemetry.io/otel/metric v1.24.0 h1:6EhoGWWK28x1fbpA4tYTOWBkPefTDQnb8WSGXlc88kI=
go.opentelemetry.io/otel/metric v1.24.0/go.mod h1:VYhLe1rFfxuTXLgj4CBiyz+9WYBA8pNGJgDcSFRKBco=
go.opentelemetry.io/otel/sdk v1.21.0 h1:FTt8qirL1EysG6sTQRZ5TokkU8d0ugCj8htOgThZXQ8=
go.opentelemetry.io/otel/sdk v1.21.0/go.mod h1:Nna6Yv7PWTdgJHVRD9hIYywQBRx7pbox6nwBnZIxl/E=
go.opentelemetry.io/otel/trace v1.24.0 h1:CsKnnL4dUAr/0llH9FKuc698G04IrpWV0MQA/Y1YELI=
go.opentelemetry.io/otel/trace v1.24.0/go.mod h1:HPc3Xr/cOApsBI154IU0OI0HJexz+aw5uPdbs3UCjNU=
go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo=
go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4=
go.opentelemetry.io/otel/metric v1.28.0 h1:f0HGvSl1KRAU1DLgLGFjrwVyismPlnuU6JD6bOeuA5Q=
go.opentelemetry.io/otel/metric v1.28.0/go.mod h1:Fb1eVBFZmLVTMb6PPohq3TO9IIhUisDsbJoL/+uQW4s=
go.opentelemetry.io/otel/sdk v1.27.0 h1:mlk+/Y1gLPLn84U4tI8d3GNJmGT/eXe3ZuOXN9kTWmI=
go.opentelemetry.io/otel/sdk v1.27.0/go.mod h1:Ha9vbLwJE6W86YstIywK2xFfPjbWlCuwPtMkKdz/Y4A=
go.opentelemetry.io/otel/sdk/metric v1.27.0 h1:5uGNOlpXi+Hbo/DRoI31BSb1v+OGcpv2NemcCrOL8gI=
go.opentelemetry.io/otel/sdk/metric v1.27.0/go.mod h1:we7jJVrYN2kh3mVBlswtPU22K0SA+769l93J6bsyvqw=
go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g=
go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down Expand Up @@ -2211,7 +2215,6 @@ golang.org/x/tools v0.0.0-20200312045724-11d5b4c81c7d/go.mod h1:o4KQGtdN14AW+yjs
golang.org/x/tools v0.0.0-20200331025713-a30bf2db82d4/go.mod h1:Sl4aGygMT6LrqrWclx+PTx3U+LnKx/seiNR+3G19Ar8=
golang.org/x/tools v0.0.0-20200422205258-72e4a01eba43/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200501065659-ab2804fb9c9d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200509030707-2212a7e161a5/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
Expand Down
2 changes: 2 additions & 0 deletions libbeat/beat/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ type PipelineConnector = Pipeline
type Client interface {
// Publish the event
Publish(Event)
// Publish the event
Process(*Event) (*Event, error)
// PublishAll events specified in the Event array
PublishAll([]Event)
Close() error
Expand Down
71 changes: 44 additions & 27 deletions libbeat/publisher/pipeline/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,27 @@ func (c *client) Publish(e beat.Event) {
c.publish(e)
}

func (c *client) Process(e *beat.Event) (*beat.Event, error) {
if c.processors != nil {
var err error

e, err = c.processors.Run(e)
//publish = event != nil
if err != nil {
// If we introduce a dead-letter queue, this is where we should
// route the event to it.
c.logger.Errorf("Failed to publish event: %v", err)
}

}

return e, nil
}

func (c *client) publish(e beat.Event) {
var (
event = &e
publish = true
event = &e
//publish = true
)

c.onNewEvent()
Expand All @@ -92,7 +109,7 @@ func (c *client) publish(e beat.Event) {
var err error

event, err = c.processors.Run(event)
publish = event != nil
//publish = event != nil
if err != nil {
// If we introduce a dead-letter queue, this is where we should
// route the event to it.
Expand All @@ -104,30 +121,30 @@ func (c *client) publish(e beat.Event) {
e = *event
}

c.eventListener.AddEvent(e, publish)
if !publish {
c.onFilteredOut(e)
return
}

e = *event
pubEvent := publisher.Event{
Content: e,
Flags: c.eventFlags,
}

var published bool
if c.canDrop {
_, published = c.producer.TryPublish(pubEvent)
} else {
_, published = c.producer.Publish(pubEvent)
}

if published {
c.onPublished()
} else {
c.onDroppedOnPublish(e)
}
//c.eventListener.AddEvent(e, publish)
//if !publish {
// c.onFilteredOut(e)
// return
//}

//e = *event
//pubEvent := publisher.Event{
// Content: e,
// Flags: c.eventFlags,
//}

//var published bool
//if c.canDrop {
// _, _ = c.producer.TryPublish(pubEvent)
//} else {
// _, _ = c.producer.Publish(pubEvent)
//}

//if published {
// c.onPublished()
//} else {
// c.onDroppedOnPublish(e)
//}
}

func (c *client) Close() error {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/publisher/pipeline/nilpipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ func (c *nilClient) Publish(event beat.Event) {
c.PublishAll([]beat.Event{event})
}

func (c *nilClient) Process(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func (c *nilClient) PublishAll(events []beat.Event) {
L := len(events)
if L == 0 {
Expand Down
4 changes: 4 additions & 0 deletions libbeat/publisher/pipeline/sync_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ type dummyClient struct {
Received chan int
}

func (c *dummyClient) Process(event *beat.Event) (*beat.Event, error) {
return event, nil
}

func newDummyClient() *dummyClient {
return &dummyClient{Received: make(chan int)}
}
Expand Down
4 changes: 4 additions & 0 deletions x-pack/dockerlogbeat/pipelinemock/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ type MockBeatClient struct {
mtx sync.Mutex
}

func (c *MockBeatClient) Process(event *beat.Event) (*beat.Event, error) {
return event, nil
}

// GetEvents returns the published events
func (c *MockBeatClient) GetEvents() []beat.Event {
c.mtx.Lock()
Expand Down
Loading

0 comments on commit b37ec49

Please sign in to comment.