Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Actor typings #3225

Merged
merged 5 commits into from
Oct 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/source/worker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import {WorkerGlobalScopeInterface} from '../util/web_worker';
import {CanonicalTileID, OverscaledTileID} from './tile_id';
import {TileParameters, WorkerSource, WorkerTileCallback, WorkerTileParameters} from './worker_source';
import {plugin as globalRTLTextPlugin} from './rtl_text_plugin';
import {ActorTarget} from '../util/actor';

const _self = {
addEventListener() {}
} as any as WorkerGlobalScopeInterface;
} as any as WorkerGlobalScopeInterface & ActorTarget;

class WorkerSourceMock implements WorkerSource {
availableImages: string[];
Expand Down
18 changes: 9 additions & 9 deletions src/source/worker.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Actor} from '../util/actor';
import {Actor, ActorTarget} from '../util/actor';
import {StyleLayerIndex} from '../style/style_layer_index';
import {VectorTileWorkerSource} from './vector_tile_worker_source';
import {RasterDEMTileWorkerSource} from './raster_dem_tile_worker_source';
Expand All @@ -24,7 +24,7 @@ import type {PluginState} from './rtl_text_plugin';
* The Worker class responsidble for background thread related execution
*/
export default class Worker {
self: WorkerGlobalScopeInterface;
self: WorkerGlobalScopeInterface & ActorTarget;
actor: Actor;
layerIndexes: {[_: string]: StyleLayerIndex};
availableImages: {[_: string]: Array<string>};
Expand All @@ -47,7 +47,7 @@ export default class Worker {
};
referrer: string;

constructor(self: WorkerGlobalScopeInterface) {
constructor(self: WorkerGlobalScopeInterface & ActorTarget) {
this.self = self;
this.actor = new Actor(self, this);

Expand Down Expand Up @@ -222,24 +222,24 @@ export default class Worker {
return layerIndexes;
}

getWorkerSource(mapId: string, type: string, source: string) {
getWorkerSource(mapId: string, sourceType: string, sourceName: string): WorkerSource {
if (!this.workerSources[mapId])
this.workerSources[mapId] = {};
if (!this.workerSources[mapId][type])
this.workerSources[mapId][type] = {};
if (!this.workerSources[mapId][sourceType])
this.workerSources[mapId][sourceType] = {};

if (!this.workerSources[mapId][type][source]) {
if (!this.workerSources[mapId][sourceType][sourceName]) {
// use a wrapped actor so that we can attach a target mapId param
// to any messages invoked by the WorkerSource
const actor = {
send: (type, data, callback) => {
this.actor.send(type, data, callback, mapId);
}
};
this.workerSources[mapId][type][source] = new (this.workerSourceTypes[type] as any)((actor as any), this.getLayerIndex(mapId), this.getAvailableImages(mapId));
this.workerSources[mapId][sourceType][sourceName] = new (this.workerSourceTypes[sourceType] as any)((actor as any), this.getLayerIndex(mapId), this.getAvailableImages(mapId));
}

return this.workerSources[mapId][type][source];
return this.workerSources[mapId][sourceType][sourceName];
}

getDEMWorkerSource(mapId: string, source: string) {
Expand Down
20 changes: 11 additions & 9 deletions src/util/actor.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Actor} from './actor';
import {Actor, ActorTarget} from './actor';
import {workerFactory} from './web_worker';
import {MessageBus} from '../../test/unit/lib/web_worker_mock';

Expand Down Expand Up @@ -33,26 +33,27 @@ describe('Actor', () => {
this.self = self;
this.actor = new Actor(self, this);
}
test(mapId, params, callback) {
getTile(mapId, params, callback) {
setTimeout(callback, 0, null, params);
}
getWorkerSource() { return null; }
});

const worker = workerFactory();

const m1 = new Actor(worker, {}, '1');
const m2 = new Actor(worker, {}, '2');
const m1 = new Actor(worker, {} as any, '1');
const m2 = new Actor(worker, {} as any, '2');

let callbackCount = 0;
m1.send('test', {value: 1729}, (err, response) => {
m1.send('getTile', {value: 1729}, (err, response) => {
expect(err).toBeFalsy();
expect(response).toEqual({value: 1729});
callbackCount++;
if (callbackCount === 2) {
done();
}
});
m2.send('test', {value: 4104}, (err, response) => {
m2.send('getTile', {value: 4104}, (err, response) => {
expect(err).toBeFalsy();
expect(response).toEqual({value: 4104});
callbackCount++;
Expand All @@ -72,18 +73,19 @@ describe('Actor', () => {
this.self = self;
this.actor = workerActor = new Actor(self, this);
}
getWorkerSource() { return null; }
});

const worker = workerFactory();

new Actor(worker, {
test () { done(); }
}, '1');
} as any, '1');
new Actor(worker, {
test () {
done('test failed');
}
}, '2');
} as any, '2');

workerActor.send('test', {}, () => {}, '1');
});
Expand All @@ -97,7 +99,7 @@ describe('Actor', () => {
expect([type, callback, useCapture]).toEqual(this._addEventListenerArgs);
done();
}
}, {}, null);
} as ActorTarget, {} as any, null);
actor.remove();
});

