Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MessageHandler] Re-factor and convert the code to a proper class #11290

Merged
merged 5 commits into from
Oct 31, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .eslintrc
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,7 @@
"object-shorthand": ["error", "always", {
"avoidQuotes": true,
}],
"prefer-const": "off",
"rest-spread-spacing": ["error", "never"],
"sort-imports": ["error", {
"ignoreCase": true,
Expand Down
195 changes: 112 additions & 83 deletions src/shared/message_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,19 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/* eslint no-var: error, prefer-const: error */

import {
AbortException, assert, createPromiseCapability, MissingPDFException,
ReadableStream, UnexpectedResponseException, UnknownErrorException
} from './util';

const CallbackKind = {
UNKNOWN: 0,
DATA: 1,
ERROR: 2,
};

const StreamKind = {
UNKNOWN: 0,
CANCEL: 1,
Expand Down Expand Up @@ -54,96 +61,110 @@ function wrapReason(reason) {
}
}

function MessageHandler(sourceName, targetName, comObj) {
this.sourceName = sourceName;
this.targetName = targetName;
this.comObj = comObj;
this.callbackId = 1;
this.streamId = 1;
this.postMessageTransfers = true;
this.streamSinks = Object.create(null);
this.streamControllers = Object.create(null);
let callbacksCapabilities = this.callbacksCapabilities = Object.create(null);
let ah = this.actionHandler = Object.create(null);
class MessageHandler {
constructor(sourceName, targetName, comObj) {
this.sourceName = sourceName;
this.targetName = targetName;
this.comObj = comObj;
this.callbackId = 1;
this.streamId = 1;
this.postMessageTransfers = true;
this.streamSinks = Object.create(null);
this.streamControllers = Object.create(null);
this.callbackCapabilities = Object.create(null);
this.actionHandler = Object.create(null);

this._onComObjOnMessage = (event) => {
let data = event.data;
if (data.targetName !== this.sourceName) {
return;
}
if (data.stream) {
this._processStreamMessage(data);
} else if (data.isReply) {
let callbackId = data.callbackId;
if (data.callbackId in callbacksCapabilities) {
let callback = callbacksCapabilities[callbackId];
delete callbacksCapabilities[callbackId];
if ('reason' in data) {
callback.reject(wrapReason(data.reason));
this._onComObjOnMessage = (event) => {
const data = event.data;
if (data.targetName !== this.sourceName) {
return;
}
if (data.stream) {
this._processStreamMessage(data);
return;
}
if (data.callback) {
const callbackId = data.callbackId;
const capability = this.callbackCapabilities[callbackId];
if (!capability) {
throw new Error(`Cannot resolve callback ${callbackId}`);
}
delete this.callbackCapabilities[callbackId];

if (data.callback === CallbackKind.DATA) {
capability.resolve(data.data);
} else if (data.callback === CallbackKind.ERROR) {
capability.reject(wrapReason(data.reason));
} else {
callback.resolve(data.data);
throw new Error('Unexpected callback case');
}
} else {
throw new Error(`Cannot resolve callback ${callbackId}`);
return;
}
const action = this.actionHandler[data.action];
if (!action) {
throw new Error(`Unknown action from worker: ${data.action}`);
}
} else if (data.action in ah) {
let action = ah[data.action];
if (data.callbackId) {
let sourceName = this.sourceName;
let targetName = data.sourceName;
const sourceName = this.sourceName;
const targetName = data.sourceName;
new Promise(function(resolve) {
resolve(action(data.data));
}).then(function(result) {
comObj.postMessage({
sourceName,
targetName,
isReply: true,
callback: CallbackKind.DATA,
callbackId: data.callbackId,
data: result,
});
}, function(reason) {
comObj.postMessage({
sourceName,
targetName,
isReply: true,
callback: CallbackKind.ERROR,
callbackId: data.callbackId,
reason: wrapReason(reason),
});
});
} else if (data.streamId) {
return;
}
if (data.streamId) {
this._createStreamSink(data);
} else {
action(data.data);
return;
}
} else {
throw new Error(`Unknown action from worker: ${data.action}`);
}
};
comObj.addEventListener('message', this._onComObjOnMessage);
}
action(data.data);
};
comObj.addEventListener('message', this._onComObjOnMessage);
}

MessageHandler.prototype = {
on(actionName, handler) {
var ah = this.actionHandler;
if (typeof PDFJSDev === 'undefined' ||
PDFJSDev.test('!PRODUCTION || TESTING')) {
assert(typeof handler === 'function',
'MessageHandler.on: Expected "handler" to be a function.');
}
const ah = this.actionHandler;
if (ah[actionName]) {
throw new Error(`There is already an actionName called "${actionName}"`);
}
ah[actionName] = handler;
},
}

/**
* Sends a message to the comObj to invoke the action with the supplied data.
* @param {string} actionName - Action to call.
* @param {JSON} data - JSON data to send.
* @param {Array} [transfers] - List of transfers/ArrayBuffers.
*/
send(actionName, data, transfers) {
this.postMessage({
this._postMessage({
sourceName: this.sourceName,
targetName: this.targetName,
action: actionName,
data,
}, transfers);
},
}

/**
* Sends a message to the comObj to invoke the action with the supplied data.
* Expects that the other side will callback with the response.
Expand All @@ -153,11 +174,11 @@ MessageHandler.prototype = {
* @returns {Promise} Promise to be resolved with response data.
*/
sendWithPromise(actionName, data, transfers) {
var callbackId = this.callbackId++;
var capability = createPromiseCapability();
this.callbacksCapabilities[callbackId] = capability;
const callbackId = this.callbackId++;
const capability = createPromiseCapability();
this.callbackCapabilities[callbackId] = capability;
try {
this.postMessage({
this._postMessage({
sourceName: this.sourceName,
targetName: this.targetName,
action: actionName,
Expand All @@ -168,7 +189,8 @@ MessageHandler.prototype = {
capability.reject(ex);
}
return capability.promise;
},
}

/**
* Sends a message to the comObj to invoke the action with the supplied data.
* Expect that the other side will callback to signal 'start_complete'.
Expand All @@ -180,22 +202,22 @@ MessageHandler.prototype = {
* @returns {ReadableStream} ReadableStream to read data in chunks.
*/
sendWithStream(actionName, data, queueingStrategy, transfers) {
let streamId = this.streamId++;
let sourceName = this.sourceName;
let targetName = this.targetName;
const streamId = this.streamId++;
const sourceName = this.sourceName;
const targetName = this.targetName;
const comObj = this.comObj;

return new ReadableStream({
start: (controller) => {
let startCapability = createPromiseCapability();
const startCapability = createPromiseCapability();
this.streamControllers[streamId] = {
controller,
startCall: startCapability,
pullCall: null,
cancelCall: null,
isClosed: false,
};
this.postMessage({
this._postMessage({
sourceName,
targetName,
action: actionName,
Expand All @@ -208,7 +230,7 @@ MessageHandler.prototype = {
},

pull: (controller) => {
let pullCapability = createPromiseCapability();
const pullCapability = createPromiseCapability();
this.streamControllers[streamId].pullCall = pullCapability;
comObj.postMessage({
sourceName,
Expand All @@ -224,7 +246,7 @@ MessageHandler.prototype = {

cancel: (reason) => {
assert(reason instanceof Error, 'cancel must have a valid reason');
let cancelCapability = createPromiseCapability();
const cancelCapability = createPromiseCapability();
this.streamControllers[streamId].cancelCall = cancelCapability;
this.streamControllers[streamId].isClosed = true;
comObj.postMessage({
Expand All @@ -238,24 +260,25 @@ MessageHandler.prototype = {
return cancelCapability.promise;
},
}, queueingStrategy);
},
}

/**
* @private
*/
_createStreamSink(data) {
let self = this;
let action = this.actionHandler[data.action];
let streamId = data.streamId;
let desiredSize = data.desiredSize;
let sourceName = this.sourceName;
let targetName = data.sourceName;
let capability = createPromiseCapability();
const self = this;
const action = this.actionHandler[data.action];
const streamId = data.streamId;
const sourceName = this.sourceName;
const targetName = data.sourceName;
const comObj = this.comObj;

let streamSink = {
const streamSink = {
enqueue(chunk, size = 1, transfers) {
if (this.isCancelled) {
return;
}
let lastDesiredSize = this.desiredSize;
const lastDesiredSize = this.desiredSize;
this.desiredSize -= size;
// Enqueue decreases the desiredSize property of sink,
// so when it changes from positive to negative,
Expand All @@ -264,7 +287,7 @@ MessageHandler.prototype = {
this.sinkCapability = createPromiseCapability();
this.ready = this.sinkCapability.promise;
}
self.postMessage({
self._postMessage({
sourceName,
targetName,
stream: StreamKind.ENQUEUE,
Expand Down Expand Up @@ -302,11 +325,11 @@ MessageHandler.prototype = {
});
},

sinkCapability: capability,
sinkCapability: createPromiseCapability(),
onPull: null,
onCancel: null,
isCancelled: false,
desiredSize,
desiredSize: data.desiredSize,
ready: null,
};

Expand All @@ -332,12 +355,15 @@ MessageHandler.prototype = {
reason: wrapReason(reason),
});
});
},
}

/**
* @private
*/
_processStreamMessage(data) {
let sourceName = this.sourceName;
let targetName = data.sourceName;
const streamId = data.streamId;
const sourceName = this.sourceName;
const targetName = data.sourceName;
const comObj = this.comObj;

switch (data.stream) {
Expand Down Expand Up @@ -465,8 +491,11 @@ MessageHandler.prototype = {
default:
throw new Error('Unexpected stream case');
}
},
}

/**
* @private
*/
async _deleteStreamController(streamId) {
// Delete the `streamController` only when the start, pull, and cancel
// capabilities have settled, to prevent `TypeError`s.
Expand All @@ -478,26 +507,26 @@ MessageHandler.prototype = {
return capability && capability.promise.catch(function() { });
}));
delete this.streamControllers[streamId];
},
}

/**
* Sends raw message to the comObj.
* @private
* @param {Object} message - Raw message.
* @param transfers List of transfers/ArrayBuffers, or undefined.
* @private
*/
postMessage(message, transfers) {
_postMessage(message, transfers) {
if (transfers && this.postMessageTransfers) {
this.comObj.postMessage(message, transfers);
} else {
this.comObj.postMessage(message);
}
},
}

destroy() {
this.comObj.removeEventListener('message', this._onComObjOnMessage);
},
};
}
}

export {
MessageHandler,
Expand Down