-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
Copy pathTaskProcessor.js
344 lines (292 loc) · 12.6 KB
/
TaskProcessor.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
import when from '../ThirdParty/when.js';
import buildModuleUrl from './buildModuleUrl.js';
import defaultValue from './defaultValue.js';
import defined from './defined.js';
import destroyObject from './destroyObject.js';
import DeveloperError from './DeveloperError.js';
import Event from './Event.js';
import FeatureDetection from './FeatureDetection.js';
import isCrossOriginUrl from './isCrossOriginUrl.js';
import Resource from './Resource.js';
import RuntimeError from './RuntimeError.js';
function canTransferArrayBuffer() {
if (!defined(TaskProcessor._canTransferArrayBuffer)) {
var worker = new Worker(getWorkerUrl('Workers/transferTypedArrayTest.js'));
worker.postMessage = defaultValue(worker.webkitPostMessage, worker.postMessage);
var value = 99;
var array = new Int8Array([value]);
try {
// postMessage might fail with a DataCloneError
// if transferring array buffers is not supported.
worker.postMessage({
array : array
}, [array.buffer]);
} catch (e) {
TaskProcessor._canTransferArrayBuffer = false;
return TaskProcessor._canTransferArrayBuffer;
}
var deferred = when.defer();
worker.onmessage = function(event) {
var array = event.data.array;
// some versions of Firefox silently fail to transfer typed arrays.
// https://bugzilla.mozilla.org/show_bug.cgi?id=841904
// Check to make sure the value round-trips successfully.
var result = defined(array) && array[0] === value;
deferred.resolve(result);
worker.terminate();
TaskProcessor._canTransferArrayBuffer = result;
};
TaskProcessor._canTransferArrayBuffer = deferred.promise;
}
return TaskProcessor._canTransferArrayBuffer;
}
var taskCompletedEvent = new Event();
function completeTask(processor, data) {
--processor._activeTasks;
var id = data.id;
if (!defined(id)) {
// This is not one of ours.
return;
}
var deferreds = processor._deferreds;
var deferred = deferreds[id];
if (defined(data.error)) {
var error = data.error;
if (error.name === 'RuntimeError') {
error = new RuntimeError(data.error.message);
error.stack = data.error.stack;
} else if (error.name === 'DeveloperError') {
error = new DeveloperError(data.error.message);
error.stack = data.error.stack;
}
taskCompletedEvent.raiseEvent(error);
deferred.reject(error);
} else {
taskCompletedEvent.raiseEvent();
deferred.resolve(data.result);
}
delete deferreds[id];
}
function getWorkerUrl(moduleID) {
var url = buildModuleUrl(moduleID);
if (isCrossOriginUrl(url)) {
//to load cross-origin, create a shim worker from a blob URL
var script = 'importScripts("' + url + '");';
var blob;
try {
blob = new Blob([script], {
type : 'application/javascript'
});
} catch (e) {
var BlobBuilder = window.BlobBuilder || window.WebKitBlobBuilder || window.MozBlobBuilder || window.MSBlobBuilder;
var blobBuilder = new BlobBuilder();
blobBuilder.append(script);
blob = blobBuilder.getBlob('application/javascript');
}
var URL = window.URL || window.webkitURL;
url = URL.createObjectURL(blob);
}
return url;
}
var bootstrapperUrlResult;
function getBootstrapperUrl() {
if (!defined(bootstrapperUrlResult)) {
bootstrapperUrlResult = getWorkerUrl('Workers/cesiumWorkerBootstrapper.js');
}
return bootstrapperUrlResult;
}
function createWorker(processor) {
var worker = new Worker(getBootstrapperUrl());
worker.postMessage = defaultValue(worker.webkitPostMessage, worker.postMessage);
var bootstrapMessage = {
loaderConfig : {},
workerModule : TaskProcessor._workerModulePrefix + processor._workerName
};
if (defined(TaskProcessor._loaderConfig)) {
bootstrapMessage.loaderConfig = TaskProcessor._loaderConfig;
} else {
bootstrapMessage.loaderConfig.paths = {
'Workers': buildModuleUrl('Workers/Build')
};
bootstrapMessage.loaderConfig.baseUrl = buildModuleUrl.getCesiumBaseUrl().url;
}
worker.postMessage(bootstrapMessage);
worker.onmessage = function(event) {
completeTask(processor, event.data);
};
return worker;
}
function getWebAssemblyLoaderConfig(processor, wasmOptions) {
var config = {
modulePath : undefined,
wasmBinaryFile : undefined,
wasmBinary : undefined
};
// Web assembly not supported, use fallback js module if provided
if (!FeatureDetection.supportsWebAssembly()) {
if (!defined(wasmOptions.fallbackModulePath)) {
throw new RuntimeError('This browser does not support Web Assembly, and no backup module was provided for ' + processor._workerName);
}
config.modulePath = buildModuleUrl(wasmOptions.fallbackModulePath);
return when.resolve(config);
}
config.modulePath = buildModuleUrl(wasmOptions.modulePath);
config.wasmBinaryFile = buildModuleUrl(wasmOptions.wasmBinaryFile);
return Resource.fetchArrayBuffer({
url: config.wasmBinaryFile
}).then(function (arrayBuffer) {
config.wasmBinary = arrayBuffer;
return config;
});
}
/**
* A wrapper around a web worker that allows scheduling tasks for a given worker,
* returning results asynchronously via a promise.
*
* The Worker is not constructed until a task is scheduled.
*
* @alias TaskProcessor
* @constructor
*
* @param {String} workerName The name of the worker. This is expected to be a script
* in the Workers folder.
* @param {Number} [maximumActiveTasks=5] The maximum number of active tasks. Once exceeded,
* scheduleTask will not queue any more tasks, allowing
* work to be rescheduled in future frames.
*/
function TaskProcessor(workerName, maximumActiveTasks) {
this._workerName = workerName;
this._maximumActiveTasks = defaultValue(maximumActiveTasks, 5);
this._activeTasks = 0;
this._deferreds = {};
this._nextID = 0;
}
var emptyTransferableObjectArray = [];
/**
* Schedule a task to be processed by the web worker asynchronously. If there are currently more
* tasks active than the maximum set by the constructor, will immediately return undefined.
* Otherwise, returns a promise that will resolve to the result posted back by the worker when
* finished.
*
* @param {Object} parameters Any input data that will be posted to the worker.
* @param {Object[]} [transferableObjects] An array of objects contained in parameters that should be
* transferred to the worker instead of copied.
* @returns {Promise.<Object>|undefined} Either a promise that will resolve to the result when available, or undefined
* if there are too many active tasks,
*
* @example
* var taskProcessor = new Cesium.TaskProcessor('myWorkerName');
* var promise = taskProcessor.scheduleTask({
* someParameter : true,
* another : 'hello'
* });
* if (!Cesium.defined(promise)) {
* // too many active tasks - try again later
* } else {
* Cesium.when(promise, function(result) {
* // use the result of the task
* });
* }
*/
TaskProcessor.prototype.scheduleTask = function(parameters, transferableObjects) {
if (!defined(this._worker)) {
this._worker = createWorker(this);
}
if (this._activeTasks >= this._maximumActiveTasks) {
return undefined;
}
++this._activeTasks;
var processor = this;
return when(canTransferArrayBuffer(), function(canTransferArrayBuffer) {
if (!defined(transferableObjects)) {
transferableObjects = emptyTransferableObjectArray;
} else if (!canTransferArrayBuffer) {
transferableObjects.length = 0;
}
var id = processor._nextID++;
var deferred = when.defer();
processor._deferreds[id] = deferred;
processor._worker.postMessage({
id : id,
parameters : parameters,
canTransferArrayBuffer : canTransferArrayBuffer
}, transferableObjects);
return deferred.promise;
});
};
/**
* Posts a message to a web worker with configuration to initialize loading
* and compiling a web assembly module asychronously, as well as an optional
* fallback JavaScript module to use if Web Assembly is not supported.
*
* @param {Object} [webAssemblyOptions] An object with the following properties:
* @param {String} [webAssemblyOptions.modulePath] The path of the web assembly JavaScript wrapper module.
* @param {String} [webAssemblyOptions.wasmBinaryFile] The path of the web assembly binary file.
* @param {String} [webAssemblyOptions.fallbackModulePath] The path of the fallback JavaScript module to use if web assembly is not supported.
* @returns {Promise.<Object>} A promise that resolves to the result when the web worker has loaded and compiled the web assembly module and is ready to process tasks.
*/
TaskProcessor.prototype.initWebAssemblyModule = function (webAssemblyOptions) {
if (!defined(this._worker)) {
this._worker = createWorker(this);
}
var deferred = when.defer();
var processor = this;
var worker = this._worker;
getWebAssemblyLoaderConfig(this, webAssemblyOptions).then(function(wasmConfig) {
return when(canTransferArrayBuffer(), function(canTransferArrayBuffer) {
var transferableObjects;
var binary = wasmConfig.wasmBinary;
if (defined(binary) && canTransferArrayBuffer) {
transferableObjects = [binary];
}
worker.onmessage = function(event) {
worker.onmessage = function(event) {
completeTask(processor, event.data);
};
deferred.resolve(event.data);
};
worker.postMessage({ webAssemblyConfig : wasmConfig }, transferableObjects);
});
});
return deferred;
};
/**
* Returns true if this object was destroyed; otherwise, false.
* <br /><br />
* If this object was destroyed, it should not be used; calling any function other than
* <code>isDestroyed</code> will result in a {@link DeveloperError} exception.
*
* @returns {Boolean} True if this object was destroyed; otherwise, false.
*
* @see TaskProcessor#destroy
*/
TaskProcessor.prototype.isDestroyed = function() {
return false;
};
/**
* Destroys this object. This will immediately terminate the Worker.
* <br /><br />
* Once an object is destroyed, it should not be used; calling any function other than
* <code>isDestroyed</code> will result in a {@link DeveloperError} exception.
*/
TaskProcessor.prototype.destroy = function() {
if (defined(this._worker)) {
this._worker.terminate();
}
return destroyObject(this);
};
/**
* An event that's raised when a task is completed successfully. Event handlers are passed
* the error object is a task fails.
*
* @type {Event}
*
* @private
*/
TaskProcessor.taskCompletedEvent = taskCompletedEvent;
// exposed for testing purposes
TaskProcessor._defaultWorkerModulePrefix = 'Workers/';
TaskProcessor._workerModulePrefix = TaskProcessor._defaultWorkerModulePrefix;
TaskProcessor._loaderConfig = undefined;
TaskProcessor._canTransferArrayBuffer = undefined;
export default TaskProcessor;