Expand Down
105 changes: 65 additions & 40 deletions src/util/actor.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,43 @@
import {isWorker, isSafari} from './util';
import {serialize, deserialize} from './web_worker_transfer';
import {isWorker} from './util';
import {serialize, deserialize, Serialized} from './web_worker_transfer';
import {ThrottledInvoker} from './throttled_invoker';

import type {Transferable} from '../types/transferable';
import type {Cancelable} from '../types/cancelable';
import type {WorkerSource} from '../source/worker_source';

export interface ActorTarget {
addEventListener: typeof window.addEventListener;
removeEventListener: typeof window.removeEventListener;
postMessage: typeof window.postMessage;
terminate?: () => void;
}

export interface WorkerSourceProvider {
getWorkerSource(mapId: string, sourceType: string, sourceName: string): WorkerSource;
}

export type MessageType = '<response>' | '<cancel>' |
'geojson.getClusterExpansionZoom' | 'geojson.getClusterChildren' | 'geojson.getClusterLeaves' | 'geojson.loadData' |
'removeSource' | 'loadWorkerSource' | 'loadDEMTile' | 'removeDEMTile' |
'removeTile' | 'reloadTile' | 'abortTile' | 'loadTile' | 'getTile' |
'getGlyphs' | 'getImages' | 'setImages' |
'syncRTLPluginState' | 'setReferrer' | 'setLayers' | 'updateLayers';
Comment on lines +20 to +25
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can also do keyof Worker here instead of enumerating these to get them automatically from the methods defined in worker.ts.

The geojson.* ones wouldn't come for free, but we could probably either do some fancy typescript work to create them automatically from a list of WorkerSource types, or just make explicit methods on worker geojsonGetClusterLeaves and getjsonLoadData since there's only 2 of them.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, but this is not the way I would like to move forward, I don't think that a message type should be a method name, it's not refactoring-safe, what I would like to do going forward is "register" an even type with a specific method, much like pub-sub and introduce promises to this entire mess.
I think this would be the first step toward reducing the number of callbacks from the code.
But I want this initial step to have more clarity on how to change things and how to proceed.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The geojson part seems like a hack I'll be removing once this will be refactored to something that is easy to read and maintain.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might help to see some pseudocode of the end state you're looking for. Is it something like:

// in shared code: define  a message object/type
class LoadDemTileRequest { ... }

// in worker: register handler
actor.register(LoadDemTileRequest, demWorker.loadTile);

// in main: send using shared message object/type
actor.send(new LoadDemTileRequest(...))

?

