Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add endpoint circuit breaker #2120

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
901c27f
feat: setup circuit breaker
jirevwe Aug 12, 2024
fc355f9
feat: add circuit breaker switching implementation
jirevwe Aug 16, 2024
6f691e8
feat: add circuit breaker configuration
jirevwe Aug 19, 2024
a4fc664
feat: add circuit breaker configuration
jirevwe Aug 19, 2024
528ef30
feat: remove db depndency and add function to fetch breaker state
jirevwe Aug 21, 2024
feb2b22
feat: rename sample_time to sample rate; set reasonable defaults when…
jirevwe Aug 21, 2024
e96034e
feat: add circuit breaker to instance config
jirevwe Aug 21, 2024
be3208e
feat: move half-open decision after other checks have been made, so w…
jirevwe Aug 22, 2024
1081b3c
feat: add index to migration
jirevwe Aug 22, 2024
9dba343
feat: personal review first pass; add func to validate custom config;…
jirevwe Aug 22, 2024
848559e
feat: personal review second pass; added CircuitBreakerManager.CanExe…
jirevwe Aug 23, 2024
2880b8c
feat: personal review third pass; use strings builder to build config…
jirevwe Aug 23, 2024
e268fa4
feat: use deadline context for redis operations to prevent deadlock; …
jirevwe Aug 28, 2024
d9a7dd3
feat: split package into separate files, add tests for each component
jirevwe Aug 30, 2024
7080d15
chore: update config tests
jirevwe Aug 30, 2024
9c7c646
chore: update tests
jirevwe Sep 2, 2024
1efcafe
chore: implement distributed lock using redlock
jirevwe Sep 2, 2024
afd3878
merge with main
jirevwe Sep 5, 2024
3c01da3
Merge branch 'main' into raymond/feat/add-circuit-breaker
jirevwe Sep 5, 2024
842080c
chore: fix lint error
jirevwe Sep 5, 2024
abe8ada
chore: fix lint errors
jirevwe Sep 5, 2024
97f427c
chore: fix test
jirevwe Sep 6, 2024
71bb297
patch: remove concurrent write path
jirevwe Sep 16, 2024
ea55312
feat: put the circuit breaker behind the license;
jirevwe Sep 17, 2024
85f4702
feat: Change FailureThreshold to an int; change ErrorTimeout to Break…
jirevwe Sep 17, 2024
9812d53
feat: put circuit breaker behind a feature flag
jirevwe Sep 17, 2024
370385b
Merge branch 'main' into raymond/feat/add-circuit-breaker
jirevwe Sep 17, 2024
5eae27e
chore: update tests
jirevwe Sep 18, 2024
ed42554
chore: fix go lint
jirevwe Sep 18, 2024
6ba935c
feat: refactor the sample query to discard entries that occurred befo…
jirevwe Sep 20, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions api/handlers/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,24 @@ import (
)

