Skip to content

Commit

Permalink
benchmark: Add sleepBetweenRPCs and connections parameters (#6299)
Browse files Browse the repository at this point in the history
  • Loading branch information
s-matyukevich authored Jun 6, 2023
1 parent 81c513a commit 1b66663
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 86 deletions.
217 changes: 134 additions & 83 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"fmt"
"io"
"log"
"math/rand"
"net"
"os"
"reflect"
Expand Down Expand Up @@ -109,6 +110,8 @@ var (
clientWriteBufferSize = flags.IntSlice("clientWriteBufferSize", []int{-1}, "Configures the client write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverReadBufferSize = flags.IntSlice("serverReadBufferSize", []int{-1}, "Configures the server read buffer size in bytes. If negative, use the default - may be a a comma-separated list")
serverWriteBufferSize = flags.IntSlice("serverWriteBufferSize", []int{-1}, "Configures the server write buffer size in bytes. If negative, use the default - may be a a comma-separated list")
sleepBetweenRPCs = flags.DurationSlice("sleepBetweenRPCs", []time.Duration{0}, "Configures the maximum amount of time the client should sleep between consecutive RPCs - may be a a comma-separated list")
connections = flag.Int("connections", 1, "The number of connections. Each connection will handle maxConcurrentCalls RPC streams")

logger = grpclog.Component("benchmark")
)
Expand Down Expand Up @@ -194,9 +197,9 @@ func runModesFromWorkloads(workload string) runModes {
type startFunc func(mode string, bf stats.Features)
type stopFunc func(count uint64)
type ucStopFunc func(req uint64, resp uint64)
type rpcCallFunc func(pos int)
type rpcSendFunc func(pos int)
type rpcRecvFunc func(pos int)
type rpcCallFunc func(cn, pos int)
type rpcSendFunc func(cn, pos int)
type rpcRecvFunc func(cn, pos int)
type rpcCleanupFunc func()

func unaryBenchmark(start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats) {
Expand Down Expand Up @@ -233,40 +236,46 @@ func unconstrainedStreamBenchmark(start startFunc, stop ucStopFunc, bf stats.Fea

bmEnd := time.Now().Add(bf.BenchTime + warmuptime)
var wg sync.WaitGroup
wg.Add(2 * bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
wg.Add(2 * bf.Connections * bf.MaxConcurrentCalls)
maxSleep := int(bf.SleepBetweenRPCs)
for cn := 0; cn < bf.Connections; cn++ {
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
go func(cn, pos int) {
defer wg.Done()
for {
if maxSleep > 0 {
time.Sleep(time.Duration(rand.Intn(maxSleep)))
}
t := time.Now()
if t.After(bmEnd) {
return
}
sender(cn, pos)
atomic.AddUint64(&req, 1)
}
sender(pos)
atomic.AddUint64(&req, 1)
}
}(i)
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
}(cn, pos)
go func(cn, pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
}
recver(cn, pos)
atomic.AddUint64(&resp, 1)
}
recver(pos)
atomic.AddUint64(&resp, 1)
}
}(i)
}(cn, pos)
}
}
wg.Wait()
stop(req, resp)
}

