Skip to content

Commit

Permalink
benchmark: add a feature for the stream count
Browse files Browse the repository at this point in the history
  • Loading branch information
hueypark committed Dec 18, 2022
1 parent 54b7d03 commit 2a8eb29
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 26 deletions.
13 changes: 10 additions & 3 deletions benchmark/benchmain/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ var (
readRespSizeBytes = flags.IntSlice("respSizeBytes", nil, "Response size in bytes - may be a comma-separated list")
reqPayloadCurveFiles = flags.StringSlice("reqPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of request payload sizes")
respPayloadCurveFiles = flags.StringSlice("respPayloadCurveFiles", nil, "comma-separated list of CSV files describing the shape a random distribution of response payload sizes")
streamCount = flags.IntSlice("streamCounts", nil, "Stream counts in a single stream - may be a comma-separated list")
benchTime = flag.Duration("benchtime", time.Second, "Configures the amount of time to run each benchmark")
memProfile = flag.String("memProfile", "", "Enables memory profiling output to the filename provided.")
memProfileRate = flag.Int("memProfileRate", 512*1024, "Configures the memory profiling rate. \n"+
Expand Down Expand Up @@ -384,13 +385,14 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
return func(pos int) {
reqSizeBytes := bf.ReqSizeBytes
respSizeBytes := bf.RespSizeBytes
streamCount := bf.StreamCount
if bf.ReqPayloadCurve != nil {
reqSizeBytes = bf.ReqPayloadCurve.ChooseRandom()
}
if bf.RespPayloadCurve != nil {
respSizeBytes = bf.RespPayloadCurve.ChooseRandom()
}
streamCaller(streams[pos], reqSizeBytes, respSizeBytes)
streamCaller(streams[pos], reqSizeBytes, respSizeBytes, streamCount)
}, cleanup
}

Expand Down Expand Up @@ -455,8 +457,8 @@ func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int)
}
}

func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) {
if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize); err != nil {
func streamCaller(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize, streamCount int) {
if err := bm.DoStreamingRoundTrip(stream, reqSize, respSize, streamCount); err != nil {
logger.Fatalf("DoStreamingRoundTrip failed: %v", err)
}
}
Expand Down Expand Up @@ -522,6 +524,7 @@ type featureOpts struct {
respSizeBytes []int
reqPayloadCurves []*stats.PayloadCurve
respPayloadCurves []*stats.PayloadCurve
streamCounts []int
compModes []string
enableChannelz []bool
enablePreloader []bool
Expand Down Expand Up @@ -559,6 +562,8 @@ func makeFeaturesNum(b *benchOpts) []int {
featuresNum[i] = len(b.features.reqPayloadCurves)
case stats.RespPayloadCurveIndex:
featuresNum[i] = len(b.features.respPayloadCurves)
case stats.StreamCountsIndex:
featuresNum[i] = len(b.features.streamCounts)
case stats.CompModesIndex:
featuresNum[i] = len(b.features.compModes)
case stats.EnableChannelzIndex:
Expand Down Expand Up @@ -650,6 +655,7 @@ func (b *benchOpts) generateFeatures(featuresNum []int) []stats.Features {
} else {
f.RespPayloadCurve = b.features.respPayloadCurves[curPos[stats.RespPayloadCurveIndex]]
}
f.StreamCount = b.features.streamCounts[curPos[stats.StreamCountsIndex]]
result = append(result, f)
addOne(curPos, featuresNum)
}
Expand Down Expand Up @@ -702,6 +708,7 @@ func processFlags() *benchOpts {
maxConcurrentCalls: append([]int(nil), *maxConcurrentCalls...),
reqSizeBytes: append([]int(nil), *readReqSizeBytes...),
respSizeBytes: append([]int(nil), *readRespSizeBytes...),
streamCounts: append([]int(nil), *streamCount...),
compModes: setCompressorMode(*compressorMode),
enableChannelz: setToggleMode(*channelzOn),
enablePreloader: setToggleMode(*preloaderMode),
Expand Down
44 changes: 25 additions & 19 deletions benchmark/benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,39 +232,45 @@ func DoUnaryCall(tc testgrpc.BenchmarkServiceClient, reqSize, respSize int) erro
}

// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
func DoStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
func DoStreamingRoundTrip(
stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize, streamCount int) error {
pl := NewPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
req := &testpb.SimpleRequest{
ResponseType: pl.Type,
ResponseSize: int32(respSize),
Payload: pl,
}
if err := stream.Send(req); err != nil {
return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
}
if _, err := stream.Recv(); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
for i := 0; i < streamCount; i++ {
if err := stream.Send(req); err != nil {
return fmt.Errorf("/BenchmarkService/StreamingCall.Send(_) = %v, want <nil>", err)
}
if _, err := stream.Recv(); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
}
return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
}
return fmt.Errorf("/BenchmarkService/StreamingCall.Recv(_) = %v, want <nil>", err)
}
return nil
}

