Skip to content

Commit

Permalink
grpcrand: delete all of grpcrand and call the rand package directly (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
arvindbr8 authored May 31, 2024
1 parent 24e9024 commit 8bf2b3e
Show file tree
Hide file tree
Showing 30 changed files with 82 additions and 251 deletions.
4 changes: 2 additions & 2 deletions balancer/endpointsharding/endpointsharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
Expand Down Expand Up @@ -234,7 +234,7 @@ func (es *endpointSharding) updateState() {
p := &pickerWithChildStates{
pickers: pickers,
childStates: childStates,
next: uint32(grpcrand.Intn(len(pickers))),
next: uint32(rand.Intn(len(pickers))),
}
es.cc.UpdateState(balancer.State{
ConnectivityState: aggState,
Expand Down
6 changes: 3 additions & 3 deletions balancer/grpclb/grpclb_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
package grpclb

import (
"math/rand"
"sync"
"sync/atomic"

"google.golang.org/grpc/balancer"
lbpb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/status"
)

Expand Down Expand Up @@ -112,7 +112,7 @@ type rrPicker struct {
func newRRPicker(readySCs []balancer.SubConn) *rrPicker {
return &rrPicker{
subConns: readySCs,
subConnsNext: grpcrand.Intn(len(readySCs)),
subConnsNext: rand.Intn(len(readySCs)),
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ func newLBPicker(serverList []*lbpb.Server, readySCs []balancer.SubConn, stats *
return &lbPicker{
serverList: serverList,
subConns: readySCs,
subConnsNext: grpcrand.Intn(len(readySCs)),
subConnsNext: rand.Intn(len(readySCs)),
stats: stats,
}
}
Expand Down
12 changes: 6 additions & 6 deletions balancer/leastrequest/balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,13 @@ func checkRoundRobinRPCs(ctx context.Context, client testgrpc.TestServiceClient,
// deterministic, allowing the test to make assertions on the distribution.
func (s) TestLeastRequestE2E(t *testing.T) {
defer func(u func() uint32) {
grpcranduint32 = u
}(grpcranduint32)
randuint32 = u
}(randuint32)
var index int
indexes := []uint32{
0, 0, 1, 1, 2, 2, // Triggers a round robin distribution.
}
grpcranduint32 = func() uint32 {
randuint32 = func() uint32 {
ret := indexes[index%len(indexes)]
index++
return ret
Expand Down Expand Up @@ -310,13 +310,13 @@ func (s) TestLeastRequestE2E(t *testing.T) {
// previous. Any created streams should then be started on the new backend.
func (s) TestLeastRequestPersistsCounts(t *testing.T) {
defer func(u func() uint32) {
grpcranduint32 = u
}(grpcranduint32)
randuint32 = u
}(randuint32)
var index int
indexes := []uint32{
0, 0, 1, 1,
}
grpcranduint32 = func() uint32 {
randuint32 = func() uint32 {
ret := indexes[index%len(indexes)]
index++
return ret
Expand Down
8 changes: 4 additions & 4 deletions balancer/leastrequest/leastrequest.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,17 @@ package leastrequest
import (
"encoding/json"
"fmt"
"math/rand"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/serviceconfig"
)

// grpcranduint32 is a global to stub out in tests.
var grpcranduint32 = grpcrand.Uint32
// randuint32 is a global to stub out in tests.
var randuint32 = rand.Uint32

// Name is the name of the least request balancer.
const Name = "least_request_experimental"
Expand Down Expand Up @@ -157,7 +157,7 @@ func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
var pickedSC *scWithRPCCount
var pickedSCNumRPCs int32
for i := 0; i < int(p.choiceCount); i++ {
index := grpcranduint32() % uint32(len(p.subConns))
index := randuint32() % uint32(len(p.subConns))
sc := p.subConns[index]
n := sc.numRPCs.Load()
if pickedSC == nil || n < pickedSCNumRPCs {
Expand Down
14 changes: 11 additions & 3 deletions balancer/pickfirst/pickfirst.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,21 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)

func init() {
balancer.Register(pickfirstBuilder{})
internal.ShuffleAddressListForTesting = func(n int, swap func(i, j int)) { rand.Shuffle(n, swap) }
}

var logger = grpclog.Component("pick-first-lb")
Expand Down Expand Up @@ -101,6 +103,12 @@ func (b *pickfirstBalancer) ResolverError(err error) {
})
}

type Shuffler interface {
ShuffleAddressListForTesting(n int, swap func(i, j int))
}

func ShuffleAddressListForTesting(n int, swap func(i, j int)) { rand.Shuffle(n, swap) }

func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
if len(state.ResolverState.Addresses) == 0 && len(state.ResolverState.Endpoints) == 0 {
// The resolver reported an empty address list. Treat it like an error by
Expand Down Expand Up @@ -132,7 +140,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// within each endpoint. - A61
if cfg.ShuffleAddressList {
endpoints = append([]resolver.Endpoint{}, endpoints...)
grpcrand.Shuffle(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
internal.ShuffleAddressListForTesting.(func(int, func(int, int)))(len(endpoints), func(i, j int) { endpoints[i], endpoints[j] = endpoints[j], endpoints[i] })
}

// "Flatten the list by concatenating the ordered list of addresses for each
Expand All @@ -153,7 +161,7 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
addrs = state.ResolverState.Addresses
if cfg.ShuffleAddressList {
addrs = append([]resolver.Address{}, addrs...)
grpcrand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
rand.Shuffle(len(addrs), func(i, j int) { addrs[i], addrs[j] = addrs[j], addrs[i] })
}
}

Expand Down
5 changes: 2 additions & 3 deletions balancer/rls/internal/adaptive/adaptive.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,15 @@
package adaptive

import (
"math/rand"
"sync"
"time"

"google.golang.org/grpc/internal/grpcrand"
)

// For overriding in unittests.
var (
timeNowFunc = func() time.Time { return time.Now() }
randFunc = func() float64 { return grpcrand.Float64() }
randFunc = func() float64 { return rand.Float64() }
)

const (
Expand Down
4 changes: 2 additions & 2 deletions balancer/roundrobin/roundrobin.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
package roundrobin

import (
"math/rand"
"sync/atomic"

"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/grpcrand"
)

// Name is the name of round_robin balancer.
Expand Down Expand Up @@ -60,7 +60,7 @@ func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
// Start at a random index, as the same RR balancer rebuilds a new
// picker when SubConn states change, and we don't want to apply excess
// load to the first server in the list.
next: uint32(grpcrand.Intn(len(scs))),
next: uint32(rand.Intn(len(scs))),
}
}

Expand Down
4 changes: 2 additions & 2 deletions balancer/weightedroundrobin/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"time"
Expand All @@ -33,7 +34,6 @@ import (
"google.golang.org/grpc/balancer/weightedroundrobin/internal"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcrand"
iserviceconfig "google.golang.org/grpc/internal/serviceconfig"
"google.golang.org/grpc/orca"
"google.golang.org/grpc/resolver"
Expand Down Expand Up @@ -318,7 +318,7 @@ func (b *wrrBalancer) regeneratePicker() {
}

p := &picker{
v: grpcrand.Uint32(), // start the scheduler at a random point
v: rand.Uint32(), // start the scheduler at a random point
cfg: b.cfg,
subConns: b.readySubConns(),
}
Expand Down
10 changes: 5 additions & 5 deletions benchmark/worker/benchmark_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"context"
"flag"
"math"
"math/rand"
"runtime"
"sync"
"time"
Expand All @@ -32,7 +33,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/syscall"
"google.golang.org/grpc/status"
"google.golang.org/grpc/testdata"
Expand Down Expand Up @@ -298,7 +298,7 @@ func (bc *benchmarkClient) unaryLoop(conns []*grpc.ClientConn, rpcCountPerConn i
}
}
} else { // Open loop.
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / *poissonLambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonUnary(client, idx, reqSize, respSize, *poissonLambda)
})
Expand Down Expand Up @@ -348,7 +348,7 @@ func (bc *benchmarkClient) streamingLoop(conns []*grpc.ClientConn, rpcCountPerCo
}
}(idx)
} else { // Open loop.
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / *poissonLambda) * float64(time.Second))
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / *poissonLambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonStreaming(stream, idx, reqSize, respSize, *poissonLambda, doRPC)
})
Expand All @@ -366,7 +366,7 @@ func (bc *benchmarkClient) poissonUnary(client testgrpc.BenchmarkServiceClient,
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
}()
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / lambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonUnary(client, idx, reqSize, respSize, lambda)
})
Expand All @@ -381,7 +381,7 @@ func (bc *benchmarkClient) poissonStreaming(stream testgrpc.BenchmarkService_Str
elapse := time.Since(start)
bc.lockingHistograms[idx].add(int64(elapse))
}()
timeBetweenRPCs := time.Duration((grpcrand.ExpFloat64() / lambda) * float64(time.Second))
timeBetweenRPCs := time.Duration((rand.ExpFloat64() / lambda) * float64(time.Second))
time.AfterFunc(timeBetweenRPCs, func() {
bc.poissonStreaming(stream, idx, reqSize, respSize, lambda, doRPC)
})
Expand Down
4 changes: 2 additions & 2 deletions examples/features/debugging/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ package main
import (
"context"
"log"
"math/rand"
"net"
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/channelz/service"
"google.golang.org/grpc/internal/grpcrand"

pb "google.golang.org/grpc/examples/helloworld/helloworld"
)
Expand All @@ -54,7 +54,7 @@ type slowServer struct {
// SayHello implements helloworld.GreeterServer
func (s *slowServer) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
// Delay 100ms ~ 200ms before replying
time.Sleep(time.Duration(100+grpcrand.Intn(100)) * time.Millisecond)
time.Sleep(time.Duration(100+rand.Intn(100)) * time.Millisecond)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/backoff/backoff.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ package backoff
import (
"context"
"errors"
"math/rand"
"time"

grpcbackoff "google.golang.org/grpc/backoff"
"google.golang.org/grpc/internal/grpcrand"
)

// Strategy defines the methodology for backing off after a grpc connection
Expand Down Expand Up @@ -67,7 +67,7 @@ func (bc Exponential) Backoff(retries int) time.Duration {
}
// Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep.
backoff *= 1 + bc.Config.Jitter*(grpcrand.Float64()*2-1)
backoff *= 1 + bc.Config.Jitter*(rand.Float64()*2-1)
if backoff < 0 {
return 0
}
Expand Down
Loading

0 comments on commit 8bf2b3e

Please sign in to comment.