-
Notifications
You must be signed in to change notification settings - Fork 29.6k
/
node_messaging.h
373 lines (308 loc) Β· 14.3 KB
/
node_messaging.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
#ifndef SRC_NODE_MESSAGING_H_
#define SRC_NODE_MESSAGING_H_
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#include "env.h"
#include "node_mutex.h"
#include "v8.h"
#include <deque>
#include <string>
#include <unordered_map>
#include <set>
namespace node {
namespace worker {
class MessagePortData;
class MessagePort;
typedef MaybeStackBuffer<v8::Local<v8::Value>, 8> TransferList;
// Used to represent the in-flight structure of an object that is being
// transferred or cloned using postMessage().
class TransferData : public MemoryRetainer {
public:
// Deserialize this object on the receiving end after a .postMessage() call.
// - `context` may not be the same as `env->context()`. This method should
// not produce JS objects coming from Contexts other than `context`.
// - `self` is a unique_ptr for the object that this is being called on.
// - The return value is treated like a `Maybe`, i.e. if `nullptr` is
// returned, any further deserialization of the message is stopped and
// control is returned to the event loop or JS as soon as possible.
virtual BaseObjectPtr<BaseObject> Deserialize(
Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<TransferData> self) = 0;
// FinalizeTransferWrite() is the counterpart to
// BaseObject::FinalizeTransferRead(). It is called right after the transfer
// data was created, and defaults to doing nothing. After this function,
// this object should not hold any more Isolate-specific data.
virtual v8::Maybe<bool> FinalizeTransferWrite(
v8::Local<v8::Context> context, v8::ValueSerializer* serializer);
};
// Represents a single communication message.
class Message : public MemoryRetainer {
public:
// Create a Message with a specific underlying payload, in the format of the
// V8 ValueSerializer API. If `payload` is empty, this message indicates
// that the receiving message port should close itself.
explicit Message(MallocedBuffer<char>&& payload = MallocedBuffer<char>());
~Message() = default;
Message(Message&& other) = default;
Message& operator=(Message&& other) = default;
Message& operator=(const Message&) = delete;
Message(const Message&) = delete;
// Whether this is a message indicating that the port is to be closed.
// This is the last message to be received by a MessagePort.
bool IsCloseMessage() const;
// Deserialize the contained JS value. May only be called once, and only
// after Serialize() has been called (e.g. by another thread).
v8::MaybeLocal<v8::Value> Deserialize(
Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value>* port_list = nullptr);
// Serialize a JS value, and optionally transfer objects, into this message.
// The Message object retains ownership of all transferred objects until
// deserialization.
// The source_port parameter, if provided, will make Serialize() throw a
// "DataCloneError" DOMException if source_port is found in transfer_list.
v8::Maybe<bool> Serialize(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> input,
const TransferList& transfer_list,
v8::Local<v8::Object> source_port =
v8::Local<v8::Object>());
// Internal method of Message that is called when a new SharedArrayBuffer
// object is encountered in the incoming value's structure.
void AddSharedArrayBuffer(std::shared_ptr<v8::BackingStore> backing_store);
// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddTransferable(std::unique_ptr<TransferData>&& data);
// Internal method of Message that is called when a new WebAssembly.Module
// object is encountered in the incoming value's structure.
uint32_t AddWASMModule(v8::CompiledWasmModule&& mod);
// The host objects that will be transferred, as recorded by Serialize()
// (e.g. MessagePorts).
// Used for warning user about posting the target MessagePort to itself,
// which will as a side effect destroy the communication channel.
const std::vector<std::unique_ptr<TransferData>>& transferables() const {
return transferables_;
}
bool has_transferables() const {
return !transferables_.empty() || !array_buffers_.empty();
}
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(Message)
SET_SELF_SIZE(Message)
private:
MallocedBuffer<char> main_message_buf_;
// TODO(addaleax): Make this a std::variant to save storage size in the common
// case (which is that all of these vectors are empty) once that is available
// with C++17.
std::vector<std::shared_ptr<v8::BackingStore>> array_buffers_;
std::vector<std::shared_ptr<v8::BackingStore>> shared_array_buffers_;
std::vector<std::unique_ptr<TransferData>> transferables_;
std::vector<v8::CompiledWasmModule> wasm_modules_;
friend class MessagePort;
};
class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
public:
// Named SiblingGroup, Used for one-to-many BroadcastChannels.
static std::shared_ptr<SiblingGroup> Get(const std::string& name);
// Anonymous SiblingGroup, Used for one-to-one MessagePort pairs.
SiblingGroup() = default;
explicit SiblingGroup(const std::string& name);
~SiblingGroup();
// Dispatches the Message to the collection of associated
// ports. If there is more than one destination port and
// the Message contains transferables, Dispatch will fail.
// Returns Just(true) if successful and the message was
// dispatched to at least one destination. Returns Just(false)
// if there were no destinations. Returns Nothing<bool>()
// if there was an error. If error is not nullptr, it will
// be set to an error message or warning message as appropriate.
v8::Maybe<bool> Dispatch(
MessagePortData* source,
std::shared_ptr<Message> message,
std::string* error = nullptr);
void Entangle(MessagePortData* data);
void Entangle(std::initializer_list<MessagePortData*> data);
void Disentangle(MessagePortData* data);
const std::string& name() const { return name_; }
size_t size() const { return ports_.size(); }
private:
const std::string name_;
RwLock group_mutex_; // Protects ports_.
std::set<MessagePortData*> ports_;
static void CheckSiblingGroup(const std::string& name);
using Map =
std::unordered_map<std::string, std::weak_ptr<SiblingGroup>>;
static Mutex groups_mutex_;
static Map groups_;
};
// This contains all data for a `MessagePort` instance that is not tied to
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData : public TransferData {
public:
explicit MessagePortData(MessagePort* owner);
~MessagePortData() override;
MessagePortData(MessagePortData&& other) = delete;
MessagePortData& operator=(MessagePortData&& other) = delete;
MessagePortData(const MessagePortData& other) = delete;
MessagePortData& operator=(const MessagePortData& other) = delete;
// Add a message to the incoming queue and notify the receiver.
// This may be called from any thread.
void AddToIncomingQueue(std::shared_ptr<Message> message);
v8::Maybe<bool> Dispatch(
std::shared_ptr<Message> message,
std::string* error = nullptr);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePortData* a, MessagePortData* b);
// Removes any possible sibling. This is thread-safe (it acquires both
// `sibling_mutex_` and `mutex_`), and has to be because it is called once
// the corresponding JS handle handle wants to close
// which can happen on either side of a worker.
void Disentangle();
void MemoryInfo(MemoryTracker* tracker) const override;
BaseObjectPtr<BaseObject> Deserialize(
Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<TransferData> self) override;
SET_MEMORY_INFO_NAME(MessagePortData)
SET_SELF_SIZE(MessagePortData)
private:
// This mutex protects all fields below it, with the exception of
// sibling_.
mutable Mutex mutex_;
// TODO(addaleax): Make this a std::variant<std::shared_ptr, std::unique_ptr>
// once that is available with C++17, because std::shared_ptr comes with
// overhead that is only necessary for BroadcastChannel.
std::deque<std::shared_ptr<Message>> incoming_messages_;
MessagePort* owner_ = nullptr;
std::shared_ptr<SiblingGroup> group_;
friend class MessagePort;
friend class SiblingGroup;
};
// A message port that receives messages from other threads, including
// the uv_async_t handle that is used to notify the current event loop of
// new incoming messages.
class MessagePort : public HandleWrap {
private:
// Create a new MessagePort. The `context` argument specifies the Context
// instance that is used for creating the values emitted from this port.
// This is called by MessagePort::New(), which is the public API used for
// creating MessagePort instances.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap);
public:
~MessagePort() override;
// Create a new message port instance, optionally over an existing
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<MessagePortData> data = {},
std::shared_ptr<SiblingGroup> sibling_group = {});
// Send a message, i.e. deliver it into the sibling's incoming queue.
// If this port is closed, or if there is no sibling, this message is
// serialized with transfers, then silently discarded.
v8::Maybe<bool> PostMessage(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Value> message,
const TransferList& transfer);
// Start processing messages on this port as a receiving end.
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
/* constructor */
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
/* prototype methods */
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void CheckType(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
static void ReceiveMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
/* static */
static void MoveToContext(const v8::FunctionCallbackInfo<v8::Value>& args);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
static void Entangle(MessagePort* a, MessagePort* b);
static void Entangle(MessagePort* a, MessagePortData* b);
// Detach this port's data for transferring. After this, the MessagePortData
// is no longer associated with this handle, although it can still receive
// messages.
std::unique_ptr<MessagePortData> Detach();
void Close(
v8::Local<v8::Value> close_callback = v8::Local<v8::Value>()) override;
// Returns true if either data_ has been freed, or if the handle is being
// closed. Equivalent to the [[Detached]] internal slot in the HTML Standard.
//
// If checking if a JavaScript MessagePort object is detached, this method
// alone is often not enough, since the backing C++ MessagePort object may
// have been deleted already. For all intents and purposes, an object with a
// NULL pointer to the C++ MessagePort object is also detached.
inline bool IsDetached() const;
TransferMode GetTransferMode() const override;
std::unique_ptr<TransferData> TransferForMessaging() override;
void MemoryInfo(MemoryTracker* tracker) const override;
SET_MEMORY_INFO_NAME(MessagePort)
SET_SELF_SIZE(MessagePort)
private:
enum class MessageProcessingMode {
kNormalOperation,
kForceReadMessages
};
void OnClose() override;
void OnMessage(MessageProcessingMode mode);
void TriggerAsync();
v8::MaybeLocal<v8::Value> ReceiveMessage(
v8::Local<v8::Context> context,
MessageProcessingMode mode,
v8::Local<v8::Value>* port_list = nullptr);
std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;
uv_async_t async_;
v8::Global<v8::Function> emit_message_fn_;
friend class MessagePortData;
};
// Provide a base class from which JS classes that should be transferable or
// cloneable by postMesssage() can inherit.
// See e.g. FileHandle in internal/fs/promises.js for an example.
class JSTransferable : public BaseObject {
public:
JSTransferable(Environment* env, v8::Local<v8::Object> obj);
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
TransferMode GetTransferMode() const override;
std::unique_ptr<TransferData> TransferForMessaging() override;
std::unique_ptr<TransferData> CloneForMessaging() const override;
v8::Maybe<std::vector<BaseObjectPtr<BaseObject>>>
NestedTransferables() const override;
v8::Maybe<bool> FinalizeTransferRead(
v8::Local<v8::Context> context,
v8::ValueDeserializer* deserializer) override;
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(JSTransferable)
SET_SELF_SIZE(JSTransferable)
private:
std::unique_ptr<TransferData> TransferOrClone(TransferMode mode) const;
class Data : public TransferData {
public:
Data(std::string&& deserialize_info, v8::Global<v8::Value>&& data);
BaseObjectPtr<BaseObject> Deserialize(
Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<TransferData> self) override;
v8::Maybe<bool> FinalizeTransferWrite(
v8::Local<v8::Context> context,
v8::ValueSerializer* serializer) override;
SET_NO_MEMORY_INFO()
SET_MEMORY_INFO_NAME(JSTransferableTransferData)
SET_SELF_SIZE(Data)
private:
std::string deserialize_info_;
v8::Global<v8::Value> data_;
};
};
v8::Local<v8::FunctionTemplate> GetMessagePortConstructorTemplate(
Environment* env);
} // namespace worker
} // namespace node
#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
#endif // SRC_NODE_MESSAGING_H_