// makeClient returns a gRPC client for the grpc.testing.BenchmarkService
// makeClients returns a gRPC client (or multiple clients) for the grpc.testing.BenchmarkService
// service. The client is configured using the different options in the passed
// 'bf'. Also returns a cleanup function to close the client and release
// resources.
func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
func makeClients(bf stats.Features) ([]testpb.BenchmarkServiceClient, func()) {
nw := &latency.Network{Kbps: bf.Kbps, Latency: bf.Latency, MTU: bf.MTU}
opts := []grpc.DialOption{}
sopts := []grpc.ServerOption{}
Expand Down Expand Up @@ -346,16 +355,24 @@ func makeClient(bf stats.Features) (testgrpc.BenchmarkServiceClient, func()) {
}
lis = nw.Listener(lis)
stopper := bm.StartServer(bm.ServerInfo{Type: "protobuf", Listener: lis}, sopts...)
conn := bm.NewClientConn("" /* target not used */, opts...)
return testgrpc.NewBenchmarkServiceClient(conn), func() {
conn.Close()
conns := make([]*grpc.ClientConn, bf.Connections)
clients := make([]testpb.BenchmarkServiceClient, bf.Connections)
for cn := 0; cn < bf.Connections; cn++ {
conns[cn] = bm.NewClientConn("" /* target not used */, opts...)
clients[cn] = testgrpc.NewBenchmarkServiceClient(conns[cn])
}

return clients, func() {
for _, conn := range conns {
conn.Close()
}
stopper()
}
}

func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
return func(int) {
clients, cleanup := makeClients(bf)
return func(cn, pos int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
if bf.ReqPayloadCurve != nil {
Expand All @@ -364,23 +381,28 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
unaryCaller(tc, reqSizeBytes, respSizeBytes)
unaryCaller(clients[cn], reqSizeBytes, respSizeBytes)
}, cleanup
}

func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
clients, cleanup := makeClients(bf)

streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(context.Background())
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
for cn := 0; cn < bf.Connections; cn++ {
tc := clients[cn]
streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {

stream, err := tc.StreamingCall(context.Background())
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
streams[cn][pos] = stream
}
streams[i] = stream
}

return func(pos int) {
return func(cn, pos int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
if bf.ReqPayloadCurve != nil {
Expand All @@ -389,51 +411,59 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
streamCaller(streams[cn][pos], reqSizeBytes, respSizeBytes)
}, cleanup
}

func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(bf)

preparedMsg := make([]*grpc.PreparedMsg, len(streams))
for i, stream := range streams {
preparedMsg[i] = &grpc.PreparedMsg{}
err := preparedMsg[i].Encode(stream, req)
if err != nil {
logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[i], req, stream, err)
preparedMsg := make([][]*grpc.PreparedMsg, len(streams))
for cn, connStreams := range streams {
preparedMsg[cn] = make([]*grpc.PreparedMsg, len(connStreams))
for pos, stream := range connStreams {
preparedMsg[cn][pos] = &grpc.PreparedMsg{}
err := preparedMsg[cn][pos].Encode(stream, req)
if err != nil {
logger.Fatalf("%v.Encode(%v, %v) = %v", preparedMsg[cn][pos], req, stream, err)
}
}
}

return func(pos int) {
streams[pos].SendMsg(preparedMsg[pos])
}, func(pos int) {
streams[pos].Recv()
return func(cn, pos int) {
streams[cn][pos].SendMsg(preparedMsg[cn][pos])
}, func(cn, pos int) {
streams[cn][pos].Recv()
}, cleanup
}

func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, rpcCleanupFunc) {
streams, req, cleanup := setupUnconstrainedStream(bf)

return func(pos int) {
streams[pos].Send(req)
}, func(pos int) {
streams[pos].Recv()
return func(cn, pos int) {
streams[cn][pos].Send(req)
}, func(cn, pos int) {
streams[cn][pos].Recv()
}, cleanup
}

func setupUnconstrainedStream(bf stats.Features) ([]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
tc, cleanup := makeClient(bf)
func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_StreamingCallClient, *testpb.SimpleRequest, rpcCleanupFunc) {
clients, cleanup := makeClients(bf)

streams := make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1")
streams := make([][]testgrpc.BenchmarkService_StreamingCallClient, bf.Connections)
md := metadata.Pairs(benchmark.UnconstrainedStreamingHeader, "1",
benchmark.UnconstrainedStreamingDelayHeader, bf.SleepBetweenRPCs.String())
ctx := metadata.NewOutgoingContext(context.Background(), md)
for i := 0; i < bf.MaxConcurrentCalls; i++ {
stream, err := tc.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
for cn := 0; cn < bf.Connections; cn++ {
tc := clients[cn]
streams[cn] = make([]testgrpc.BenchmarkService_StreamingCallClient, bf.MaxConcurrentCalls)
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
stream, err := tc.StreamingCall(ctx)
if err != nil {
logger.Fatalf("%v.StreamingCall(_) = _, %v", tc, err)
}
streams[cn][pos] = stream
}
streams[i] = stream
}

pl := bm.NewPayload(testpb.PayloadType_COMPRESSABLE, bf.ReqSizeBytes)
Expand Down Expand Up @@ -461,32 +491,45 @@ func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize,
}

func runBenchmark(caller rpcCallFunc, start startFunc, stop stopFunc, bf stats.Features, s *stats.Stats, mode string) {
// Warm up connection.
for i := 0; i < warmupCallCount; i++ {
caller(0)
// if SleepBetweenRPCs > 0 we skip the warmup because otherwise
// we are going to send a set of simultaneous requests on every connection,
// which is something we are trying to avoid when using SleepBetweenRPCs.
if bf.SleepBetweenRPCs == 0 {
// Warm up connections.
for i := 0; i < warmupCallCount; i++ {
for cn := 0; cn < bf.Connections; cn++ {
caller(cn, 0)
}
}
}

// Run benchmark.
start(mode, bf)
var wg sync.WaitGroup
wg.Add(bf.MaxConcurrentCalls)
wg.Add(bf.Connections * bf.MaxConcurrentCalls)
bmEnd := time.Now().Add(bf.BenchTime)
maxSleep := int(bf.SleepBetweenRPCs)
var count uint64
for i := 0; i < bf.MaxConcurrentCalls; i++ {
go func(pos int) {
defer wg.Done()
for {
t := time.Now()
if t.After(bmEnd) {
return
for cn := 0; cn < bf.Connections; cn++ {
for pos := 0; pos < bf.MaxConcurrentCalls; pos++ {
go func(cn, pos int) {
defer wg.Done()
for {
if maxSleep > 0 {
time.Sleep(time.Duration(rand.Intn(maxSleep)))
}
t := time.Now()
if t.After(bmEnd) {
return
}
start := time.Now()
caller(cn, pos)
elapse := time.Since(start)
atomic.AddUint64(&count, 1)
s.AddDuration(elapse)
}
start := time.Now()
caller(pos)
elapse := time.Since(start)
atomic.AddUint64(&count, 1)
s.AddDuration(elapse)
}
}(i)
}(cn, pos)
}
}
wg.Wait()
stop(count)
Expand All @@ -504,6 +547,7 @@ type benchOpts struct {
benchmarkResultFile string
useBufconn bool
enableKeepalive bool
connections int
features *featureOpts
}

Expand All @@ -528,6 +572,7 @@ type featureOpts struct {
clientWriteBufferSize []int
serverReadBufferSize []int
serverWriteBufferSize []int
sleepBetweenRPCs []time.Duration
}

// makeFeaturesNum returns a slice of ints of size 'maxFeatureIndex' where each
Expand Down Expand Up @@ -572,6 +617,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.serverReadBufferSize)
case stats.ServerWriteBufferSize:
featuresNum[i] = len(b.features.serverWriteBufferSize)
case stats.SleepBetweenRPCs:
featuresNum[i] = len(b.features.sleepBetweenRPCs)
default:
log.Fatalf("Unknown feature index %v in generateFeatures. maxFeatureIndex is %v", i, stats.MaxFeatureIndex)
}
Expand Down Expand Up @@ -625,6 +672,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
UseBufConn: b.useBufconn,
EnableKeepalive: b.enableKeepalive,
BenchTime: b.benchTime,
Connections: b.connections,
// These features can potentially change for each iteration.
EnableTrace: b.features.enableTrace[curPos[stats.EnableTraceIndex]],
Latency: b.features.readLatencies[curPos[stats.ReadLatenciesIndex]],
Expand All @@ -638,6 +686,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
ClientWriteBufferSize: b.features.clientWriteBufferSize[curPos[stats.ClientWriteBufferSize]],
ServerReadBufferSize: b.features.serverReadBufferSize[curPos[stats.ServerReadBufferSize]],
ServerWriteBufferSize: b.features.serverWriteBufferSize[curPos[stats.ServerWriteBufferSize]],
SleepBetweenRPCs: b.features.sleepBetweenRPCs[curPos[stats.SleepBetweenRPCs]],
}
if len(b.features.reqPayloadCurves) == 0 {
f.ReqSizeBytes = b.features.reqSizeBytes[curPos[stats.ReqSizeBytesIndex]]
Expand Down Expand Up @@ -693,6 +742,7 @@ func processFlags() *benchOpts {
benchmarkResultFile: *benchmarkResultFile,
useBufconn: *useBufconn,
enableKeepalive: *enableKeepalive,
connections: *connections,
features: &featureOpts{
enableTrace: setToggleMode(*traceMode),
readLatencies: append([]time.Duration(nil), *readLatency...),
Expand All @@ -708,6 +758,7 @@ func processFlags() *benchOpts {
clientWriteBufferSize: append([]int(nil), *clientWriteBufferSize...),
serverReadBufferSize: append([]int(nil), *serverReadBufferSize...),
serverWriteBufferSize: append([]int(nil), *serverWriteBufferSize...),
sleepBetweenRPCs: append([]time.Duration(nil), *sleepBetweenRPCs...),
},
}

Expand Down
Loading

0 comments on commit 1b66663

Please sign in to comment.