Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server.go: use worker goroutines for fewer stack allocations #3204

Merged
merged 6 commits into from
Apr 23, 2020
Merged
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 88 additions & 4 deletions server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal/binarylog"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/keepalive"
Expand Down Expand Up @@ -87,6 +88,12 @@ type service struct {
mdata interface{}
}

type serverWorkerData struct {
st transport.ServerTransport
wg *sync.WaitGroup
stream *transport.Stream
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
opts serverOptions
Expand All @@ -107,6 +114,8 @@ type Server struct {

channelzID int64 // channelz unique identification number
czData *channelzData

serverWorkerChannel chan *serverWorkerData
}

type serverOptions struct {
Expand All @@ -131,6 +140,7 @@ type serverOptions struct {
connectionTimeout time.Duration
maxHeaderListSize *uint32
headerTableSize *uint32
numServerWorkers uint32
}

var defaultServerOptions = serverOptions{
Expand Down Expand Up @@ -388,6 +398,59 @@ func HeaderTableSize(s uint32) ServerOption {
})
}

// NumStreamWorkers returns a ServerOption that sets the number of worker
// goroutines that should be used to process incoming streams. Setting this to
// zero (default) will disable workers and spawn a new goroutine for each
// stream.
//
// This API is EXPERIMENTAL.
func NumStreamWorkers(numServerWorkers uint32) ServerOption {
// TODO: If/when this API gets stabilized (i.e. stream workers become the
// only way streams are processed), change the behavior of the zero value to
// a sane default. Preliminary experiments suggest that a value equal to the
// number of CPUs available is most performant; requires thorough testing.
return newFuncServerOption(func(o *serverOptions) {
o.numServerWorkers = numServerWorkers
})
}

// serverWorkerResetThreshold defines how often the stack must be reset. Every
// N requests, by spawning a new goroutine in its place, a worker can reset its
// stack so that large stacks don't live in memory forever. 2^16 should allow
// each goroutine stack to live for at least a few seconds in a typical
// workload (assuming a QPS of a few thousand requests/sec).
const serverWorkerResetThreshold = 1 << 16
Comment on lines +417 to +422
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Brainstorming: should this be time-based instead of request-based? Or some combination of both?

If a server goes idle, we may want to restart the threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly yes, but I expect the overhead introduced by that to be non-negligible. Also heavily depends on how Go's runtime manages shrinking stacks during GC pauses, I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could do it with a Timer checked in the same select as the workload without being too expensive. But this is fine for now - we can work on this more in the future.

Copy link

@CAFxX CAFxX May 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a late comment just to point out that the runtime shrinks the stacks of goroutines during GC (or at the next safepoint), so this may be not needed at all (and potentially is counter-productive): https://github.com/golang/go/blob/9d812cfa5cbb1f573d61c452c864072270526753/src/runtime/mgcmark.go#L781-L783

Dropping all this part would have the benefit of making it much easier to implement adaptive workers (discussed below)


// serverWorkers blocks on a *transport.Stream channel forever and waits for
// data to be fed by serveStreams. This allows different requests to be
// processed by the same goroutine, removing the need for expensive stack
// re-allocations (see the runtime.morestack problem [1]).
//
// [1] https://github.com/golang/go/issues/18138
func (s *Server) serverWorker() {
// To make sure all server workers don't reset at the same time, choose a
// random number of iterations before resetting.
threshold := serverWorkerResetThreshold + grpcrand.Intn(serverWorkerResetThreshold)
for completed := 0; completed < threshold; completed++ {
data, ok := <-s.serverWorkerChannel
if !ok {
return
}
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream))
data.wg.Done()
}
go s.serverWorker()
}

// initServerWorkers creates worker goroutines and channels to process incoming
// connections to reduce the time spent overall on runtime.morestack.
func (s *Server) initServerWorkers() {
s.serverWorkerChannel = make(chan *serverWorkerData)
for i := uint32(0); i < s.opts.numServerWorkers; i++ {
go s.serverWorker()
}
}

// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
Expand All @@ -410,6 +473,10 @@ func NewServer(opt ...ServerOption) *Server {
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
}

if s.opts.numServerWorkers > 0 {
s.initServerWorkers()
}

if channelz.IsOn() {
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "")
}
Expand Down Expand Up @@ -715,12 +782,26 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
func (s *Server) serveStreams(st transport.ServerTransport) {
adtac marked this conversation as resolved.
Show resolved Hide resolved
defer st.Close()
var wg sync.WaitGroup

st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
if s.opts.numServerWorkers > 0 {
data := &serverWorkerData{st: st, wg: &wg, stream: stream}
select {
case s.serverWorkerChannel <- data:
default:
// If all stream workers are busy, fallback to the default code path.
Copy link

@CAFxX CAFxX May 19, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a worker in this case, not a one-off goroutine.

Extra workers created over numServerWorkers should linger for a short period of time waiting for new work once they're done, and then shut down if no new work arrives. This would in turn enable to remove the numServerWorkers knob, as new workers will be created as needed.

go func() {
s.handleStream(st, stream, s.traceInfo(st, stream))
wg.Done()
Comment on lines +803 to +804
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have been

defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))

as it is on line 809 and as it was before the change

}()
}
} else {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}
}, func(ctx context.Context, method string) context.Context {
if !EnableTracing {
return ctx
Expand Down Expand Up @@ -1415,6 +1496,9 @@ func (s *Server) Stop() {
for c := range st {
c.Close()
}
if s.opts.numServerWorkers > 0 {
close(s.serverWorkerChannel)
}

s.mu.Lock()
if s.events != nil {
Expand Down