diff --git a/cluster/cluster.go b/cluster/cluster.go index 17e1d1b3..493aa71c 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -23,7 +23,6 @@ import ( gubernator "github.com/mailgun/gubernator/v2" "github.com/mailgun/holster/v4/clock" - "github.com/mailgun/holster/v4/ctxutil" "github.com/mailgun/holster/v4/errors" "github.com/sirupsen/logrus" ) @@ -110,7 +109,7 @@ func Restart(ctx context.Context) error { // StartWith a local cluster with specific addresses func StartWith(localPeers []gubernator.PeerInfo) error { for _, peer := range localPeers { - ctx, cancel := ctxutil.WithTimeout(context.Background(), clock.Second*10) + ctx, cancel := context.WithTimeout(context.Background(), clock.Second*10) d, err := gubernator.SpawnDaemon(ctx, gubernator.DaemonConfig{ Logger: logrus.WithField("instance", peer.GRPCAddress), GRPCListenAddress: peer.GRPCAddress, diff --git a/cmd/gubernator-cli/main.go b/cmd/gubernator-cli/main.go index a4e8065e..4e0a96b2 100644 --- a/cmd/gubernator-cli/main.go +++ b/cmd/gubernator-cli/main.go @@ -28,7 +28,6 @@ import ( "github.com/davecgh/go-spew/spew" guber "github.com/mailgun/gubernator/v2" "github.com/mailgun/holster/v4/clock" - "github.com/mailgun/holster/v4/ctxutil" "github.com/mailgun/holster/v4/errors" "github.com/mailgun/holster/v4/setter" "github.com/mailgun/holster/v4/syncutil" @@ -179,7 +178,7 @@ func sendRequest(ctx context.Context, client guber.V1Client, req *guber.GetRateL ctx = tracing.StartScope(ctx) defer tracing.EndScope(ctx, nil) - ctx, cancel := ctxutil.WithTimeout(ctx, timeout) + ctx, cancel := context.WithTimeout(ctx, timeout) // Now hit our cluster with the rate limits resp, err := client.GetRateLimits(ctx, req) diff --git a/cmd/gubernator/main.go b/cmd/gubernator/main.go index 5b64f4b3..bdf35e80 100644 --- a/cmd/gubernator/main.go +++ b/cmd/gubernator/main.go @@ -27,7 +27,6 @@ import ( gubernator "github.com/mailgun/gubernator/v2" "github.com/mailgun/holster/v4/clock" - "github.com/mailgun/holster/v4/ctxutil" "github.com/mailgun/holster/v4/tracing" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/sdk/resource" @@ -78,7 +77,7 @@ func main() { conf, err := gubernator.SetupDaemonConfig(logrus.StandardLogger(), configFile) checkErr(err, "while getting config") - ctx, cancel := ctxutil.WithTimeout(ctx, clock.Second*10) + ctx, cancel := context.WithTimeout(ctx, clock.Second*10) // Start the daemon daemon, err := gubernator.SpawnDaemon(ctx, conf) diff --git a/config.go b/config.go index 811bd1e9..0b4ef64a 100644 --- a/config.go +++ b/config.go @@ -124,7 +124,7 @@ func (c *Config) SetDefaults() error { setter.SetDefault(&c.Behaviors.GlobalTimeout, time.Millisecond*500) setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize) - setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*500) + setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*100) setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas)) setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil)) diff --git a/etcd.go b/etcd.go index a1ba35e3..336dfb11 100644 --- a/etcd.go +++ b/etcd.go @@ -21,7 +21,6 @@ import ( "encoding/json" "github.com/mailgun/holster/v4/clock" - "github.com/mailgun/holster/v4/ctxutil" "github.com/mailgun/holster/v4/errors" "github.com/mailgun/holster/v4/setter" "github.com/mailgun/holster/v4/syncutil" @@ -139,7 +138,7 @@ func (e *EtcdPool) watchPeers() error { } func (e *EtcdPool) collectPeers(revision *int64) error { - ctx, cancel := ctxutil.WithTimeout(e.ctx, etcdTimeout) + ctx, cancel := context.WithTimeout(e.ctx, etcdTimeout) defer cancel() resp, err := e.conf.Client.Get(ctx, e.conf.KeyPrefix, etcd.WithPrefix()) @@ -232,7 +231,7 @@ func (e *EtcdPool) register(peer PeerInfo) error { var lease *etcd.LeaseGrantResponse register := func() error { - ctx, cancel := ctxutil.WithTimeout(e.ctx, etcdTimeout) + ctx, cancel := context.WithTimeout(e.ctx, etcdTimeout) defer cancel() var err error @@ -296,7 +295,7 @@ func (e *EtcdPool) register(peer PeerInfo) error { } lastKeepAlive = clock.Now() case <-done: - ctx, cancel := ctxutil.WithTimeout(context.Background(), etcdTimeout) + ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout) if _, err := e.conf.Client.Delete(ctx, instanceKey); err != nil { e.log.WithError(err). Warn("during etcd delete") diff --git a/functional_test.go b/functional_test.go index 23ac62a2..6225fc3e 100644 --- a/functional_test.go +++ b/functional_test.go @@ -842,30 +842,30 @@ func TestGlobalRateLimits(t *testing.T) { testutil.UntilPass(t, 20, clock.Millisecond*200, func(t testutil.TestingT) { // Inspect our metrics, ensure they collected the counts we expected during this test d := cluster.DaemonAt(0) - config := d.Config() - resp, err := http.Get(fmt.Sprintf("http://%s/metrics", config.HTTPListenAddress)) - if !assert.NoError(t, err) { - return - } - defer resp.Body.Close() - - m := getMetric(t, resp.Body, "gubernator_async_durations_count") - assert.NotEqual(t, 0, int(m.Value)) - - // V1Instance 2 should be the owner of our global rate limit - d = cluster.DaemonAt(2) - config = d.Config() - resp, err = http.Get(fmt.Sprintf("http://%s/metrics", config.HTTPListenAddress)) - if !assert.NoError(t, err) { - return + metricsURL := fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress) + m := getMetricRequest(t, metricsURL, "gubernator_global_send_duration_count") + assert.Equal(t, 1, int(m.Value)) + + // Expect one peer (the owning peer) to indicate a broadcast. + var broadcastCount int + for i := 0; i < cluster.NumOfDaemons(); i++ { + d := cluster.DaemonAt(i) + metricsURL := fmt.Sprintf("http://%s/metrics", d.Config().HTTPListenAddress) + m := getMetricRequest(t, metricsURL, "gubernator_broadcast_duration_count") + broadcastCount += int(m.Value) } - defer resp.Body.Close() - m = getMetric(t, resp.Body, "gubernator_broadcast_durations_count") - assert.NotEqual(t, 0, int(m.Value)) + assert.Equal(t, 1, broadcastCount) }) } +func getMetricRequest(t testutil.TestingT, url string, name string) *model.Sample { + resp, err := http.Get(url) + require.NoError(t, err) + defer resp.Body.Close() + return getMetric(t, resp.Body, name) +} + func TestChangeLimit(t *testing.T) { client, errs := guber.DialV1Server(cluster.GetRandomPeer(cluster.DataCenterNone).GRPCAddress, nil) require.Nil(t, errs) diff --git a/global.go b/global.go index 630a4a14..4090537f 100644 --- a/global.go +++ b/global.go @@ -19,7 +19,6 @@ package gubernator import ( "context" - "github.com/mailgun/holster/v4/ctxutil" "github.com/mailgun/holster/v4/syncutil" "github.com/prometheus/client_golang/prometheus" "google.golang.org/protobuf/proto" @@ -28,12 +27,16 @@ import ( // globalManager manages async hit queue and updates peers in // the cluster periodically when a global rate limit we own updates. type globalManager struct { - asyncQueue chan *RateLimitReq - broadcastQueue chan *RateLimitReq - wg syncutil.WaitGroup - conf BehaviorConfig - log FieldLogger - instance *V1Instance + asyncQueue chan *RateLimitReq + broadcastQueue chan *RateLimitReq + wg syncutil.WaitGroup + conf BehaviorConfig + log FieldLogger + instance *V1Instance + metricGlobalSendDuration prometheus.Summary + metricBroadcastDuration prometheus.Summary + metricBroadcastCounter *prometheus.CounterVec + metricGlobalQueueLength prometheus.Gauge } func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager { @@ -43,6 +46,24 @@ func newGlobalManager(conf BehaviorConfig, instance *V1Instance) *globalManager broadcastQueue: make(chan *RateLimitReq, conf.GlobalBatchLimit), instance: instance, conf: conf, + metricGlobalSendDuration: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "gubernator_global_send_duration", + Help: "The duration of GLOBAL async sends in seconds.", + Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, + }), + metricBroadcastDuration: prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "gubernator_broadcast_duration", + Help: "The duration of GLOBAL broadcasts to peers in seconds.", + Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, + }), + metricBroadcastCounter: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "gubernator_broadcast_counter", + Help: "The count of broadcasts.", + }, []string{"condition"}), + metricGlobalQueueLength: prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "gubernator_global_queue_length", + Help: "The count of requests queued up for global broadcast. This is only used for GetRateLimit requests using global behavior.", + }), } gm.runAsyncHits() gm.runBroadcasts() @@ -108,7 +129,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { client *PeerClient req GetPeerRateLimitsReq } - defer prometheus.NewTimer(metricGlobalSendDuration).ObserveDuration() + defer prometheus.NewTimer(gm.metricGlobalSendDuration).ObserveDuration() peerRequests := make(map[string]*pair) // Assign each request to a peer @@ -132,7 +153,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { // Send the rate limit requests to their respective owning peers. for _, p := range peerRequests { - ctx, cancel := ctxutil.WithTimeout(context.Background(), gm.conf.GlobalTimeout) + ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout) _, err := p.client.GetPeerRateLimits(ctx, &p.req) cancel() @@ -156,7 +177,7 @@ func (gm *globalManager) runBroadcasts() { // Send the hits if we reached our batch limit if len(updates) >= gm.conf.GlobalBatchLimit { - metricBroadcastCounter.WithLabelValues("queue_full").Inc() + gm.metricBroadcastCounter.WithLabelValues("queue_full").Inc() gm.broadcastPeers(context.Background(), updates) updates = make(map[string]*RateLimitReq) return true @@ -170,11 +191,11 @@ func (gm *globalManager) runBroadcasts() { case <-interval.C: if len(updates) != 0 { - metricBroadcastCounter.WithLabelValues("timer").Inc() + gm.metricBroadcastCounter.WithLabelValues("timer").Inc() gm.broadcastPeers(context.Background(), updates) updates = make(map[string]*RateLimitReq) } else { - metricGlobalQueueLength.Set(0) + gm.metricGlobalQueueLength.Set(0) } case <-done: return false @@ -185,10 +206,10 @@ func (gm *globalManager) runBroadcasts() { // broadcastPeers broadcasts global rate limit statuses to all other peers func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]*RateLimitReq) { - defer prometheus.NewTimer(metricBroadcastDuration).ObserveDuration() + defer prometheus.NewTimer(gm.metricBroadcastDuration).ObserveDuration() var req UpdatePeerGlobalsReq - metricGlobalQueueLength.Set(float64(len(updates))) + gm.metricGlobalQueueLength.Set(float64(len(updates))) for _, r := range updates { // Copy the original since we are removing the GLOBAL behavior @@ -217,7 +238,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] continue } - ctx, cancel := ctxutil.WithTimeout(ctx, gm.conf.GlobalTimeout) + ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout) _, err := peer.UpdatePeerGlobals(ctx, &req) cancel() diff --git a/go.mod b/go.mod index 53d3075c..9ecd352c 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( go.opentelemetry.io/otel v1.16.0 go.opentelemetry.io/otel/sdk v1.16.0 go.opentelemetry.io/otel/trace v1.16.0 - golang.org/x/net v0.10.0 + golang.org/x/net v0.17.0 golang.org/x/time v0.3.0 google.golang.org/api v0.108.0 google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 @@ -90,9 +90,9 @@ require ( go.uber.org/zap v1.21.0 // indirect golang.org/x/mod v0.8.0 // indirect golang.org/x/oauth2 v0.6.0 // indirect - golang.org/x/sys v0.8.0 // indirect - golang.org/x/term v0.8.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/sys v0.13.0 // indirect + golang.org/x/term v0.13.0 // indirect + golang.org/x/text v0.13.0 // indirect golang.org/x/tools v0.6.0 // indirect google.golang.org/appengine v1.6.7 // indirect gopkg.in/inf.v0 v0.9.1 // indirect diff --git a/go.sum b/go.sum index c5b49c1c..e7180cd4 100644 --- a/go.sum +++ b/go.sum @@ -571,8 +571,8 @@ golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20211209124913-491a49abca63/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.10.0 h1:X2//UzNDwYmtCLn7To6G58Wr6f5ahEAQgKNzv9Y951M= -golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= +golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM= +golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -664,13 +664,13 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.8.0 h1:EBmGv8NaZBZTWvrbjNoL6HVt+IVy3QDQpJs7VRIw3tU= -golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= -golang.org/x/term v0.8.0 h1:n5xxQn2i3PC0yLAbjTpNT85q/Kgzcr2gIoX9OrJUols= -golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= +golang.org/x/term v0.13.0 h1:bb+I9cTfFazGW51MZqBVmZy7+JEJMouUHTUSKVQLBek= +golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -680,8 +680,8 @@ golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= diff --git a/gubernator.go b/gubernator.go index 9bb46807..59c26eca 100644 --- a/gubernator.go +++ b/gubernator.go @@ -23,7 +23,6 @@ import ( "sync" "github.com/mailgun/errors" - "github.com/mailgun/holster/v4/ctxutil" "github.com/mailgun/holster/v4/syncutil" "github.com/mailgun/holster/v4/tracing" "github.com/prometheus/client_golang/prometheus" @@ -55,7 +54,7 @@ type V1Instance struct { var ( metricGetRateLimitCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "gubernator_getratelimit_counter", - Help: "The count of getLocalRateLimit() calls. Label \"calltype\" may be \"local\" for calls handled by the same peer, \"forward\" for calls forwarded to another peer, or \"global\" for global rate limits.", + Help: "The count of getLocalRateLimit() calls. Label \"calltype\" may be \"local\" for calls handled by the same peer, or \"global\" for global rate limits.", }, []string{"calltype"}) metricFuncTimeDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{ Name: "gubernator_func_duration", @@ -103,26 +102,6 @@ var ( 0.99: 0.001, }, }, []string{"peerAddr"}) - - // Global behavior. - metricGlobalSendDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "gubernator_global_send_duration", - Help: "The duration of GLOBAL async sends in seconds.", - Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, - }) - metricBroadcastDuration = prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "gubernator_broadcast_duration", - Help: "The duration of GLOBAL broadcasts to peers in seconds.", - Objectives: map[float64]float64{0.5: 0.05, 0.99: 0.001}, - }) - metricBroadcastCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "gubernator_broadcast_counter", - Help: "The count of broadcasts.", - }, []string{"condition"}) - metricGlobalQueueLength = prometheus.NewGauge(prometheus.GaugeOpts{ - Name: "gubernator_global_queue_length", - Help: "The count of requests queued up for global broadcast. This is only used for GetRateLimit requests using global behavior.", - }) ) // NewV1Instance instantiate a single instance of a gubernator peer and register this @@ -198,15 +177,13 @@ func (s *V1Instance) Close() (err error) { // rate limit `Name` and `UniqueKey` is not owned by this instance, then we forward the request to the // peer that does. func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*GetRateLimitsResp, error) { - funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetRateLimits")) defer funcTimer.ObserveDuration() - metricConcurrentChecks.Inc() defer metricConcurrentChecks.Dec() if len(r.Requests) > maxBatchSize { - metricCheckErrorCounter.WithLabelValues("Request too large").Add(1) + metricCheckErrorCounter.WithLabelValues("Request too large").Inc() return nil, status.Errorf(codes.OutOfRange, "Requests.RateLimits list too large; max size is '%d'", maxBatchSize) } @@ -214,7 +191,6 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G resp := GetRateLimitsResp{ Responses: make([]*RateLimitResp, len(r.Requests)), } - var wg sync.WaitGroup asyncCh := make(chan AsyncResp, len(r.Requests)) @@ -225,13 +201,13 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G var err error if len(req.UniqueKey) == 0 { - metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1) + metricCheckErrorCounter.WithLabelValues("Invalid request").Inc() resp.Responses[i] = &RateLimitResp{Error: "field 'unique_key' cannot be empty"} continue } if len(req.Name) == 0 { - metricCheckErrorCounter.WithLabelValues("Invalid request").Add(1) + metricCheckErrorCounter.WithLabelValues("Invalid request").Inc() resp.Responses[i] = &RateLimitResp{Error: "field 'namespace' cannot be empty"} continue } @@ -263,10 +239,7 @@ func (s *V1Instance) GetRateLimits(ctx context.Context, r *GetRateLimitsReq) (*G // If our server instance is the owner of this rate limit if peer.Info().IsOwner { // Apply our rate limit algorithm to the request - metricGetRateLimitCounter.WithLabelValues("local").Add(1) - funcTimer1 := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit (local)")) resp.Responses[i], err = s.getLocalRateLimit(ctx, req) - funcTimer1.ObserveDuration() if err != nil { err = errors.Wrapf(err, "Error while apply rate limit for '%s'", key) span := trace.SpanFromContext(ctx) @@ -356,7 +329,6 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) { // If we are attempting again, the owner of this rate limit might have changed to us! if attempts != 0 { if req.Peer.Info().IsOwner { - metricGetRateLimitCounter.WithLabelValues("local").Add(1) resp.Resp, err = s.getLocalRateLimit(ctx, req.Req) if err != nil { s.log.WithContext(ctx). @@ -371,12 +343,11 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) { } // Make an RPC call to the peer that owns this rate limit - metricGetRateLimitCounter.WithLabelValues("forward").Add(1) r, err := req.Peer.GetPeerRateLimit(ctx, req.Req) if err != nil { if IsNotReady(err) { attempts++ - metricBatchSendRetries.WithLabelValues(req.Req.Name).Add(1) + metricBatchSendRetries.WithLabelValues(req.Req.Name).Inc() req.Peer, err = s.GetPeer(ctx, req.Key) if err != nil { errPart := fmt.Sprintf("Error finding peer that owns rate limit '%s'", req.Key) @@ -406,7 +377,7 @@ func (s *V1Instance) asyncRequest(ctx context.Context, req *AsyncReq) { req.WG.Done() if isDeadlineExceeded(ctx.Err()) { - metricCheckErrorCounter.WithLabelValues("Timeout forwarding to peer").Add(1) + metricCheckErrorCounter.WithLabelValues("Timeout forwarding to peer").Inc() } } @@ -417,20 +388,18 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) attribute.String("ratelimit.key", req.UniqueKey), attribute.String("ratelimit.name", req.Name), )) - defer func() { tracing.EndScope(ctx, err) }() - - funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getGlobalRateLimit")) - defer funcTimer.ObserveDuration() - // Queue the hit for async update after we have prepared our response. - // NOTE: The defer here avoids a race condition where we queue the req to - // be forwarded to the owning peer in a separate goroutine but simultaneously - // access and possibly copy the req in this method. - defer s.global.QueueHit(req) + defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getGlobalRateLimit")).ObserveDuration() + defer func() { + if err == nil { + s.global.QueueHit(req) + } + tracing.EndScope(ctx, err) + }() item, ok, err := s.workerPool.GetCacheItem(ctx, req.HashKey()) if err != nil { countError(err, "Error in workerPool.GetCacheItem") - return nil, errors.Wrap(err, "Error in workerPool.GetCacheItem") + return nil, errors.Wrap(err, "during in workerPool.GetCacheItem") } if ok { // Global rate limits are always stored as RateLimitResp regardless of algorithm @@ -446,12 +415,12 @@ func (s *V1Instance) getGlobalRateLimit(ctx context.Context, req *RateLimitReq) cpy.Behavior = Behavior_NO_BATCHING // Process the rate limit like we own it - metricGetRateLimitCounter.WithLabelValues("global").Add(1) resp, err = s.getLocalRateLimit(ctx, cpy) if err != nil { - return nil, errors.Wrap(err, "Error in getLocalRateLimit") + return nil, errors.Wrap(err, "during in getLocalRateLimit") } + metricGetRateLimitCounter.WithLabelValues("global").Inc() return resp, nil } @@ -478,7 +447,7 @@ func (s *V1Instance) UpdatePeerGlobals(ctx context.Context, r *UpdatePeerGlobals func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimitsReq) (resp *GetPeerRateLimitsResp, err error) { if len(r.Requests) > maxBatchSize { err := fmt.Errorf("'PeerRequest.rate_limits' list too large; max size is '%d'", maxBatchSize) - metricCheckErrorCounter.WithLabelValues("Request too large").Add(1) + metricCheckErrorCounter.WithLabelValues("Request too large").Inc() return nil, status.Error(codes.OutOfRange, err.Error()) } @@ -521,7 +490,7 @@ func (s *V1Instance) GetPeerRateLimits(ctx context.Context, r *GetPeerRateLimits // Return the error for this request err = errors.Wrap(err, "Error in getLocalRateLimit") rl = &RateLimitResp{Error: err.Error()} - // metricCheckErrorCounter is updated within getLocalRateLimit(). + // metricCheckErrorCounter is updated within getLocalRateLimit(), not in GetPeerRateLimits. } respChan <- respOut{rin.idx, rl} @@ -584,24 +553,30 @@ func (s *V1Instance) HealthCheck(ctx context.Context, r *HealthCheckReq) (health return health, nil } -func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (*RateLimitResp, error) { +func (s *V1Instance) getLocalRateLimit(ctx context.Context, r *RateLimitReq) (_ *RateLimitResp, err error) { ctx = tracing.StartNamedScope(ctx, "V1Instance.getLocalRateLimit", trace.WithAttributes( attribute.String("ratelimit.key", r.UniqueKey), attribute.String("ratelimit.name", r.Name), attribute.Int64("ratelimit.limit", r.Limit), attribute.Int64("ratelimit.hits", r.Hits), )) - + defer func() { tracing.EndScope(ctx, err) }() defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.getLocalRateLimit")).ObserveDuration() + resp, err := s.workerPool.GetRateLimit(ctx, r) + if err != nil { + return nil, errors.Wrap(err, "during workerPool.GetRateLimit") + } + + metricGetRateLimitCounter.WithLabelValues("local").Inc() + + // If global behavior and owning peer, broadcast update to all peers. + // Assuming that this peer does not own the ratelimit. if HasBehavior(r.Behavior, Behavior_GLOBAL) { s.global.QueueUpdate(r) } - resp, err := s.workerPool.GetRateLimit(ctx, r) - - tracing.EndScope(ctx, err) - return resp, err + return resp, nil } // SetPeers is called by the implementor to indicate the pool of peers has changed @@ -652,7 +627,7 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { s.log.WithField("peers", peerInfo).Debug("peers updated") // Shutdown any old peers we no longer need - ctx, cancel := ctxutil.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout) + ctx, cancel := context.WithTimeout(context.Background(), s.conf.Behaviors.BatchTimeout) defer cancel() var shutdownPeers []*PeerClient @@ -694,14 +669,10 @@ func (s *V1Instance) SetPeers(peerInfo []PeerInfo) { // GetPeer returns a peer client for the hash key provided func (s *V1Instance) GetPeer(ctx context.Context, key string) (p *PeerClient, err error) { - funcTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetPeer")) - defer funcTimer.ObserveDuration() - lockTimer := prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetPeer_RLock")) + defer prometheus.NewTimer(metricFuncTimeDuration.WithLabelValues("V1Instance.GetPeer")).ObserveDuration() s.peerMutex.RLock() defer s.peerMutex.RUnlock() - lockTimer.ObserveDuration() - p, err = s.conf.LocalPicker.Get(key) if err != nil { return nil, errors.Wrap(err, "Error in conf.LocalPicker.Get") @@ -727,17 +698,17 @@ func (s *V1Instance) Describe(ch chan<- *prometheus.Desc) { metricBatchQueueLength.Describe(ch) metricBatchSendDuration.Describe(ch) metricBatchSendRetries.Describe(ch) - metricBroadcastCounter.Describe(ch) - metricBroadcastDuration.Describe(ch) metricCheckErrorCounter.Describe(ch) metricCommandCounter.Describe(ch) metricConcurrentChecks.Describe(ch) metricFuncTimeDuration.Describe(ch) metricGetRateLimitCounter.Describe(ch) - metricGlobalQueueLength.Describe(ch) - metricGlobalSendDuration.Describe(ch) metricOverLimitCounter.Describe(ch) metricWorkerQueue.Describe(ch) + s.global.metricBroadcastCounter.Describe(ch) + s.global.metricBroadcastDuration.Describe(ch) + s.global.metricGlobalQueueLength.Describe(ch) + s.global.metricGlobalSendDuration.Describe(ch) } // Collect fetches metrics from the server for use by prometheus @@ -745,17 +716,17 @@ func (s *V1Instance) Collect(ch chan<- prometheus.Metric) { metricBatchQueueLength.Collect(ch) metricBatchSendDuration.Collect(ch) metricBatchSendRetries.Collect(ch) - metricBroadcastCounter.Collect(ch) - metricBroadcastDuration.Collect(ch) metricCheckErrorCounter.Collect(ch) metricCommandCounter.Collect(ch) metricConcurrentChecks.Collect(ch) metricFuncTimeDuration.Collect(ch) metricGetRateLimitCounter.Collect(ch) - metricGlobalQueueLength.Collect(ch) - metricGlobalSendDuration.Collect(ch) metricOverLimitCounter.Collect(ch) metricWorkerQueue.Collect(ch) + s.global.metricBroadcastCounter.Collect(ch) + s.global.metricBroadcastDuration.Collect(ch) + s.global.metricGlobalQueueLength.Collect(ch) + s.global.metricGlobalSendDuration.Collect(ch) } // HasBehavior returns true if the provided behavior is set @@ -778,11 +749,11 @@ func SetBehavior(b *Behavior, flag Behavior, set bool) { func countError(err error, defaultType string) { for { if err == nil { - metricCheckErrorCounter.WithLabelValues(defaultType).Add(1) + metricCheckErrorCounter.WithLabelValues(defaultType).Inc() return } if errors.Is(err, context.DeadlineExceeded) { - metricCheckErrorCounter.WithLabelValues("Timeout").Add(1) + metricCheckErrorCounter.WithLabelValues("Timeout").Inc() return } diff --git a/peer_client.go b/peer_client.go index ebb48a90..2cd72512 100644 --- a/peer_client.go +++ b/peer_client.go @@ -24,7 +24,6 @@ import ( "github.com/mailgun/holster/v4/clock" "github.com/mailgun/holster/v4/collections" - "github.com/mailgun/holster/v4/ctxutil" "github.com/mailgun/holster/v4/errors" "github.com/mailgun/holster/v4/tracing" "github.com/prometheus/client_golang/prometheus" @@ -430,7 +429,7 @@ func (c *PeerClient) sendBatch(ctx context.Context, queue []*request) { } - timeoutCtx, timeoutCancel := ctxutil.WithTimeout(ctx, c.conf.Behavior.BatchTimeout) + timeoutCtx, timeoutCancel := context.WithTimeout(ctx, c.conf.Behavior.BatchTimeout) resp, err := c.client.GetPeerRateLimits(timeoutCtx, &req) timeoutCancel() diff --git a/tls_test.go b/tls_test.go index 38b203fa..30a958ce 100644 --- a/tls_test.go +++ b/tls_test.go @@ -329,15 +329,15 @@ func TestHTTPSClientAuth(t *testing.T) { assert.Equal(t, `{"status":"healthy","message":"","peer_count":1}`, strings.ReplaceAll(string(b), " ", "")) // Verify we get an error when we try to access existing HTTPListenAddress without cert - resp2, err := clientWithoutCert.Do(reqCertRequired) - assert.Error(t, err) - defer resp2.Body.Close() + //nolint:bodyclose // Expect error, no body to close. + _, err = clientWithoutCert.Do(reqCertRequired) + require.Error(t, err) // Check that with a valid client cert we can access /v1/HealthCheck at existing HTTPListenAddress - resp3, err := clientWithCert.Do(reqCertRequired) + resp2, err := clientWithCert.Do(reqCertRequired) require.NoError(t, err) - defer resp3.Body.Close() - b, err = io.ReadAll(resp3.Body) + defer resp2.Body.Close() + b, err = io.ReadAll(resp2.Body) require.NoError(t, err) assert.Equal(t, `{"status":"healthy","message":"","peer_count":1}`, strings.ReplaceAll(string(b), " ", "")) }