// DoByteBufStreamingRoundTrip performs a round trip for a single streaming rpc, using a custom codec for byte buffer.
func DoByteBufStreamingRoundTrip(stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize int) error {
func DoByteBufStreamingRoundTrip(
stream testgrpc.BenchmarkService_StreamingCallClient, reqSize, respSize, streamCount int) error {
out := make([]byte, reqSize)
if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
}
var in []byte
if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
for i := 0; i < streamCount; i++ {
if err := stream.(grpc.ClientStream).SendMsg(&out); err != nil {
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).SendMsg(_) = %v, want <nil>", err)
}
var in []byte
if err := stream.(grpc.ClientStream).RecvMsg(&in); err != nil {
// EOF is a valid error here.
if err == io.EOF {
return nil
}
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
}
return fmt.Errorf("/BenchmarkService/StreamingCall.(ClientStream).RecvMsg(_) = %v, want <nil>", err)
}
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions benchmark/stats/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
RespSizeBytesIndex
ReqPayloadCurveIndex
RespPayloadCurveIndex
StreamCountsIndex
CompModesIndex
EnableChannelzIndex
EnablePreloaderIndex
Expand Down Expand Up @@ -107,6 +108,8 @@ type Features struct {
// RespPayloadCurve is a histogram representing the shape a random
// distribution request payloads should take.
RespPayloadCurve *PayloadCurve
// StreamCount is the number request and response in a single stream.
StreamCount int
// ModeCompressor represents the compressor mode used.
ModeCompressor string
// EnableChannelz indicates if channelz was turned on.
Expand Down
9 changes: 5 additions & 4 deletions benchmark/worker/benchmark_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ func performRPCs(config *testpb.ClientConfig, conns []*grpc.ClientConn, bc *benc
bc.doCloseLoopUnary(conns, rpcCountPerConn, payloadReqSize, payloadRespSize)
// TODO open loop.
case testpb.RpcType_STREAMING:
bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, payloadType)
bc.doCloseLoopStreaming(conns, rpcCountPerConn, payloadReqSize, payloadRespSize, 1, payloadType)
// TODO open loop.
default:
return status.Errorf(codes.InvalidArgument, "unknown rpc type: %v", config.RpcType)
Expand Down Expand Up @@ -289,8 +289,9 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
}
}

func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, payloadType string) {
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int) error
func (bc *benchmarkClient) doCloseLoopStreaming(
conns []*grpc.ClientConn, rpcCountPerConn int, reqSize int, respSize int, streamCount int, payloadType string) {
var doRPC func(testgrpc.BenchmarkService_StreamingCallClient, int, int, int) error
if payloadType == "bytebuf" {
doRPC = benchmark.DoByteBufStreamingRoundTrip
} else {
Expand All @@ -315,7 +316,7 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
// before starting benchmark.
for {
start := time.Now()
if err := doRPC(stream, reqSize, respSize); err != nil {
if err := doRPC(stream, reqSize, respSize, streamCount); err != nil {
return
}
elapse := time.Since(start)
Expand Down

0 comments on commit 2a8eb29

Please sign in to comment.