From 66a38ab7319ab7d7eea7cf3e6bdc28502f237100 Mon Sep 17 00:00:00 2001 From: Dwi Siswanto Date: Tue, 26 Sep 2023 19:16:48 +0700 Subject: [PATCH] feat(falcosidekick): add falcoSidekick type & its utils (#99) * chore: rm customfields key value from falcosidekick example conf * refactor(sendLogs): use falcoEvent type * feat(falcosidekick): add falcoSidekick type & its utils * fix(checkFalcoEvents): replace `time.Tick` with a ticker This commit replaces the usage of `time.Tick` with a ticker in the `checkFalcoEvents` func for more precise timing control. It initializes the ticker & defers its cleanup to ensure proper resource management. Signed-off-by: Dwi Siswanto --------- Signed-off-by: Dwi Siswanto --- falcosidekick.example.yaml | 6 +- falcosidekick.go | 122 +++++++++++++++++++++++++++++++++++++ go.mod | 3 +- go.sum | 4 +- teler.go | 57 ++++++++--------- 5 files changed, 157 insertions(+), 35 deletions(-) create mode 100644 falcosidekick.go diff --git a/falcosidekick.example.yaml b/falcosidekick.example.yaml index 78af1d8..3eb474b 100644 --- a/falcosidekick.example.yaml +++ b/falcosidekick.example.yaml @@ -2,9 +2,9 @@ #listenport: 2801 # port to listen for daemon (default: 2801) debug: false # if true all outputs will print in stdout the payload they send (default: false) customfields: # custom fields are added to falco events and metrics, if the value starts with % the relative env var is used - Akey: "AValue" - Bkey: "BValue" - Ckey: "CValue" + # Akey: "AValue" + # Bkey: "BValue" + # Ckey: "CValue" templatedfields: # templated fields are added to falco events and metrics, it uses Go template + output_fields values # Dkey: '{{ or (index . "k8s.ns.labels.foo") "bar" }}' # bracketreplacer: "_" # if not empty, the brackets in keys of Output Fields are replaced diff --git a/falcosidekick.go b/falcosidekick.go new file mode 100644 index 0000000..44cb4e2 --- /dev/null +++ b/falcosidekick.go @@ -0,0 +1,122 @@ +// Copyright Dwi Siswanto and/or licensed to Dwi Siswanto under one +// or more contributor license agreements. Licensed under the Elastic License 2.0; +// you may not use this file except in compliance with the Elastic License 2.0. +// See the LICENSE-ELASTIC file in the project root for more information. + +package teler + +import ( + "bytes" + "time" + + "encoding/json" + "net/http" + + "github.com/daniel-hutao/spinlock" + "github.com/sourcegraph/conc/pool" + "go.uber.org/zap/zapcore" + "golang.org/x/exp/slices" +) + +// falcoEvent represents a Falco event structure. +// It is used for marshaling Falco events to JSON format. +type falcoEvent struct { + Output string `json:"output"` + Priority string `json:"priority"` + Rule string `json:"rule"` + Time string `json:"time"` + OutputFields struct { + Caller string `json:"teler.caller"` + ID string `json:"teler.id"` + Threat string `json:"teler.threat"` + RequestBody string `json:"request.body"` + RequestHeaders string `json:"request.headers"` + RequestIPAddr string `json:"request.ip_addr"` + RequestMethod string `json:"request.method"` + RequestPath string `json:"request.path"` + } `json:"output_fields"` +} + +// falcoSidekick represents a data structure for managing +// a collection of Falco events and a SpinLock for concurrent +// access control. +type falcoSidekick struct { + events []*falcoEvent + sl spinlock.SpinLock +} + +// checkFalcoEvents periodically checks for pending Falco events and +// sends them to a FalcoSidekick instance. +// +// If the FalcoSidekick URL is configured. It runs as a background goroutine. +func (t *Teler) checkFalcoEvents() { + // If the FalcoSidekick URL is not configured, do nothing. + if t.opt.FalcoSidekickURL == "" { + return + } + + // Initialize ticker + ticker := time.NewTicker(time.Second * 5) + defer ticker.Stop() + + // Check for pending Falco events every 5 seconds. + for range ticker.C { + // Get the count of pending Falco events. + c := len(t.falcoSidekick.events) + if c > 0 { + // Send pending Falco events to FalcoSidekick. + t.sendFalcoEvents() + } + } +} + +// sendFalcoEvents sends pending Falco events to a FalcoSidekick instance. +// It uses a goroutine pool to parallelize the sending of events. +func (t *Teler) sendFalcoEvents() { + // Lock the FalcoSidekick event slice to prevent concurrent access. + t.falcoSidekick.sl.Lock() + defer t.falcoSidekick.sl.Unlock() + + // Get the number of pending Falco events. + i := len(t.falcoSidekick.events) + + // Initialize worker number + var w int + + // Define worker number logic + w = i / 2 + if w == 0 { + w = 1 + } + + // Create a goroutine pool with a maximum number of goroutines (workers) + // equal to half the number of pending events. + p := pool.New().WithMaxGoroutines(w) + + // Iterate over the pending Falco events. + for _, event := range t.falcoSidekick.events { + e := event + p.Go(func() { + // Marshal the Falco event to JSON format. + payload, err := json.Marshal(e) + if err != nil { + // Handle JSON marshaling error by logging an error message. + t.error(zapcore.ErrorLevel, err.Error()) + } + + // Send a POST request to the FalcoSidekick instance with the JSON payload. + resp, err := http.Post(t.opt.FalcoSidekickURL, "application/json", bytes.NewBuffer(payload)) + if err != nil { + // Handle HTTP POST request error by logging an error message. + t.error(zapcore.ErrorLevel, err.Error()) + } + defer resp.Body.Close() + }) + } + + // Wait for all goroutines in the pool to complete. + p.Wait() + + // Remove sent Falco events from the pending events slice. + t.falcoSidekick.events = slices.Delete(t.falcoSidekick.events, 0, i) +} diff --git a/go.mod b/go.mod index be2b4ed..90cdc7b 100644 --- a/go.mod +++ b/go.mod @@ -14,11 +14,13 @@ require ( github.com/projectdiscovery/mapcidr v1.1.2 github.com/samber/lo v1.38.1 github.com/scorpionknifes/go-pcre v0.0.0-20210805092536-77486363b797 + github.com/sourcegraph/conc v0.3.0 github.com/stretchr/testify v1.8.2 github.com/twharmon/gouid v0.5.2 github.com/valyala/fastjson v1.6.3 github.com/valyala/fasttemplate v1.2.2 go.uber.org/zap v1.26.0 + golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 golang.org/x/net v0.14.0 golang.org/x/text v0.12.0 gopkg.in/yaml.v3 v3.0.1 @@ -64,7 +66,6 @@ require ( go.opencensus.io v0.24.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.12.0 // indirect - golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 // indirect golang.org/x/oauth2 v0.8.0 // indirect golang.org/x/sys v0.11.0 // indirect golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect diff --git a/go.sum b/go.sum index 32c218d..f37c9bc 100644 --- a/go.sum +++ b/go.sum @@ -424,6 +424,8 @@ github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/scorpionknifes/go-pcre v0.0.0-20210805092536-77486363b797 h1:gY4oDYOGix3T1pJoNbW+yG7cxuPbKvWDeHQksgPXuM4= github.com/scorpionknifes/go-pcre v0.0.0-20210805092536-77486363b797/go.mod h1:ygmxh78DrhoitFrusINenwt2BfHkkPe68GjNYdNPkQQ= +github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo= +github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -974,8 +976,8 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= gopkg.in/cheggaaa/pb.v1 v1.0.27/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/teler.go b/teler.go index ad8e865..c6ea515 100644 --- a/teler.go +++ b/teler.go @@ -20,7 +20,6 @@ package teler import ( "bufio" - "bytes" "errors" "fmt" "io" @@ -104,6 +103,9 @@ type Teler struct { // env is environment for DSL. env *dsl.Env + + // falcoSidekick is Falco Sidekick instance that holds events. + falcoSidekick falcoSidekick } // New constructs a new Teler instance with the supplied options. @@ -320,6 +322,9 @@ func New(opts ...Options) *Teler { t.error(zapcore.PanicLevel, fmt.Sprintf(errResources, err)) } + // Run checks Falco events + go t.checkFalcoEvents() + return t } @@ -380,35 +385,27 @@ func (t *Teler) sendLogs(r *http.Request, k threat.Threat, id string, msg string now := time.Now() // Build FalcoSidekick event payload - data := map[string]interface{}{ - "output": fmt.Sprintf( - "%s: %s at %s by %s (caller=%s threat=%s id=%s)", - now.Format("15:04:05.000000000"), msg, r.URL.Path, ipAddr, t.caller, cat, id), - "priority": "Warning", - "rule": msg, - "time": now.Format("2006-01-02T15:04:05.999999999Z"), - "output_fields": map[string]interface{}{ - "teler.caller": t.caller, - "teler.id": id, - "teler.threat": cat, - "request.method": r.Method, - "request.path": path, - "request.ip_addr": ipAddr, - "request.headers": string(jsonHeaders), - "request.body": string(body), - }, - } - payload, err := json.Marshal(data) - if err != nil { - t.error(zapcore.ErrorLevel, err.Error()) - } - - // Send the POST request to FalcoSidekick instance - resp, err := http.Post(t.opt.FalcoSidekickURL, "application/json", bytes.NewBuffer(payload)) - if err != nil { - t.error(zapcore.ErrorLevel, err.Error()) - } - defer resp.Body.Close() + event := new(falcoEvent) + event.Output = fmt.Sprintf( + "%s: %s at %s by %s (caller=%s threat=%s id=%s)", + now.Format("15:04:05.000000000"), msg, r.URL.Path, ipAddr, t.caller, cat, id, + ) + event.Priority = "Warning" + event.Rule = msg + event.Time = now.Format("2006-01-02T15:04:05.999999999Z") + event.OutputFields.Caller = t.caller + event.OutputFields.ID = id + event.OutputFields.Threat = cat + event.OutputFields.RequestBody = string(body) + event.OutputFields.RequestHeaders = string(jsonHeaders) + event.OutputFields.RequestIPAddr = ipAddr + event.OutputFields.RequestMethod = r.Method + event.OutputFields.RequestPath = path + + // Append event to falcoSidekick instance + t.falcoSidekick.sl.Lock() + t.falcoSidekick.events = append(t.falcoSidekick.events, event) + t.falcoSidekick.sl.Unlock() } // getResources to download datasets of threat ruleset from teler-resources