I think it's possible to get the existing "method of worker" approach to work with promises, but you've got a good point that we've got a chance to rethink this now so don't need to feel bound by the old implementation.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, something similar to what you wrote, I still don't have a good picture of how this will look as I need to understand most of the places this is used and how to best define it.
For example I leaned that Style class is registering itself to provide getGlyphs method to the workers. It took me too long to figure out why and where the registration occures and how this works.
It shouldn't be that complicated...
So I need to map the events and see how I can slowly progress and to it bit by bit...


export type MessageData = {
id: string;
type: MessageType;
data?: Serialized;
targetMapId?: string | null;
mustQueue?: boolean;
error?: Serialized | null;
hasCallback?: boolean;
sourceMapId: string | null;
}

export type Message = {
data: MessageData;
}

/**
* An implementation of the [Actor design pattern](http://en.wikipedia.org/wiki/Actor_model)
Expand All @@ -12,36 +46,30 @@ import type {Cancelable} from '../types/cancelable';
* owned by the styles
*/
export class Actor {
target: any;
parent: any;
target: ActorTarget;
parent: WorkerSourceProvider;
mapId: string | null;
callbacks: {
number: any;
};
callbacks: { [x: number]: Function};
name: string;
tasks: {
number: any;
};
taskQueue: Array<number>;
cancelCallbacks: {
number: Cancelable;
};
tasks: { [x: number]: MessageData };
taskQueue: Array<string>;
cancelCallbacks: { [x: number]: () => void };
invoker: ThrottledInvoker;
globalScope: any;
globalScope: ActorTarget;

/**
* @param target - The target
* @param parent - The parent
* @param mapId - A unique identifier for the Map instance using this Actor.
*/
constructor(target: any, parent: any, mapId?: string) {
constructor(target: ActorTarget, parent: WorkerSourceProvider, mapId?: string) {
this.target = target;
this.parent = parent;
this.mapId = mapId;
this.callbacks = {} as { number: any };
this.tasks = {} as { number: any };
this.callbacks = {};
this.tasks = {};
this.taskQueue = [];
this.cancelCallbacks = {} as { number: Cancelable };
this.cancelCallbacks = {};
this.invoker = new ThrottledInvoker(this.process);
this.target.addEventListener('message', this.receive, false);
this.globalScope = isWorker() ? target : window;
Expand All @@ -55,7 +83,7 @@ export class Actor {
* @param targetMapId - A particular mapId to which to send this message.
*/
send(
type: string,
type: MessageType,
data: unknown,
Copy link
Contributor

@msbarry msbarry Oct 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it in scope for this PR to make data argument typed so you know you are passing the right data into it? Here's what I landed on in maplibre-contour:

https://github.com/onthegomap/maplibre-contour/blob/6abe4628329227337ae536c9b3eb6dc9658560c2/src/actor.ts#L107-L117

Something like:

type KeysOfType<T, V> = {
    [K in keyof T]: T[K] extends V ? K : never;
}[keyof T];

export type MessageType = KeysOfType<WorkerType, (a: any, b: any, c: WorkerTileCallback) => void>;

    send(
        type: MessageType,
        data: Parameters<WorkerType[MessageType]>[1],
        callback?: Function | null,
        targetMapId?: string | null,
        mustQueue: boolean = false
    ): Cancelable {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is OK for this PR, I'm thinking about creating a sendAsync method that will get a message, each message will have a type and a payload that is specific to that message, this way you know what you are passing and what you are expecting in return.
This will return a promise with a single data object that will have all the relevant data.
It will be a slow process to fix all the places, but once done, I'll simply remove this send method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense - the typing gets pretty gnarly to make that work, makes sense to set up the typing we want on the new clean-slate implementation and clean things up as we migrate to it.

callback?: Function | null,
targetMapId?: string | null,
Expand All @@ -69,40 +97,36 @@ export class Actor {
if (callback) {
this.callbacks[id] = callback;
}
const buffers: Array<Transferable> = isSafari(this.globalScope) ? undefined : [];
this.target.postMessage({
const buffers: Array<Transferable> = [];
const message: MessageData = {
id,
type,
hasCallback: !!callback,
targetMapId,
mustQueue,
sourceMapId: this.mapId,
data: serialize(data, buffers)
}, buffers);
};

this.target.postMessage(message, {transfer: buffers});
return {
cancel: () => {
if (callback) {
// Set the callback to null so that it never fires after the request is aborted.
delete this.callbacks[id];
}
this.target.postMessage({
const cancelMessage: MessageData = {
id,
type: '<cancel>',
targetMapId,
sourceMapId: this.mapId
});
};
this.target.postMessage(cancelMessage);
}
};
}

receive = (message: {
data: {
id: number;
type: string;
data: unknown;
targetMapId?: string | null;
mustQueue: boolean;
};}) => {
receive = (message: Message) => {
const data = message.data;
const id = data.id;

Expand Down Expand Up @@ -164,7 +188,7 @@ export class Actor {
this.processTask(id, task);
};

processTask(id: number, task: any) {
processTask(id: string, task: MessageData) {
if (task.type === '<response>') {
// The done() function in the counterpart has been called, and we are now
// firing the callback in the originating actor, if there is one.
Expand All @@ -180,30 +204,31 @@ export class Actor {
}
} else {
let completed = false;
const buffers: Array<Transferable> = isSafari(this.globalScope) ? undefined : [];
const buffers: Array<Transferable> = [];
const done = task.hasCallback ? (err: Error, data?: any) => {
completed = true;
delete this.cancelCallbacks[id];
this.target.postMessage({
const responseMessage: MessageData = {
id,
type: '<response>',
sourceMapId: this.mapId,
error: err ? serialize(err) : null,
data: serialize(data, buffers)
}, buffers);
};
this.target.postMessage(responseMessage, {transfer: buffers});
} : (_) => {
completed = true;
};

let callback = null;
const params = (deserialize(task.data) as any);
let callback: Cancelable = null;
const params = deserialize(task.data);
if (this.parent[task.type]) {
// task.type == 'loadTile', 'removeTile', etc.
callback = this.parent[task.type](task.sourceMapId, params, done);
} else if (this.parent.getWorkerSource) {
// task.type == sourcetype.method
const keys = task.type.split('.');
const scope = (this.parent as any).getWorkerSource(task.sourceMapId, keys[0], params.source);
const scope = this.parent.getWorkerSource(task.sourceMapId, keys[0], (params as any).source);
callback = scope[keys[1]](params, done);
} else {
// No function was found.
Expand Down
4 changes: 2 additions & 2 deletions src/util/dispatcher.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {asyncAll} from './util';
import {Actor} from './actor';
import {Actor, MessageType} from './actor';

import type {WorkerPool} from './worker_pool';
import type {WorkerSource} from '../source/worker_source'; /* eslint-disable-line */ // this is used for the docs' import
Expand Down Expand Up @@ -36,7 +36,7 @@ export class Dispatcher {
/**
* Broadcast a message to all Workers.
*/
broadcast(type: string, data: unknown, cb?: (...args: any[]) => any) {
broadcast(type: MessageType, data: unknown, cb?: (...args: any[]) => any) {
cb = cb || function () {};
asyncAll(this.actors, (actor, done) => {
actor.send(type, data, done);
Expand Down
15 changes: 0 additions & 15 deletions src/util/web_worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,6 @@ import {config} from './config';

import type {WorkerSource} from '../source/worker_source';

export type MessageListener = (
a: {
data: any;
target: any;
}
) => unknown;

// The main thread interface. Provided by Worker in a browser environment,
export interface WorkerInterface {
addEventListener(type: 'message', listener: MessageListener): void;
removeEventListener(type: 'message', listener: MessageListener): void;
postMessage(message: any): void;
terminate(): void;
}

export interface WorkerGlobalScopeInterface {
importScripts(...urls: Array<string>): void;
registerWorkerSource: (
Expand Down
Loading