Skip to content

Commit

Permalink
Merge pull request #11155 from Snuffleupagus/MessageHandler-misc-cleanup
Browse files Browse the repository at this point in the history
Miscellaneous (small) clean-up of the `MessageHandler` code
  • Loading branch information
timvandermeij authored Sep 18, 2019
2 parents d7c7f15 + 4bd79ec commit 7af66c8
Showing 1 changed file with 48 additions and 38 deletions.
86 changes: 48 additions & 38 deletions src/shared/message_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,6 @@ function wrapReason(reason) {
}
}

function resolveOrReject(capability, data) {
if (data.success) {
capability.resolve();
} else {
capability.reject(wrapReason(data.reason));
}
}

function MessageHandler(sourceName, targetName, comObj) {
this.sourceName = sourceName;
this.targetName = targetName;
Expand All @@ -80,8 +72,8 @@ function MessageHandler(sourceName, targetName, comObj) {
if (data.callbackId in callbacksCapabilities) {
let callback = callbacksCapabilities[callbackId];
delete callbacksCapabilities[callbackId];
if ('error' in data) {
callback.reject(wrapReason(data.error));
if ('reason' in data) {
callback.reject(wrapReason(data.reason));
} else {
callback.resolve(data.data);
}
Expand Down Expand Up @@ -109,7 +101,7 @@ function MessageHandler(sourceName, targetName, comObj) {
targetName,
isReply: true,
callbackId: data.callbackId,
error: wrapReason(reason),
reason: wrapReason(reason),
});
});
} else if (data.streamId) {
Expand Down Expand Up @@ -193,6 +185,8 @@ MessageHandler.prototype = {
this.streamControllers[streamId] = {
controller,
startCall: startCapability,
pullCall: null,
cancelCall: null,
isClosed: false,
};
this.postMessage({
Expand Down Expand Up @@ -337,33 +331,43 @@ MessageHandler.prototype = {
_processStreamMessage(data) {
let sourceName = this.sourceName;
let targetName = data.sourceName;
let streamId = data.streamId;
const streamId = data.streamId;
const comObj = this.comObj;

let deleteStreamController = () => {
// Delete the `streamController` only when the start, pull, and cancel
// capabilities have settled, to prevent `TypeError`s.
Promise.all([
this.streamControllers[data.streamId].startCall,
this.streamControllers[data.streamId].pullCall,
this.streamControllers[data.streamId].cancelCall
this.streamControllers[streamId].startCall,
this.streamControllers[streamId].pullCall,
this.streamControllers[streamId].cancelCall
].map(function(capability) {
return capability && capability.promise.catch(function() { });
})).then(() => {
delete this.streamControllers[data.streamId];
delete this.streamControllers[streamId];
});
};

switch (data.stream) {
case StreamKind.START_COMPLETE:
resolveOrReject(this.streamControllers[data.streamId].startCall, data);
if (data.success) {
this.streamControllers[streamId].startCall.resolve();
} else {
this.streamControllers[streamId].startCall.reject(
wrapReason(data.reason));
}
break;
case StreamKind.PULL_COMPLETE:
resolveOrReject(this.streamControllers[data.streamId].pullCall, data);
if (data.success) {
this.streamControllers[streamId].pullCall.resolve();
} else {
this.streamControllers[streamId].pullCall.reject(
wrapReason(data.reason));
}
break;
case StreamKind.PULL:
// Ignore any pull after close is called.
if (!this.streamSinks[data.streamId]) {
if (!this.streamSinks[streamId]) {
comObj.postMessage({
sourceName,
targetName,
Expand All @@ -376,12 +380,12 @@ MessageHandler.prototype = {
// Pull increases the desiredSize property of sink,
// so when it changes from negative to positive,
// set ready property as resolved promise.
if (this.streamSinks[data.streamId].desiredSize <= 0 &&
if (this.streamSinks[streamId].desiredSize <= 0 &&
data.desiredSize > 0) {
this.streamSinks[data.streamId].sinkCapability.resolve();
this.streamSinks[streamId].sinkCapability.resolve();
}
// Reset desiredSize property of sink on every pull.
this.streamSinks[data.streamId].desiredSize = data.desiredSize;
this.streamSinks[streamId].desiredSize = data.desiredSize;
const { onPull, } = this.streamSinks[data.streamId];
new Promise(function(resolve) {
resolve(onPull && onPull());
Expand All @@ -404,35 +408,41 @@ MessageHandler.prototype = {
});
break;
case StreamKind.ENQUEUE:
assert(this.streamControllers[data.streamId],
assert(this.streamControllers[streamId],
'enqueue should have stream controller');
if (!this.streamControllers[data.streamId].isClosed) {
this.streamControllers[data.streamId].controller.enqueue(data.chunk);
if (this.streamControllers[streamId].isClosed) {
break;
}
this.streamControllers[streamId].controller.enqueue(data.chunk);
break;
case StreamKind.CLOSE:
assert(this.streamControllers[data.streamId],
assert(this.streamControllers[streamId],
'close should have stream controller');
if (this.streamControllers[data.streamId].isClosed) {
if (this.streamControllers[streamId].isClosed) {
break;
}
this.streamControllers[data.streamId].isClosed = true;
this.streamControllers[data.streamId].controller.close();
this.streamControllers[streamId].isClosed = true;
this.streamControllers[streamId].controller.close();
deleteStreamController();
break;
case StreamKind.ERROR:
assert(this.streamControllers[data.streamId],
assert(this.streamControllers[streamId],
'error should have stream controller');
this.streamControllers[data.streamId].controller.
error(wrapReason(data.reason));
this.streamControllers[streamId].controller.error(
wrapReason(data.reason));
deleteStreamController();
break;
case StreamKind.CANCEL_COMPLETE:
resolveOrReject(this.streamControllers[data.streamId].cancelCall, data);
if (data.success) {
this.streamControllers[streamId].cancelCall.resolve();
} else {
this.streamControllers[streamId].cancelCall.reject(
wrapReason(data.reason));
}
deleteStreamController();
break;
case StreamKind.CANCEL:
if (!this.streamSinks[data.streamId]) {
if (!this.streamSinks[streamId]) {
break;
}
const { onCancel, } = this.streamSinks[data.streamId];
Expand All @@ -455,10 +465,10 @@ MessageHandler.prototype = {
reason: wrapReason(reason),
});
});
this.streamSinks[data.streamId].sinkCapability.
reject(wrapReason(data.reason));
this.streamSinks[data.streamId].isCancelled = true;
delete this.streamSinks[data.streamId];
this.streamSinks[streamId].sinkCapability.reject(
wrapReason(data.reason));
this.streamSinks[streamId].isCancelled = true;
delete this.streamSinks[streamId];
break;
default:
throw new Error('Unexpected stream case');
Expand Down

0 comments on commit 7af66c8

Please sign in to comment.