From b7f16d487ed751b801520d3e5335c936326e6116 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Thu, 21 Nov 2019 12:08:42 -0800 Subject: [PATCH 1/6] server.go: use worker goroutines for fewer stack allocations MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Currently (go1.13.4), the default stack size for newly spawned goroutines is 2048 bytes. This is insufficient when processing gRPC requests as the we often require more than 4 KiB stacks. This causes the Go runtime to call runtime.morestack at least twice per RPC, which causes performance to suffer needlessly as stack reallocations require all sorts of internal work such as changing pointers to point to new addresses. Since this stack growth is guaranteed to happen at least twice per RPC, reusing goroutines gives us two wins: 1. The stack is already grown to 8 KiB after the first RPC, so subsequent RPCs do not call runtime.morestack. 2. We eliminate the need to spawn a new goroutine for each request (even though they're relatively inexpensive). Performance improves across the board. The improvement is especially visible in small, unary requests as the overhead of stack reallocation is higher, percentage-wise. QPS is up anywhere between 3% and 5% depending on the number of concurrent RPC requests in flight. Latency is down ~3%. There is even a 1% decrease in memory footprint in some cases, though that is an unintended, but happy coincidence. unary-networkMode_none-bufConn_false-keepalive_false-benchTime_1m0s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_8-reqSize_1B-respSize_1B-compressor_off-channelz_false-preloader_false Title Before After Percentage TotalOps 2613512 2701705 3.37% SendOps 0 0 NaN% RecvOps 0 0 NaN% Bytes/op 8657.00 8654.17 -0.03% Allocs/op 173.37 173.28 0.00% ReqT/op 348468.27 360227.33 3.37% RespT/op 348468.27 360227.33 3.37% 50th-Lat 174.601µs 167.378µs -4.14% 90th-Lat 233.132µs 229.087µs -1.74% 99th-Lat 438.98µs 441.857µs 0.66% Avg-Lat 183.263µs 177.26µs -3.28% --- internal/transport/transport.go | 5 +++ server.go | 70 +++++++++++++++++++++++++++++++-- 2 files changed, 71 insertions(+), 4 deletions(-) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index a30da9eb324f..f10e45df4c61 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -286,6 +286,11 @@ type Stream struct { contentSubtype string } +// ID returns the stream ID. +func (s *Stream) ID() uint32 { + return s.id +} + // isHeaderSent is only valid on the server-side. func (s *Stream) isHeaderSent() bool { return atomic.LoadUint32(&s.headerSent) == 1 diff --git a/server.go b/server.go index 0d75cb109a09..216265f21331 100644 --- a/server.go +++ b/server.go @@ -712,15 +712,73 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr return st } +func floorCPUCount() uint32 { + n := uint32(runtime.NumCPU()) + for i := uint32(1 << 31); i >= 2; i >>= 1 { + if n&i > 0 { + return i + } + } + + return 1 +} + +// workerStackReset defines how often the stack must be reset. Every N +// requests, by spawning a new goroutine in its place, a worker can reset its +// stack so that large stacks don't live in memory forever. 2^16 should allow +// each goroutine stack to live for at least a few seconds in a typical +// workload (assuming a QPS of a few thousand requests/sec). +const workerStackReset = 1 << 16 + +// streamWorkers blocks on a *transport.Stream channel forever and waits for +// data to be fed by serveStreams. This allows different requests to be +// processed by the same goroutine, removing the need for expensive stack +// re-allocations (see the runtime.morestack problem [1]). +// +// [1] https://github.com/golang/go/issues/18138 +func (s *Server) streamWorker(st transport.ServerTransport, wg *sync.WaitGroup, ch chan *transport.Stream) { + completed := 0 + for stream := range ch { + s.handleStream(st, stream, s.traceInfo(st, stream)) + wg.Done() + completed++ + if completed == workerStackReset { + go s.streamWorker(st, wg, ch) + return + } + } +} + +// numWorkers defines the number of stream handling workers. After experiments +// with different CPU counts, using the floor of the number of CPUs available +// was found to be the number optimal for performance across the board (QPS, +// latency). +var numWorkers = floorCPUCount() + +// workerMask is used to perform bitwise AND operations instead of expensive +// module operations on integers. +var workerMask = numWorkers - 1 + func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup + + streamChannels := make([]chan *transport.Stream, numWorkers) + for i := range streamChannels { + streamChannels[i] = make(chan *transport.Stream) + go s.streamWorker(st, &wg, streamChannels[i]) + } + st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) - go func() { - defer wg.Done() - s.handleStream(st, stream, s.traceInfo(st, stream)) - }() + select { + case streamChannels[stream.ID()&workerMask] <- stream: + default: + go func() { + s.handleStream(st, stream, s.traceInfo(st, stream)) + wg.Done() + }() + } }, func(ctx context.Context, method string) context.Context { if !EnableTracing { return ctx @@ -729,6 +787,10 @@ func (s *Server) serveStreams(st transport.ServerTransport) { return trace.NewContext(ctx, tr) }) wg.Wait() + + for _, ch := range streamChannels { + close(ch) + } } var _ http.Handler = (*Server)(nil) From 11cd1604f5ec906f29c4ceb43b06a055f6cb385f Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Wed, 18 Dec 2019 15:00:46 -0800 Subject: [PATCH 2/6] V2: server option, stream ID bitshift --- server.go | 72 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 41 insertions(+), 31 deletions(-) diff --git a/server.go b/server.go index 216265f21331..024e6badbddd 100644 --- a/server.go +++ b/server.go @@ -131,6 +131,7 @@ type serverOptions struct { connectionTimeout time.Duration maxHeaderListSize *uint32 headerTableSize *uint32 + numStreamWorkers uint32 } var defaultServerOptions = serverOptions{ @@ -388,6 +389,22 @@ func HeaderTableSize(s uint32) ServerOption { }) } +// NumStreamWorkers returns a ServerOption that sets the number of worker +// goroutines that should be used to process incoming streams. Setting this to +// zero (default) will disable workers and spawn a new goroutine for each +// stream. +// +// This API is EXPERIMENTAL. +func NumStreamWorkers(numStreamWorkers uint32) ServerOption { + // TODO: If/when this API gets stabilized (i.e. stream workers become the + // only way streams are processed), change the behavior of the zero value to + // a sane default. Preliminary experiments suggest that a value equal to the + // number of CPUs available is most performant; requires thorough testing. + return newFuncServerOption(func(o *serverOptions) { + o.numStreamWorkers = numStreamWorkers + }) +} + // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { @@ -712,17 +729,6 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr return st } -func floorCPUCount() uint32 { - n := uint32(runtime.NumCPU()) - for i := uint32(1 << 31); i >= 2; i >>= 1 { - if n&i > 0 { - return i - } - } - - return 1 -} - // workerStackReset defines how often the stack must be reset. Every N // requests, by spawning a new goroutine in its place, a worker can reset its // stack so that large stacks don't live in memory forever. 2^16 should allow @@ -749,34 +755,36 @@ func (s *Server) streamWorker(st transport.ServerTransport, wg *sync.WaitGroup, } } -// numWorkers defines the number of stream handling workers. After experiments -// with different CPU counts, using the floor of the number of CPUs available -// was found to be the number optimal for performance across the board (QPS, -// latency). -var numWorkers = floorCPUCount() - -// workerMask is used to perform bitwise AND operations instead of expensive -// module operations on integers. -var workerMask = numWorkers - 1 - func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup - streamChannels := make([]chan *transport.Stream, numWorkers) - for i := range streamChannels { - streamChannels[i] = make(chan *transport.Stream) - go s.streamWorker(st, &wg, streamChannels[i]) + var streamChannels []chan *transport.Stream + if s.opts.numStreamWorkers > 0 { + streamChannels = make([]chan *transport.Stream, s.opts.numStreamWorkers) + for i := range streamChannels { + streamChannels[i] = make(chan *transport.Stream) + go s.streamWorker(st, &wg, streamChannels[i]) + } } + var streamChannelCounter uint32 st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) - select { - case streamChannels[stream.ID()&workerMask] <- stream: - default: + if s.opts.numStreamWorkers > 0 { + select { + case streamChannels[atomic.AddUint32(&streamChannelCounter, 1)%s.opts.numStreamWorkers] <- stream: + default: + // If all stream workers are busy, fallback to default code path. + go func() { + s.handleStream(st, stream, s.traceInfo(st, stream)) + wg.Done() + }() + } + } else { go func() { + defer wg.Done() s.handleStream(st, stream, s.traceInfo(st, stream)) - wg.Done() }() } }, func(ctx context.Context, method string) context.Context { @@ -788,8 +796,10 @@ func (s *Server) serveStreams(st transport.ServerTransport) { }) wg.Wait() - for _, ch := range streamChannels { - close(ch) + if s.opts.numStreamWorkers > 0 { + for _, ch := range streamChannels { + close(ch) + } } } From f13952c7320c8cf110dd8207b88958187ca9ec76 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Thu, 19 Dec 2019 16:05:39 -0800 Subject: [PATCH 3/6] V3: server workers instead of stream workers --- internal/transport/transport.go | 5 -- server.go | 105 +++++++++++++++++--------------- 2 files changed, 57 insertions(+), 53 deletions(-) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index f10e45df4c61..a30da9eb324f 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -286,11 +286,6 @@ type Stream struct { contentSubtype string } -// ID returns the stream ID. -func (s *Stream) ID() uint32 { - return s.id -} - // isHeaderSent is only valid on the server-side. func (s *Stream) isHeaderSent() bool { return atomic.LoadUint32(&s.headerSent) == 1 diff --git a/server.go b/server.go index 024e6badbddd..eb6c5da32449 100644 --- a/server.go +++ b/server.go @@ -87,6 +87,12 @@ type service struct { mdata interface{} } +type serverWorkerData struct { + st transport.ServerTransport + wg *sync.WaitGroup + stream *transport.Stream +} + // Server is a gRPC server to serve RPC requests. type Server struct { opts serverOptions @@ -107,6 +113,8 @@ type Server struct { channelzID int64 // channelz unique identification number czData *channelzData + + serverWorkerChannel chan *serverWorkerData } type serverOptions struct { @@ -131,7 +139,7 @@ type serverOptions struct { connectionTimeout time.Duration maxHeaderListSize *uint32 headerTableSize *uint32 - numStreamWorkers uint32 + numServerWorkers uint32 } var defaultServerOptions = serverOptions{ @@ -395,16 +403,50 @@ func HeaderTableSize(s uint32) ServerOption { // stream. // // This API is EXPERIMENTAL. -func NumStreamWorkers(numStreamWorkers uint32) ServerOption { +func NumStreamWorkers(numServerWorkers uint32) ServerOption { // TODO: If/when this API gets stabilized (i.e. stream workers become the // only way streams are processed), change the behavior of the zero value to // a sane default. Preliminary experiments suggest that a value equal to the // number of CPUs available is most performant; requires thorough testing. return newFuncServerOption(func(o *serverOptions) { - o.numStreamWorkers = numStreamWorkers + o.numServerWorkers = numServerWorkers }) } +// serverWorkerResetThreshold defines how often the stack must be reset. Every +// N requests, by spawning a new goroutine in its place, a worker can reset its +// stack so that large stacks don't live in memory forever. 2^16 should allow +// each goroutine stack to live for at least a few seconds in a typical +// workload (assuming a QPS of a few thousand requests/sec). +const serverWorkerResetThreshold = 1 << 16 + +// serverWorkers blocks on a *transport.Stream channel forever and waits for +// data to be fed by serveStreams. This allows different requests to be +// processed by the same goroutine, removing the need for expensive stack +// re-allocations (see the runtime.morestack problem [1]). +// +// [1] https://github.com/golang/go/issues/18138 +func (s *Server) serverWorker() { + for completed := 0; completed < serverWorkerResetThreshold; completed++ { + data, ok := <-s.serverWorkerChannel + if !ok { + return + } + s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) + data.wg.Done() + } + go s.serverWorker() +} + +// initServerWorkers creates worker goroutines and channels to process incoming +// connections to reduce the time spent overall on runtime.morestack. +func (s *Server) initServerWorkers() { + s.serverWorkerChannel = make(chan *serverWorkerData) + for i := uint32(0); i < s.opts.numServerWorkers; i++ { + go s.serverWorker() + } +} + // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { @@ -427,6 +469,11 @@ func NewServer(opt ...ServerOption) *Server { s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) } + s.opts.numServerWorkers = 1 + if s.opts.numServerWorkers > 0 { + s.initServerWorkers() + } + if channelz.IsOn() { s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") } @@ -729,53 +776,18 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr return st } -// workerStackReset defines how often the stack must be reset. Every N -// requests, by spawning a new goroutine in its place, a worker can reset its -// stack so that large stacks don't live in memory forever. 2^16 should allow -// each goroutine stack to live for at least a few seconds in a typical -// workload (assuming a QPS of a few thousand requests/sec). -const workerStackReset = 1 << 16 - -// streamWorkers blocks on a *transport.Stream channel forever and waits for -// data to be fed by serveStreams. This allows different requests to be -// processed by the same goroutine, removing the need for expensive stack -// re-allocations (see the runtime.morestack problem [1]). -// -// [1] https://github.com/golang/go/issues/18138 -func (s *Server) streamWorker(st transport.ServerTransport, wg *sync.WaitGroup, ch chan *transport.Stream) { - completed := 0 - for stream := range ch { - s.handleStream(st, stream, s.traceInfo(st, stream)) - wg.Done() - completed++ - if completed == workerStackReset { - go s.streamWorker(st, wg, ch) - return - } - } -} - func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup - var streamChannels []chan *transport.Stream - if s.opts.numStreamWorkers > 0 { - streamChannels = make([]chan *transport.Stream, s.opts.numStreamWorkers) - for i := range streamChannels { - streamChannels[i] = make(chan *transport.Stream) - go s.streamWorker(st, &wg, streamChannels[i]) - } - } - - var streamChannelCounter uint32 st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) - if s.opts.numStreamWorkers > 0 { + if s.opts.numServerWorkers > 0 { + data := &serverWorkerData{st: st, wg: &wg, stream: stream} select { - case streamChannels[atomic.AddUint32(&streamChannelCounter, 1)%s.opts.numStreamWorkers] <- stream: + case s.serverWorkerChannel <- data: default: - // If all stream workers are busy, fallback to default code path. + // If all stream workers are busy, fallback to the default code path. go func() { s.handleStream(st, stream, s.traceInfo(st, stream)) wg.Done() @@ -795,12 +807,6 @@ func (s *Server) serveStreams(st transport.ServerTransport) { return trace.NewContext(ctx, tr) }) wg.Wait() - - if s.opts.numStreamWorkers > 0 { - for _, ch := range streamChannels { - close(ch) - } - } } var _ http.Handler = (*Server)(nil) @@ -1487,6 +1493,9 @@ func (s *Server) Stop() { for c := range st { c.Close() } + if s.opts.numServerWorkers > 0 { + close(s.serverWorkerChannel) + } s.mu.Lock() if s.events != nil { From af2732578f3f3249889edd9d13a056b063b40457 Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Thu, 19 Dec 2019 17:04:34 -0800 Subject: [PATCH 4/6] V4: go fmt --- server.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/server.go b/server.go index eb6c5da32449..3dd64ae5c7fa 100644 --- a/server.go +++ b/server.go @@ -88,8 +88,8 @@ type service struct { } type serverWorkerData struct { - st transport.ServerTransport - wg *sync.WaitGroup + st transport.ServerTransport + wg *sync.WaitGroup stream *transport.Stream } @@ -469,7 +469,6 @@ func NewServer(opt ...ServerOption) *Server { s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) } - s.opts.numServerWorkers = 1 if s.opts.numServerWorkers > 0 { s.initServerWorkers() } From d104fc26da6320e54964c190ebb53beb88d9f0ee Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Thu, 19 Dec 2019 17:13:17 -0800 Subject: [PATCH 5/6] V5: use a drifting, random number of iterations --- server.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index 3dd64ae5c7fa..38cbada7e410 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,7 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/binarylog" "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/internal/grpcrand" "google.golang.org/grpc/internal/grpcsync" "google.golang.org/grpc/internal/transport" "google.golang.org/grpc/keepalive" @@ -427,7 +428,10 @@ const serverWorkerResetThreshold = 1 << 16 // // [1] https://github.com/golang/go/issues/18138 func (s *Server) serverWorker() { - for completed := 0; completed < serverWorkerResetThreshold; completed++ { + // To make sure all server workers don't reset at the same time, choose a + // random number of iterations before resetting. + threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) + for completed := 0; completed < threshold; completed++ { data, ok := <-s.serverWorkerChannel if !ok { return From 9939840ca5435b21821c59483b8db2223903cb4a Mon Sep 17 00:00:00 2001 From: Adhityaa Chandrasekar Date: Fri, 20 Dec 2019 13:05:42 -0800 Subject: [PATCH 6/6] multiple channels --- server.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/server.go b/server.go index 38cbada7e410..14aa7e10e8f3 100644 --- a/server.go +++ b/server.go @@ -115,7 +115,7 @@ type Server struct { channelzID int64 // channelz unique identification number czData *channelzData - serverWorkerChannel chan *serverWorkerData + serverWorkerChannels []chan *serverWorkerData } type serverOptions struct { @@ -427,27 +427,34 @@ const serverWorkerResetThreshold = 1 << 16 // re-allocations (see the runtime.morestack problem [1]). // // [1] https://github.com/golang/go/issues/18138 -func (s *Server) serverWorker() { +func (s *Server) serverWorker(ch chan *serverWorkerData) { // To make sure all server workers don't reset at the same time, choose a // random number of iterations before resetting. threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold) for completed := 0; completed < threshold; completed++ { - data, ok := <-s.serverWorkerChannel + data, ok := <-ch if !ok { return } s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) data.wg.Done() } - go s.serverWorker() + go s.serverWorker(ch) } // initServerWorkers creates worker goroutines and channels to process incoming // connections to reduce the time spent overall on runtime.morestack. func (s *Server) initServerWorkers() { - s.serverWorkerChannel = make(chan *serverWorkerData) + s.serverWorkerChannels = make([]chan *serverWorkerData, s.opts.numServerWorkers) for i := uint32(0); i < s.opts.numServerWorkers; i++ { - go s.serverWorker() + s.serverWorkerChannels[i] = make(chan *serverWorkerData) + go s.serverWorker(s.serverWorkerChannels[i]) + } +} + +func (s *Server) stopServerWorkers() { + for i := uint32(0); i < s.opts.numServerWorkers; i++ { + close(s.serverWorkerChannels[i]) } } @@ -783,12 +790,13 @@ func (s *Server) serveStreams(st transport.ServerTransport) { defer st.Close() var wg sync.WaitGroup + var roundRobinCounter uint32 st.HandleStreams(func(stream *transport.Stream) { wg.Add(1) if s.opts.numServerWorkers > 0 { data := &serverWorkerData{st: st, wg: &wg, stream: stream} select { - case s.serverWorkerChannel <- data: + case s.serverWorkerChannels[atomic.AddUint32(&roundRobinCounter, 1)%s.opts.numServerWorkers] <- data: default: // If all stream workers are busy, fallback to the default code path. go func() { @@ -1497,7 +1505,7 @@ func (s *Server) Stop() { c.Close() } if s.opts.numServerWorkers > 0 { - close(s.serverWorkerChannel) + s.stopServerWorkers() } s.mu.Lock()