forked from envoyproxy/envoy
-
Notifications
You must be signed in to change notification settings - Fork 61
/
Copy pathwasm.h
497 lines (422 loc) · 18.8 KB
/
wasm.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
#pragma once
#include <memory>
#include "envoy/access_log/access_log.h"
#include "envoy/buffer/buffer.h"
#include "envoy/common/exception.h"
#include "envoy/config/wasm/v2/wasm.pb.validate.h"
#include "envoy/http/filter.h"
#include "envoy/server/wasm.h"
#include "envoy/thread_local/thread_local.h"
#include "envoy/upstream/cluster_manager.h"
#include "common/common/assert.h"
#include "common/common/c_smart_ptr.h"
#include "common/common/logger.h"
#include "extensions/common/wasm/well_known_names.h"
#include "extensions/filters/http/well_known_names.h"
namespace Envoy {
namespace Extensions {
namespace Common {
namespace Wasm {
struct AsyncClientHandler;
class Context;
class Wasm;
class WasmVm;
using Pairs = std::vector<std::pair<absl::string_view, absl::string_view>>;
using PairsWithStringValues = std::vector<std::pair<absl::string_view, std::string>>;
using WasmCall0Void = std::function<void(Context*)>;
using WasmCall2Void = std::function<void(Context*, uint32_t, uint32_t)>;
using WasmContextCall0Void = std::function<void(Context*, uint32_t context_id)>;
using WasmContextCall7Void = std::function<void(Context*, uint32_t context_id, uint32_t, uint32_t,
uint32_t, uint32_t, uint32_t, uint32_t, uint32_t)>;
using WasmContextCall0Int = std::function<uint32_t(Context*, uint32_t context_id)>;
using WasmContextCall2Int =
std::function<uint32_t(Context*, uint32_t context_id, uint32_t, uint32_t)>;
// A context which will be the target of callbacks for a particular session
// e.g. a handler of a stream.
class Context : public Http::StreamFilter,
public AccessLog::Instance,
public Logger::Loggable<Logger::Id::wasm>,
public std::enable_shared_from_this<Context> {
public:
explicit Context(Wasm* wasm);
Wasm* wasm() const { return wasm_; }
WasmVm* wasmVm() const;
Upstream::ClusterManager* clusterManager() const;
uint32_t id() const { return id_; }
const StreamInfo::StreamInfo& streamInfo() const;
//
// VM level downcalls into the WASM code on Context(id == 0).
//
virtual void onStart();
virtual void onConfigure(absl::string_view configuration);
//
// Stream downcalls on Context(id > 0).
//
// General stream downcall on a new stream.
virtual void onCreate();
// HTTP Filter Stream Request Downcalls.
virtual Http::FilterHeadersStatus onRequestHeaders();
virtual Http::FilterDataStatus onRequestBody(int body_buffer_length, bool end_of_stream);
virtual Http::FilterTrailersStatus onRequestTrailers();
virtual Http::FilterMetadataStatus onRequestMetadata();
// HTTP Filter Stream Response Downcalls.
virtual Http::FilterHeadersStatus onResponseHeaders();
virtual Http::FilterDataStatus onResponseBody(int body_buffer_length, bool end_of_stream);
virtual Http::FilterTrailersStatus onResponseTrailers();
virtual Http::FilterMetadataStatus onResponseMetadata();
// Async Response Downcalls on any Context.
virtual void onHttpCallResponse(uint32_t token, const Pairs& response_headers,
absl::string_view response_body, const Pairs& response_trailers);
// General stream downcall when the stream has ended.
virtual void onDone();
// General stream downcall for logging. Occurs after onDone().
virtual void onLog();
// General stream downcall when no futher stream calls will occur.
virtual void onDelete();
//
// General Callbacks.
//
virtual void scriptLog(spdlog::level::level_enum level, absl::string_view message);
virtual void setTickPeriod(std::chrono::milliseconds tick_period);
//
// AccessLog::Instance
//
void log(const Http::HeaderMap* request_headers, const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers,
const StreamInfo::StreamInfo& stream_info) override;
//
// Http::StreamFilterBase
//
// Note: This calls onDone() in WASM.
void onDestroy() override;
//
// Http::StreamDecoderFilter
//
Http::FilterHeadersStatus decodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Http::FilterDataStatus decodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus decodeTrailers(Http::HeaderMap& trailers) override;
// Note: this is not yet implementated in envoy.
Http::FilterMetadataStatus decodeMetadata(Http::MetadataMap&& metadata_map) /* override */;
void setDecoderFilterCallbacks(Envoy::Http::StreamDecoderFilterCallbacks& callbacks) override;
//
// Http::StreamEncoderFilter
//
Http::FilterHeadersStatus encode100ContinueHeaders(Http::HeaderMap&) override;
Http::FilterHeadersStatus encodeHeaders(Http::HeaderMap& headers, bool end_stream) override;
Http::FilterDataStatus encodeData(Buffer::Instance& data, bool end_stream) override;
Http::FilterTrailersStatus encodeTrailers(Http::HeaderMap& trailers) override;
Http::FilterMetadataStatus encodeMetadata(Http::MetadataMap& metadata_map) override;
void setEncoderFilterCallbacks(Envoy::Http::StreamEncoderFilterCallbacks& callbacks) override;
//
// HTTP Filter Callbacks
//
// StreamInfo
virtual std::string getRequestStreamInfoProtocol();
virtual std::string getResponseStreamInfoProtocol();
// Metadata: the values are serialized ProtobufWkt::Struct
virtual std::string getRequestMetadata(absl::string_view key);
virtual void setRequestMetadata(absl::string_view key, absl::string_view serialized_proto_struct);
virtual PairsWithStringValues getRequestMetadataPairs();
virtual std::string getResponseMetadata(absl::string_view key);
virtual void setResponseMetadata(absl::string_view key,
absl::string_view serialized_proto_struct);
virtual PairsWithStringValues getResponseMetadataPairs();
// Continue
virtual void continueRequest() {
if (decoder_callbacks_)
decoder_callbacks_->continueDecoding();
}
virtual void continueResponse() {
if (encoder_callbacks_)
encoder_callbacks_->continueEncoding();
}
// Shared Data
virtual std::pair<std::string, uint32_t> getSharedData(absl::string_view key);
virtual bool setSharedData(absl::string_view key, absl::string_view value, uint32_t cas);
// Request Headers
virtual void addRequestHeader(absl::string_view key, absl::string_view value);
virtual absl::string_view getRequestHeader(absl::string_view key);
virtual Pairs getRequestHeaderPairs();
virtual void removeRequestHeader(absl::string_view key);
virtual void replaceRequestHeader(absl::string_view key, absl::string_view value);
// Request Trailers
virtual void addRequestTrailer(absl::string_view key, absl::string_view value);
virtual absl::string_view getRequestTrailer(absl::string_view key);
virtual Pairs getRequestTrailerPairs();
virtual void removeRequestTrailer(absl::string_view key);
virtual void replaceRequestTrailer(absl::string_view key, absl::string_view value);
// Response Headers
virtual void addResponseHeader(absl::string_view key, absl::string_view value);
virtual absl::string_view getResponseHeader(absl::string_view key);
virtual Pairs getResponseHeaderPairs();
virtual void removeResponseHeader(absl::string_view key);
virtual void replaceResponseHeader(absl::string_view key, absl::string_view value);
// Response Trailers
virtual void addResponseTrailer(absl::string_view key, absl::string_view value);
virtual absl::string_view getResponseTrailer(absl::string_view key);
virtual Pairs getResponseTrailerPairs();
virtual void removeResponseTrailer(absl::string_view key);
virtual void replaceResponseTrailer(absl::string_view key, absl::string_view value);
// Body Buffer
virtual absl::string_view getRequestBodyBufferBytes(uint32_t start, uint32_t length);
virtual absl::string_view getResponseBodyBufferBytes(uint32_t start, uint32_t length);
// HTTP
// Returns a token which will be used with the corresponding onHttpCallResponse.
virtual uint32_t httpCall(absl::string_view cluster, const Pairs& request_headers,
absl::string_view request_body, const Pairs& request_trailers,
int timeout_millisconds);
virtual void httpRespond(const Pairs& response_headers, absl::string_view body,
const Pairs& response_trailers);
// Connection
virtual bool isSsl();
protected:
friend struct AsyncClientHandler;
void onAsyncClientSuccess(uint32_t token, Envoy::Http::MessagePtr& response);
void onAsyncClientFailure(uint32_t token, Http::AsyncClient::FailureReason reason);
Wasm* const wasm_;
const uint32_t id_;
bool destroyed_ = false;
// Async callback support.
uint32_t next_async_token_ = 1;
// MB: must be a node-type map as we take persistent references to the entries.
std::map<uint32_t, AsyncClientHandler> http_request_;
Envoy::Http::StreamDecoderFilterCallbacks* decoder_callbacks_{};
Envoy::Http::StreamEncoderFilterCallbacks* encoder_callbacks_{};
// HTTP Filter state.
Http::HeaderMap* request_headers_{};
Http::HeaderMap* response_headers_{};
Buffer::Instance* requestBodyBuffer_{};
Buffer::Instance* responseBodyBuffer_{};
bool request_end_of_stream_{};
bool response_end_of_stream_{};
Http::HeaderMap* request_trailers_{};
Http::HeaderMap* response_trailers_{};
Http::MetadataMap* request_metadata_{};
Http::MetadataMap* response_metadata_{};
const StreamInfo::StreamInfo* access_log_stream_info_{};
const Http::HeaderMap* access_log_request_headers_{};
const Http::HeaderMap* access_log_response_headers_{};
const Http::HeaderMap* access_log_request_trailers_{}; // unused
const Http::HeaderMap* access_log_response_trailers_{};
};
struct AsyncClientHandler : public Http::AsyncClient::Callbacks {
// Http::AsyncClient::Callbacks
void onSuccess(Envoy::Http::MessagePtr&& response) override;
void onFailure(Http::AsyncClient::FailureReason reason) override;
Context* context;
uint32_t token;
Http::AsyncClient::Request* request;
};
// Wasm execution instance. Manages the Envoy side of the Wasm interface.
class Wasm : public Envoy::Server::Wasm,
public AccessLog::Instance,
public ThreadLocal::ThreadLocalObject,
public Logger::Loggable<Logger::Id::wasm>,
public std::enable_shared_from_this<Wasm> {
public:
Wasm(absl::string_view vm, absl::string_view id);
Wasm(const Wasm& other);
~Wasm() {}
void setDispatcher(Event::Dispatcher& dispatcher) { dispatcher_ = &dispatcher; }
void setClusterManager(Upstream::ClusterManager& clusterManager) {
clusterManager_ = &clusterManager;
}
bool initialize(const std::string& code, absl::string_view name, bool allow_precompiled);
void configure(absl::string_view configuration);
void start();
const std::string& context_id_filter_state_data_name() {
return context_id_filter_state_data_name_;
}
absl::string_view id() const { return id_; }
WasmVm* wasmVm() const { return wasm_vm_.get(); }
Context* generalContext() const { return general_context_.get(); }
Upstream::ClusterManager* clusterManager() const { return clusterManager_; }
std::shared_ptr<Context> createContext() { return std::make_shared<Context>(this); }
void setTickPeriod(std::chrono::milliseconds tick_period);
void tickHandler();
uint32_t allocContextId();
//
// AccessLog::Instance
//
void log(const Http::HeaderMap* request_headers, const Http::HeaderMap* response_headers,
const Http::HeaderMap* response_trailers, const StreamInfo::StreamInfo& stream_info);
// For testing.
void setGeneralContext(std::shared_ptr<Context> context) {
general_context_ = std::move(context);
}
private:
friend class Context;
void getFunctions();
Event::Dispatcher* dispatcher_ = nullptr;
Upstream::ClusterManager* clusterManager_ = nullptr;
std::string id_;
std::string context_id_filter_state_data_name_;
uint32_t next_context_id_ = 0;
std::unique_ptr<WasmVm> wasm_vm_;
std::shared_ptr<Context> general_context_; // Context unrelated to any specific stream.
std::function<void(Common::Wasm::Context*)> tick_;
std::chrono::milliseconds tick_period_;
Event::TimerPtr timer_;
// Calls into the VM.
WasmCall0Void onStart_;
WasmCall2Void onConfigure_;
WasmCall0Void onTick_;
// Calls into the VM with a context.
WasmContextCall0Void onCreate_;
WasmContextCall0Int onRequestHeaders_;
WasmContextCall2Int onRequestBody_;
WasmContextCall0Int onRequestTrailers_;
WasmContextCall0Int onRequestMetadata_;
WasmContextCall0Int onResponseHeaders_;
WasmContextCall2Int onResponseBody_;
WasmContextCall0Int onResponseTrailers_;
WasmContextCall0Int onResponseMetadata_;
WasmContextCall7Void onHttpCallResponse_;
WasmContextCall0Void onDone_;
WasmContextCall0Void onLog_;
WasmContextCall0Void onDelete_;
};
inline WasmVm* Context::wasmVm() const { return wasm_->wasmVm(); }
inline Upstream::ClusterManager* Context::clusterManager() const { return wasm_->clusterManager(); }
inline const ProtobufWkt::Struct& getMetadata(Http::StreamFilterCallbacks* callbacks) {
if (callbacks->route() == nullptr || callbacks->route()->routeEntry() == nullptr) {
return ProtobufWkt::Struct::default_instance();
}
const auto& metadata = callbacks->route()->routeEntry()->metadata();
const auto filter_it = metadata.filter_metadata().find(HttpFilters::HttpFilterNames::get().Wasm);
if (filter_it == metadata.filter_metadata().end()) {
return ProtobufWkt::Struct::default_instance();
}
return filter_it->second;
}
// Wasm VM instance. Provides the low level WASM interface.
class WasmVm : public Logger::Loggable<Logger::Id::wasm> {
public:
virtual ~WasmVm() {}
virtual absl::string_view vm() PURE;
// Whether or not the VM implementation supports cloning.
virtual bool clonable() PURE;
// Make a thread-specific copy. This may not be supported by the underlying VM system in which
// case it will return nullptr and the caller will need to create a new VM from scratch.
virtual std::unique_ptr<WasmVm> clone() PURE;
// Load the WASM code from a file. Return true on success.
virtual bool initialize(const std::string& code, absl::string_view id,
bool allow_precompiled) PURE;
// Call the 'start' function or main() if there is no start funcition.
virtual void start(Context*) PURE;
// Allocate a block of memory in the VM and return the pointer to use as a call arguments.
virtual void* allocMemory(uint32_t size, uint32_t* pointer) PURE;
// Convert a block of memory in the VM to a string_view.
virtual absl::string_view getMemory(uint32_t pointer, uint32_t size) PURE;
// Set a block of memory in the VM, returns true on success, false if the pointer/size is invalid.
virtual bool setMemory(uint32_t pointer, uint32_t size, void* data) PURE;
// Convenience functions.
// Allocate a null-terminated string in the VM and return the pointer to use as a call arguments.
uint32_t copyString(absl::string_view s) {
uint32_t pointer;
uint8_t* m = static_cast<uint8_t*>(allocMemory((s.size() + 1), &pointer));
if (s.size() > 0)
memcpy(m, s.data(), s.size());
m[s.size()] = 0;
return pointer;
}
// Copy the data in 's' into the VM along with the pointer-size pair. Returns true on success.
bool copyToPointerSize(absl::string_view s, uint32_t ptr_ptr, uint32_t size_ptr) {
uint32_t pointer = 0;
uint32_t size = s.size();
void* p = nullptr;
if (size > 0) {
p = allocMemory(size, &pointer);
if (!p)
return false;
memcpy(p, s.data(), size);
}
if (!setMemory(ptr_ptr, sizeof(uint32_t), &pointer))
return false;
if (!setMemory(size_ptr, sizeof(uint32_t), &size))
return false;
return true;
}
bool copyToPointerSize(const Buffer::Instance& buffer, uint32_t start, uint32_t length,
uint32_t ptr_ptr, uint32_t size_ptr) {
uint32_t size = buffer.length();
if (size < start + length)
return false;
auto nslices = buffer.getRawSlices(nullptr, 0);
auto slices = std::make_unique<Buffer::RawSlice[]>(nslices + 10 /* pad for evbuffer overrun */);
auto actual_slices = buffer.getRawSlices(&slices[0], nslices);
uint32_t pointer = 0;
char* p = static_cast<char*>(allocMemory(length, &pointer));
auto s = start;
auto l = length;
if (!p)
return false;
for (uint64_t i = 0; i < actual_slices; i++) {
if (slices[i].len_ <= s) {
s -= slices[i].len_;
continue;
}
auto ll = l;
if (ll > s + slices[i].len_)
ll = s + slices[i].len_;
memcpy(p, static_cast<char*>(slices[i].mem_) + s, ll);
l -= ll;
if (l <= 0)
break;
s = 0;
p += ll;
}
if (!setMemory(ptr_ptr, sizeof(int32_t), &pointer))
return false;
if (!setMemory(size_ptr, sizeof(int32_t), &length))
return false;
return true;
}
};
// Create a new WASM VM of the give type (e.g. "envoy.wasm.vm.wavm").
std::unique_ptr<WasmVm> createWasmVm(absl::string_view vm);
// Create a new Wasm VM not attached to any thread. Note: 'id' may be empty if this VM will not be
// shared by APIs (e.g. HTTP Filter + AccessLog).
std::unique_ptr<Wasm> createWasm(absl::string_view id,
const envoy::config::wasm::v2::VmConfig& vm_config, Api::Api& api);
// Create a ThreadLocal VM from an existing VM (e.g. from createWasm() above).
std::shared_ptr<Wasm> createThreadLocalWasm(Wasm& base_wasm,
const envoy::config::wasm::v2::VmConfig& vm_config,
Event::Dispatcher& dispatcher,
absl::string_view configuration, Api::Api& api);
// Get an existing ThreadLocal VM matching 'id'.
std::shared_ptr<Wasm> getThreadLocalWasm(absl::string_view id, absl::string_view configuration);
class WasmException : public EnvoyException {
public:
using EnvoyException::EnvoyException;
};
class WasmVmException : public EnvoyException {
public:
using EnvoyException::EnvoyException;
};
inline Context::Context(Wasm* wasm) : wasm_(wasm), id_(wasm->allocContextId()) {}
// Forward declarations for VM implemenations.
template <typename R, typename... Args>
void registerCallbackWavm(WasmVm* vm, absl::string_view functionName, R (*)(Args...));
template <typename R, typename... Args>
void getFunctionWavm(WasmVm* vm, absl::string_view functionName,
std::function<R(Context*, Args...)>*);
template <typename R, typename... Args>
void registerCallback(WasmVm* vm, absl::string_view functionName, R (*f)(Args...)) {
if (vm->vm() == WasmVmNames::get().Wavm) {
registerCallbackWavm(vm, functionName, f);
} else {
throw WasmVmException("unsupoorted wasm vm");
}
}
template <typename F> void getFunction(WasmVm* vm, absl::string_view functionName, F* function) {
if (vm->vm() == WasmVmNames::get().Wavm) {
getFunctionWavm(vm, functionName, function);
} else {
throw WasmVmException("unsupoorted wasm vm");
}
}
} // namespace Wasm
} // namespace Common
} // namespace Extensions
} // namespace Envoy