Skip to content

Commit

Permalink
sync goroutine / configuration / service mgmt behavior with other go …
Browse files Browse the repository at this point in the history
…bouncers (#136)

* update dependencies to crowdsec 1.5.2; allow build with devel version of go

* Refactor to use error group

Signed-off-by: Shivam Sandbhor <shivam.sandbhor@gmail.com>

* Make review changes

Signed-off-by: Shivam Sandbhor <shivam.sandbhor@gmail.com>

* notify systemd; move *testConfig after validation; update func tests

---------

Signed-off-by: Shivam Sandbhor <shivam.sandbhor@gmail.com>
Co-authored-by: Shivam Sandbhor <shivam.sandbhor@gmail.com>
  • Loading branch information
mmetc and sbs2001 authored Jun 12, 2023
1 parent bfdc55c commit 9a42c34
Show file tree
Hide file tree
Showing 8 changed files with 214 additions and 1,948 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/build-binary-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ name: build-binary-package

on:
release:
types: prereleased
types:
- prereleased

permissions:
# Use write for: hub release edit
Expand Down
121 changes: 63 additions & 58 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,22 @@ import (
"net"
"net/http"
"os"
"os/signal"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/prometheus/client_golang/prometheus/promhttp"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"golang.org/x/sync/errgroup"

"github.com/crowdsecurity/crowdsec/pkg/apiclient"
"github.com/crowdsecurity/crowdsec/pkg/models"
csbouncer "github.com/crowdsecurity/go-cs-bouncer"
"github.com/crowdsecurity/go-cs-lib/pkg/csdaemon"
"github.com/crowdsecurity/go-cs-lib/pkg/version"

"github.com/crowdsecurity/cs-cloudflare-bouncer/pkg/cf"
Expand All @@ -30,9 +33,27 @@ import (

const (
DEFAULT_CONFIG_PATH = "/etc/crowdsec/bouncers/crowdsec-cloudflare-bouncer.yaml"
name = "crowdsec-cloudflare-bouncer"
name = "crowdsec-cloudflare-bouncer"
)

func HandleSignals(ctx context.Context) error {
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)

select {
case s := <-signalChan:
switch s {
case syscall.SIGTERM:
return fmt.Errorf("received SIGTERM")
case syscall.SIGINT:
return fmt.Errorf("received SIGINT")
}
case <-ctx.Done():
return ctx.Err()
}
return nil
}

func newAPILogger(logDir string, logAPIRequests *bool) (*log.Logger, error) {
APILogger := log.New()
if *logAPIRequests {
Expand Down Expand Up @@ -112,11 +133,6 @@ func Execute() error {
return fmt.Errorf("unable to parse config: %w", err)
}

if *testConfig {
log.Info("config is valid")
return nil
}

if *delete || *onlySetup {
log.SetOutput(os.Stdout)
}
Expand All @@ -127,7 +143,6 @@ func Execute() error {
}

var csLAPI *csbouncer.StreamBouncer
ctx := context.Background()

zoneLocks := make([]cf.ZoneLock, 0)
for _, account := range conf.CloudflareConfig.Accounts {
Expand All @@ -136,10 +151,7 @@ func Execute() error {
}
}

var workerTomb tomb.Tomb
var serverTomb tomb.Tomb
var dispatchTomb tomb.Tomb

g, ctx := errgroup.WithContext(context.Background())
// lapiStreams are used to forward the decisions to all the workers
lapiStreams := make([]chan *models.DecisionsStreamResponse, 0)
APICountByToken := make(map[string]*uint32)
Expand All @@ -165,12 +177,8 @@ func Execute() error {
TokenCallCount: APICountByToken[account.Token],
}
if *onlySetup {
workerTomb.Go(func() error {
var err error = nil
defer func() {
workerTomb.Kill(err)
}()

g.Go(func() error {
var err error
worker.CFStateByAction = nil
err = worker.Init()
if err != nil {
Expand All @@ -180,21 +188,17 @@ func Execute() error {
return err
})
} else if *delete {
workerTomb.Go(func() error {
var err error = nil
defer func() {
workerTomb.Kill(err)
}()
g.Go(func() error {
var err error
err = worker.Init()
if err != nil {
return err
}
err = worker.DeleteExistingIPList()
return err

})
} else {
workerTomb.Go(func() error {
g.Go(func() error {
err := worker.Run()
return err
})
Expand All @@ -221,27 +225,35 @@ func Execute() error {
if err := csLAPI.Init(); err != nil {
return err
}
dispatchTomb.Go(func() error {
go func() {
csLAPI.Run()
log.Fatal("LAPI can't be reached")
}()
if *testConfig {
log.Info("config is valid")
return nil
}
g.Go(func() error {
csLAPI.Run(ctx)
return fmt.Errorf("crowdsec LAPI stream has stopped")
})
g.Go(func() error {
for {
// broadcast decision to each worker
decisions := <-csLAPI.Stream
for _, lapiStream := range lapiStreams {
lapiStream <- decisions
select {
case decisions := <-csLAPI.Stream:
for _, lapiStream := range lapiStreams {
stream := lapiStream
go func() { stream <- decisions }()
}
case <-ctx.Done():
return ctx.Err()
}
}
})
}

if conf.PrometheusConfig.Enabled {
serverTomb.Go(func() error {
go func() {
http.Handle("/metrics", promhttp.Handler())
err := http.ListenAndServe(net.JoinHostPort(conf.PrometheusConfig.ListenAddress, conf.PrometheusConfig.ListenPort), nil)
return err
})
log.Error(http.ListenAndServe(net.JoinHostPort(conf.PrometheusConfig.ListenAddress, conf.PrometheusConfig.ListenPort), nil))
}()
}

apiCallCounterWindow := time.NewTicker(time.Second)
Expand All @@ -254,27 +266,20 @@ func Execute() error {
}
}()

for {
select {
case <-workerTomb.Dying():
dispatchTomb.Kill(nil)
err := workerTomb.Err()
if err != nil {
return err
}
if *onlySetup || *delete {
if *delete {
log.Info("deleted all cf config")
_ = csdaemon.NotifySystemd(log.StandardLogger())

} else {
log.Info("setup complete")
}
}
return nil
case <-dispatchTomb.Dying():
workerTomb.Kill(nil)
return fmt.Errorf("dispatch is dying")
}
}
g.Go(func() error {
return HandleSignals(ctx)
})

if err := g.Wait(); err != nil {
return fmt.Errorf("process terminated with error: %w", err)
}
if *delete {
log.Info("deleted all cf config")
}
if *onlySetup {
log.Info("setup complete")
}
return nil
}
63 changes: 36 additions & 27 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,52 +4,61 @@ go 1.20

require (
github.com/cloudflare/cloudflare-go v0.40.1-0.20220527055342-b3795adaff97
github.com/crowdsecurity/crowdsec v1.4.6
github.com/crowdsecurity/go-cs-bouncer v0.0.2
github.com/crowdsecurity/go-cs-lib v0.0.0-20230522124854-671e895fa788
github.com/prometheus/client_golang v1.13.0
github.com/crowdsecurity/crowdsec v1.5.2
github.com/crowdsecurity/go-cs-bouncer v0.0.7
github.com/crowdsecurity/go-cs-lib v0.0.2
github.com/prometheus/client_golang v1.15.1
github.com/sirupsen/logrus v1.9.2
github.com/stretchr/testify v1.8.3
golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1
golang.org/x/sync v0.2.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/antonmedv/expr v1.9.0 // indirect
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/antonmedv/expr v1.12.5 // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/crowdsecurity/grokky v0.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/crowdsecurity/grokky v0.2.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.15.0 // indirect
github.com/go-openapi/analysis v0.21.4 // indirect
github.com/go-openapi/errors v0.20.3 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
github.com/go-openapi/jsonreference v0.20.0 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/loads v0.21.2 // indirect
github.com/go-openapi/spec v0.20.7 // indirect
github.com/go-openapi/strfmt v0.21.3 // indirect
github.com/go-openapi/swag v0.22.1 // indirect
github.com/go-openapi/validate v0.22.0 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/go-openapi/spec v0.20.9 // indirect
github.com/go-openapi/strfmt v0.21.7 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/go-openapi/validate v0.22.1 // indirect
github.com/goccy/go-yaml v1.11.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/hashicorp/go-version v1.6.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/magefile/mage v1.15.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
go.mongodb.org/mongo-driver v1.10.1 // indirect
golang.org/x/net v0.0.0-20220812174116-3211cb980234 // indirect
golang.org/x/sys v0.7.0 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220411224347-583f2d630306 // indirect
google.golang.org/protobuf v1.28.1 // indirect
github.com/prometheus/client_model v0.4.0 // indirect
github.com/prometheus/common v0.44.0 // indirect
github.com/prometheus/procfs v0.10.0 // indirect
github.com/tetratelabs/wazero v1.1.0 // indirect
github.com/wasilibs/go-re2 v1.1.0 // indirect
go.mongodb.org/mongo-driver v1.11.6 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.8.0 // indirect
golang.org/x/text v0.9.0 // indirect
golang.org/x/time v0.0.0-20220224211638-0e9765cccd65 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
Loading

0 comments on commit 9a42c34

Please sign in to comment.