Skip to content

Commit

Permalink
Re-factor the MessageHandler-class event handler function
Browse files Browse the repository at this point in the history
 - Change the "message" event handler function to a private method.

 - Remove the "message" event listener with an `AbortSignal`.

 - Extend the `LoopbackPort`-class with `AbortSignal` support.
  • Loading branch information
Snuffleupagus committed Oct 16, 2024
1 parent 689ffda commit 788eabc
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 66 deletions.
26 changes: 22 additions & 4 deletions src/display/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -1982,7 +1982,7 @@ class PDFPageProxy {
}

class LoopbackPort {
#listeners = new Set();
#listeners = new Map();

#deferred = Promise.resolve();

Expand All @@ -1992,21 +1992,39 @@ class LoopbackPort {
};

this.#deferred.then(() => {
for (const listener of this.#listeners) {
for (const [listener] of this.#listeners) {
listener.call(this, event);
}
});
}

addEventListener(name, listener) {
this.#listeners.add(listener);
addEventListener(name, listener, options = null) {
let rmAbort = null;
if (options?.signal instanceof AbortSignal) {
const { signal } = options;
if (signal.aborted) {
warn("LoopbackPort - cannot use an `aborted` signal.");
return;
}
const onAbort = () => this.removeEventListener(name, listener);
rmAbort = () => signal.removeEventListener("abort", onAbort);

signal.addEventListener("abort", onAbort);
}
this.#listeners.set(listener, rmAbort);
}

removeEventListener(name, listener) {
const rmAbort = this.#listeners.get(listener);
rmAbort?.();

this.#listeners.delete(listener);
}

terminate() {
for (const [, rmAbort] of this.#listeners) {
rmAbort?.();
}
this.#listeners.clear();
}
}
Expand Down
130 changes: 68 additions & 62 deletions src/shared/message_handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ function wrapReason(reason) {
}

class MessageHandler {
#messageAC = new AbortController();

constructor(sourceName, targetName, comObj) {
this.sourceName = sourceName;
this.targetName = targetName;
Expand All @@ -80,71 +82,74 @@ class MessageHandler {
this.callbackCapabilities = Object.create(null);
this.actionHandler = Object.create(null);

this._onComObjOnMessage = event => {
const data = event.data;
if (data.targetName !== this.sourceName) {
return;
}
if (data.stream) {
this.#processStreamMessage(data);
return;
}
if (data.callback) {
const callbackId = data.callbackId;
const capability = this.callbackCapabilities[callbackId];
if (!capability) {
throw new Error(`Cannot resolve callback ${callbackId}`);
}
delete this.callbackCapabilities[callbackId];
comObj.addEventListener("message", this.#onMessage.bind(this), {
signal: this.#messageAC.signal,
});
}

if (data.callback === CallbackKind.DATA) {
capability.resolve(data.data);
} else if (data.callback === CallbackKind.ERROR) {
capability.reject(wrapReason(data.reason));
} else {
throw new Error("Unexpected callback case");
}
return;
}
const action = this.actionHandler[data.action];
if (!action) {
throw new Error(`Unknown action from worker: ${data.action}`);
#onMessage({ data }) {
if (data.targetName !== this.sourceName) {
return;
}
if (data.stream) {
this.#processStreamMessage(data);
return;
}
if (data.callback) {
const callbackId = data.callbackId;
const capability = this.callbackCapabilities[callbackId];
if (!capability) {
throw new Error(`Cannot resolve callback ${callbackId}`);
}
if (data.callbackId) {
const cbSourceName = this.sourceName;
const cbTargetName = data.sourceName;
delete this.callbackCapabilities[callbackId];

new Promise(function (resolve) {
resolve(action(data.data));
}).then(
function (result) {
comObj.postMessage({
sourceName: cbSourceName,
targetName: cbTargetName,
callback: CallbackKind.DATA,
callbackId: data.callbackId,
data: result,
});
},
function (reason) {
comObj.postMessage({
sourceName: cbSourceName,
targetName: cbTargetName,
callback: CallbackKind.ERROR,
callbackId: data.callbackId,
reason: wrapReason(reason),
});
}
);
return;
}
if (data.streamId) {
this.#createStreamSink(data);
return;
if (data.callback === CallbackKind.DATA) {
capability.resolve(data.data);
} else if (data.callback === CallbackKind.ERROR) {
capability.reject(wrapReason(data.reason));
} else {
throw new Error("Unexpected callback case");
}
action(data.data);
};
comObj.addEventListener("message", this._onComObjOnMessage);
return;
}
const action = this.actionHandler[data.action];
if (!action) {
throw new Error(`Unknown action from worker: ${data.action}`);
}
if (data.callbackId) {
const sourceName = this.sourceName,
targetName = data.sourceName,
comObj = this.comObj;

new Promise(function (resolve) {
resolve(action(data.data));
}).then(
function (result) {
comObj.postMessage({
sourceName,
targetName,
callback: CallbackKind.DATA,
callbackId: data.callbackId,
data: result,
});
},
function (reason) {
comObj.postMessage({
sourceName,
targetName,
callback: CallbackKind.ERROR,
callbackId: data.callbackId,
reason: wrapReason(reason),
});
}
);
return;
}
if (data.streamId) {
this.#createStreamSink(data);
return;
}
action(data.data);
}

on(actionName, handler) {
Expand Down Expand Up @@ -527,7 +532,8 @@ class MessageHandler {
}

destroy() {
this.comObj.removeEventListener("message", this._onComObjOnMessage);
this.#messageAC?.abort();
this.#messageAC = null;
}
}

Expand Down

0 comments on commit 788eabc

Please sign in to comment.