diff --git a/api/api.go b/api/api.go index 48fcf25a39..9915d47980 100644 --- a/api/api.go +++ b/api/api.go @@ -333,6 +333,7 @@ func (a *ApplicationHandler) BuildControlPlaneRoutes() *chi.Mux { e.With(handler.RequireEnabledProject()).Delete("/", handler.DeleteEndpoint) e.With(handler.RequireEnabledProject()).Put("/expire_secret", handler.ExpireSecret) e.With(handler.RequireEnabledProject()).Put("/pause", handler.PauseEndpoint) + e.With(handler.RequireEnabledProject()).Post("/activate", handler.ActivateEndpoint) }) }) diff --git a/api/handlers/configuration.go b/api/handlers/configuration.go index ad0d51ee25..c7e6dddcb8 100644 --- a/api/handlers/configuration.go +++ b/api/handlers/configuration.go @@ -17,24 +17,24 @@ import ( ) func (h *Handler) GetConfiguration(w http.ResponseWriter, r *http.Request) { - config, err := postgres.NewConfigRepo(h.A.DB).LoadConfiguration(r.Context()) + configuration, err := postgres.NewConfigRepo(h.A.DB).LoadConfiguration(r.Context()) if err != nil && !errors.Is(err, datastore.ErrConfigNotFound) { _ = render.Render(w, r, util.NewServiceErrResponse(err)) return } - configResponse := []*models.ConfigurationResponse{} - if config != nil { - if config.StoragePolicy.Type == datastore.S3 { + var configResponse []*models.ConfigurationResponse + if configuration != nil { + if configuration.StoragePolicy.Type == datastore.S3 { policy := &datastore.S3Storage{} - policy.Bucket = config.StoragePolicy.S3.Bucket - policy.Endpoint = config.StoragePolicy.S3.Endpoint - policy.Region = config.StoragePolicy.S3.Region - config.StoragePolicy.S3 = policy + policy.Bucket = configuration.StoragePolicy.S3.Bucket + policy.Endpoint = configuration.StoragePolicy.S3.Endpoint + policy.Region = configuration.StoragePolicy.S3.Region + configuration.StoragePolicy.S3 = policy } c := &models.ConfigurationResponse{ - Configuration: config, + Configuration: configuration, ApiVersion: convoy.GetVersion(), } @@ -61,14 +61,14 @@ func (h *Handler) CreateConfiguration(w http.ResponseWriter, r *http.Request) { NewConfig: &newConfig, } - config, err := cc.Run(r.Context()) + configuration, err := cc.Run(r.Context()) if err != nil { _ = render.Render(w, r, util.NewServiceErrResponse(err)) return } c := &models.ConfigurationResponse{ - Configuration: config, + Configuration: configuration, ApiVersion: convoy.GetVersion(), } @@ -92,14 +92,14 @@ func (h *Handler) UpdateConfiguration(w http.ResponseWriter, r *http.Request) { Config: &newConfig, } - config, err := uc.Run(r.Context()) + configuration, err := uc.Run(r.Context()) if err != nil { _ = render.Render(w, r, util.NewServiceErrResponse(err)) return } c := &models.ConfigurationResponse{ - Configuration: config, + Configuration: configuration, ApiVersion: convoy.GetVersion(), } diff --git a/api/handlers/endpoint.go b/api/handlers/endpoint.go index 28103ff580..5c3c7f058b 100644 --- a/api/handlers/endpoint.go +++ b/api/handlers/endpoint.go @@ -3,7 +3,11 @@ package handlers import ( "context" "encoding/json" + "fmt" + "github.com/frain-dev/convoy/pkg/circuit_breaker" + "github.com/frain-dev/convoy/pkg/msgpack" "net/http" + "time" "github.com/frain-dev/convoy/api/models" "github.com/frain-dev/convoy/database/postgres" @@ -207,9 +211,37 @@ func (h *Handler) GetEndpoints(w http.ResponseWriter, r *http.Request) { return } + // fetch keys from redis and mutate endpoints slice + keys := make([]string, len(endpoints)) + for i := 0; i < len(endpoints); i++ { + keys[i] = fmt.Sprintf("breaker:%s", endpoints[i].UID) + } + + cbs, err := h.A.Redis.MGet(r.Context(), keys...).Result() + if err != nil { + _ = render.Render(w, r, util.NewServiceErrResponse(err)) + return + } + + for i := 0; i < len(cbs); i++ { + if cbs[i] != nil { + str, ok := cbs[i].(string) + if ok { + var c circuit_breaker.CircuitBreaker + asBytes := []byte(str) + innerErr := msgpack.DecodeMsgPack(asBytes, &c) + if innerErr != nil { + continue + } + endpoints[i].FailureRate = c.FailureRate + } + } + } + resp := models.NewListResponse(endpoints, func(endpoint datastore.Endpoint) models.EndpointResponse { return models.EndpointResponse{Endpoint: &endpoint} }) + serverResponse := util.NewServerResponse( "Endpoints fetched successfully", models.PagedResponse{Content: &resp, Pagination: &paginationData}, http.StatusOK) @@ -410,7 +442,7 @@ func (h *Handler) ExpireSecret(w http.ResponseWriter, r *http.Request) { // PauseEndpoint // // @Summary Pause endpoint -// @Description This endpoint toggles an endpoint status between the active and paused states +// @Description Toggles an endpoint's status between active and paused states // @Id PauseEndpoint // @Tags Endpoints // @Accept json @@ -458,6 +490,76 @@ func (h *Handler) PauseEndpoint(w http.ResponseWriter, r *http.Request) { util.WriteResponse(w, r, resBytes, http.StatusAccepted) } +// ActivateEndpoint +// +// @Summary Activate endpoint +// @Description Activated an inactive endpoint +// @Id PauseEndpoint +// @Tags Endpoints +// @Accept json +// @Produce json +// @Param projectID path string true "Project ID" +// @Param endpointID path string true "Endpoint ID" +// @Success 202 {object} util.ServerResponse{data=models.EndpointResponse} +// @Failure 400,401,404 {object} util.ServerResponse{data=Stub} +// @Security ApiKeyAuth +// @Router /v1/projects/{projectID}/endpoints/{endpointID}/activate [post] +func (h *Handler) ActivateEndpoint(w http.ResponseWriter, r *http.Request) { + project, err := h.retrieveProject(r) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest)) + return + } + + aes := services.ActivateEndpointService{ + EndpointRepo: postgres.NewEndpointRepo(h.A.DB, h.A.Cache), + ProjectID: project.UID, + EndpointId: chi.URLParam(r, "endpointID"), + } + + endpoint, err := aes.Run(r.Context()) + if err != nil { + _ = render.Render(w, r, util.NewServiceErrResponse(err)) + return + } + + cbs, err := h.A.Redis.Get(r.Context(), fmt.Sprintf("breaker:%s", endpoint.UID)).Result() + if err != nil { + h.A.Logger.WithError(err).Error("failed to find circuit breaker") + } + + if len(cbs) > 0 { + c, innerErr := circuit_breaker.NewCircuitBreakerFromStore([]byte(cbs), h.A.Logger.(*log.Logger)) + if innerErr != nil { + h.A.Logger.WithError(innerErr).Error("failed to decode circuit breaker") + } else { + c.Reset(time.Now()) + b, msgPackErr := msgpack.EncodeMsgPack(c) + if msgPackErr != nil { + h.A.Logger.WithError(msgPackErr).Error("failed to encode circuit breaker") + } + h.A.Redis.Set(r.Context(), fmt.Sprintf("breaker:%s", endpoint.UID), b, time.Minute*5) + } + } + + resp := &models.EndpointResponse{Endpoint: endpoint} + serverResponse := util.NewServerResponse("endpoint status successfully activated", resp, http.StatusAccepted) + + rb, err := json.Marshal(serverResponse) + if err != nil { + _ = render.Render(w, r, util.NewServiceErrResponse(err)) + return + } + + resBytes, err := h.RM.VersionResponse(r, rb, "UpdateEndpoint") + if err != nil { + _ = render.Render(w, r, util.NewServiceErrResponse(err)) + return + } + + util.WriteResponse(w, r, resBytes, http.StatusAccepted) +} + func (h *Handler) retrieveEndpoint(ctx context.Context, endpointID, projectID string) (*datastore.Endpoint, error) { endpointRepo := postgres.NewEndpointRepo(h.A.DB, h.A.Cache) return endpointRepo.FindEndpointByID(ctx, endpointID, projectID) diff --git a/api/models/models.go b/api/models/models.go index fa225e0af1..b9604530af 100644 --- a/api/models/models.go +++ b/api/models/models.go @@ -154,7 +154,7 @@ type PortalLinkResponse struct { DeletedAt null.Time `json:"deleted_at,omitempty"` } -// NewListResponse is generic function for looping over +// NewListResponse is a generic function for looping over // a slice of type M and returning a slice of type T func NewListResponse[T, M any](items []M, fn func(item M) T) []T { results := make([]T, 0) diff --git a/api/server_suite_test.go b/api/server_suite_test.go index 669fbba395..ddb2ee9ac6 100644 --- a/api/server_suite_test.go +++ b/api/server_suite_test.go @@ -128,10 +128,13 @@ func buildServer() *ApplicationHandler { noopCache := ncache.NewNoopCache() r, _ := rlimiter.NewRedisLimiter(cfg.Redis.BuildDsn()) + rd, _ := rdb.NewClient(cfg.Redis.BuildDsn()) + ah, _ := NewApplicationHandler( &types.APIOptions{ DB: db, Queue: newQueue, + Redis: rd.Client(), Logger: logger, Cache: noopCache, Rate: r, diff --git a/api/testdb/seed.go b/api/testdb/seed.go index 728744f6b3..4a7d25d4cd 100644 --- a/api/testdb/seed.go +++ b/api/testdb/seed.go @@ -557,21 +557,23 @@ func SeedUser(db database.Database, email, password string) (*datastore.User, er } func SeedConfiguration(db database.Database) (*datastore.Configuration, error) { - config := &datastore.Configuration{ - UID: ulid.Make().String(), - IsAnalyticsEnabled: true, - IsSignupEnabled: true, - StoragePolicy: &datastore.DefaultStoragePolicy, + c := &datastore.Configuration{ + UID: ulid.Make().String(), + IsAnalyticsEnabled: true, + IsSignupEnabled: true, + StoragePolicy: &datastore.DefaultStoragePolicy, + RetentionPolicy: &datastore.DefaultRetentionPolicy, + CircuitBreakerConfig: &datastore.DefaultCircuitBreakerConfiguration, } // Seed Data configRepo := postgres.NewConfigRepo(db) - err := configRepo.CreateConfiguration(context.TODO(), config) + err := configRepo.CreateConfiguration(context.TODO(), c) if err != nil { return nil, err } - return config, nil + return c, nil } func SeedDevice(db database.Database, g *datastore.Project, endpointID string) error { diff --git a/api/types/types.go b/api/types/types.go index 6b706f2f14..defd48861d 100644 --- a/api/types/types.go +++ b/api/types/types.go @@ -9,6 +9,7 @@ import ( "github.com/frain-dev/convoy/internal/pkg/limiter" "github.com/frain-dev/convoy/pkg/log" "github.com/frain-dev/convoy/queue" + "github.com/redis/go-redis/v9" ) type ContextKey string @@ -16,6 +17,7 @@ type ContextKey string type APIOptions struct { FFlag *fflag.FFlag DB database.Database + Redis redis.UniversalClient Queue queue.Queuer Logger log.StdLogger Cache cache.Cache diff --git a/cmd/agent/agent.go b/cmd/agent/agent.go index 0628cb7f78..08274b9841 100644 --- a/cmd/agent/agent.go +++ b/cmd/agent/agent.go @@ -2,7 +2,6 @@ package agent import ( "context" - "fmt" "os" "os/signal" "time" @@ -26,8 +25,6 @@ import ( func AddAgentCommand(a *cli.App) *cobra.Command { var agentPort uint32 - var ingestPort uint32 - var workerPort uint32 var logLevel string var consumerPoolSize int var interval int @@ -110,11 +107,9 @@ func AddAgentCommand(a *cli.App) *cobra.Command { cmd.Flags().StringVar(&smtpUrl, "smtp-url", "", "SMTP provider URL") cmd.Flags().Uint32Var(&smtpPort, "smtp-port", 0, "SMTP Port") - cmd.Flags().Uint32Var(&agentPort, "agent-port", 0, "Agent port") - cmd.Flags().Uint32Var(&workerPort, "worker-port", 0, "Worker port") - cmd.Flags().Uint32Var(&ingestPort, "ingest-port", 0, "Ingest port") + cmd.Flags().Uint32Var(&agentPort, "port", 0, "Agent port") - cmd.Flags().StringVar(&logLevel, "log-level", "", "scheduler log level") + cmd.Flags().StringVar(&logLevel, "log-level", "", "Log level") cmd.Flags().IntVar(&consumerPoolSize, "consumers", -1, "Size of the consumers pool.") cmd.Flags().IntVar(&interval, "interval", 10, "the time interval, measured in seconds to update the in-memory store from the database") cmd.Flags().StringVar(&executionMode, "mode", "", "Execution Mode (one of events, retry and default)") @@ -122,30 +117,27 @@ func AddAgentCommand(a *cli.App) *cobra.Command { return cmd } -func startServerComponent(ctx context.Context, a *cli.App) error { +func startServerComponent(_ context.Context, a *cli.App) error { + lo := a.Logger.(*log.Logger) + lo.SetPrefix("agent") + cfg, err := config.Get() if err != nil { - a.Logger.WithError(err).Fatal("Failed to load configuration") + lo.WithError(err).Fatal("Failed to load configuration") } start := time.Now() - a.Logger.Info("Starting Convoy data plane ...") + lo.Info("Starting Convoy data plane") apiKeyRepo := postgres.NewAPIKeyRepo(a.DB, a.Cache) userRepo := postgres.NewUserRepo(a.DB, a.Cache) portalLinkRepo := postgres.NewPortalLinkRepo(a.DB, a.Cache) err = realm_chain.Init(&cfg.Auth, apiKeyRepo, userRepo, portalLinkRepo, a.Cache) if err != nil { - a.Logger.WithError(err).Fatal("failed to initialize realm chain") + lo.WithError(err).Fatal("failed to initialize realm chain") } - flag, err := fflag.NewFFlag(&cfg) - if err != nil { - a.Logger.WithError(err).Fatal("failed to create fflag controller") - } - - lo := a.Logger.(*log.Logger) - lo.SetPrefix("api server") + flag := fflag.NewFFlag(&cfg) lvl, err := log.ParseLevel(cfg.Logger.Level) if err != nil { @@ -164,6 +156,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error { Logger: lo, Cache: a.Cache, Rate: a.Rate, + Redis: a.Redis, Licenser: a.Licenser, }) if err != nil { @@ -172,7 +165,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error { srv.SetHandler(evHandler.BuildDataPlaneRoutes()) - fmt.Printf("Started convoy server in %s\n", time.Since(start)) + lo.Infof("Started convoy server in %s", time.Since(start)) httpConfig := cfg.Server.HTTP if httpConfig.SSL { @@ -181,7 +174,7 @@ func startServerComponent(ctx context.Context, a *cli.App) error { return nil } - fmt.Printf("Starting Convoy Agent on port %v\n", cfg.Server.HTTP.AgentPort) + lo.Infof("Starting Convoy Agent on port %v", cfg.Server.HTTP.AgentPort) go func() { srv.Listen() @@ -194,7 +187,7 @@ func buildAgentCliConfiguration(cmd *cobra.Command) (*config.Configuration, erro c := &config.Configuration{} // PORT - port, err := cmd.Flags().GetUint32("agent-port") + port, err := cmd.Flags().GetUint32("port") if err != nil { return nil, err } @@ -203,25 +196,6 @@ func buildAgentCliConfiguration(cmd *cobra.Command) (*config.Configuration, erro c.Server.HTTP.AgentPort = port } - ingestPort, err := cmd.Flags().GetUint32("ingest-port") - if err != nil { - return nil, err - } - - if ingestPort != 0 { - c.Server.HTTP.IngestPort = ingestPort - } - - // CONVOY_WORKER_PORT - workerPort, err := cmd.Flags().GetUint32("worker-port") - if err != nil { - return nil, err - } - - if workerPort != 0 { - c.Server.HTTP.WorkerPort = workerPort - } - logLevel, err := cmd.Flags().GetString("log-level") if err != nil { return nil, err diff --git a/cmd/ff/feature_flags.go b/cmd/ff/feature_flags.go index c72e7eea04..88e578bf50 100644 --- a/cmd/ff/feature_flags.go +++ b/cmd/ff/feature_flags.go @@ -20,10 +20,8 @@ func AddFeatureFlagsCommand() *cobra.Command { if err != nil { log.WithError(err).Fatal("Error fetching the config.") } - f, err := fflag2.NewFFlag(&cfg) - if err != nil { - return err - } + + f := fflag2.NewFFlag(&cfg) return f.ListFeatures() }, PersistentPostRun: func(cmd *cobra.Command, args []string) {}, diff --git a/cmd/hooks/hooks.go b/cmd/hooks/hooks.go index 838097a20b..a838ae4761 100644 --- a/cmd/hooks/hooks.go +++ b/cmd/hooks/hooks.go @@ -122,6 +122,11 @@ func PreRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args [ lo := log.NewLogger(os.Stdout) + rd, err := rdb.NewClient(cfg.Redis.BuildDsn()) + if err != nil { + return err + } + ca, err = cache.NewCache(cfg.Redis) if err != nil { return err @@ -155,6 +160,7 @@ func PreRun(app *cli.App, db *postgres.Postgres) func(cmd *cobra.Command, args [ } } + app.Redis = rd.Client() app.DB = postgresDB app.Queue = q app.Logger = lo @@ -327,21 +333,32 @@ func ensureInstanceConfig(ctx context.Context, a *cli.App, cfg config.Configurat IsRetentionPolicyEnabled: cfg.RetentionPolicy.IsRetentionPolicyEnabled, } + circuitBreakerConfig := &datastore.CircuitBreakerConfig{ + SampleRate: cfg.CircuitBreaker.SampleRate, + ErrorTimeout: cfg.CircuitBreaker.ErrorTimeout, + FailureThreshold: cfg.CircuitBreaker.FailureThreshold, + SuccessThreshold: cfg.CircuitBreaker.SuccessThreshold, + ObservabilityWindow: cfg.CircuitBreaker.ObservabilityWindow, + MinimumRequestCount: cfg.CircuitBreaker.MinimumRequestCount, + ConsecutiveFailureThreshold: cfg.CircuitBreaker.ConsecutiveFailureThreshold, + } + configuration, err := configRepo.LoadConfiguration(ctx) if err != nil { if errors.Is(err, datastore.ErrConfigNotFound) { a.Logger.Info("Creating Instance Config") - cfg := &datastore.Configuration{ - UID: ulid.Make().String(), - StoragePolicy: storagePolicy, - IsAnalyticsEnabled: cfg.Analytics.IsEnabled, - IsSignupEnabled: cfg.Auth.IsSignupEnabled, - RetentionPolicy: retentionPolicy, - CreatedAt: time.Now(), - UpdatedAt: time.Now(), + c := &datastore.Configuration{ + UID: ulid.Make().String(), + StoragePolicy: storagePolicy, + IsAnalyticsEnabled: cfg.Analytics.IsEnabled, + IsSignupEnabled: cfg.Auth.IsSignupEnabled, + RetentionPolicy: retentionPolicy, + CircuitBreakerConfig: circuitBreakerConfig, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), } - return cfg, configRepo.CreateConfiguration(ctx, cfg) + return c, configRepo.CreateConfiguration(ctx, c) } return configuration, err @@ -350,6 +367,7 @@ func ensureInstanceConfig(ctx context.Context, a *cli.App, cfg config.Configurat configuration.StoragePolicy = storagePolicy configuration.IsSignupEnabled = cfg.Auth.IsSignupEnabled configuration.IsAnalyticsEnabled = cfg.Analytics.IsEnabled + configuration.CircuitBreakerConfig = circuitBreakerConfig configuration.RetentionPolicy = retentionPolicy configuration.UpdatedAt = time.Now() @@ -567,32 +585,34 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) { } - flag, err := fflag2.NewFFlag(c) - if err != nil { - return nil, err - } + flag := fflag2.NewFFlag(c) c.Metrics = config.MetricsConfiguration{ IsEnabled: false, } + if flag.CanAccessFeature(fflag2.Prometheus) { metricsBackend, err := cmd.Flags().GetString("metrics-backend") if err != nil { return nil, err } + if !config.IsStringEmpty(metricsBackend) { c.Metrics = config.MetricsConfiguration{ IsEnabled: false, Backend: config.MetricsBackend(metricsBackend), } + switch c.Metrics.Backend { case config.PrometheusMetricsProvider: sampleTime, err := cmd.Flags().GetUint64("metrics-prometheus-sample-time") if err != nil { return nil, err } + if sampleTime < 1 { return nil, errors.New("metrics-prometheus-sample-time must be non-zero") } + c.Metrics = config.MetricsConfiguration{ IsEnabled: true, Backend: config.MetricsBackend(metricsBackend), @@ -602,14 +622,17 @@ func buildCliConfiguration(cmd *cobra.Command) (*config.Configuration, error) { } } } else { - log.Warn("No metrics-backend specified") + log.Warn("metrics backend not specified") } + } else { + log.Info(fflag2.ErrPrometheusMetricsNotEnabled) } maxRetrySeconds, err := cmd.Flags().GetUint64("max-retry-seconds") if err != nil { return nil, err } + c.MaxRetrySeconds = maxRetrySeconds return c, nil diff --git a/cmd/ingest/ingest.go b/cmd/ingest/ingest.go index 334f7bdbd1..27b89e6619 100644 --- a/cmd/ingest/ingest.go +++ b/cmd/ingest/ingest.go @@ -65,7 +65,7 @@ func AddIngestCommand(a *cli.App) *cobra.Command { } cmd.Flags().Uint32Var(&ingestPort, "ingest-port", 0, "Ingest port") - cmd.Flags().StringVar(&logLevel, "log-level", "", "ingest log level") + cmd.Flags().StringVar(&logLevel, "log-level", "", "Log level") cmd.Flags().IntVar(&interval, "interval", 10, "the time interval, measured in seconds, at which the database should be polled for new pub sub sources") return cmd @@ -119,7 +119,7 @@ func StartIngest(ctx context.Context, a *cli.App, cfg config.Configuration, inte go ingest.Run() - fmt.Println("Starting Convoy Ingester") + log.Println("Starting Convoy Ingester") return nil } diff --git a/cmd/main.go b/cmd/main.go index f76ac4963d..22d530c523 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -120,8 +120,8 @@ func main() { c.Flags().StringVar(&metricsBackend, "metrics-backend", "prometheus", "Metrics backend e.g. prometheus. ('prometheus' feature flag required") c.Flags().Uint64Var(&prometheusMetricsSampleTime, "metrics-prometheus-sample-time", 5, "Prometheus metrics sample time") - c.Flags().StringVar(&retentionPolicy, "retention-policy", "", "SMTP Port") - c.Flags().BoolVar(&retentionPolicyEnabled, "retention-policy-enabled", false, "SMTP Port") + c.Flags().StringVar(&retentionPolicy, "retention-policy", "", "Retention Policy Duration") + c.Flags().BoolVar(&retentionPolicyEnabled, "retention-policy-enabled", false, "Retention Policy Enabled") c.Flags().Uint64Var(&maxRetrySeconds, "max-retry-seconds", 7200, "Max retry seconds exponential backoff") diff --git a/cmd/server/server.go b/cmd/server/server.go index 904951bfac..edfe0c7a51 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -107,10 +107,7 @@ func startConvoyServer(a *cli.App) error { a.Logger.WithError(err).Fatal("failed to initialize realm chain") } - flag, err := fflag.NewFFlag(&cfg) - if err != nil { - a.Logger.WithError(err).Fatal("failed to create fflag controller") - } + flag := fflag.NewFFlag(&cfg) if cfg.Server.HTTP.Port <= 0 { return errors.New("please provide the HTTP port in the convoy.json file") @@ -133,6 +130,7 @@ func startConvoyServer(a *cli.App) error { DB: a.DB, Queue: a.Queue, Logger: lo, + Redis: a.Redis, Cache: a.Cache, Rate: a.Rate, Licenser: a.Licenser, @@ -157,7 +155,7 @@ func startConvoyServer(a *cli.App) error { s.RegisterTask("0 0 * * *", convoy.ScheduleQueue, convoy.RetentionPolicies) s.RegisterTask("0 * * * *", convoy.ScheduleQueue, convoy.TokenizeSearch) - metrics.RegisterQueueMetrics(a.Queue, a.DB) + metrics.RegisterQueueMetrics(a.Queue, a.DB, nil) // Start scheduler s.Start() diff --git a/cmd/stream/stream.go b/cmd/stream/stream.go index 72d37baf48..9a5275aebf 100644 --- a/cmd/stream/stream.go +++ b/cmd/stream/stream.go @@ -136,12 +136,12 @@ func AddStreamCommand(a *cli.App) *cobra.Command { } cmd.Flags().Uint32Var(&socketPort, "socket-port", 5008, "Socket port") - cmd.Flags().StringVar(&logLevel, "log-level", "error", "stream log level") + cmd.Flags().StringVar(&logLevel, "log-level", "", "Log level") return cmd } -func buildCliFlagConfiguration(cmd *cobra.Command) (*config.Configuration, error) { +func buildCliFlagConfiguration(_ *cobra.Command) (*config.Configuration, error) { c := &config.Configuration{} return c, nil diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 546e1f2d09..547c28b24b 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -3,13 +3,15 @@ package worker import ( "context" "fmt" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/internal/pkg/fflag" "net/http" + "strings" "github.com/frain-dev/convoy" "github.com/frain-dev/convoy/config" "github.com/frain-dev/convoy/database/postgres" "github.com/frain-dev/convoy/internal/pkg/cli" - fflag2 "github.com/frain-dev/convoy/internal/pkg/fflag" "github.com/frain-dev/convoy/internal/pkg/limiter" "github.com/frain-dev/convoy/internal/pkg/loader" "github.com/frain-dev/convoy/internal/pkg/memorystore" @@ -19,6 +21,8 @@ import ( "github.com/frain-dev/convoy/internal/pkg/smtp" "github.com/frain-dev/convoy/internal/telemetry" "github.com/frain-dev/convoy/net" + cb "github.com/frain-dev/convoy/pkg/circuit_breaker" + "github.com/frain-dev/convoy/pkg/clock" "github.com/frain-dev/convoy/pkg/log" "github.com/frain-dev/convoy/queue" redisQueue "github.com/frain-dev/convoy/queue/redis" @@ -131,7 +135,7 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte sc, err := smtp.NewClient(&cfg.SMTP) if err != nil { - a.Logger.WithError(err).Error("Failed to create smtp client") + lo.WithError(err).Error("Failed to create smtp client") return err } @@ -225,11 +229,11 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte configuration, err := configRepo.LoadConfiguration(context.Background()) if err != nil { - a.Logger.WithError(err).Fatal("Failed to instance configuration") + lo.WithError(err).Fatal("Failed to instance configuration") return err } - subscriptionsLoader := loader.NewSubscriptionLoader(subRepo, projectRepo, a.Logger, 0) + subscriptionsLoader := loader.NewSubscriptionLoader(subRepo, projectRepo, lo, 0) subscriptionsTable := memorystore.NewTable(memorystore.OptionSyncer(subscriptionsLoader)) err = memorystore.DefaultStore.Register("subscriptions", subscriptionsTable) @@ -245,17 +249,59 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte go memorystore.DefaultStore.Sync(ctx, interval) - newTelemetry := telemetry.NewTelemetry(a.Logger.(*log.Logger), configuration, + newTelemetry := telemetry.NewTelemetry(lo, configuration, telemetry.OptionTracker(counter), telemetry.OptionBackend(pb), telemetry.OptionBackend(mb)) dispatcher, err := net.NewDispatcher(cfg.Server.HTTP.HttpProxy, a.Licenser, false) if err != nil { - a.Logger.WithError(err).Fatal("Failed to create new net dispatcher") + lo.WithError(err).Fatal("Failed to create new net dispatcher") return err } + featureFlag := fflag.NewFFlag(&cfg) + var circuitBreakerManager *cb.CircuitBreakerManager + + if featureFlag.CanAccessFeature(fflag.CircuitBreaker) { + circuitBreakerManager, err = cb.NewCircuitBreakerManager( + cb.ConfigOption(configuration.ToCircuitBreakerConfig()), + cb.StoreOption(cb.NewRedisStore(rd.Client(), clock.NewRealClock())), + cb.ClockOption(clock.NewRealClock()), + cb.LoggerOption(lo), + cb.NotificationFunctionOption(func(n cb.NotificationType, c cb.CircuitBreakerConfig, b cb.CircuitBreaker) error { + endpointId := strings.Split(b.Key, ":")[1] + project, funcErr := projectRepo.FetchProjectByID(ctx, b.TenantId) + if funcErr != nil { + return funcErr + } + + endpoint, funcErr := endpointRepo.FindEndpointByID(ctx, endpointId, b.TenantId) + if funcErr != nil { + return funcErr + } + + switch n { + case cb.TypeDisableResource: + breakerErr := endpointRepo.UpdateEndpointStatus(ctx, project.UID, endpoint.UID, datastore.InactiveEndpointStatus) + if breakerErr != nil { + return breakerErr + } + default: + return fmt.Errorf("unsupported circuit breaker notification type: %s", n) + } + return nil + }), + ) + if err != nil { + lo.WithError(err).Fatal("Failed to create circuit breaker manager") + } + + go circuitBreakerManager.Start(ctx, attemptRepo.GetFailureAndSuccessCounts) + } else { + lo.Warn(fflag.ErrCircuitBreakerNotEnabled) + } + channels := make(map[string]task.EventChannel) defaultCh, broadcastCh, dynamicCh := task.NewDefaultEventChannel(), task.NewBroadcastEventChannel(subscriptionsTable), task.NewDynamicEventChannel() channels["default"] = defaultCh @@ -271,6 +317,8 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte rateLimiter, dispatcher, attemptRepo, + circuitBreakerManager, + featureFlag, ), newTelemetry) consumer.RegisterHandlers(convoy.CreateEventProcessor, task.ProcessEventCreation( @@ -286,11 +334,14 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte consumer.RegisterHandlers(convoy.RetryEventProcessor, task.ProcessRetryEventDelivery( endpointRepo, eventDeliveryRepo, + a.Licenser, projectRepo, a.Queue, rateLimiter, dispatcher, attemptRepo, + circuitBreakerManager, + featureFlag, ), newTelemetry) consumer.RegisterHandlers(convoy.CreateBroadcastEventProcessor, task.ProcessBroadcastEventCreation( @@ -340,11 +391,7 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte consumer.RegisterHandlers(convoy.DailyAnalytics, task.PushDailyTelemetry(lo, a.DB, a.Cache, rd), nil) consumer.RegisterHandlers(convoy.EmailProcessor, task.ProcessEmails(sc), nil) - fflag, err := fflag2.NewFFlag(&cfg) - if err != nil { - return nil - } - if fflag.CanAccessFeature(fflag2.FullTextSearch) && a.Licenser.AdvancedWebhookFiltering() { + if featureFlag.CanAccessFeature(fflag.FullTextSearch) && a.Licenser.AdvancedWebhookFiltering() { consumer.RegisterHandlers(convoy.TokenizeSearch, task.GeneralTokenizerHandler(projectRepo, eventRepo, jobRepo, rd), nil) consumer.RegisterHandlers(convoy.TokenizeSearchForProject, task.TokenizerHandler(eventRepo, jobRepo), nil) } @@ -353,11 +400,11 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte consumer.RegisterHandlers(convoy.MetaEventProcessor, task.ProcessMetaEvent(projectRepo, metaEventRepo, dispatcher), nil) consumer.RegisterHandlers(convoy.DeleteArchivedTasksProcessor, task.DeleteArchivedTasks(a.Queue, rd), nil) - metrics.RegisterQueueMetrics(a.Queue, a.DB) + metrics.RegisterQueueMetrics(a.Queue, a.DB, circuitBreakerManager) // start worker consumer.Start() - fmt.Println("Starting Convoy Consumer Pool") + lo.Println("Starting Convoy Consumer Pool") return ctx.Err() } diff --git a/config/config.go b/config/config.go index c1f46dc02b..613fa4f561 100644 --- a/config/config.go +++ b/config/config.go @@ -74,6 +74,15 @@ var DefaultConfiguration = Configuration{ Policy: "720h", IsRetentionPolicyEnabled: false, }, + CircuitBreaker: CircuitBreakerConfiguration{ + SampleRate: 30, + ErrorTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 5, + ObservabilityWindow: 5, + MinimumRequestCount: 10, + ConsecutiveFailureThreshold: 10, + }, Auth: AuthConfiguration{ IsSignupEnabled: true, Native: NativeRealmOptions{ @@ -262,6 +271,16 @@ type RetentionPolicyConfiguration struct { IsRetentionPolicyEnabled bool `json:"enabled" envconfig:"CONVOY_RETENTION_POLICY_ENABLED"` } +type CircuitBreakerConfiguration struct { + SampleRate uint64 `json:"sample_rate" envconfig:"CONVOY_CIRCUIT_BREAKER_SAMPLE_RATE"` + ErrorTimeout uint64 `json:"error_timeout" envconfig:"CONVOY_CIRCUIT_BREAKER_ERROR_TIMEOUT"` + FailureThreshold uint64 `json:"failure_threshold" envconfig:"CONVOY_CIRCUIT_BREAKER_FAILURE_THRESHOLD"` + SuccessThreshold uint64 `json:"success_threshold" envconfig:"CONVOY_CIRCUIT_BREAKER_SUCCESS_THRESHOLD"` + MinimumRequestCount uint64 `json:"minimum_request_count" envconfig:"CONVOY_MINIMUM_REQUEST_COUNT"` + ObservabilityWindow uint64 `json:"observability_window" envconfig:"CONVOY_CIRCUIT_BREAKER_OBSERVABILITY_WINDOW"` + ConsecutiveFailureThreshold uint64 `json:"consecutive_failure_threshold" envconfig:"CONVOY_CIRCUIT_BREAKER_CONSECUTIVE_FAILURE_THRESHOLD"` +} + type AnalyticsConfiguration struct { IsEnabled bool `json:"enabled" envconfig:"CONVOY_ANALYTICS_ENABLED"` } @@ -358,6 +377,7 @@ type Configuration struct { CustomDomainSuffix string `json:"custom_domain_suffix" envconfig:"CONVOY_CUSTOM_DOMAIN_SUFFIX"` EnableFeatureFlag []string `json:"enable_feature_flag" envconfig:"CONVOY_ENABLE_FEATURE_FLAG"` RetentionPolicy RetentionPolicyConfiguration `json:"retention_policy"` + CircuitBreaker CircuitBreakerConfiguration `json:"circuit_breaker"` Analytics AnalyticsConfiguration `json:"analytics"` StoragePolicy StoragePolicyConfiguration `json:"storage_policy"` ConsumerPoolSize int `json:"consumer_pool_size" envconfig:"CONVOY_CONSUMER_POOL_SIZE"` diff --git a/config/config_test.go b/config/config_test.go index 3227f8b347..7d594cf955 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -125,6 +125,15 @@ func TestLoadConfig(t *testing.T) { Policy: "720h", IsRetentionPolicyEnabled: true, }, + CircuitBreaker: CircuitBreakerConfiguration{ + SampleRate: 30, + ErrorTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 5, + ObservabilityWindow: 5, + MinimumRequestCount: 10, + ConsecutiveFailureThreshold: 10, + }, Server: ServerConfiguration{ HTTP: HTTPServerConfiguration{ Port: 80, @@ -197,6 +206,15 @@ func TestLoadConfig(t *testing.T) { Port: 5432, SetConnMaxLifetime: 3600, }, + CircuitBreaker: CircuitBreakerConfiguration{ + SampleRate: 30, + ErrorTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 5, + ObservabilityWindow: 5, + MinimumRequestCount: 10, + ConsecutiveFailureThreshold: 10, + }, Redis: RedisConfiguration{ Scheme: "redis", Host: "localhost", @@ -264,6 +282,15 @@ func TestLoadConfig(t *testing.T) { Host: "localhost:5005", RetentionPolicy: RetentionPolicyConfiguration{Policy: "720h"}, ConsumerPoolSize: 100, + CircuitBreaker: CircuitBreakerConfiguration{ + SampleRate: 30, + ErrorTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 5, + ObservabilityWindow: 5, + MinimumRequestCount: 10, + ConsecutiveFailureThreshold: 10, + }, Database: DatabaseConfiguration{ Type: PostgresDatabaseProvider, Scheme: "postgres", diff --git a/database/postgres/configuration.go b/database/postgres/configuration.go index 30fcbf0e01..d369b8c1f8 100644 --- a/database/postgres/configuration.go +++ b/database/postgres/configuration.go @@ -19,9 +19,13 @@ const ( storage_policy_type, on_prem_path, s3_prefix, s3_bucket, s3_access_key, s3_secret_key, s3_region, s3_session_token, s3_endpoint, - retention_policy_policy, retention_policy_enabled + retention_policy_policy, retention_policy_enabled, + cb_sample_rate,cb_error_timeout, + cb_failure_threshold, cb_success_threshold, + cb_observability_window, + cb_consecutive_failure_threshold, cb_minimum_request_count ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14); + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21); ` fetchConfiguration = ` @@ -40,6 +44,13 @@ const ( s3_session_token AS "storage_policy.s3.session_token", s3_endpoint AS "storage_policy.s3.endpoint", s3_prefix AS "storage_policy.s3.prefix", + cb_sample_rate AS "circuit_breaker.sample_rate", + cb_error_timeout AS "circuit_breaker.error_timeout", + cb_failure_threshold AS "circuit_breaker.failure_threshold", + cb_success_threshold AS "circuit_breaker.success_threshold", + cb_observability_window AS "circuit_breaker.observability_window", + cb_minimum_request_count as "circuit_breaker.minimum_request_count", + cb_consecutive_failure_threshold AS "circuit_breaker.consecutive_failure_threshold", created_at, updated_at, deleted_at @@ -64,6 +75,13 @@ const ( s3_prefix = $12, retention_policy_policy = $13, retention_policy_enabled = $14, + cb_sample_rate = $15, + cb_error_timeout = $16, + cb_failure_threshold = $17, + cb_success_threshold = $18, + cb_observability_window = $19, + cb_consecutive_failure_threshold = $20, + cb_minimum_request_count = $21, updated_at = NOW() WHERE id = $1 AND deleted_at IS NULL; ` @@ -95,6 +113,7 @@ func (c *configRepo) CreateConfiguration(ctx context.Context, config *datastore. } rc := config.GetRetentionPolicyConfig() + cb := config.GetCircuitBreakerConfig() r, err := c.db.ExecContext(ctx, createConfiguration, config.UID, @@ -111,6 +130,13 @@ func (c *configRepo) CreateConfiguration(ctx context.Context, config *datastore. config.StoragePolicy.S3.Endpoint, rc.Policy, rc.IsRetentionPolicyEnabled, + cb.SampleRate, + cb.ErrorTimeout, + cb.FailureThreshold, + cb.SuccessThreshold, + cb.ObservabilityWindow, + cb.ConsecutiveFailureThreshold, + cb.MinimumRequestCount, ) if err != nil { return err @@ -159,6 +185,7 @@ func (c *configRepo) UpdateConfiguration(ctx context.Context, cfg *datastore.Con } rc := cfg.GetRetentionPolicyConfig() + cb := cfg.GetCircuitBreakerConfig() result, err := c.db.ExecContext(ctx, updateConfiguration, cfg.UID, @@ -175,6 +202,13 @@ func (c *configRepo) UpdateConfiguration(ctx context.Context, cfg *datastore.Con cfg.StoragePolicy.S3.Prefix, rc.Policy, rc.IsRetentionPolicyEnabled, + cb.SampleRate, + cb.ErrorTimeout, + cb.FailureThreshold, + cb.SuccessThreshold, + cb.ObservabilityWindow, + cb.ConsecutiveFailureThreshold, + cb.MinimumRequestCount, ) if err != nil { return err diff --git a/database/postgres/configuration_test.go b/database/postgres/configuration_test.go index cfe2ecf51c..1fc9411a1d 100644 --- a/database/postgres/configuration_test.go +++ b/database/postgres/configuration_test.go @@ -93,10 +93,6 @@ func generateConfig() *datastore.Configuration { UID: ulid.Make().String(), IsAnalyticsEnabled: true, IsSignupEnabled: false, - RetentionPolicy: &datastore.RetentionPolicyConfiguration{ - Policy: "720h", - IsRetentionPolicyEnabled: true, - }, StoragePolicy: &datastore.StoragePolicyConfiguration{ Type: datastore.OnPrem, S3: &datastore.S3Storage{ @@ -112,5 +108,17 @@ func generateConfig() *datastore.Configuration { Path: null.NewString("path", true), }, }, + RetentionPolicy: &datastore.RetentionPolicyConfiguration{ + Policy: "720h", + IsRetentionPolicyEnabled: true, + }, + CircuitBreakerConfig: &datastore.CircuitBreakerConfig{ + SampleRate: 30, + ErrorTimeout: 30, + FailureThreshold: 10, + SuccessThreshold: 5, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 10, + }, } } diff --git a/database/postgres/delivery_attempts.go b/database/postgres/delivery_attempts.go index 41d39d5723..f525c0f5e0 100644 --- a/database/postgres/delivery_attempts.go +++ b/database/postgres/delivery_attempts.go @@ -4,8 +4,10 @@ import ( "context" "database/sql" "errors" + "fmt" "github.com/frain-dev/convoy/database" "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/pkg/circuit_breaker" "github.com/jmoiron/sqlx" "io" "time" @@ -130,6 +132,66 @@ func (d *deliveryAttemptRepo) DeleteProjectDeliveriesAttempts(ctx context.Contex return nil } +func (d *deliveryAttemptRepo) GetFailureAndSuccessCounts(ctx context.Context, lookBackDuration uint64, resetTimes map[string]time.Time) (map[string]circuit_breaker.PollResult, error) { + resultsMap := map[string]circuit_breaker.PollResult{} + + query := ` + SELECT + endpoint_id AS key, + project_id AS tenant_id, + COUNT(CASE WHEN status = false THEN 1 END) AS failures, + COUNT(CASE WHEN status = true THEN 1 END) AS successes + FROM convoy.delivery_attempts + WHERE created_at >= NOW() - MAKE_INTERVAL(mins := $1) + group by endpoint_id, project_id; + ` + + rows, err := d.db.QueryxContext(ctx, query, lookBackDuration) + if err != nil { + return nil, err + } + defer rows.Close() + + for rows.Next() { + var rowValue circuit_breaker.PollResult + if rowScanErr := rows.StructScan(&rowValue); rowScanErr != nil { + return nil, rowScanErr + } + resultsMap[rowValue.Key] = rowValue + } + + // this is an n+1 query? yikes + query2 := ` + SELECT + endpoint_id AS key, + project_id AS tenant_id, + COUNT(CASE WHEN status = false THEN 1 END) AS failures, + COUNT(CASE WHEN status = true THEN 1 END) AS successes + FROM convoy.delivery_attempts + WHERE endpoint_id = '%s' AND created_at >= TIMESTAMP '%s' AT TIME ZONE 'UTC' + group by endpoint_id, project_id; + ` + + customFormat := "2006-01-02 15:04:05" + for k, t := range resetTimes { + // remove the old key so it doesn't pollute the results + delete(resultsMap, k) + qq := fmt.Sprintf(query2, k, t.Format(customFormat)) + + var rowValue circuit_breaker.PollResult + err = d.db.QueryRowxContext(ctx, qq).StructScan(&rowValue) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + continue + } + } + + resultsMap[k] = rowValue + } + + return resultsMap, nil +} + func (d *deliveryAttemptRepo) ExportRecords(ctx context.Context, projectID string, createdAt time.Time, w io.Writer) (int64, error) { return exportRecords(ctx, d.db, "convoy.delivery_attempts", projectID, createdAt, w) } diff --git a/datastore/models.go b/datastore/models.go index f5877559c8..468a210bb8 100644 --- a/datastore/models.go +++ b/datastore/models.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + cb "github.com/frain-dev/convoy/pkg/circuit_breaker" "math" "net/http" "strings" @@ -317,6 +318,15 @@ var ( IsRetentionPolicyEnabled: false, Policy: "720h", } + + DefaultCircuitBreakerConfiguration = CircuitBreakerConfig{ + SampleRate: 30, + ErrorTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 5, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 10, + } ) func GetDefaultSignatureConfig() *SignatureConfiguration { @@ -406,8 +416,9 @@ type Endpoint struct { Events int64 `json:"events,omitempty" db:"event_count"` Authentication *EndpointAuthentication `json:"authentication" db:"authentication"` - RateLimit int `json:"rate_limit" db:"rate_limit"` - RateLimitDuration uint64 `json:"rate_limit_duration" db:"rate_limit_duration"` + RateLimit int `json:"rate_limit" db:"rate_limit"` + RateLimitDuration uint64 `json:"rate_limit_duration" db:"rate_limit_duration"` + FailureRate float64 `json:"failure_rate" db:"-"` CreatedAt time.Time `json:"created_at,omitempty" db:"created_at,omitempty" swaggertype:"string"` UpdatedAt time.Time `json:"updated_at,omitempty" db:"updated_at,omitempty" swaggertype:"string"` @@ -1324,17 +1335,38 @@ type Organisation struct { } type Configuration struct { - UID string `json:"uid" db:"id"` - IsAnalyticsEnabled bool `json:"is_analytics_enabled" db:"is_analytics_enabled"` - IsSignupEnabled bool `json:"is_signup_enabled" db:"is_signup_enabled"` - StoragePolicy *StoragePolicyConfiguration `json:"storage_policy" db:"storage_policy"` - RetentionPolicy *RetentionPolicyConfiguration `json:"retention_policy" db:"retention_policy"` + UID string `json:"uid" db:"id"` + IsAnalyticsEnabled bool `json:"is_analytics_enabled" db:"is_analytics_enabled"` + IsSignupEnabled bool `json:"is_signup_enabled" db:"is_signup_enabled"` + + StoragePolicy *StoragePolicyConfiguration `json:"storage_policy" db:"storage_policy"` + RetentionPolicy *RetentionPolicyConfiguration `json:"retention_policy" db:"retention_policy"` + CircuitBreakerConfig *CircuitBreakerConfig `json:"circuit_breaker" db:"circuit_breaker"` CreatedAt time.Time `json:"created_at,omitempty" db:"created_at,omitempty" swaggertype:"string"` UpdatedAt time.Time `json:"updated_at,omitempty" db:"updated_at,omitempty" swaggertype:"string"` DeletedAt null.Time `json:"deleted_at,omitempty" db:"deleted_at" swaggertype:"string"` } +func (c *Configuration) GetCircuitBreakerConfig() CircuitBreakerConfig { + if c.CircuitBreakerConfig != nil { + return *c.CircuitBreakerConfig + } + return CircuitBreakerConfig{} +} + +func (c *Configuration) ToCircuitBreakerConfig() *cb.CircuitBreakerConfig { + return &cb.CircuitBreakerConfig{ + SampleRate: c.CircuitBreakerConfig.SampleRate, + BreakerTimeout: c.CircuitBreakerConfig.ErrorTimeout, + FailureThreshold: c.CircuitBreakerConfig.FailureThreshold, + SuccessThreshold: c.CircuitBreakerConfig.SuccessThreshold, + ObservabilityWindow: c.CircuitBreakerConfig.ObservabilityWindow, + MinimumRequestCount: c.CircuitBreakerConfig.MinimumRequestCount, + ConsecutiveFailureThreshold: c.CircuitBreakerConfig.ConsecutiveFailureThreshold, + } +} + func (c *Configuration) GetRetentionPolicyConfig() RetentionPolicyConfiguration { if c.RetentionPolicy != nil { return *c.RetentionPolicy @@ -1362,6 +1394,16 @@ type OnPremStorage struct { Path null.String `json:"path" db:"path"` } +type CircuitBreakerConfig struct { + SampleRate uint64 `json:"sample_rate" db:"sample_rate"` + ErrorTimeout uint64 `json:"error_timeout" db:"error_timeout"` + FailureThreshold uint64 `json:"failure_threshold" db:"failure_threshold"` + SuccessThreshold uint64 `json:"success_threshold" db:"success_threshold"` + ObservabilityWindow uint64 `json:"observability_window" db:"observability_window"` + MinimumRequestCount uint64 `json:"minimum_request_count" db:"minimum_request_count"` + ConsecutiveFailureThreshold uint64 `json:"consecutive_failure_threshold" db:"consecutive_failure_threshold"` +} + type OrganisationMember struct { UID string `json:"uid" db:"id"` OrganisationID string `json:"organisation_id" db:"organisation_id"` diff --git a/datastore/repository.go b/datastore/repository.go index 022b006239..cf0315ca65 100644 --- a/datastore/repository.go +++ b/datastore/repository.go @@ -2,6 +2,7 @@ package datastore import ( "context" + "github.com/frain-dev/convoy/pkg/circuit_breaker" "io" "time" @@ -207,4 +208,5 @@ type DeliveryAttemptsRepository interface { FindDeliveryAttemptById(context.Context, string, string) (*DeliveryAttempt, error) FindDeliveryAttempts(context.Context, string) ([]DeliveryAttempt, error) DeleteProjectDeliveriesAttempts(ctx context.Context, projectID string, filter *DeliveryAttemptsFilter, hardDelete bool) error + GetFailureAndSuccessCounts(ctx context.Context, lookBackDuration uint64, resetTimes map[string]time.Time) (resultsMap map[string]circuit_breaker.PollResult, err error) } diff --git a/internal/email/templates/endpoint.update.html b/internal/email/templates/endpoint.update.html index c935a2e0bc..b61816eac5 100644 --- a/internal/email/templates/endpoint.update.html +++ b/internal/email/templates/endpoint.update.html @@ -1,141 +1,74 @@ - - - - - Convoy - - - + + + + + + + - - - - - - - - - -
-
-
- Company Logo - -
-

Hi there,

- {{if eq .endpoint_status "active"}} -

- Please note your endpoint has been enabled. See details: -

-
    -
  • URL: {{.target_url}}
  • -
-

- Important: Congrats! Your endpoint as been restored successfully. Please head over to your dashboard to retry - all failed and discarded events. -

- {{else}} -

- Please note your endpoint has been disabled. See details: -

-
    -
  • URL: {{.target_url}}
  • - {{if eq .response_body ""}} -
  • Most recent failure message: {{ .failure_msg }}
  • - {{else}} -
  • Endpoint Response Body: {{ .response_body }}
  • -
  • Status code: {{ .status_code }}
  • - {{end}} -
-

- Important: You're receiving this email because your endpoint has consecutively failed to receive events, and - needs to be checked. To re-activate your endpoint please head over to your dashboard and retry any failed event. -

- {{end}} - -

- For any enquiry or complaint, you can reply to this email. -

-
- - -
- - +
+ + + + + \ No newline at end of file diff --git a/internal/email/templates/reset.password.html b/internal/email/templates/reset.password.html index 1a08440e83..2edb63ad8e 100644 --- a/internal/email/templates/reset.password.html +++ b/internal/email/templates/reset.password.html @@ -6,87 +6,16 @@ - + -
+
-
- - - - - - - - - - - - - - - - - - - - - - - - - - - - +
+ convoy-logo

= float64(cb.config.SuccessThreshold) { + breaker.Reset(cb.clock.Now().Add(time.Duration(cb.config.BreakerTimeout) * time.Second)) + } else if (breaker.State == StateClosed || breaker.State == StateHalfOpen) && breaker.Requests >= cb.config.MinimumRequestCount { + if breaker.FailureRate >= float64(cb.config.FailureThreshold) { + breaker.trip(cb.clock.Now().Add(time.Duration(cb.config.BreakerTimeout) * time.Second)) + } + } + + if breaker.State == StateOpen && cb.clock.Now().After(breaker.WillResetAt) { + breaker.toHalfOpen() + } + + // send notifications for each circuit breaker + if cb.notificationFn != nil && breaker.State != StateOpen { + if breaker.ConsecutiveFailures >= cb.GetConfig().ConsecutiveFailureThreshold { + innerErr := cb.notificationFn(TypeDisableResource, *cb.config, breaker) + if innerErr != nil { + cb.logger.WithError(innerErr).Errorf("[circuit breaker] failed to execute disable resource notification function") + } + cb.logger.Debug("[circuit breaker] executed disable resource notification function") + } + } + + circuitBreakers[key] = breaker + } + + if err = cb.updateCircuitBreakers(ctx, circuitBreakers); err != nil { + cb.logger.WithError(err).Error("[circuit breaker] failed to update state") + return err + } + + return nil +} + +func (cb *CircuitBreakerManager) updateCircuitBreakers(ctx context.Context, breakers map[string]CircuitBreaker) (err error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + return cb.store.SetMany(ctx, breakers, time.Duration(cb.config.ObservabilityWindow)*time.Minute) +} + +func (cb *CircuitBreakerManager) loadCircuitBreakers(ctx context.Context) ([]CircuitBreaker, error) { + redisCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + keys, err := cb.store.Keys(redisCtx, prefix) + if err != nil { + return nil, err + } + + if len(keys) == 0 { + return []CircuitBreaker{}, nil + } + + redisCtx2, cancel2 := context.WithTimeout(ctx, 5*time.Second) + defer cancel2() + + res, err := cb.store.GetMany(redisCtx2, keys...) + if err != nil { + return nil, err + } + + circuitBreakers := make([]CircuitBreaker, len(res)) + for i := range res { + switch res[i].(type) { + case string: + str, ok := res[i].(string) + if !ok { + cb.logger.Errorf("[circuit breaker] breaker with key (%s) is corrupted", keys[i]) + continue + } + + c, innerErr := NewCircuitBreakerFromStore([]byte(str), cb.logger) + if innerErr != nil { + cb.logger.WithError(innerErr).Errorf("[circuit breaker] an error occurred loading circuit breaker (%s) state from the store", keys[i]) + continue + } + circuitBreakers[i] = *c + case CircuitBreaker: // only used in tests that use the mockStore + circuitBreakers[i] = res[i].(CircuitBreaker) + } + } + + return circuitBreakers, nil +} + +func (cb *CircuitBreakerManager) getCircuitBreakerError(b CircuitBreaker) error { + switch b.State { + case StateOpen: + return ErrOpenState + case StateHalfOpen: + if b.FailureRate > float64(cb.config.FailureThreshold) && b.WillResetAt.After(cb.clock.Now()) { + return ErrTooManyRequests + } + return nil + default: + return nil + } +} + +// CanExecute checks if the circuit breaker for a key will return an error for the current state. +// It will not return an error if it is in the closed state or half-open state when the failure +// threshold has not been reached, it will also fail-open if the circuit breaker is not found. +func (cb *CircuitBreakerManager) CanExecute(ctx context.Context, key string) error { + b, err := cb.GetCircuitBreaker(ctx, key) + if err != nil { + return err + } + + if b != nil { + switch b.State { + case StateOpen, StateHalfOpen: + return cb.getCircuitBreakerError(*b) + default: + return nil + } + } + + return nil +} + +// GetCircuitBreaker is used to get fetch the circuit breaker state, +// it fails open if the circuit breaker for that key is not found +func (cb *CircuitBreakerManager) GetCircuitBreaker(ctx context.Context, key string) (c *CircuitBreaker, err error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + bKey := fmt.Sprintf("%s%s", prefix, key) + res, err := cb.store.GetOne(ctx, bKey) + if err != nil { + if errors.Is(err, ErrCircuitBreakerNotFound) { + // a circuit breaker was not found for this key; + // it probably hasn't been created; + // we should fail open + return nil, nil + } + return nil, err + } + + err = msgpack.DecodeMsgPack([]byte(res), &c) + if err != nil { + return nil, err + } + + return c, nil +} + +// GetCircuitBreakerWithError is used to get fetch the circuit breaker state, +// it returns ErrCircuitBreakerNotFound when a circuit breaker for the key is not found +func (cb *CircuitBreakerManager) GetCircuitBreakerWithError(ctx context.Context, key string) (c *CircuitBreaker, err error) { + ctx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + bKey := fmt.Sprintf("%s%s", prefix, key) + res, err := cb.store.GetOne(ctx, bKey) + if err != nil { + return nil, err + } + + err = msgpack.DecodeMsgPack([]byte(res), &c) + if err != nil { + return nil, err + } + + return c, nil +} + +func (cb *CircuitBreakerManager) sampleAndUpdate(ctx context.Context, pollFunc PollFunc) error { + start := time.Now() + stopTime := time.Now().Add(time.Duration(cb.config.SampleRate-2) * time.Second) + isLeader := true + + mu, err := cb.store.Lock(ctx, mutexKey, cb.config.SampleRate) + if err != nil { + isLeader = false + cb.logger.WithError(err).Debugf("[circuit breaker] failed to acquire lock") + return err + } + + defer func() { + sampleLatency := time.Since(start) + + // we are sleeping the rest of the duration because the sample might be done complete, + // but we don't want to release the lock until the next sample time window. + sleepTime := time.Until(stopTime) + if sleepTime.Seconds() > 1.0 { + time.Sleep(sleepTime) + } + + innerErr := cb.store.Unlock(ctx, mu) + if innerErr != nil { + cb.logger.WithError(innerErr).Debugf("[circuit breaker] failed to unlock mutex") + } + + if isLeader { + // should only be logged by the instance that runs the sample + cb.logger.Infof("[circuit breaker] sample completed in %v", sampleLatency) + + // cachedMetrics will be nil if metrics is not enabled + if cachedMetrics != nil { + // Update the sample latency metric + cachedMetrics.SampleLatency = sampleLatency + } + } + }() + + bs, err := cb.loadCircuitBreakers(ctx) + if err != nil { + cb.logger.WithError(err).Error("[circuit breaker] failed to load circuitBreakers") + return err + } + + resetMap := make(map[string]time.Time, len(bs)) + for i := range bs { + if bs[i].State != StateOpen && bs[i].WillResetAt.After(time.Time{}) { + k := strings.Split(bs[i].Key, ":")[1] + resetMap[k] = bs[i].WillResetAt + } + } + + // Get the failure and success counts from the last X minutes + pollResults, err := pollFunc(ctx, cb.config.ObservabilityWindow, resetMap) + if err != nil { + return fmt.Errorf("poll function failed: %w", err) + } + + if len(pollResults) == 0 { + return nil // Nothing to update + } + + if err = cb.sampleStore(ctx, pollResults); err != nil { + return fmt.Errorf("[circuit breaker] failed to sample events and update state: %w", err) + } + + return nil +} + +func (cb *CircuitBreakerManager) GetConfig() CircuitBreakerConfig { + return *cb.config +} + +func (cb *CircuitBreakerManager) Start(ctx context.Context, pollFunc PollFunc) { + ticker := time.NewTicker(time.Duration(cb.config.SampleRate) * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := cb.sampleAndUpdate(ctx, pollFunc); err != nil { + cb.logger.WithError(err).Debug("[circuit breaker] failed to sample and update circuit breakers") + } + } + } +} diff --git a/pkg/circuit_breaker/circuit_breaker_manager_test.go b/pkg/circuit_breaker/circuit_breaker_manager_test.go new file mode 100644 index 0000000000..fbcaa71956 --- /dev/null +++ b/pkg/circuit_breaker/circuit_breaker_manager_test.go @@ -0,0 +1,908 @@ +package circuit_breaker + +import ( + "context" + "errors" + "github.com/frain-dev/convoy/pkg/log" + "os" + "testing" + "time" + + "github.com/frain-dev/convoy/pkg/clock" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +func getRedis(t *testing.T) (client redis.UniversalClient, err error) { + t.Helper() + + opts, err := redis.ParseURL("redis://localhost:6379") + if err != nil { + return nil, err + } + + return redis.NewClient(opts), nil +} + +func pollResult(t *testing.T, key string, failureCount, successCount uint64) map[string]PollResult { + t.Helper() + + return map[string]PollResult{ + key: { + Key: key, + Failures: failureCount, + Successes: successCount, + }, + } +} + +func TestCircuitBreakerManager(t *testing.T) { + ctx := context.Background() + + testClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + + re, err := getRedis(t) + require.NoError(t, err) + + store := NewRedisStore(re, testClock) + + keys, err := re.Keys(ctx, "breaker*").Result() + require.NoError(t, err) + + for i := range keys { + err = re.Del(ctx, keys[i]).Err() + require.NoError(t, err) + } + + c := &CircuitBreakerConfig{ + SampleRate: 2, + BreakerTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 10, + } + + b, err := NewCircuitBreakerManager( + ClockOption(testClock), + StoreOption(store), + ConfigOption(c), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + endpointId := "endpoint-1" + pollResults := []map[string]PollResult{ + pollResult(t, endpointId, 3, 9), + pollResult(t, endpointId, 5, 10), + pollResult(t, endpointId, 10, 1), + pollResult(t, endpointId, 20, 0), + pollResult(t, endpointId, 2, 3), + pollResult(t, endpointId, 1, 4), + } + + for i := 0; i < len(pollResults); i++ { + innerErr := b.sampleStore(ctx, pollResults[i]) + require.NoError(t, innerErr) + + testClock.AdvanceTime(time.Minute) + } + + breaker, innerErr := b.GetCircuitBreakerWithError(ctx, endpointId) + require.NoError(t, innerErr) + + require.Equal(t, breaker.State, StateClosed) +} + +func TestCircuitBreakerManager_AddNewBreakerMidway(t *testing.T) { + ctx := context.Background() + + testClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + + re, err := getRedis(t) + require.NoError(t, err) + + store := NewRedisStore(re, testClock) + + keys, err := re.Keys(ctx, "breaker*").Result() + require.NoError(t, err) + + for i := range keys { + err = re.Del(ctx, keys[i]).Err() + require.NoError(t, err) + } + + c := &CircuitBreakerConfig{ + SampleRate: 2, + BreakerTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 10, + } + b, err := NewCircuitBreakerManager(ClockOption(testClock), StoreOption(store), ConfigOption(c), LoggerOption(log.NewLogger(os.Stdout))) + require.NoError(t, err) + + endpoint1 := "endpoint-1" + endpoint2 := "endpoint-2" + pollResults := []map[string]PollResult{ + pollResult(t, endpoint1, 1, 0), + pollResult(t, endpoint1, 2, 0), + pollResult(t, endpoint1, 2, 1), pollResult(t, endpoint2, 1, 0), + pollResult(t, endpoint1, 2, 2), pollResult(t, endpoint2, 1, 1), + pollResult(t, endpoint1, 2, 3), pollResult(t, endpoint2, 0, 2), + pollResult(t, endpoint1, 1, 4), pollResult(t, endpoint2, 1, 1), + } + + for i := 0; i < len(pollResults); i++ { + err = b.sampleStore(ctx, pollResults[i]) + require.NoError(t, err) + + testClock.AdvanceTime(time.Minute) + } + + breakers, innerErr := b.loadCircuitBreakers(ctx) + require.NoError(t, innerErr) + + require.Len(t, breakers, 2) +} + +func TestCircuitBreakerManager_Transitions(t *testing.T) { + ctx := context.Background() + + testClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + + re, err := getRedis(t) + require.NoError(t, err) + + store := NewRedisStore(re, testClock) + + keys, err := store.Keys(ctx, "breaker*") + require.NoError(t, err) + + for i := range keys { + err = re.Del(ctx, keys[i]).Err() + require.NoError(t, err) + } + + c := &CircuitBreakerConfig{ + SampleRate: 2, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 10, + } + b, err := NewCircuitBreakerManager(ClockOption(testClock), StoreOption(store), ConfigOption(c), LoggerOption(log.NewLogger(os.Stdout))) + require.NoError(t, err) + + endpointId := "endpoint-1" + pollResults := []map[string]PollResult{ + pollResult(t, endpointId, 1, 2), // Closed + pollResult(t, endpointId, 13, 1), // Still Open + pollResult(t, endpointId, 10, 1), // Half-Open (after ErrorTimeout) + pollResult(t, endpointId, 0, 2), // Closed (SuccessThreshold reached) + pollResult(t, endpointId, 14, 0), // Open (FailureThreshold reached) + } + + expectedStates := []State{ + StateClosed, + StateOpen, + StateHalfOpen, + StateClosed, + StateOpen, + } + + for i, result := range pollResults { + err = b.sampleStore(ctx, result) + require.NoError(t, err) + + breaker, innerErr := b.GetCircuitBreakerWithError(ctx, endpointId) + require.NoError(t, innerErr) + + require.Equal(t, expectedStates[i], breaker.State, "Iteration %d: expected state %v, got %v", i, expectedStates[i], breaker.State) + + if i == 1 { + // Advance time to trigger the transition to half-open + testClock.AdvanceTime(time.Duration(c.BreakerTimeout+1) * time.Second) + } else { + testClock.AdvanceTime(time.Second * 5) // Advance time by 5 seconds for other iterations + } + } +} + +func TestCircuitBreakerManager_ConsecutiveFailures(t *testing.T) { + ctx := context.Background() + + testClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + + re, err := getRedis(t) + require.NoError(t, err) + + store := NewRedisStore(re, testClock) + + keys, err := re.Keys(ctx, "breaker*").Result() + require.NoError(t, err) + + for i := range keys { + err = re.Del(ctx, keys[i]).Err() + require.NoError(t, err) + } + + c := &CircuitBreakerConfig{ + SampleRate: 2, + BreakerTimeout: 30, + FailureThreshold: 70, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + b, err := NewCircuitBreakerManager(ClockOption(testClock), StoreOption(store), ConfigOption(c), LoggerOption(log.NewLogger(os.Stdout))) + require.NoError(t, err) + + endpointId := "endpoint-1" + pollResults := []map[string]PollResult{ + pollResult(t, endpointId, 13, 1), // Open + pollResult(t, endpointId, 13, 1), // Half-Open + pollResult(t, endpointId, 15, 0), // Open + pollResult(t, endpointId, 17, 1), // Half-Open + pollResult(t, endpointId, 13, 0), // Open + } + + for _, result := range pollResults { + err = b.sampleStore(ctx, result) + require.NoError(t, err) + + testClock.AdvanceTime(time.Duration(c.BreakerTimeout+1) * time.Second) + } + + breaker, err := b.GetCircuitBreakerWithError(ctx, endpointId) + require.NoError(t, err) + require.Equal(t, StateOpen, breaker.State) + require.Equal(t, uint64(3), breaker.ConsecutiveFailures) +} + +func TestCircuitBreakerManager_MultipleEndpoints(t *testing.T) { + ctx := context.Background() + + testClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + + re, err := getRedis(t) + require.NoError(t, err) + + store := NewRedisStore(re, testClock) + + keys, err := re.Keys(ctx, "breaker*").Result() + require.NoError(t, err) + + for i := range keys { + err = re.Del(ctx, keys[i]).Err() + require.NoError(t, err) + } + + c := &CircuitBreakerConfig{ + SampleRate: 2, + BreakerTimeout: 30, + FailureThreshold: 60, + SuccessThreshold: 10, + ObservabilityWindow: 5, + MinimumRequestCount: 10, + ConsecutiveFailureThreshold: 10, + } + b, err := NewCircuitBreakerManager(ClockOption(testClock), StoreOption(store), ConfigOption(c), LoggerOption(log.NewLogger(os.Stdout))) + require.NoError(t, err) + + endpoint1 := "endpoint-1" + endpoint2 := "endpoint-2" + endpoint3 := "endpoint-3" + + pollResults := []map[string]PollResult{ + pollResult(t, endpoint1, 10, 0), pollResult(t, endpoint2, 3, 1), pollResult(t, endpoint3, 0, 4), + pollResult(t, endpoint1, 13, 0), pollResult(t, endpoint2, 3, 1), pollResult(t, endpoint3, 0, 4), + pollResult(t, endpoint1, 15, 0), pollResult(t, endpoint2, 1, 3), pollResult(t, endpoint3, 1, 5), + } + + for _, results := range pollResults { + err = b.sampleStore(ctx, results) + require.NoError(t, err) + + testClock.AdvanceTime(time.Duration(c.BreakerTimeout+1) * time.Second) + } + + breaker1, err := b.GetCircuitBreakerWithError(ctx, endpoint1) + require.NoError(t, err) + require.Equal(t, StateOpen, breaker1.State) + + breaker2, err := b.GetCircuitBreakerWithError(ctx, endpoint2) + require.NoError(t, err) + require.Equal(t, StateClosed, breaker2.State) + + breaker3, err := b.GetCircuitBreakerWithError(ctx, endpoint3) + require.NoError(t, err) + require.Equal(t, StateClosed, breaker3.State) +} + +func TestCircuitBreakerManager_Config(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + t.Run("Success", func(t *testing.T) { + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + + require.NoError(t, err) + require.NotNil(t, manager) + require.Equal(t, mockStore, manager.store) + require.Equal(t, mockClock, manager.clock) + require.Equal(t, config, manager.config) + }) + + t.Run("Missing Store", func(t *testing.T) { + _, err := NewCircuitBreakerManager( + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + + require.Error(t, err) + require.Equal(t, ErrStoreMustNotBeNil, err) + }) + + t.Run("Missing Clock", func(t *testing.T) { + _, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + + require.Error(t, err) + require.Equal(t, ErrClockMustNotBeNil, err) + }) + + t.Run("Missing Config", func(t *testing.T) { + _, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + LoggerOption(log.NewLogger(os.Stdout)), + ) + + require.Error(t, err) + require.Equal(t, ErrConfigMustNotBeNil, err) + }) +} + +func TestCircuitBreakerManager_GetCircuitBreakerError(t *testing.T) { + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + c := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + manager := &CircuitBreakerManager{config: config, clock: c} + + t.Run("Open State", func(t *testing.T) { + breaker := CircuitBreaker{State: StateOpen} + err := manager.getCircuitBreakerError(breaker) + require.Equal(t, ErrOpenState, err) + }) + + t.Run("Half-Open State with Too Many Failures", func(t *testing.T) { + breaker := CircuitBreaker{State: StateHalfOpen, FailureRate: 60, WillResetAt: time.Date(2020, 1, 1, 0, 1, 0, 0, time.UTC)} + err := manager.getCircuitBreakerError(breaker) + require.Equal(t, ErrTooManyRequests, err) + }) + + t.Run("Half-Open State with Acceptable Failures", func(t *testing.T) { + breaker := CircuitBreaker{State: StateHalfOpen, FailureRate: 40} + err := manager.getCircuitBreakerError(breaker) + require.NoError(t, err) + }) + + t.Run("Closed State", func(t *testing.T) { + breaker := CircuitBreaker{State: StateClosed} + err := manager.getCircuitBreakerError(breaker) + require.NoError(t, err) + }) +} + +func TestCircuitBreakerManager_SampleStore(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + ctx := context.Background() + pollResults := map[string]PollResult{ + "test1": {Key: "test1", Failures: 3, Successes: 7}, + "test2": {Key: "test2", Failures: 6, Successes: 4}, + } + + err = manager.sampleStore(ctx, pollResults) + require.NoError(t, err) + + // Check if circuit breakers were created and updated correctly + cb1, err := manager.GetCircuitBreakerWithError(ctx, "test1") + require.NoError(t, err) + require.Equal(t, StateClosed, cb1.State) + require.Equal(t, uint64(10), cb1.Requests) + require.Equal(t, uint64(3), cb1.TotalFailures) + require.Equal(t, uint64(7), cb1.TotalSuccesses) + + cb2, err := manager.GetCircuitBreakerWithError(ctx, "test2") + require.NoError(t, err) + require.Equal(t, StateOpen, cb2.State) + require.Equal(t, uint64(10), cb2.Requests) + require.Equal(t, uint64(6), cb2.TotalFailures) + require.Equal(t, uint64(4), cb2.TotalSuccesses) +} + +func TestCircuitBreakerManager_UpdateCircuitBreakers(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + ctx := context.Background() + breakers := map[string]CircuitBreaker{ + "breaker:test1": { + Key: "breaker:test1", + State: StateClosed, + Requests: 10, + TotalFailures: 3, + TotalSuccesses: 7, + }, + "breaker:test2": { + Key: "breaker:test2", + State: StateOpen, + Requests: 10, + TotalFailures: 6, + TotalSuccesses: 4, + }, + } + + err = manager.updateCircuitBreakers(ctx, breakers) + require.NoError(t, err) + + // Check if circuit breakers were updated in the store + cb1, err := manager.GetCircuitBreakerWithError(ctx, "test1") + require.NoError(t, err) + require.Equal(t, StateClosed, cb1.State) + require.Equal(t, uint64(10), cb1.Requests) + require.Equal(t, uint64(3), cb1.TotalFailures) + require.Equal(t, uint64(7), cb1.TotalSuccesses) + + cb2, err := manager.GetCircuitBreakerWithError(ctx, "test2") + require.NoError(t, err) + require.Equal(t, StateOpen, cb2.State) + require.Equal(t, uint64(10), cb2.Requests) + require.Equal(t, uint64(6), cb2.TotalFailures) + require.Equal(t, uint64(4), cb2.TotalSuccesses) +} + +func TestCircuitBreakerManager_LoadCircuitBreakers_TestStore(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + ctx := context.Background() + breakers := map[string]CircuitBreaker{ + "breaker:test1": { + Key: "test1", + State: StateClosed, + Requests: 10, + TotalFailures: 3, + TotalSuccesses: 7, + }, + "breaker:test2": { + Key: "test2", + State: StateOpen, + Requests: 10, + TotalFailures: 6, + TotalSuccesses: 4, + }, + } + + err = manager.updateCircuitBreakers(ctx, breakers) + require.NoError(t, err) + + loadedBreakers, err := manager.loadCircuitBreakers(ctx) + require.NoError(t, err) + require.Len(t, loadedBreakers, 2) + + // Check if loaded circuit breakers match the original ones + for _, cb := range loadedBreakers { + originalCB, exists := breakers["breaker:"+cb.Key] + require.True(t, exists) + require.Equal(t, originalCB.State, cb.State) + require.Equal(t, originalCB.Requests, cb.Requests) + require.Equal(t, originalCB.TotalFailures, cb.TotalFailures) + require.Equal(t, originalCB.TotalSuccesses, cb.TotalSuccesses) + } +} + +func TestCircuitBreakerManager_LoadCircuitBreakers_RedisStore(t *testing.T) { + ctx := context.Background() + + re, err := getRedis(t) + require.NoError(t, err) + + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + store := NewRedisStore(re, mockClock) + + keys, err := re.Keys(ctx, "breaker*").Result() + require.NoError(t, err) + + for i := range keys { + err = re.Del(ctx, keys[i]).Err() + require.NoError(t, err) + } + + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(store), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + breakers := map[string]CircuitBreaker{ + "breaker:test1": { + Key: "test1", + State: StateClosed, + Requests: 10, + TotalFailures: 3, + TotalSuccesses: 7, + }, + "breaker:test2": { + Key: "test2", + State: StateOpen, + Requests: 10, + TotalFailures: 6, + TotalSuccesses: 4, + }, + } + + err = manager.updateCircuitBreakers(ctx, breakers) + require.NoError(t, err) + + loadedBreakers, err := manager.loadCircuitBreakers(ctx) + require.NoError(t, err) + require.Len(t, loadedBreakers, 2) + + // Check if loaded circuit breakers match the original ones + for _, cb := range loadedBreakers { + originalCB, exists := breakers["breaker:"+cb.Key] + require.True(t, exists) + require.Equal(t, originalCB.State, cb.State) + require.Equal(t, originalCB.Requests, cb.Requests) + require.Equal(t, originalCB.TotalFailures, cb.TotalFailures) + require.Equal(t, originalCB.TotalSuccesses, cb.TotalSuccesses) + } +} + +func TestCircuitBreakerManager_CanExecute(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + ctx := context.Background() + + t.Run("Circuit Breaker Not Found", func(t *testing.T) { + err := manager.CanExecute(ctx, "non_existent") + require.NoError(t, err) + }) + + t.Run("Closed State", func(t *testing.T) { + cb := CircuitBreaker{ + Key: "test_closed", + State: StateClosed, + } + err := manager.store.SetOne(ctx, "breaker:test_closed", cb, time.Minute) + require.NoError(t, err) + + err = manager.CanExecute(ctx, "test_closed") + require.NoError(t, err) + }) + + t.Run("Open State", func(t *testing.T) { + cb := CircuitBreaker{ + Key: "test_open", + State: StateOpen, + } + err := manager.store.SetOne(ctx, "breaker:test_open", cb, time.Minute) + require.NoError(t, err) + + err = manager.CanExecute(ctx, "test_open") + require.Equal(t, ErrOpenState, err) + }) + + t.Run("Half-Open State with Too Many Failures", func(t *testing.T) { + cb := CircuitBreaker{ + Key: "test_half_open", + State: StateHalfOpen, + FailureRate: 60, + WillResetAt: time.Date(2020, 1, 1, 0, 1, 0, 0, time.UTC), + } + err := manager.store.SetOne(ctx, "breaker:test_half_open", cb, time.Minute) + require.NoError(t, err) + + err = manager.CanExecute(ctx, "test_half_open") + require.Equal(t, ErrTooManyRequests, err) + }) + + t.Run("Half-Open State with Acceptable Failures", func(t *testing.T) { + cb := CircuitBreaker{ + Key: "test_half_open_ok", + State: StateHalfOpen, + TotalFailures: 4, + } + err := manager.store.SetOne(ctx, "breaker:test_half_open_ok", cb, time.Minute) + require.NoError(t, err) + + err = manager.CanExecute(ctx, "test_half_open_ok") + require.NoError(t, err) + }) +} + +func TestCircuitBreakerManager_GetCircuitBreaker(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + ctx := context.Background() + + t.Run("Circuit Breaker Not Found", func(t *testing.T) { + _, err := manager.GetCircuitBreakerWithError(ctx, "non_existent") + require.Equal(t, ErrCircuitBreakerNotFound, err) + }) + + t.Run("Circuit Breaker Found", func(t *testing.T) { + cb := CircuitBreaker{ + Key: "test_cb", + State: StateClosed, + Requests: 10, + TotalFailures: 3, + TotalSuccesses: 7, + } + err := manager.store.SetOne(ctx, "breaker:test_cb", cb, time.Minute) + require.NoError(t, err) + + retrievedCB, err := manager.GetCircuitBreakerWithError(ctx, "test_cb") + require.NoError(t, err) + require.Equal(t, cb.Key, retrievedCB.Key) + require.Equal(t, cb.State, retrievedCB.State) + require.Equal(t, cb.Requests, retrievedCB.Requests) + require.Equal(t, cb.TotalFailures, retrievedCB.TotalFailures) + require.Equal(t, cb.TotalSuccesses, retrievedCB.TotalSuccesses) + }) +} + +func TestCircuitBreakerManager_SampleAndUpdate(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + ctx := context.Background() + + t.Run("Sample and Update Success", func(t *testing.T) { + pollFunc := func(ctx context.Context, lookBackDuration uint64, _ map[string]time.Time) (map[string]PollResult, error) { + return map[string]PollResult{ + "test1": {Key: "test1", Failures: 3, Successes: 7}, + "test2": {Key: "test2", Failures: 6, Successes: 4}, + }, nil + } + + err := manager.sampleAndUpdate(ctx, pollFunc) + require.NoError(t, err) + + // Check if circuit breakers were created and updated correctly + cb1, err := manager.GetCircuitBreakerWithError(ctx, "test1") + require.NoError(t, err) + require.Equal(t, StateClosed, cb1.State) + require.Equal(t, uint64(10), cb1.Requests) + require.Equal(t, uint64(3), cb1.TotalFailures) + require.Equal(t, uint64(7), cb1.TotalSuccesses) + + cb2, err := manager.GetCircuitBreakerWithError(ctx, "test2") + require.NoError(t, err) + require.Equal(t, StateOpen, cb2.State) + require.Equal(t, uint64(10), cb2.Requests) + require.Equal(t, uint64(6), cb2.TotalFailures) + require.Equal(t, uint64(4), cb2.TotalSuccesses) + }) + + t.Run("Sample and Update with Empty Results", + func(t *testing.T) { + pollFunc := func(ctx context.Context, lookBackDuration uint64, _ map[string]time.Time) (map[string]PollResult, error) { + return map[string]PollResult{}, nil + } + + err := manager.sampleAndUpdate(ctx, pollFunc) + require.NoError(t, err) + }) + + t.Run("Sample and Update with Poll Function Error", func(t *testing.T) { + pollFunc := func(ctx context.Context, lookBackDuration uint64, _ map[string]time.Time) (map[string]PollResult, error) { + return nil, errors.New("poll function error") + } + + err := manager.sampleAndUpdate(ctx, pollFunc) + require.Error(t, err) + require.Contains(t, err.Error(), "poll function failed") + }) +} + +func TestCircuitBreakerManager_Start(t *testing.T) { + mockStore := NewTestStore() + mockClock := clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)) + config := &CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 10, + MinimumRequestCount: 10, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + } + + manager, err := NewCircuitBreakerManager( + StoreOption(mockStore), + ClockOption(mockClock), + ConfigOption(config), + LoggerOption(log.NewLogger(os.Stdout)), + ) + require.NoError(t, err) + + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + pollCount := 0 + pollFunc := func(ctx context.Context, lookBackDuration uint64, _ map[string]time.Time) (map[string]PollResult, error) { + pollCount++ + return map[string]PollResult{ + "test": {Key: "test", Failures: uint64(pollCount), Successes: 10 - uint64(pollCount)}, + }, nil + } + + go manager.Start(ctx, pollFunc) + + // Wait for a few poll cycles + time.Sleep(2500 * time.Millisecond) + + // Check if the circuit breaker was updated + cb, err := manager.GetCircuitBreakerWithError(ctx, "test") + require.NoError(t, err) + require.NotNil(t, cb) + require.Equal(t, uint64(10), cb.Requests) + require.True(t, cb.TotalFailures > 0) + require.True(t, cb.TotalSuccesses > 0) + + // Ensure the poll function was called multiple times + require.True(t, pollCount > 1) +} diff --git a/pkg/circuit_breaker/circuit_breaker_test.go b/pkg/circuit_breaker/circuit_breaker_test.go new file mode 100644 index 0000000000..5f68917bdc --- /dev/null +++ b/pkg/circuit_breaker/circuit_breaker_test.go @@ -0,0 +1,217 @@ +package circuit_breaker + +import ( + "github.com/frain-dev/convoy/pkg/log" + "github.com/frain-dev/convoy/pkg/msgpack" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "os" + "testing" + "time" +) + +func TestCircuitBreaker_String(t *testing.T) { + cb := &CircuitBreaker{ + Key: "test", + State: StateClosed, + Requests: 100, + FailureRate: 0.1, + WillResetAt: time.Now(), + TotalFailures: 10, + TotalSuccesses: 90, + ConsecutiveFailures: 2, + } + + t.Run("Success", func(t *testing.T) { + result := cb.String() + + require.NotEmpty(t, result) + + // Decode the result back to a CircuitBreaker + var decodedCB CircuitBreaker + err := msgpack.DecodeMsgPack([]byte(result), &decodedCB) + require.NoError(t, err) + + // Compare the decoded CircuitBreaker with the original + require.Equal(t, cb.Key, decodedCB.Key) + require.Equal(t, cb.State, decodedCB.State) + require.Equal(t, cb.Requests, decodedCB.Requests) + require.Equal(t, cb.FailureRate, decodedCB.FailureRate) + require.Equal(t, cb.WillResetAt.Unix(), decodedCB.WillResetAt.Unix()) + require.Equal(t, cb.TotalFailures, decodedCB.TotalFailures) + require.Equal(t, cb.TotalSuccesses, decodedCB.TotalSuccesses) + require.Equal(t, cb.ConsecutiveFailures, decodedCB.ConsecutiveFailures) + }) +} + +func TestCircuitBreaker_tripCircuitBreaker(t *testing.T) { + cb := &CircuitBreaker{ + State: StateClosed, + ConsecutiveFailures: 0, + } + + resetTime := time.Now().Add(30 * time.Second) + cb.trip(resetTime) + + require.Equal(t, StateOpen, cb.State) + require.Equal(t, resetTime, cb.WillResetAt) + require.Equal(t, uint64(1), cb.ConsecutiveFailures) +} + +func TestCircuitBreaker_toHalfOpen(t *testing.T) { + cb := &CircuitBreaker{ + State: StateOpen, + } + + cb.toHalfOpen() + + require.Equal(t, StateHalfOpen, cb.State) +} + +func TestCircuitBreaker_resetCircuitBreaker(t *testing.T) { + cb := &CircuitBreaker{ + State: StateOpen, + ConsecutiveFailures: 5, + } + + cb.Reset(time.Now()) + + require.Equal(t, StateClosed, cb.State) + require.Equal(t, uint64(0), cb.ConsecutiveFailures) +} + +func TestNewCircuitBreakerFromStore(t *testing.T) { + createValidMsgpack := func() []byte { + cb := &CircuitBreaker{ + Key: "test-key", + TenantId: "tenant-1", + State: StateClosed, + Requests: 10, + FailureRate: 0.2, + SuccessRate: 0.8, + WillResetAt: time.Now().Add(time.Hour), + TotalFailures: 2, + TotalSuccesses: 8, + ConsecutiveFailures: 1, + NotificationsSent: 1, + } + data, err := msgpack.EncodeMsgPack(cb) + if err != nil { + t.Fatalf("Failed to create test data: %v", err) + } + return data + } + + logger := log.NewLogger(os.Stdout) + + tests := []struct { + name string + input []byte + logger *log.Logger + wantErr bool + errContains string + validate func(*testing.T, *CircuitBreaker) + }{ + { + name: "empty input", + input: []byte{}, + logger: logger, + wantErr: true, + errContains: "EOF", + }, + { + name: "invalid CircuitBreaker", + input: []byte{0x1, 0x2, 0x3}, + logger: logger, + wantErr: true, + errContains: "decoding map length", + }, + { + name: "valid CircuitBreaker with logger", + input: createValidMsgpack(), + logger: logger, + wantErr: false, + validate: func(t *testing.T, cb *CircuitBreaker) { + assert.Equal(t, "test-key", cb.Key) + assert.Equal(t, "tenant-1", cb.TenantId) + assert.Equal(t, StateClosed, cb.State) + assert.Equal(t, uint64(10), cb.Requests) + assert.Equal(t, 0.2, cb.FailureRate) + assert.Equal(t, 0.8, cb.SuccessRate) + assert.Equal(t, uint64(2), cb.TotalFailures) + assert.Equal(t, uint64(8), cb.TotalSuccesses) + assert.Equal(t, uint64(1), cb.ConsecutiveFailures) + assert.Equal(t, uint64(1), cb.NotificationsSent) + assert.NotNil(t, cb.logger) + }, + }, + { + name: "valid CircuitBreaker without logger", + input: createValidMsgpack(), + logger: nil, + wantErr: false, + validate: func(t *testing.T, cb *CircuitBreaker) { + assert.Equal(t, "test-key", cb.Key) + assert.Nil(t, cb.logger) + }, + }, + { + name: "CircuitBreaker with different state", + input: func() []byte { + cb := &CircuitBreaker{ + Key: "test-key", + State: StateOpen, + } + data, _ := msgpack.EncodeMsgPack(cb) + return data + }(), + logger: logger, + wantErr: false, + validate: func(t *testing.T, cb *CircuitBreaker) { + assert.Equal(t, StateOpen, cb.State) + }, + }, + { + name: "large numbers test", + input: func() []byte { + cb := &CircuitBreaker{ + Key: "test-key", + Requests: 18446744073709551615, // max uint64 + TotalSuccesses: 18446744073709551615, // max uint64 + ConsecutiveFailures: 18446744073709551615, // max uint64 + } + data, _ := msgpack.EncodeMsgPack(cb) + return data + }(), + logger: logger, + wantErr: false, + validate: func(t *testing.T, cb *CircuitBreaker) { + assert.Equal(t, uint64(18446744073709551615), cb.Requests) + assert.Equal(t, uint64(18446744073709551615), cb.TotalSuccesses) + assert.Equal(t, uint64(18446744073709551615), cb.ConsecutiveFailures) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewCircuitBreakerFromStore(tt.input, tt.logger) + + if tt.wantErr { + assert.Error(t, err) + if len(tt.errContains) > 0 { + assert.Contains(t, err.Error(), tt.errContains) + } + assert.Nil(t, got) + return + } + + assert.NoError(t, err) + assert.NotNil(t, got) + + if tt.validate != nil { + tt.validate(t, got) + } + }) + } +} diff --git a/pkg/circuit_breaker/config.go b/pkg/circuit_breaker/config.go new file mode 100644 index 0000000000..ecb67a9a2b --- /dev/null +++ b/pkg/circuit_breaker/config.go @@ -0,0 +1,88 @@ +package circuit_breaker + +import ( + "fmt" + "strings" +) + +// CircuitBreakerConfig is the configuration that all the circuit breakers will use +type CircuitBreakerConfig struct { + // SampleRate is the time interval (in seconds) at which the data source + // is polled to determine the number successful and failed requests + SampleRate uint64 `json:"sample_rate"` + + // BreakerTimeout is the time (in seconds) after which a circuit breaker goes + // into the half-open state from the open state + BreakerTimeout uint64 `json:"breaker_timeout"` + + // FailureThreshold is the % of failed requests in the observability window + // after which a circuit breaker will go into the open state + FailureThreshold uint64 `json:"failure_threshold"` + + // MinimumRequestCount minimum number of requests in the observability window + // that will trip a circuit breaker + MinimumRequestCount uint64 `json:"request_count"` + + // SuccessThreshold is the % of successful requests in the observability window + // after which a circuit breaker in the half-open state will go into the closed state + SuccessThreshold uint64 `json:"success_threshold"` + + // ObservabilityWindow is how far back in time (in minutes) the data source is + // polled when determining the number successful and failed requests + ObservabilityWindow uint64 `json:"observability_window"` + + // ConsecutiveFailureThreshold determines when we ultimately disable the endpoint. + // E.g., after 10 consecutive transitions from half-open → open we should disable it. + ConsecutiveFailureThreshold uint64 `json:"consecutive_failure_threshold"` +} + +func (c *CircuitBreakerConfig) Validate() error { + var errs strings.Builder + + if c.SampleRate == 0 { + errs.WriteString("SampleRate must be greater than 0") + errs.WriteString("; ") + } + + if c.BreakerTimeout == 0 { + errs.WriteString("BreakerTimeout must be greater than 0") + errs.WriteString("; ") + } + + if c.FailureThreshold == 0 || c.FailureThreshold > 100 { + errs.WriteString("FailureThreshold must be between 1 and 100") + errs.WriteString("; ") + } + + if c.MinimumRequestCount < 10 { + errs.WriteString("MinimumRequestCount must be greater than 10") + errs.WriteString("; ") + } + + if c.SuccessThreshold == 0 || c.SuccessThreshold > 100 { + errs.WriteString("SuccessThreshold must be between 1 and 100") + errs.WriteString("; ") + } + + if c.ObservabilityWindow == 0 { + errs.WriteString("ObservabilityWindow must be greater than 0") + errs.WriteString("; ") + } + + // ObservabilityWindow is in minutes and SampleRate is in seconds + if (c.ObservabilityWindow * 60) <= c.SampleRate { + errs.WriteString("ObservabilityWindow must be greater than the SampleRate") + errs.WriteString("; ") + } + + if c.ConsecutiveFailureThreshold == 0 { + errs.WriteString("ConsecutiveFailureThreshold must be greater than 0") + errs.WriteString("; ") + } + + if errs.Len() > 0 { + return fmt.Errorf("config validation failed with errors: %s", errs.String()) + } + + return nil +} diff --git a/pkg/circuit_breaker/config_test.go b/pkg/circuit_breaker/config_test.go new file mode 100644 index 0000000000..f42b0c6bb9 --- /dev/null +++ b/pkg/circuit_breaker/config_test.go @@ -0,0 +1,131 @@ +package circuit_breaker + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestCircuitBreakerConfig_Validate(t *testing.T) { + tests := []struct { + name string + config CircuitBreakerConfig + wantErr bool + err string + }{ + { + name: "Valid Config", + config: CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 2, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 3, + MinimumRequestCount: 10, + }, + wantErr: false, + }, + { + name: "Invalid SampleRate", + config: CircuitBreakerConfig{ + SampleRate: 0, + }, + wantErr: true, + err: "SampleRate must be greater than 0", + }, + { + name: "Invalid ErrorTimeout", + config: CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 0, + }, + wantErr: true, + err: "BreakerTimeout must be greater than 0", + }, + { + name: "Invalid FailureThreshold", + config: CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 150, + }, + wantErr: true, + err: "FailureThreshold must be between 1 and 100", + }, + { + name: "Invalid SuccessThreshold", + config: CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 5, + SuccessThreshold: 150, + }, + wantErr: true, + err: "SuccessThreshold must be between 1 and 100", + }, + { + name: "Invalid ObservabilityWindow", + config: CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 2, + ObservabilityWindow: 0, + }, + wantErr: true, + err: "ObservabilityWindow must be greater than 0", + }, + { + name: "ObservabilityWindow should be greater than sample rate", + config: CircuitBreakerConfig{ + SampleRate: 200, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 2, + ObservabilityWindow: 1, + }, + wantErr: true, + err: "ObservabilityWindow must be greater than the SampleRate", + }, + { + name: "Invalid ConsecutiveFailureThreshold", + config: CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 50, + SuccessThreshold: 2, + ObservabilityWindow: 5, + ConsecutiveFailureThreshold: 0, + }, + wantErr: true, + err: "ConsecutiveFailureThreshold must be greater than 0", + }, + { + name: "Invalid MinimumRequestCount", + config: CircuitBreakerConfig{ + SampleRate: 1, + BreakerTimeout: 30, + FailureThreshold: 30, + SuccessThreshold: 2, + ObservabilityWindow: 5, + MinimumRequestCount: 5, + ConsecutiveFailureThreshold: 1, + }, + wantErr: true, + err: "MinimumRequestCount must be greater than 10", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.config.Validate() + if tt.wantErr { + require.Error(t, err) + require.NotEmpty(t, tt.err) + require.Contains(t, err.Error(), tt.err) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/pkg/circuit_breaker/store.go b/pkg/circuit_breaker/store.go new file mode 100644 index 0000000000..a7b7a00db7 --- /dev/null +++ b/pkg/circuit_breaker/store.go @@ -0,0 +1,198 @@ +package circuit_breaker + +import ( + "context" + "errors" + "fmt" + "github.com/frain-dev/convoy/pkg/clock" + "github.com/go-redsync/redsync/v4" + "github.com/go-redsync/redsync/v4/redis/goredis/v9" + "github.com/redis/go-redis/v9" + "strings" + "sync" + "time" +) + +type CircuitBreakerStore interface { + Lock(ctx context.Context, lockKey string, expiry uint64) (*redsync.Mutex, error) + Unlock(ctx context.Context, mutex *redsync.Mutex) error + Keys(context.Context, string) ([]string, error) + GetOne(context.Context, string) (string, error) + GetMany(context.Context, ...string) ([]interface{}, error) + SetOne(context.Context, string, interface{}, time.Duration) error + SetMany(context.Context, map[string]CircuitBreaker, time.Duration) error +} + +type RedisStore struct { + redis redis.UniversalClient + clock clock.Clock +} + +func NewRedisStore(redis redis.UniversalClient, clock clock.Clock) *RedisStore { + return &RedisStore{ + redis: redis, + clock: clock, + } +} + +func (s *RedisStore) Lock(ctx context.Context, mutexKey string, expiry uint64) (*redsync.Mutex, error) { + pool := goredis.NewPool(s.redis) + rs := redsync.New(pool) + + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + mutex := rs.NewMutex(mutexKey, redsync.WithExpiry(time.Duration(expiry)*time.Second), redsync.WithTries(1)) + err := mutex.LockContext(ctx) + if err != nil { + return nil, fmt.Errorf("failed to obtain lock: %v", err) + } + + return mutex, nil +} + +func (s *RedisStore) Unlock(ctx context.Context, mutex *redsync.Mutex) error { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + ok, err := mutex.UnlockContext(ctx) + if !ok { + return fmt.Errorf("failed to release lock: %v", err) + } + + if err != nil { + return fmt.Errorf("failed to release lock: %v", err) + } + + return nil +} + +// Keys returns all the keys used by the circuit breaker store +func (s *RedisStore) Keys(ctx context.Context, pattern string) ([]string, error) { + return s.redis.Keys(ctx, fmt.Sprintf("%s*", pattern)).Result() +} + +func (s *RedisStore) GetOne(ctx context.Context, key string) (string, error) { + key, err := s.redis.Get(ctx, key).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return "", ErrCircuitBreakerNotFound + } + return "", err + } + return key, nil +} + +func (s *RedisStore) GetMany(ctx context.Context, keys ...string) ([]any, error) { + res, err := s.redis.MGet(ctx, keys...).Result() + if err != nil { + if errors.Is(err, redis.Nil) { + return []any{}, nil + } + return nil, err + } + + return res, nil +} + +func (s *RedisStore) SetOne(ctx context.Context, key string, value interface{}, expiration time.Duration) error { + return s.redis.Set(ctx, key, value, expiration).Err() +} + +func (s *RedisStore) SetMany(ctx context.Context, breakers map[string]CircuitBreaker, ttl time.Duration) error { + pipe := s.redis.TxPipeline() + for key, breaker := range breakers { + val := breaker.String() + if innerErr := pipe.Set(ctx, key, val, ttl).Err(); innerErr != nil { + return innerErr + } + } + + _, err := pipe.Exec(ctx) + if err != nil { + return err + } + + return nil +} + +type TestStore struct { + store map[string]CircuitBreaker + mu *sync.RWMutex + clock clock.Clock +} + +func NewTestStore() *TestStore { + return &TestStore{ + store: make(map[string]CircuitBreaker), + mu: &sync.RWMutex{}, + clock: clock.NewSimulatedClock(time.Date(2020, 1, 1, 0, 0, 0, 0, time.UTC)), + } +} + +func (t *TestStore) Lock(_ context.Context, _ string, _ uint64) (*redsync.Mutex, error) { + return nil, nil +} + +func (t *TestStore) Unlock(_ context.Context, _ *redsync.Mutex) error { + return nil +} + +func (t *TestStore) Keys(_ context.Context, s string) (keys []string, err error) { + t.mu.RLock() + defer t.mu.RUnlock() + + for key := range t.store { + if strings.HasPrefix(key, s) { + keys = append(keys, key) + } + } + + return keys, nil +} + +func (t *TestStore) GetOne(_ context.Context, s string) (string, error) { + t.mu.RLock() + defer t.mu.RUnlock() + res, ok := t.store[s] + if !ok { + return "", ErrCircuitBreakerNotFound + } + + vv := res.String() + if vv == "" { + return "", errors.New("an error occurred decoding the circuit breaker") + } + + return vv, nil +} + +func (t *TestStore) GetMany(_ context.Context, keys ...string) (vals []interface{}, err error) { + t.mu.RLock() + defer t.mu.RUnlock() + for _, key := range keys { + if _, ok := t.store[key]; ok { + vals = append(vals, t.store[key]) + } else { + vals = append(vals, nil) + } + } + + return vals, nil +} + +func (t *TestStore) SetOne(_ context.Context, key string, i interface{}, _ time.Duration) error { + t.mu.Lock() + defer t.mu.Unlock() + t.store[key] = i.(CircuitBreaker) + return nil +} + +func (t *TestStore) SetMany(ctx context.Context, m map[string]CircuitBreaker, duration time.Duration) error { + for k, v := range m { + if err := t.SetOne(ctx, k, v, duration); err != nil { + return err + } + } + return nil +} diff --git a/pkg/circuit_breaker/store_test.go b/pkg/circuit_breaker/store_test.go new file mode 100644 index 0000000000..cee0a9d49f --- /dev/null +++ b/pkg/circuit_breaker/store_test.go @@ -0,0 +1,270 @@ +package circuit_breaker + +import ( + "context" + "testing" + "time" + + "github.com/frain-dev/convoy/pkg/clock" + "github.com/stretchr/testify/require" +) + +func TestRedisStore_Keys(t *testing.T) { + ctx := context.Background() + redisClient, err := getRedis(t) + require.NoError(t, err) + + mockClock := clock.NewSimulatedClock(time.Now()) + store := NewRedisStore(redisClient, mockClock) + + // Clean up any existing keys + existingKeys, err := redisClient.Keys(ctx, "test_keys*").Result() + require.NoError(t, err) + if len(existingKeys) > 0 { + err = redisClient.Del(ctx, existingKeys...).Err() + require.NoError(t, err) + } + + // Set up test data + testKeys := []string{"test_keys:1", "test_keys:2", "test_keys:3"} + for _, key := range testKeys { + err = redisClient.Set(ctx, key, "value", time.Minute).Err() + require.NoError(t, err) + } + + // Test Keys method + keys, err := store.Keys(ctx, "test_keys") + require.NoError(t, err) + require.ElementsMatch(t, testKeys, keys) + + // Clean up + err = redisClient.Del(ctx, testKeys...).Err() + require.NoError(t, err) +} + +func TestRedisStore_GetOne(t *testing.T) { + ctx := context.Background() + redisClient, err := getRedis(t) + require.NoError(t, err) + + mockClock := clock.NewSimulatedClock(time.Now()) + store := NewRedisStore(redisClient, mockClock) + + t.Run("Existing Key", func(t *testing.T) { + key := "test_get_one:existing" + value := "test_value" + err = redisClient.Set(ctx, key, value, time.Minute).Err() + require.NoError(t, err) + + result, err := store.GetOne(ctx, key) + require.NoError(t, err) + require.Equal(t, value, result) + + err = redisClient.Del(ctx, key).Err() + require.NoError(t, err) + }) + + t.Run("Non-existing Key", func(t *testing.T) { + key := "test_get_one:non_existing" + _, err := store.GetOne(ctx, key) + require.Equal(t, ErrCircuitBreakerNotFound, err) + }) +} + +func TestRedisStore_GetMany(t *testing.T) { + ctx := context.Background() + redisClient, err := getRedis(t) + require.NoError(t, err) + + mockClock := clock.NewSimulatedClock(time.Now()) + store := NewRedisStore(redisClient, mockClock) + + // Set up test data + testData := map[string]string{ + "test_get_many:1": "value1", + "test_get_many:2": "value2", + "test_get_many:3": "value3", + } + for key, value := range testData { + err = redisClient.Set(ctx, key, value, time.Minute).Err() + require.NoError(t, err) + } + + keys := []string{"test_get_many:1", "test_get_many:2", "test_get_many:3", "test_get_many:non_existing"} + results, err := store.GetMany(ctx, keys...) + require.NoError(t, err) + require.Len(t, results, 4) + + for i, key := range keys { + if i < 3 { + require.Equal(t, testData[key], results[i]) + } else { + require.Nil(t, results[i]) + } + } + + // Clean up + err = redisClient.Del(ctx, "test_get_many:1", "test_get_many:2", "test_get_many:3").Err() + require.NoError(t, err) +} + +func TestRedisStore_SetOne(t *testing.T) { + ctx := context.Background() + redisClient, err := getRedis(t) + require.NoError(t, err) + + mockClock := clock.NewSimulatedClock(time.Now()) + store := NewRedisStore(redisClient, mockClock) + + key := "test_set_one" + value := "test_value" + expiration := time.Minute + + err = store.SetOne(ctx, key, value, expiration) + require.NoError(t, err) + + // Verify the value was set + result, err := redisClient.Get(ctx, key).Result() + require.NoError(t, err) + require.Equal(t, value, result) + + // Verify the expiration was set + ttl, err := redisClient.TTL(ctx, key).Result() + require.NoError(t, err) + require.True(t, ttl > 0 && ttl <= expiration) + + // Clean up + err = redisClient.Del(ctx, key).Err() + require.NoError(t, err) +} + +func TestRedisStore_SetMany(t *testing.T) { + ctx := context.Background() + redisClient, err := getRedis(t) + require.NoError(t, err) + + mockClock := clock.NewSimulatedClock(time.Now()) + store := NewRedisStore(redisClient, mockClock) + + breakers := map[string]CircuitBreaker{ + "test_set_many:1": { + Key: "test_set_many:1", + State: StateClosed, + }, + "test_set_many:2": { + Key: "test_set_many:2", + State: StateOpen, + }, + } + expiration := time.Minute + + err = store.SetMany(ctx, breakers, expiration) + require.NoError(t, err) + + // Verify the values were set + for key, breaker := range breakers { + result, err := redisClient.Get(ctx, key).Result() + require.NoError(t, err) + + expectedValue := breaker.String() + require.Equal(t, expectedValue, result) + + // Verify the expiration was set + ttl, err := redisClient.TTL(ctx, key).Result() + require.NoError(t, err) + require.True(t, ttl > 0 && ttl <= expiration) + } + + // Clean up + keys := make([]string, 0, len(breakers)) + for key := range breakers { + keys = append(keys, key) + } + err = redisClient.Del(ctx, keys...).Err() + require.NoError(t, err) +} + +func TestTestStore_Keys(t *testing.T) { + store := NewTestStore() + ctx := context.Background() + + // Add some test data + store.store["test:1"] = CircuitBreaker{Key: "test:1"} + store.store["test:2"] = CircuitBreaker{Key: "test:2"} + store.store["other:1"] = CircuitBreaker{Key: "other:1"} + + keys, err := store.Keys(ctx, "test") + require.NoError(t, err) + require.ElementsMatch(t, []string{"test:1", "test:2"}, keys) +} + +func TestTestStore_GetOne(t *testing.T) { + store := NewTestStore() + ctx := context.Background() + + t.Run("Existing Key", func(t *testing.T) { + cb := CircuitBreaker{Key: "test", State: StateClosed} + store.store["test"] = cb + + result, err := store.GetOne(ctx, "test") + require.NoError(t, err) + + expectedValue := cb.String() + require.Equal(t, expectedValue, result) + }) + + t.Run("Non-existing Key", func(t *testing.T) { + _, err := store.GetOne(ctx, "non_existing") + require.Equal(t, ErrCircuitBreakerNotFound, err) + }) +} + +func TestTestStore_GetMany(t *testing.T) { + store := NewTestStore() + ctx := context.Background() + + cb1 := CircuitBreaker{Key: "test1", State: StateClosed} + cb2 := CircuitBreaker{Key: "test2", State: StateOpen} + store.store["test1"] = cb1 + store.store["test2"] = cb2 + + results, err := store.GetMany(ctx, "test1", "test2", "non_existing") + require.NoError(t, err) + require.Len(t, results, 3) + + require.Equal(t, cb1, results[0]) + require.Equal(t, cb2, results[1]) + require.Nil(t, results[2]) +} + +func TestTestStore_SetOne(t *testing.T) { + store := NewTestStore() + ctx := context.Background() + + cb := CircuitBreaker{Key: "test", State: StateClosed} + err := store.SetOne(ctx, "test", cb, time.Minute) + require.NoError(t, err) + + storedCB, ok := store.store["test"] + require.True(t, ok) + require.Equal(t, cb, storedCB) +} + +func TestTestStore_SetMany(t *testing.T) { + store := NewTestStore() + ctx := context.Background() + + breakers := map[string]CircuitBreaker{ + "test1": {Key: "test1", State: StateClosed}, + "test2": {Key: "test2", State: StateOpen}, + } + + err := store.SetMany(ctx, breakers, time.Minute) + require.NoError(t, err) + + for key, cb := range breakers { + storedCB, ok := store.store[key] + require.True(t, ok) + require.Equal(t, cb, storedCB) + } +} diff --git a/pkg/clock/clock.go b/pkg/clock/clock.go new file mode 100644 index 0000000000..ea006dee97 --- /dev/null +++ b/pkg/clock/clock.go @@ -0,0 +1,54 @@ +package clock + +import ( + "sync" + "time" +) + +// A Clock is an object that can tell you the current time. +// +// This interface allows decoupling code that uses time from the code that creates +// a point in time. You can use this to your advantage by injecting Clocks into interfaces +// rather than having implementations call time.Now() directly. +// +// Use RealClock() in production. +// Use SimulatedClock() in test. +type Clock interface { + Now() time.Time +} + +func NewRealClock() Clock { return &realTimeClock{} } + +type realTimeClock struct{} + +func (_ *realTimeClock) Now() time.Time { return time.Now() } + +// A SimulatedClock is a concrete Clock implementation that doesn't "tick" on its own. +// Time is advanced by explicit call to the AdvanceTime() or SetTime() functions. +// This object is concurrency safe. +type SimulatedClock struct { + mu *sync.Mutex + t time.Time +} + +func NewSimulatedClock(t time.Time) *SimulatedClock { + return &SimulatedClock{mu: &sync.Mutex{}, t: t} +} + +func (c *SimulatedClock) Now() time.Time { + c.mu.Lock() + defer c.mu.Unlock() + return c.t +} + +func (c *SimulatedClock) SetTime(t time.Time) { + c.mu.Lock() + defer c.mu.Unlock() + c.t = t +} + +func (c *SimulatedClock) AdvanceTime(d time.Duration) { + c.mu.Lock() + defer c.mu.Unlock() + c.t = c.t.Add(d) +} diff --git a/pkg/clock/clock_test.go b/pkg/clock/clock_test.go new file mode 100644 index 0000000000..3b6f381e4b --- /dev/null +++ b/pkg/clock/clock_test.go @@ -0,0 +1,44 @@ +package clock + +import ( + "testing" + "time" +) + +func TestSimulatedClock(t *testing.T) { + now := time.Now() + + tests := []struct { + desc string + initTime time.Time + advanceBy time.Duration + wantTime time.Time + }{ + { + desc: "advance time forward", + initTime: now, + advanceBy: 30 * time.Second, + wantTime: now.Add(30 * time.Second), + }, + { + desc: "advance time backward", + initTime: now, + advanceBy: -10 * time.Second, + wantTime: now.Add(-10 * time.Second), + }, + } + + for _, tc := range tests { + c := NewSimulatedClock(tc.initTime) + + if c.Now() != tc.initTime { + t.Errorf("%s: Before Advance; SimulatedClock.Now() = %v, want %v", tc.desc, c.Now(), tc.initTime) + } + + c.AdvanceTime(tc.advanceBy) + + if c.Now() != tc.wantTime { + t.Errorf("%s: After Advance; SimulatedClock.Now() = %v, want %v", tc.desc, c.Now(), tc.wantTime) + } + } +} diff --git a/pkg/log/log.go b/pkg/log/log.go index dc64b8f34a..fb156bb618 100644 --- a/pkg/log/log.go +++ b/pkg/log/log.go @@ -95,7 +95,7 @@ func (l Level) ToLogrusLevel() (logrus.Level, error) { } // NewLogger creates and returns a new instance of Logger. -// Log level is set to DebugLevel by default. +// The log level is set to ErrorLevel by default. func NewLogger(out io.Writer) *Logger { log := &logrus.Logger{ Out: out, @@ -218,7 +218,7 @@ func (l *Logger) SetLevel(v Level) { l.logger.SetLevel(lvl) } -// WithField sets logger fields +// SetPrefix sets logger fields func (l *Logger) SetPrefix(value interface{}) { l.entry = l.entry.WithField("source", value) } diff --git a/services/activate_endpoint.go b/services/activate_endpoint.go new file mode 100644 index 0000000000..8bf22a3738 --- /dev/null +++ b/services/activate_endpoint.go @@ -0,0 +1,33 @@ +package services + +import ( + "context" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/pkg/log" +) + +type ActivateEndpointService struct { + EndpointRepo datastore.EndpointRepository + ProjectID string + EndpointId string +} + +func (s *ActivateEndpointService) Run(ctx context.Context) (*datastore.Endpoint, error) { + endpoint, err := s.EndpointRepo.FindEndpointByID(ctx, s.EndpointId, s.ProjectID) + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to find endpoint") + return nil, &ServiceError{ErrMsg: "failed to find endpoint", Err: err} + } + + if endpoint.Status != datastore.InactiveEndpointStatus { + return nil, &ServiceError{ErrMsg: "the endpoint must be inactive"} + } + + err = s.EndpointRepo.UpdateEndpointStatus(ctx, s.ProjectID, endpoint.UID, datastore.ActiveEndpointStatus) + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to activate endpoint") + return nil, &ServiceError{ErrMsg: "failed to activate endpoint", Err: err} + } + + return endpoint, nil +} diff --git a/sql/1724236900.sql b/sql/1724236900.sql new file mode 100644 index 0000000000..aad794430a --- /dev/null +++ b/sql/1724236900.sql @@ -0,0 +1,23 @@ +-- +migrate Up +alter table convoy.configurations add column if not exists cb_sample_rate int not null default 30; -- seconds +alter table convoy.configurations add column if not exists cb_error_timeout int not null default 30; -- seconds +alter table convoy.configurations add column if not exists cb_failure_threshold int not null default 70; -- percentage +alter table convoy.configurations add column if not exists cb_success_threshold int not null default 1; -- percentage +alter table convoy.configurations add column if not exists cb_observability_window int not null default 30; -- minutes +alter table convoy.configurations add column if not exists cb_minimum_request_count int not null default 10; +alter table convoy.configurations add column if not exists cb_consecutive_failure_threshold int not null default 10; +create index if not exists idx_delivery_attempts_created_at on convoy.delivery_attempts (created_at); +create index if not exists idx_delivery_attempts_event_delivery_id_created_at on convoy.delivery_attempts (event_delivery_id, created_at); +create index if not exists idx_delivery_attempts_event_delivery_id on convoy.delivery_attempts (event_delivery_id); + +-- +migrate Down +alter table convoy.configurations drop column if exists cb_sample_rate; +alter table convoy.configurations drop column if exists cb_error_timeout; +alter table convoy.configurations drop column if exists cb_failure_threshold; +alter table convoy.configurations drop column if exists cb_success_threshold; +alter table convoy.configurations drop column if exists cb_observability_window; +alter table convoy.configurations drop column if exists cb_consecutive_failure_threshold; +drop index if exists convoy.idx_delivery_attempts_created_at; +drop index if exists convoy.idx_delivery_attempts_event_delivery_id_created_at; +drop index if exists convoy.idx_delivery_attempts_event_delivery_id; + diff --git a/web/ui/dashboard/src/app/models/endpoint.model.ts b/web/ui/dashboard/src/app/models/endpoint.model.ts index 111dc7efba..3161fb5a52 100644 --- a/web/ui/dashboard/src/app/models/endpoint.model.ts +++ b/web/ui/dashboard/src/app/models/endpoint.model.ts @@ -14,6 +14,7 @@ export interface ENDPOINT { uid: string; title: string; advanced_signatures: boolean; + failure_rate: number; authentication: { api_key: { header_value: string; header_name: string }; }; diff --git a/web/ui/dashboard/src/app/private/pages/project/endpoints/endpoints.component.html b/web/ui/dashboard/src/app/private/pages/project/endpoints/endpoints.component.html index 575b07087e..af7d3ddbcb 100644 --- a/web/ui/dashboard/src/app/private/pages/project/endpoints/endpoints.component.html +++ b/web/ui/dashboard/src/app/private/pages/project/endpoints/endpoints.component.html @@ -86,7 +86,11 @@

Endpoints

- + + {{ endpoint.failure_rate | number }}% + + +
+ +
  • + +