Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async_hooks integration for gRPC Server #169

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions packages/grpc-native-core/src/async-hooks-integration.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/**
* @license
* Copyright 2018 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

var semver = require('semver');
var useAsyncHooks = semver.satisfies(process.version, '>=8');

/**
* Assuming that async_hooks is available, this file exposes a function
* createAsyncResourceWrapper which creates an async resource, returning a
* proxy object with the functions necessary to mark the entry/exit of a
* continuation associated with that resource, as well as to destroy the
* resource.
*
* In the absence of async_hooks, a no-op implementation is returned that
* should have minimal performance implications.
*/

if (useAsyncHooks) {
const asyncHooks = require('async_hooks');
class GrpcAsyncResource extends asyncHooks.AsyncResource {
constructor(name, handle) {
super(name);
this.handle = handle;
}
}
module.exports = function createAsyncResourceWrapper(name, handle) {
let resource = new GrpcAsyncResource(name, handle);
return {
wrap: (fn) => {
return function() {
if (resource) {
resource.emitBefore();
let result;
try {
result = fn.apply(this, arguments);
} finally {
resource.emitAfter();
}
return result;
} else {
return fn.apply(this, arguments);
}
}
},
destroy: () => {
setImmediate(() => {
if (resource) {
resource.emitDestroy();
resource = null;
}
});
}
};
}
} else {
const noImpl = {
wrap: fn => fn,
destroy: () => {}
};
module.exports = function createAsyncResourceWrapper() { return noImpl; }
}
70 changes: 52 additions & 18 deletions packages/grpc-native-core/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ var util = require('util');

var EventEmitter = require('events').EventEmitter;

/**
* Provides an API for integrating the async_hooks module with gRPC.
* Akin to Node HTTP servers, each incoming request is assigned a unique
* AsyncResource object with which continuation-local storage can be associated.
* All of these request-scoped AsyncResource objects share a common trigger:
* a long-lived AsyncResource assigned to the server.
*/
var createAsyncResourceWrapper = require('./async-hooks-integration');

/**
* Handle an error on a call by sending it as a status
* @private
Expand Down Expand Up @@ -568,25 +577,30 @@ ServerDuplexStream.prototype.waitForCancel = waitForCancel;
*/
function handleUnary(call, handler, metadata) {
var emitter = new ServerUnaryCall(call, metadata);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', emitter);
emitter.on('error', function(error) {
handleError(call, error);
asyncResource.destroy();
});
emitter.waitForCancel();
var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
call.startBatch(batch, asyncResource.wrap(function(err, result) {
if (err) {
handleError(call, err);
asyncResource.destroy();
return;
}
try {
emitter.request = handler.deserialize(result.read);
} catch (e) {
e.code = constants.status.INTERNAL;
handleError(call, e);
asyncResource.destroy();
return;
}
if (emitter.cancelled) {
asyncResource.destroy();
return;
}
handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
Expand All @@ -598,8 +612,9 @@ function handleUnary(call, handler, metadata) {
} else {
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
asyncResource.destroy();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on why it is okay that the destroy doesn't get called on all paths?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not okay 🙃 I've added additional calls to destroy if the user function is never called.

});
});
}));
}

/**
Expand All @@ -623,13 +638,15 @@ function handleUnary(call, handler, metadata) {
*/
function handleServerStreaming(call, handler, metadata) {
var stream = new ServerWritableStream(call, metadata, handler.serialize);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream);
stream.on('error', asyncResource.destroy);
stream.on('finish', asyncResource.destroy);
stream.waitForCancel();
var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, function(err, result) {
call.startBatch(batch, asyncResource.wrap(function(err, result) {
if (err) {
stream.emit('error', err);
return;
}
try {
stream.request = handler.deserialize(result.read);
Expand All @@ -639,7 +656,7 @@ function handleServerStreaming(call, handler, metadata) {
return;
}
handler.func(stream);
});
}));
}

/**
Expand All @@ -665,21 +682,26 @@ function handleServerStreaming(call, handler, metadata) {
*/
function handleClientStreaming(call, handler, metadata) {
var stream = new ServerReadableStream(call, metadata, handler.deserialize);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream);
stream.on('error', function(error) {
handleError(call, error);
asyncResource.destroy();
});
stream.waitForCancel();
handler.func(stream, function(err, value, trailer, flags) {
stream.terminate();
if (err) {
if (trailer) {
err.metadata = trailer;
asyncResource.wrap(function() {
handler.func(stream, function(err, value, trailer, flags) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This use isn't making sense to me. handler.func seems to be getting executed in the same continuation as handleClientStreaming – it is called synchronously.

It doesn't seem logical to create an AsyncResource to wrap a synchronously executed function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to mirror what the other handler types do. I don't think there's harm in having nesting here (from the perspective of an agent listening for events with only one type of async resource, there is no concept of nesting) -- and I can't think of a good alternative without changing the behavior of the code to move the invocation handler.func to the next turn of the event loop. At the minimum we will definitely want a per-request grpc.ServerRequest-type async resource. The grpc.Server async resource doesn't seem strictly required at the moment but it follows the precedent set by http.

stream.terminate();
if (err) {
if (trailer) {
err.metadata = trailer;
}
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
handleError(call, err);
} else {
sendUnaryResponse(call, value, handler.serialize, trailer, flags);
}
});
asyncResource.destroy();
});
})();
}

/**
Expand All @@ -704,8 +726,13 @@ function handleClientStreaming(call, handler, metadata) {
function handleBidiStreaming(call, handler, metadata) {
var stream = new ServerDuplexStream(call, metadata, handler.serialize,
handler.deserialize);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream);
stream.on('error', asyncResource.destroy);
stream.on('finish', asyncResource.destroy);
stream.waitForCancel();
handler.func(stream);
asyncResource.wrap(function() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

likewise.

handler.func(stream);
})();
}

var streamHandlers = {
Expand Down Expand Up @@ -744,6 +771,7 @@ Server.prototype.start = function() {
}
var self = this;
this.started = true;
this.asyncResourceWrap = createAsyncResourceWrapper('grpc.Server', this);
this._server.start();
/**
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
Expand All @@ -752,7 +780,7 @@ Server.prototype.start = function() {
* @param {grpc.internal~Event} event The event to handle with tag
* SERVER_RPC_NEW
*/
function handleNewCall(err, event) {
var handleNewCall = function (err, event) {
if (err) {
return;
}
Expand Down Expand Up @@ -782,6 +810,7 @@ Server.prototype.start = function() {
}
streamHandlers[handler.type](call, handler, metadata);
}
handleNewCall = this.asyncResourceWrap.wrap(handleNewCall);
this._server.requestCall(handleNewCall);
};

Expand Down Expand Up @@ -828,7 +857,11 @@ Server.prototype.register = function(name, handler, serialize, deserialize,
* @param {function()} callback The shutdown complete callback
*/
Server.prototype.tryShutdown = function(callback) {
this._server.tryShutdown(callback);
var self = this;
this._server.tryShutdown(function() {
self.asyncResourceWrap.destroy();
callback();
});
};

/**
Expand All @@ -839,6 +872,7 @@ Server.prototype.tryShutdown = function(callback) {
*/
Server.prototype.forceShutdown = function() {
this._server.forceShutdown();
this.asyncResourceWrap.destroy();
};

var unimplementedStatusResponse = {
Expand Down