Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

perf: Fanout for async updates in global #198

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ type BehaviorConfig struct {
GlobalBatchLimit int
// ForceGlobal forces global mode on all rate limit checks.
ForceGlobal bool

// Number of concurrent requests that will be made to peers. Defaults to 100
PeerRequestsConcurrency int
miparnisari marked this conversation as resolved.
Show resolved Hide resolved
}

// Config for a gubernator instance
Expand Down Expand Up @@ -126,6 +129,8 @@ func (c *Config) SetDefaults() error {
setter.SetDefault(&c.Behaviors.GlobalBatchLimit, maxBatchSize)
setter.SetDefault(&c.Behaviors.GlobalSyncWait, time.Millisecond*100)

setter.SetDefault(&c.Behaviors.PeerRequestsConcurrency, 100)

setter.SetDefault(&c.LocalPicker, NewReplicatedConsistentHash(nil, defaultReplicas))
setter.SetDefault(&c.RegionPicker, NewRegionPicker(nil))

Expand Down
4 changes: 2 additions & 2 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
}
}

fan := syncutil.NewFanOut(100)
fan := syncutil.NewFanOut(gm.conf.PeerRequestsConcurrency)
// Send the rate limit requests to their respective owning peers.
for _, p := range peerRequests {
fan.Run(func(in interface{}) error {
Expand Down Expand Up @@ -237,7 +237,7 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
})
}

fan := syncutil.NewFanOut(100)
fan := syncutil.NewFanOut(gm.conf.PeerRequestsConcurrency)
for _, peer := range gm.instance.GetPeerList() {
// Exclude ourselves from the update
if peer.Info().IsOwner {
Expand Down