-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
server.go: use worker goroutines for fewer stack allocations #3204
Changes from 3 commits
b7f16d4
11cd160
f13952c
af27325
d104fc2
9939840
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,6 +87,12 @@ type service struct { | |
mdata interface{} | ||
} | ||
|
||
type serverWorkerData struct { | ||
st transport.ServerTransport | ||
wg *sync.WaitGroup | ||
stream *transport.Stream | ||
} | ||
|
||
// Server is a gRPC server to serve RPC requests. | ||
type Server struct { | ||
opts serverOptions | ||
|
@@ -107,6 +113,8 @@ type Server struct { | |
|
||
channelzID int64 // channelz unique identification number | ||
czData *channelzData | ||
|
||
serverWorkerChannel chan *serverWorkerData | ||
} | ||
|
||
type serverOptions struct { | ||
|
@@ -131,6 +139,7 @@ type serverOptions struct { | |
connectionTimeout time.Duration | ||
maxHeaderListSize *uint32 | ||
headerTableSize *uint32 | ||
numServerWorkers uint32 | ||
} | ||
|
||
var defaultServerOptions = serverOptions{ | ||
|
@@ -388,6 +397,56 @@ func HeaderTableSize(s uint32) ServerOption { | |
}) | ||
} | ||
|
||
// NumStreamWorkers returns a ServerOption that sets the number of worker | ||
// goroutines that should be used to process incoming streams. Setting this to | ||
// zero (default) will disable workers and spawn a new goroutine for each | ||
// stream. | ||
// | ||
// This API is EXPERIMENTAL. | ||
func NumStreamWorkers(numServerWorkers uint32) ServerOption { | ||
// TODO: If/when this API gets stabilized (i.e. stream workers become the | ||
// only way streams are processed), change the behavior of the zero value to | ||
// a sane default. Preliminary experiments suggest that a value equal to the | ||
// number of CPUs available is most performant; requires thorough testing. | ||
return newFuncServerOption(func(o *serverOptions) { | ||
o.numServerWorkers = numServerWorkers | ||
}) | ||
} | ||
|
||
// serverWorkerResetThreshold defines how often the stack must be reset. Every | ||
// N requests, by spawning a new goroutine in its place, a worker can reset its | ||
// stack so that large stacks don't live in memory forever. 2^16 should allow | ||
// each goroutine stack to live for at least a few seconds in a typical | ||
// workload (assuming a QPS of a few thousand requests/sec). | ||
const serverWorkerResetThreshold = 1 << 16 | ||
|
||
// serverWorkers blocks on a *transport.Stream channel forever and waits for | ||
// data to be fed by serveStreams. This allows different requests to be | ||
// processed by the same goroutine, removing the need for expensive stack | ||
// re-allocations (see the runtime.morestack problem [1]). | ||
// | ||
// [1] https://github.com/golang/go/issues/18138 | ||
func (s *Server) serverWorker() { | ||
for completed := 0; completed < serverWorkerResetThreshold; completed++ { | ||
adtac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
data, ok := <-s.serverWorkerChannel | ||
if !ok { | ||
return | ||
} | ||
s.handleStream(data.st, data.stream, s.traceInfo(data.st, data.stream)) | ||
data.wg.Done() | ||
} | ||
go s.serverWorker() | ||
} | ||
|
||
// initServerWorkers creates worker goroutines and channels to process incoming | ||
// connections to reduce the time spent overall on runtime.morestack. | ||
func (s *Server) initServerWorkers() { | ||
s.serverWorkerChannel = make(chan *serverWorkerData) | ||
for i := uint32(0); i < s.opts.numServerWorkers; i++ { | ||
go s.serverWorker() | ||
} | ||
} | ||
|
||
// NewServer creates a gRPC server which has no service registered and has not | ||
// started to accept requests yet. | ||
func NewServer(opt ...ServerOption) *Server { | ||
|
@@ -410,6 +469,11 @@ func NewServer(opt ...ServerOption) *Server { | |
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) | ||
} | ||
|
||
s.opts.numServerWorkers = 1 | ||
if s.opts.numServerWorkers > 0 { | ||
s.initServerWorkers() | ||
} | ||
|
||
if channelz.IsOn() { | ||
s.channelzID = channelz.RegisterServer(&channelzServer{s}, "") | ||
} | ||
|
@@ -715,12 +779,26 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr | |
func (s *Server) serveStreams(st transport.ServerTransport) { | ||
adtac marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer st.Close() | ||
var wg sync.WaitGroup | ||
|
||
st.HandleStreams(func(stream *transport.Stream) { | ||
wg.Add(1) | ||
go func() { | ||
defer wg.Done() | ||
s.handleStream(st, stream, s.traceInfo(st, stream)) | ||
}() | ||
if s.opts.numServerWorkers > 0 { | ||
data := &serverWorkerData{st: st, wg: &wg, stream: stream} | ||
select { | ||
case s.serverWorkerChannel <- data: | ||
default: | ||
// If all stream workers are busy, fallback to the default code path. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should add a worker in this case, not a one-off goroutine. Extra workers created over numServerWorkers should linger for a short period of time waiting for new work once they're done, and then shut down if no new work arrives. This would in turn enable to remove the numServerWorkers knob, as new workers will be created as needed. |
||
go func() { | ||
s.handleStream(st, stream, s.traceInfo(st, stream)) | ||
wg.Done() | ||
Comment on lines
+803
to
+804
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this should have been defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream)) as it is on line 809 and as it was before the change |
||
}() | ||
} | ||
} else { | ||
go func() { | ||
defer wg.Done() | ||
s.handleStream(st, stream, s.traceInfo(st, stream)) | ||
}() | ||
} | ||
}, func(ctx context.Context, method string) context.Context { | ||
if !EnableTracing { | ||
return ctx | ||
|
@@ -1415,6 +1493,9 @@ func (s *Server) Stop() { | |
for c := range st { | ||
c.Close() | ||
} | ||
if s.opts.numServerWorkers > 0 { | ||
close(s.serverWorkerChannel) | ||
} | ||
|
||
s.mu.Lock() | ||
if s.events != nil { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brainstorming: should this be time-based instead of request-based? Or some combination of both?
If a server goes idle, we may want to restart the threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Possibly yes, but I expect the overhead introduced by that to be non-negligible. Also heavily depends on how Go's runtime manages shrinking stacks during GC pauses, I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we could do it with a
Timer
checked in the sameselect
as the workload without being too expensive. But this is fine for now - we can work on this more in the future.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
a late comment just to point out that the runtime shrinks the stacks of goroutines during GC (or at the next safepoint), so this may be not needed at all (and potentially is counter-productive): https://github.com/golang/go/blob/9d812cfa5cbb1f573d61c452c864072270526753/src/runtime/mgcmark.go#L781-L783
Dropping all this part would have the benefit of making it much easier to implement adaptive workers (discussed below)