Skip to content

Commit

Permalink
feat(falcosidekick): add falcoSidekick type & its utils (#99)
Browse files Browse the repository at this point in the history
* chore: rm customfields key value from falcosidekick example conf

* refactor(sendLogs): use falcoEvent type

* feat(falcosidekick): add falcoSidekick type & its utils

* fix(checkFalcoEvents): replace `time.Tick` with a ticker

This commit replaces the usage of `time.Tick` with a ticker
in the `checkFalcoEvents` func for more precise timing control.
It initializes the ticker & defers its cleanup to ensure proper
resource management.

Signed-off-by: Dwi Siswanto <me@dw1.io>

---------

Signed-off-by: Dwi Siswanto <me@dw1.io>
  • Loading branch information
dwisiswant0 authored Sep 26, 2023
1 parent 52ee58e commit 66a38ab
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 35 deletions.
6 changes: 3 additions & 3 deletions falcosidekick.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
#listenport: 2801 # port to listen for daemon (default: 2801)
debug: false # if true all outputs will print in stdout the payload they send (default: false)
customfields: # custom fields are added to falco events and metrics, if the value starts with % the relative env var is used
Akey: "AValue"
Bkey: "BValue"
Ckey: "CValue"
# Akey: "AValue"
# Bkey: "BValue"
# Ckey: "CValue"
templatedfields: # templated fields are added to falco events and metrics, it uses Go template + output_fields values
# Dkey: '{{ or (index . "k8s.ns.labels.foo") "bar" }}'
# bracketreplacer: "_" # if not empty, the brackets in keys of Output Fields are replaced
Expand Down
122 changes: 122 additions & 0 deletions falcosidekick.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
// Copyright Dwi Siswanto and/or licensed to Dwi Siswanto under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
// See the LICENSE-ELASTIC file in the project root for more information.

package teler

import (
"bytes"
"time"

"encoding/json"
"net/http"

"github.com/daniel-hutao/spinlock"
"github.com/sourcegraph/conc/pool"
"go.uber.org/zap/zapcore"
"golang.org/x/exp/slices"
)

// falcoEvent represents a Falco event structure.
// It is used for marshaling Falco events to JSON format.
type falcoEvent struct {
Output string `json:"output"`
Priority string `json:"priority"`
Rule string `json:"rule"`
Time string `json:"time"`
OutputFields struct {
Caller string `json:"teler.caller"`
ID string `json:"teler.id"`
Threat string `json:"teler.threat"`
RequestBody string `json:"request.body"`
RequestHeaders string `json:"request.headers"`
RequestIPAddr string `json:"request.ip_addr"`
RequestMethod string `json:"request.method"`
RequestPath string `json:"request.path"`
} `json:"output_fields"`
}

// falcoSidekick represents a data structure for managing
// a collection of Falco events and a SpinLock for concurrent
// access control.
type falcoSidekick struct {
events []*falcoEvent
sl spinlock.SpinLock
}

// checkFalcoEvents periodically checks for pending Falco events and
// sends them to a FalcoSidekick instance.
//
// If the FalcoSidekick URL is configured. It runs as a background goroutine.
func (t *Teler) checkFalcoEvents() {
// If the FalcoSidekick URL is not configured, do nothing.
if t.opt.FalcoSidekickURL == "" {
return
}

// Initialize ticker
ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop()

// Check for pending Falco events every 5 seconds.
for range ticker.C {
// Get the count of pending Falco events.
c := len(t.falcoSidekick.events)
if c > 0 {
// Send pending Falco events to FalcoSidekick.
t.sendFalcoEvents()
}
}
}

// sendFalcoEvents sends pending Falco events to a FalcoSidekick instance.
// It uses a goroutine pool to parallelize the sending of events.
func (t *Teler) sendFalcoEvents() {
// Lock the FalcoSidekick event slice to prevent concurrent access.
t.falcoSidekick.sl.Lock()
defer t.falcoSidekick.sl.Unlock()

// Get the number of pending Falco events.
i := len(t.falcoSidekick.events)

// Initialize worker number
var w int

// Define worker number logic
w = i / 2
if w == 0 {
w = 1
}

// Create a goroutine pool with a maximum number of goroutines (workers)
// equal to half the number of pending events.
p := pool.New().WithMaxGoroutines(w)

// Iterate over the pending Falco events.
for _, event := range t.falcoSidekick.events {
e := event
p.Go(func() {
// Marshal the Falco event to JSON format.
payload, err := json.Marshal(e)
if err != nil {
// Handle JSON marshaling error by logging an error message.
t.error(zapcore.ErrorLevel, err.Error())
}

// Send a POST request to the FalcoSidekick instance with the JSON payload.
resp, err := http.Post(t.opt.FalcoSidekickURL, "application/json", bytes.NewBuffer(payload))
if err != nil {
// Handle HTTP POST request error by logging an error message.
t.error(zapcore.ErrorLevel, err.Error())
}
defer resp.Body.Close()
})
}

// Wait for all goroutines in the pool to complete.
p.Wait()

// Remove sent Falco events from the pending events slice.
t.falcoSidekick.events = slices.Delete(t.falcoSidekick.events, 0, i)
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@ require (
github.com/projectdiscovery/mapcidr v1.1.2
github.com/samber/lo v1.38.1
github.com/scorpionknifes/go-pcre v0.0.0-20210805092536-77486363b797
github.com/sourcegraph/conc v0.3.0
github.com/stretchr/testify v1.8.2
github.com/twharmon/gouid v0.5.2
github.com/valyala/fastjson v1.6.3
github.com/valyala/fasttemplate v1.2.2
go.uber.org/zap v1.26.0
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0
golang.org/x/net v0.14.0
golang.org/x/text v0.12.0
gopkg.in/yaml.v3 v3.0.1
Expand Down Expand Up @@ -64,7 +66,6 @@ require (
go.opencensus.io v0.24.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 // indirect
golang.org/x/oauth2 v0.8.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
Expand Down
4 changes: 3 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,8 @@ github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM=
github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA=
github.com/scorpionknifes/go-pcre v0.0.0-20210805092536-77486363b797 h1:gY4oDYOGix3T1pJoNbW+yG7cxuPbKvWDeHQksgPXuM4=
github.com/scorpionknifes/go-pcre v0.0.0-20210805092536-77486363b797/go.mod h1:ygmxh78DrhoitFrusINenwt2BfHkkPe68GjNYdNPkQQ=
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down Expand Up @@ -974,8 +976,8 @@ google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8=
google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/cheggaaa/pb.v1 v1.0.27/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
57 changes: 27 additions & 30 deletions teler.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package teler

import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -104,6 +103,9 @@ type Teler struct {

// env is environment for DSL.
env *dsl.Env

// falcoSidekick is Falco Sidekick instance that holds events.
falcoSidekick falcoSidekick
}

// New constructs a new Teler instance with the supplied options.
Expand Down Expand Up @@ -320,6 +322,9 @@ func New(opts ...Options) *Teler {
t.error(zapcore.PanicLevel, fmt.Sprintf(errResources, err))
}

// Run checks Falco events
go t.checkFalcoEvents()

return t
}

Expand Down Expand Up @@ -380,35 +385,27 @@ func (t *Teler) sendLogs(r *http.Request, k threat.Threat, id string, msg string
now := time.Now()

// Build FalcoSidekick event payload
data := map[string]interface{}{
"output": fmt.Sprintf(
"%s: %s at %s by %s (caller=%s threat=%s id=%s)",
now.Format("15:04:05.000000000"), msg, r.URL.Path, ipAddr, t.caller, cat, id),
"priority": "Warning",
"rule": msg,
"time": now.Format("2006-01-02T15:04:05.999999999Z"),
"output_fields": map[string]interface{}{
"teler.caller": t.caller,
"teler.id": id,
"teler.threat": cat,
"request.method": r.Method,
"request.path": path,
"request.ip_addr": ipAddr,
"request.headers": string(jsonHeaders),
"request.body": string(body),
},
}
payload, err := json.Marshal(data)
if err != nil {
t.error(zapcore.ErrorLevel, err.Error())
}

// Send the POST request to FalcoSidekick instance
resp, err := http.Post(t.opt.FalcoSidekickURL, "application/json", bytes.NewBuffer(payload))
if err != nil {
t.error(zapcore.ErrorLevel, err.Error())
}
defer resp.Body.Close()
event := new(falcoEvent)
event.Output = fmt.Sprintf(
"%s: %s at %s by %s (caller=%s threat=%s id=%s)",
now.Format("15:04:05.000000000"), msg, r.URL.Path, ipAddr, t.caller, cat, id,
)
event.Priority = "Warning"
event.Rule = msg
event.Time = now.Format("2006-01-02T15:04:05.999999999Z")
event.OutputFields.Caller = t.caller
event.OutputFields.ID = id
event.OutputFields.Threat = cat
event.OutputFields.RequestBody = string(body)
event.OutputFields.RequestHeaders = string(jsonHeaders)
event.OutputFields.RequestIPAddr = ipAddr
event.OutputFields.RequestMethod = r.Method
event.OutputFields.RequestPath = path

// Append event to falcoSidekick instance
t.falcoSidekick.sl.Lock()
t.falcoSidekick.events = append(t.falcoSidekick.events, event)
t.falcoSidekick.sl.Unlock()
}

// getResources to download datasets of threat ruleset from teler-resources
Expand Down

1 comment on commit 66a38ab

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Performance Alert ⚠️

Possible performance regression was detected for benchmark.
Benchmark result of this commit is worse than the previous benchmark result exceeding threshold 2.

Benchmark suite Current: 66a38ab Previous: 26a3e5d Ratio
BenchmarkInitializeDefault - ns/op 1715831450 ns/op 51486071 ns/op 33.33

This comment was automatically generated by workflow using github-action-benchmark.

Please sign in to comment.