func (h *Handler) GetConfiguration(w http.ResponseWriter, r *http.Request) {
config, err := postgres.NewConfigRepo(h.A.DB).LoadConfiguration(r.Context())
configuration, err := postgres.NewConfigRepo(h.A.DB).LoadConfiguration(r.Context())
if err != nil && !errors.Is(err, datastore.ErrConfigNotFound) {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

configResponse := []*models.ConfigurationResponse{}
if config != nil {
if config.StoragePolicy.Type == datastore.S3 {
var configResponse []*models.ConfigurationResponse
if configuration != nil {
if configuration.StoragePolicy.Type == datastore.S3 {
policy := &datastore.S3Storage{}
policy.Bucket = config.StoragePolicy.S3.Bucket
policy.Endpoint = config.StoragePolicy.S3.Endpoint
policy.Region = config.StoragePolicy.S3.Region
config.StoragePolicy.S3 = policy
policy.Bucket = configuration.StoragePolicy.S3.Bucket
policy.Endpoint = configuration.StoragePolicy.S3.Endpoint
policy.Region = configuration.StoragePolicy.S3.Region
configuration.StoragePolicy.S3 = policy
}

c := &models.ConfigurationResponse{
Configuration: config,
Configuration: configuration,
ApiVersion: convoy.GetVersion(),
}

Expand All @@ -61,14 +61,14 @@ func (h *Handler) CreateConfiguration(w http.ResponseWriter, r *http.Request) {
NewConfig: &newConfig,
}

config, err := cc.Run(r.Context())
configuration, err := cc.Run(r.Context())
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

c := &models.ConfigurationResponse{
Configuration: config,
Configuration: configuration,
ApiVersion: convoy.GetVersion(),
}

Expand All @@ -92,14 +92,14 @@ func (h *Handler) UpdateConfiguration(w http.ResponseWriter, r *http.Request) {
Config: &newConfig,
}

config, err := uc.Run(r.Context())
configuration, err := uc.Run(r.Context())
if err != nil {
_ = render.Render(w, r, util.NewServiceErrResponse(err))
return
}

c := &models.ConfigurationResponse{
Configuration: config,
Configuration: configuration,
ApiVersion: convoy.GetVersion(),
}

Expand Down
16 changes: 9 additions & 7 deletions api/testdb/seed.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,21 +557,23 @@ func SeedUser(db database.Database, email, password string) (*datastore.User, er
}

func SeedConfiguration(db database.Database) (*datastore.Configuration, error) {
config := &datastore.Configuration{
UID: ulid.Make().String(),
IsAnalyticsEnabled: true,
IsSignupEnabled: true,
StoragePolicy: &datastore.DefaultStoragePolicy,
c := &datastore.Configuration{
UID: ulid.Make().String(),
IsAnalyticsEnabled: true,
IsSignupEnabled: true,
StoragePolicy: &datastore.DefaultStoragePolicy,
RetentionPolicy: &datastore.DefaultRetentionPolicy,
CircuitBreakerConfig: &datastore.DefaultCircuitBreakerConfiguration,
}

// Seed Data
configRepo := postgres.NewConfigRepo(db)
err := configRepo.CreateConfiguration(context.TODO(), config)
err := configRepo.CreateConfiguration(context.TODO(), c)
if err != nil {
return nil, err
}

return config, nil
return c, nil
}

func SeedDevice(db database.Database, g *datastore.Project, endpointID string) error {
Expand Down
14 changes: 5 additions & 9 deletions cmd/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package agent

import (
"context"
"fmt"
"os"
"os/signal"
"time"
Expand Down Expand Up @@ -114,15 +113,15 @@ func AddAgentCommand(a *cli.App) *cobra.Command {
cmd.Flags().Uint32Var(&workerPort, "worker-port", 0, "Worker port")
cmd.Flags().Uint32Var(&ingestPort, "ingest-port", 0, "Ingest port")

cmd.Flags().StringVar(&logLevel, "log-level", "", "scheduler log level")
cmd.Flags().StringVar(&logLevel, "log-level", "", "Log level")
cmd.Flags().IntVar(&consumerPoolSize, "consumers", -1, "Size of the consumers pool.")
cmd.Flags().IntVar(&interval, "interval", 10, "the time interval, measured in seconds to update the in-memory store from the database")
cmd.Flags().StringVar(&executionMode, "mode", "", "Execution Mode (one of events, retry and default)")

return cmd
}

func startServerComponent(ctx context.Context, a *cli.App) error {
func startServerComponent(_ context.Context, a *cli.App) error {
cfg, err := config.Get()
if err != nil {
a.Logger.WithError(err).Fatal("Failed to load configuration")
Expand All @@ -139,10 +138,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error {
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
}

flag, err := fflag.NewFFlag(&cfg)
if err != nil {
a.Logger.WithError(err).Fatal("failed to create fflag controller")
}
flag := fflag.NewFFlag(&cfg)

lo := a.Logger.(*log.Logger)
lo.SetPrefix("api server")
Expand Down Expand Up @@ -172,7 +168,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error {

srv.SetHandler(evHandler.BuildDataPlaneRoutes())

fmt.Printf("Started convoy server in %s\n", time.Since(start))
log.Infof("Started convoy server in %s\n", time.Since(start))

httpConfig := cfg.Server.HTTP
if httpConfig.SSL {
Expand All @@ -181,7 +177,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error {
return nil
}

fmt.Printf("Starting Convoy Agent on port %v\n", cfg.Server.HTTP.AgentPort)
log.Println("Starting Convoy Agent on port %v\n", cfg.Server.HTTP.AgentPort)

go func() {
srv.Listen()
Expand Down
6 changes: 2 additions & 4 deletions cmd/ff/feature_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ func AddFeatureFlagsCommand() *cobra.Command {
if err != nil {
log.WithError(err).Fatal("Error fetching the config.")
}
f, err := fflag2.NewFFlag(&cfg)
if err != nil {
return err
}

f := fflag2.NewFFlag(&cfg)
return f.ListFeatures()
},
PersistentPostRun: func(cmd *cobra.Command, args []string) {},
Expand Down
52 changes: 38 additions & 14 deletions cmd/hooks/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/lib/pq"
"io"
"os"
"time"
Expand Down Expand Up @@ -318,21 +319,38 @@ func ensureInstanceConfig(ctx context.Context, a *cli.App, cfg config.Configurat
IsRetentionPolicyEnabled: cfg.RetentionPolicy.IsRetentionPolicyEnabled,
}

notificationThresholds := pq.Int64Array{}
for i := range cfg.CircuitBreaker.NotificationThresholds {
notificationThresholds = append(notificationThresholds, int64(cfg.CircuitBreaker.NotificationThresholds[i]))
}
circuitBreakerConfig := &datastore.CircuitBreakerConfig{
SampleRate: cfg.CircuitBreaker.SampleRate,
ErrorTimeout: cfg.CircuitBreaker.ErrorTimeout,
FailureCount: cfg.CircuitBreaker.FailureCount,
FailureThreshold: cfg.CircuitBreaker.FailureThreshold,
SuccessThreshold: cfg.CircuitBreaker.SuccessThreshold,
ObservabilityWindow: cfg.CircuitBreaker.ObservabilityWindow,
MinimumRequestCount: cfg.CircuitBreaker.MinimumRequestCount,
NotificationThresholds: notificationThresholds,
ConsecutiveFailureThreshold: cfg.CircuitBreaker.ConsecutiveFailureThreshold,
}

configuration, err := configRepo.LoadConfiguration(ctx)
if err != nil {
if errors.Is(err, datastore.ErrConfigNotFound) {
a.Logger.Info("Creating Instance Config")
cfg := &datastore.Configuration{
UID: ulid.Make().String(),
StoragePolicy: storagePolicy,
IsAnalyticsEnabled: cfg.Analytics.IsEnabled,
IsSignupEnabled: cfg.Auth.IsSignupEnabled,
RetentionPolicy: retentionPolicy,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
c := &datastore.Configuration{
UID: ulid.Make().String(),
StoragePolicy: storagePolicy,
IsAnalyticsEnabled: cfg.Analytics.IsEnabled,
IsSignupEnabled: cfg.Auth.IsSignupEnabled,
RetentionPolicy: retentionPolicy,
CircuitBreakerConfig: circuitBreakerConfig,
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}

return cfg, configRepo.CreateConfiguration(ctx, cfg)
return c, configRepo.CreateConfiguration(ctx, c)
}

return configuration, err
Expand All @@ -341,6 +359,7 @@ func ensureInstanceConfig(ctx context.Context, a *cli.App, cfg config.Configurat
configuration.StoragePolicy = storagePolicy
configuration.IsSignupEnabled = cfg.Auth.IsSignupEnabled
configuration.IsAnalyticsEnabled = cfg.Analytics.IsEnabled
configuration.CircuitBreakerConfig = circuitBreakerConfig
configuration.RetentionPolicy = retentionPolicy
configuration.UpdatedAt = time.Now()

Expand Down Expand Up @@ -542,32 +561,34 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {

}

flag, err := fflag2.NewFFlag(c)
if err != nil {
return nil, err
}
flag := fflag2.NewFFlag(c)
c.Metrics = config.MetricsConfiguration{
IsEnabled: false,
}

if flag.CanAccessFeature(fflag2.Prometheus) {
metricsBackend, err := cmd.Flags().GetString("metrics-backend")
if err != nil {
return nil, err
}

if !config.IsStringEmpty(metricsBackend) {
c.Metrics = config.MetricsConfiguration{
IsEnabled: false,
Backend: config.MetricsBackend(metricsBackend),
}

switch c.Metrics.Backend {
case config.PrometheusMetricsProvider:
sampleTime, err := cmd.Flags().GetUint64("metrics-prometheus-sample-time")
if err != nil {
return nil, err
}

if sampleTime < 1 {
return nil, errors.New("metrics-prometheus-sample-time must be non-zero")
}

c.Metrics = config.MetricsConfiguration{
IsEnabled: true,
Backend: config.MetricsBackend(metricsBackend),
Expand All @@ -577,14 +598,17 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
}
}
} else {
log.Warn("No metrics-backend specified")
log.Warn("metrics backend not specified")
}
} else {
log.Info(fflag2.ErrPrometheusMetricsNotEnabled)
}

maxRetrySeconds, err := cmd.Flags().GetUint64("max-retry-seconds")
if err != nil {
return nil, err
}

c.MaxRetrySeconds = maxRetrySeconds

return c, nil
Expand Down
4 changes: 2 additions & 2 deletions cmd/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func AddIngestCommand(a *cli.App) *cobra.Command {
}

cmd.Flags().Uint32Var(&ingestPort, "ingest-port", 0, "Ingest port")
cmd.Flags().StringVar(&logLevel, "log-level", "", "ingest log level")
cmd.Flags().StringVar(&logLevel, "log-level", "", "Log level")
cmd.Flags().IntVar(&interval, "interval", 10, "the time interval, measured in seconds, at which the database should be polled for new pub sub sources")

return cmd
Expand Down Expand Up @@ -119,7 +119,7 @@ func StartIngest(ctx context.Context, a *cli.App, cfg config.Configuration, inte

go ingest.Run()

fmt.Println("Starting Convoy Ingester")
log.Println("Starting Convoy Ingester")

return nil
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ func main() {
c.Flags().StringVar(&metricsBackend, "metrics-backend", "prometheus", "Metrics backend e.g. prometheus. ('prometheus' feature flag required")
c.Flags().Uint64Var(&prometheusMetricsSampleTime, "metrics-prometheus-sample-time", 5, "Prometheus metrics sample time")

c.Flags().StringVar(&retentionPolicy, "retention-policy", "", "SMTP Port")
c.Flags().BoolVar(&retentionPolicyEnabled, "retention-policy-enabled", false, "SMTP Port")
c.Flags().StringVar(&retentionPolicy, "retention-policy", "", "Retention Policy Duration")
c.Flags().BoolVar(&retentionPolicyEnabled, "retention-policy-enabled", false, "Retention Policy Enabled")

c.Flags().Uint64Var(&maxRetrySeconds, "max-retry-seconds", 7200, "Max retry seconds exponential backoff")

Expand Down
5 changes: 1 addition & 4 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,10 +106,7 @@ func startConvoyServer(a *cli.App) error {
a.Logger.WithError(err).Fatal("failed to initialize realm chain")
}

flag, err := fflag.NewFFlag(&cfg)
if err != nil {
a.Logger.WithError(err).Fatal("failed to create fflag controller")
}
flag := fflag.NewFFlag(&cfg)

if cfg.Server.HTTP.Port <= 0 {
return errors.New("please provide the HTTP port in the convoy.json file")
Expand Down
4 changes: 2 additions & 2 deletions cmd/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,12 @@ func AddStreamCommand(a *cli.App) *cobra.Command {
}

cmd.Flags().Uint32Var(&socketPort, "socket-port", 5008, "Socket port")
cmd.Flags().StringVar(&logLevel, "log-level", "error", "stream log level")
cmd.Flags().StringVar(&logLevel, "log-level", "", "Log level")

return cmd
}

func buildCliFlagConfiguration(cmd *cobra.Command) (*config.Configuration, error) {
func buildCliFlagConfiguration(_ *cobra.Command) (*config.Configuration, error) {
c := &config.Configuration{}

return c, nil
Expand Down
Loading
Loading