Skip to content

Commit

Permalink
server.go: use worker goroutines for fewer stack allocations
Browse files Browse the repository at this point in the history
Currently (go1.13.4), the default stack size for newly spawned
goroutines is 2048 bytes. This is insufficient when processing gRPC
requests as the we often require more than 4 KiB stacks. This causes the
Go runtime to call runtime.morestack at least twice per RPC, which
causes performance to suffer needlessly as stack reallocations require
all sorts of internal work such as changing pointers to point to new
addresses.

See golang/go#18138 for more details.

Since this stack growth is guaranteed to happen at least twice per RPC,
reusing goroutines gives us two wins:

  1. The stack is already grown to 8 KiB after the first RPC, so
     subsequent RPCs do not call runtime.morestack.
  2. We eliminate the need to spawn a new goroutine for each request
     (even though they're relatively inexpensive).

Performance improves across the board. The improvement is especially
visible in small, unary requests as the overhead of stack reallocation
is higher, percentage-wise. QPS is up anywhere between 3% and 5%
depending on the number of concurrent RPC requests in flight. Latency is
down ~3%. There is even a 1% decrease in memory footprint in some cases,
though that is an unintended, but happy coincidence.

unary-networkMode_none-bufConn_false-keepalive_false-benchTime_1m0s-trace_false-latency_0s-kbps_0-MTU_0-maxConcurrentCalls_8-reqSize_1B-respSize_1B-compressor_off-channelz_false-preloader_false
               Title       Before        After Percentage
            TotalOps      2613512      2701705     3.37%
             SendOps            0            0      NaN%
             RecvOps            0            0      NaN%
            Bytes/op      8657.00      8654.17    -0.03%
           Allocs/op       173.37       173.28     0.00%
             ReqT/op    348468.27    360227.33     3.37%
            RespT/op    348468.27    360227.33     3.37%
            50th-Lat    174.601µs    167.378µs    -4.14%
            90th-Lat    233.132µs    229.087µs    -1.74%
            99th-Lat     438.98µs    441.857µs     0.66%
             Avg-Lat    183.263µs     177.26µs    -3.28%
  • Loading branch information
Adhityaa Chandrasekar committed Nov 22, 2019
1 parent dc49de8 commit 7d808bc
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 4 deletions.
5 changes: 5 additions & 0 deletions internal/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,11 @@ type Stream struct {
contentSubtype string
}

// ID returns the stream ID.
func (s *Stream) ID() uint32 {
return s.id
}

// isHeaderSent is only valid on the server-side.
func (s *Stream) isHeaderSent() bool {
return atomic.LoadUint32(&s.headerSent) == 1
Expand Down
66 changes: 62 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -712,15 +712,69 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
return st
}

func floorCPUCount() uint32 {
n := uint32(runtime.NumCPU())
for i := uint32(1 << 31); i >= 2; i >>= 1 {
if n&i > 0 {
return i
}
}

return 1
}

// numWorkers defines the number of stream handling workers. After experiments
// with different CPU counts, using the floor of the number of CPUs available
// was found to be the number optimal for performance across the board (QPS,
// latency).
var numWorkers = floorCPUCount()

// workerMask is used to perform bitwise AND operations instead of expensive
// module operations on integers.
var workerMask = numWorkers - 1

// workerStackReset defines how often the stack must be reset. Every N
// requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever.
//
// 2^16 should allow each goroutine stack to live for at least a few seconds in
// a typical workload.
const workerStackReset = 1 << 16

func (s *Server) streamWorker(st transport.ServerTransport, wg *sync.WaitGroup, ch chan *transport.Stream) {
completed := 0
for stream := range ch {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
completed++
if completed == workerStackReset {
go s.streamWorker(st, wg, ch)
return
}
}
}

func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup

streamChannels := make([]chan *transport.Stream, numWorkers)

for i := range streamChannels {
streamChannels[i] = make(chan *transport.Stream)
go s.streamWorker(st, &wg, streamChannels[i])
}

st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
select {
case streamChannels[stream.ID()&workerMask] <- stream:
default:
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
Expand All @@ -729,6 +783,10 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
return trace.NewContext(ctx, tr)
})
wg.Wait()

for _, ch := range streamChannels {
close(ch)
}
}

var _ http.Handler = (*Server)(nil)
Expand Down

0 comments on commit 7d808bc

Please sign in to comment.