Skip to content

Commit

Permalink
async_hooks integration -- address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kjin committed Feb 8, 2018
1 parent 617c167 commit b9ba5a4
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
2 changes: 1 addition & 1 deletion packages/grpc-native-core/deps/grpc
Submodule grpc updated 996 files
35 changes: 22 additions & 13 deletions packages/grpc-native-core/src/async-hooks-integration.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,23 @@ var useAsyncHooks = semver.satisfies(process.version, '>=8');
*/

if (useAsyncHooks) {
var asyncHooks = require('async_hooks');
module.exports = function createAsyncResourceWrapper(name) {
var resource = new asyncHooks.AsyncResource(name);
const asyncHooks = require('async_hooks');
class GrpcAsyncResource extends asyncHooks.AsyncResource {
constructor(name, handle) {
super(name);
this.handle = handle;
}
}
module.exports = function createAsyncResourceWrapper(name, handle) {
let resource = new GrpcAsyncResource(name, handle);
return {
wrap: function(fn) {
wrap: (fn) => {
return function() {
if (resource) {
resource.emitBefore();
let result;
try {
var result = fn.apply(this, arguments);
result = fn.apply(this, arguments);
} finally {
resource.emitAfter();
}
Expand All @@ -50,18 +57,20 @@ if (useAsyncHooks) {
}
}
},
destroy: function() {
if (resource) {
resource.emitDestroy();
resource = null;
}
destroy: () => {
setImmediate(() => {
if (resource) {
resource.emitDestroy();
resource = null;
}
});
}
};
}
} else {
var noImpl = {
wrap: function(fn) { return fn; },
destroy: function() {}
const noImpl = {
wrap: fn => fn,
destroy: () => {}
};
module.exports = function createAsyncResourceWrapper() { return noImpl; }
}
15 changes: 10 additions & 5 deletions packages/grpc-native-core/src/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -576,27 +576,31 @@ ServerDuplexStream.prototype.waitForCancel = waitForCancel;
* @param {grpc.Metadata} metadata Metadata from the client
*/
function handleUnary(call, handler, metadata) {
var asyncResource = createAsyncResourceWrapper('GrpcServerRequest');
var emitter = new ServerUnaryCall(call, metadata);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', emitter);
emitter.on('error', function(error) {
handleError(call, error);
asyncResource.destroy();
});
emitter.waitForCancel();
var batch = {};
batch[grpc.opType.RECV_MESSAGE] = true;
call.startBatch(batch, asyncResource.wrap(function(err, result) {
if (err) {
handleError(call, err);
asyncResource.destroy();
return;
}
try {
emitter.request = handler.deserialize(result.read);
} catch (e) {
e.code = constants.status.INTERNAL;
handleError(call, e);
asyncResource.destroy();
return;
}
if (emitter.cancelled) {
asyncResource.destroy();
return;
}
handler.func(emitter, function sendUnaryData(err, value, trailer, flags) {
Expand Down Expand Up @@ -633,8 +637,8 @@ function handleUnary(call, handler, metadata) {
* @param {grpc.Metadata} metadata Metadata from the client
*/
function handleServerStreaming(call, handler, metadata) {
var asyncResource = createAsyncResourceWrapper('GrpcServerRequest');
var stream = new ServerWritableStream(call, metadata, handler.serialize);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream);
stream.on('error', asyncResource.destroy);
stream.on('finish', asyncResource.destroy);
stream.waitForCancel();
Expand Down Expand Up @@ -677,10 +681,11 @@ function handleServerStreaming(call, handler, metadata) {
* @param {grpc.Metadata} metadata Metadata from the client
*/
function handleClientStreaming(call, handler, metadata) {
var asyncResource = createAsyncResourceWrapper('GrpcServerRequest');
var stream = new ServerReadableStream(call, metadata, handler.deserialize);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream);
stream.on('error', function(error) {
handleError(call, error);
asyncResource.destroy();
});
stream.waitForCancel();
asyncResource.wrap(function() {
Expand Down Expand Up @@ -719,9 +724,9 @@ function handleClientStreaming(call, handler, metadata) {
* @param {Metadata} metadata Metadata from the client
*/
function handleBidiStreaming(call, handler, metadata) {
var asyncResource = createAsyncResourceWrapper('GrpcServerRequest');
var stream = new ServerDuplexStream(call, metadata, handler.serialize,
handler.deserialize);
var asyncResource = createAsyncResourceWrapper('grpc.ServerRequest', stream);
stream.on('error', asyncResource.destroy);
stream.on('finish', asyncResource.destroy);
stream.waitForCancel();
Expand Down Expand Up @@ -766,7 +771,7 @@ Server.prototype.start = function() {
}
var self = this;
this.started = true;
this.asyncResourceWrap = createAsyncResourceWrapper('GrpcServer');
this.asyncResourceWrap = createAsyncResourceWrapper('grpc.Server', this);
this._server.start();
/**
* Handles the SERVER_RPC_NEW event. If there is a handler associated with
Expand Down

0 comments on commit b9ba5a4

Please sign in to comment.