diff --git a/config.go b/config.go index 0b4ef64a..03f1d357 100644 --- a/config.go +++ b/config.go @@ -64,6 +64,9 @@ type BehaviorConfig struct { GlobalBatchLimit int // ForceGlobal forces global mode on all rate limit checks. ForceGlobal bool + + // Number of concurrent requests that will be made to peers. Defaults to 100 + GlobalPeerRequestsConcurrency int } // Config for a gubernator instance @@ -126,6 +129,8 @@ func (c *Config) SetDefaults() error { setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize) setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*100) + setter.SetDefault(&c.Behaviors.GlobalPeerRequestsConcurrency, 100) + setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas)) setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil)) diff --git a/global.go b/global.go index 4090537f..78431960 100644 --- a/global.go +++ b/global.go @@ -151,18 +151,23 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) { } } + fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency) // Send the rate limit requests to their respective owning peers. for _, p := range peerRequests { - ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout) - _, err := p.client.GetPeerRateLimits(ctx, &p.req) - cancel() - - if err != nil { - gm.log.WithError(err). - Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress) - continue - } + fan.Run(func(in interface{}) error { + p := in.(*pair) + ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout) + _, err := p.client.GetPeerRateLimits(ctx, &p.req) + cancel() + + if err != nil { + gm.log.WithError(err). + Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress) + } + return nil + }, p) } + fan.Wait() } // runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster. @@ -232,24 +237,29 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string] }) } + fan := syncutil.NewFanOut(gm.conf.GlobalPeerRequestsConcurrency) for _, peer := range gm.instance.GetPeerList() { // Exclude ourselves from the update if peer.Info().IsOwner { continue } - ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout) - _, err := peer.UpdatePeerGlobals(ctx, &req) - cancel() - - if err != nil { - // Skip peers that are not in a ready state - if !IsNotReady(err) { - gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress) + fan.Run(func(in interface{}) error { + peer := in.(*PeerClient) + ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout) + _, err := peer.UpdatePeerGlobals(ctx, &req) + cancel() + + if err != nil { + // Skip peers that are not in a ready state + if !IsNotReady(err) { + gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress) + } } - continue - } + return nil + }, peer) } + fan.Wait() } func (gm *globalManager) Close() {