Skip to content

Commit

Permalink
V2: server option, stream ID bitshift
Browse files Browse the repository at this point in the history
  • Loading branch information
Adhityaa Chandrasekar committed Dec 18, 2019
1 parent b7f16d4 commit 11cd160
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ type serverOptions struct {
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numStreamWorkers uint32
}

var defaultServerOptions = serverOptions{
Expand Down Expand Up @@ -388,6 +389,22 @@ func HeaderTableSize(s uint32) ServerOption {
})
}

// NumStreamWorkers returns a ServerOption that sets the number of worker
// goroutines that should be used to process incoming streams. Setting this to
// zero (default) will disable workers and spawn a new goroutine for each
// stream.
//
// This API is EXPERIMENTAL.
func NumStreamWorkers(numStreamWorkers uint32) ServerOption {
// TODO: If/when this API gets stabilized (i.e. stream workers become the
// only way streams are processed), change the behavior of the zero value to
// a sane default. Preliminary experiments suggest that a value equal to the
// number of CPUs available is most performant; requires thorough testing.
return newFuncServerOption(func(o *serverOptions) {
o.numStreamWorkers = numStreamWorkers
})
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
Expand Down Expand Up @@ -712,17 +729,6 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
return st
}

func floorCPUCount() uint32 {
n := uint32(runtime.NumCPU())
for i := uint32(1 << 31); i >= 2; i >>= 1 {
if n&i > 0 {
return i
}
}

return 1
}

// workerStackReset defines how often the stack must be reset. Every N
// requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever. 2^16 should allow
Expand All @@ -749,34 +755,36 @@ func (s *Server) streamWorker(st transport.ServerTransport, wg *sync.WaitGroup,
}
}

// numWorkers defines the number of stream handling workers. After experiments
// with different CPU counts, using the floor of the number of CPUs available
// was found to be the number optimal for performance across the board (QPS,
// latency).
var numWorkers = floorCPUCount()

// workerMask is used to perform bitwise AND operations instead of expensive
// module operations on integers.
var workerMask = numWorkers - 1

func (s *Server) serveStreams(st transport.ServerTransport) {
defer st.Close()
var wg sync.WaitGroup

streamChannels := make([]chan *transport.Stream, numWorkers)
for i := range streamChannels {
streamChannels[i] = make(chan *transport.Stream)
go s.streamWorker(st, &wg, streamChannels[i])
var streamChannels []chan *transport.Stream
if s.opts.numStreamWorkers > 0 {
streamChannels = make([]chan *transport.Stream, s.opts.numStreamWorkers)
for i := range streamChannels {
streamChannels[i] = make(chan *transport.Stream)
go s.streamWorker(st, &wg, streamChannels[i])
}
}

var streamChannelCounter uint32
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
select {
case streamChannels[stream.ID()&workerMask] <- stream:
default:
if s.opts.numStreamWorkers > 0 {
select {
case streamChannels[atomic.AddUint32(&streamChannelCounter, 1)%s.opts.numStreamWorkers] <- stream:
default:
// If all stream workers are busy, fallback to default code path.
go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
}()
}
}, func(ctx context.Context, method string) context.Context {
Expand All @@ -788,8 +796,10 @@ func (s *Server) serveStreams(st transport.ServerTransport) {
})
wg.Wait()

for _, ch := range streamChannels {
close(ch)
if s.opts.numStreamWorkers > 0 {
for _, ch := range streamChannels {
close(ch)
}
}
}

Expand Down

0 comments on commit 11cd160

Please sign in to comment.