From 83159c7ad123f0d4a23f307b4e0d3d7d6146227e Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Thu, 26 Oct 2023 11:20:28 -0700 Subject: [PATCH 1/5] perf: Fanout for async updates in global --- global.go | 42 ++++++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 18 deletions(-) diff --git a/global.go b/global.go index 4090537f..bd2c2b96 100644 --- a/global.go +++ b/global.go @@ -151,18 +151,21 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { } } + var wg syncutil.WaitGroup // Send the rate limit requests to their respective owning peers. for _, p := range peerRequests { - ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout) - _, err := p.client.GetPeerRateLimits(ctx, &p.req) - cancel() - - if err != nil { - gm.log.WithError(err). - Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress) - continue - } + wg.Go(func() { + ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout) + _, err := p.client.GetPeerRateLimits(ctx, &p.req) + cancel() + + if err != nil { + gm.log.WithError(err). + Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress) + } + }) } + wg.Wait() } // runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster. @@ -232,24 +235,27 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] }) } + var wg syncutil.WaitGroup for _, peer := range gm.instance.GetPeerList() { // Exclude ourselves from the update if peer.Info().IsOwner { continue } - ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout) - _, err := peer.UpdatePeerGlobals(ctx, &req) - cancel() + wg.Go(func() { + ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout) + _, err := peer.UpdatePeerGlobals(ctx, &req) + cancel() - if err != nil { - // Skip peers that are not in a ready state - if !IsNotReady(err) { - gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress) + if err != nil { + // Skip peers that are not in a ready state + if !IsNotReady(err) { + gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress) + } } - continue - } + }) } + wg.Wait() } func (gm *globalManager) Close() { From 63b72156b704938f733007ca98a4f8e0828908f7 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Thu, 26 Oct 2023 11:48:13 -0700 Subject: [PATCH 2/5] fix: data race --- global.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/global.go b/global.go index bd2c2b96..7a91387b 100644 --- a/global.go +++ b/global.go @@ -154,6 +154,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { var wg syncutil.WaitGroup // Send the rate limit requests to their respective owning peers. for _, p := range peerRequests { + p := p wg.Go(func() { ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout) _, err := p.client.GetPeerRateLimits(ctx, &p.req) @@ -242,6 +243,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] continue } + peer := peer wg.Go(func() { ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout) _, err := peer.UpdatePeerGlobals(ctx, &req) From 580d02fe268cb9767587be23628474faa0ec2bee Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Fri, 27 Oct 2023 09:25:33 -0700 Subject: [PATCH 3/5] use fanout --- global.go | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/global.go b/global.go index 7a91387b..83466280 100644 --- a/global.go +++ b/global.go @@ -151,11 +151,11 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { } } - var wg syncutil.WaitGroup + fan := syncutil.NewFanOut(100) // Send the rate limit requests to their respective owning peers. for _, p := range peerRequests { - p := p - wg.Go(func() { + fan.Run(func(in interface{}) error { + p := in.(*pair) ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout) _, err := p.client.GetPeerRateLimits(ctx, &p.req) cancel() @@ -164,9 +164,10 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { gm.log.WithError(err). Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress) } - }) + return nil + }, p) } - wg.Wait() + fan.Wait() } // runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster. @@ -236,15 +237,15 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] }) } - var wg syncutil.WaitGroup + fan := syncutil.NewFanOut(100) for _, peer := range gm.instance.GetPeerList() { // Exclude ourselves from the update if peer.Info().IsOwner { continue } - peer := peer - wg.Go(func() { + fan.Run(func(in interface{}) error { + peer := in.(*PeerClient) ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout) _, err := peer.UpdatePeerGlobals(ctx, &req) cancel() @@ -255,9 +256,10 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress) } } - }) + return nil + }, peer) } - wg.Wait() + fan.Wait() } func (gm *globalManager) Close() { From b728fe4157538941c1743ead0cfaddd9614efe49 Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Fri, 27 Oct 2023 14:20:33 -0700 Subject: [PATCH 4/5] make it configurable --- config.go | 5 +++++ global.go | 4 ++-- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/config.go b/config.go index 0b4ef64a..ceb3e7c4 100644 --- a/config.go +++ b/config.go @@ -64,6 +64,9 @@ type BehaviorConfig struct { GlobalBatchLimit int // ForceGlobal forces global mode on all rate limit checks. ForceGlobal bool + + // Number of concurrent requests that will be made to peers. Defaults to 100 + PeerRequestsConcurrency int } // Config for a gubernator instance @@ -126,6 +129,8 @@ func (c *Config) SetDefaults() error { setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize) setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*100) + setter.SetDefault(&c.Behaviors.PeerRequestsConcurrency, 100) + setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas)) setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil)) diff --git a/global.go b/global.go index 83466280..39952be8 100644 --- a/global.go +++ b/global.go @@ -151,7 +151,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { } } - fan := syncutil.NewFanOut(100) + fan := syncutil.NewFanOut(gm.conf.PeerRequestsConcurrency) // Send the rate limit requests to their respective owning peers. for _, p := range peerRequests { fan.Run(func(in interface{}) error { @@ -237,7 +237,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] }) } - fan := syncutil.NewFanOut(100) + fan := syncutil.NewFanOut(gm.conf.PeerRequestsConcurrency) for _, peer := range gm.instance.GetPeerList() { // Exclude ourselves from the update if peer.Info().IsOwner { From fb4c862be10c77e427ca901c0910c76715f529ff Mon Sep 17 00:00:00 2001 From: Maria Ines Parnisari Date: Mon, 30 Oct 2023 09:26:56 -0700 Subject: [PATCH 5/5] rename config --- config.go | 4 ++-- global.go | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/config.go b/config.go index ceb3e7c4..03f1d357 100644 --- a/config.go +++ b/config.go @@ -66,7 +66,7 @@ type BehaviorConfig struct { ForceGlobal bool // Number of concurrent requests that will be made to peers. Defaults to 100 - PeerRequestsConcurrency int + GlobalPeerRequestsConcurrency int } // Config for a gubernator instance @@ -129,7 +129,7 @@ func (c *Config) SetDefaults() error { setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize) setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*100) - setter.SetDefault(&c.Behaviors.PeerRequestsConcurrency, 100) + setter.SetDefault(&c.Behaviors.GlobalPeerRequestsConcurrency, 100) setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas)) setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil)) diff --git a/global.go b/global.go index 39952be8..78431960 100644 --- a/global.go +++ b/global.go @@ -151,7 +151,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { } } - fan := syncutil.NewFanOut(gm.conf.PeerRequestsConcurrency) + fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency) // Send the rate limit requests to their respective owning peers. for _, p := range peerRequests { fan.Run(func(in interface{}) error { @@ -237,7 +237,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] }) } - fan := syncutil.NewFanOut(gm.conf.PeerRequestsConcurrency) + fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency) for _, peer := range gm.instance.GetPeerList() { // Exclude ourselves from the update if peer.Info().IsOwner {