Skip to content
This repository has been archived by the owner on Apr 19, 2024. It is now read-only.

perf: Fanout for async updates in global #198

Merged
merged 5 commits into from
Oct 30, 2023
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 27 additions & 19 deletions global.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,22 @@ func (gm *globalManager) sendHits(hits map[string]*RateLimitReq) {
}
}

var wg syncutil.WaitGroup
// Send the rate limit requests to their respective owning peers.
for _, p := range peerRequests {
ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
_, err := p.client.GetPeerRateLimits(ctx, &p.req)
cancel()

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
continue
}
p := p
wg.Go(func() {
miparnisari marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithTimeout(context.Background(), gm.conf.GlobalTimeout)
_, err := p.client.GetPeerRateLimits(ctx, &p.req)
cancel()

if err != nil {
gm.log.WithError(err).
Errorf("error sending global hits to '%s'", p.client.Info().GRPCAddress)
}
})
}
wg.Wait()
}

// runBroadcasts collects status changes for global rate limits and broadcasts the changes to each peer in the cluster.
Expand Down Expand Up @@ -232,24 +236,28 @@ func (gm *globalManager) broadcastPeers(ctx context.Context, updates map[string]
})
}

var wg syncutil.WaitGroup
for _, peer := range gm.instance.GetPeerList() {
// Exclude ourselves from the update
if peer.Info().IsOwner {
continue
}

ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
peer := peer
wg.Go(func() {
ctx, cancel := context.WithTimeout(ctx, gm.conf.GlobalTimeout)
_, err := peer.UpdatePeerGlobals(ctx, &req)
cancel()

if err != nil {
// Skip peers that are not in a ready state
if !IsNotReady(err) {
gm.log.WithError(err).Errorf("while broadcasting global updates to '%s'", peer.Info().GRPCAddress)
}
}
continue
}
})
}
wg.Wait()
}

func (gm *globalManager) Close() {
Expand Down