Skip to content

Commit

Permalink
datasource: use async data source for wasm (envoyproxy#216)
Browse files Browse the repository at this point in the history
Signed-off-by: crazyxy <yxyan@google.com>
  • Loading branch information
yxue authored and jplevyak committed Nov 6, 2019
1 parent 8ff0af2 commit 7c040df
Show file tree
Hide file tree
Showing 26 changed files with 502 additions and 161 deletions.
2 changes: 1 addition & 1 deletion api/envoy/config/wasm/v2/wasm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message VmConfig {
string runtime = 2;

// The Wasm code that Envoy will execute.
api.v2.core.DataSource code = 3;
api.v2.core.AsyncDataSource code = 3;

// The Wasm configuration string used on initialization of a new VM (proxy_onStart).
string configuration = 4;
Expand Down
2 changes: 1 addition & 1 deletion api/envoy/config/wasm/v3alpha/wasm.proto
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ message VmConfig {
string runtime = 2;

// The Wasm code that Envoy will execute.
api.v3alpha.core.DataSource code = 3;
api.v3alpha.core.AsyncDataSource code = 3;

// The Wasm configuration string used on initialization of a new VM (proxy_onStart).
string configuration = 4;
Expand Down
3 changes: 2 additions & 1 deletion include/envoy/server/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ class Wasm {
virtual ~Wasm() {}
};

typedef std::shared_ptr<Wasm> WasmSharedPtr;
using WasmSharedPtr = std::shared_ptr<Wasm>;
using CreateWasmCallback = std::function<void(WasmSharedPtr)>;

} // namespace Server
} // namespace Envoy
6 changes: 3 additions & 3 deletions include/envoy/server/wasm_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,12 +68,12 @@ class WasmFactory {
* @param config const ProtoBuf::Message& supplies the config for the resource monitor
* implementation.
* @param context WasmFactoryContext& supplies the resource monitor's context.
* @return WasmSharedPtr a singleton Wasm service. May be be nullptr if per silo.
* @param cb CreateWasmCallback&& supplies the callback to be called after wasm is created.
* @throw EnvoyException if the implementation is unable to produce an instance with
* the provided parameters.
*/
virtual WasmSharedPtr createWasm(const envoy::config::wasm::v2::WasmService& config,
WasmFactoryContext& context) PURE;
virtual void createWasm(const envoy::config::wasm::v2::WasmService& config,
WasmFactoryContext& context, CreateWasmCallback&& cb) PURE;
};

} // namespace Configuration
Expand Down
1 change: 1 addition & 0 deletions source/extensions/access_loggers/wasm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ envoy_cc_library(
"//include/envoy/registry",
"//include/envoy/server:access_log_config_interface",
"//source/common/access_log:access_log_formatter_lib",
"//source/common/config:datasource_lib",
"//source/common/protobuf",
"//source/extensions/access_loggers:well_known_names",
"//source/extensions/common/wasm:wasm_lib",
Expand Down
30 changes: 20 additions & 10 deletions source/extensions/access_loggers/wasm/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,32 @@ WasmAccessLogFactory::createAccessLogInstance(const Protobuf::Message& proto_con
auto vm_id = config.config().vm_config().vm_id();
auto root_id = config.config().root_id();
auto configuration = std::make_shared<std::string>(config.config().configuration());
auto tls_slot = context.threadLocal().allocateSlot();
auto access_log = std::make_shared<WasmAccessLog>(root_id, nullptr, std::move(filter));

// Create a base WASM to verify that the code loads before setting/cloning the for the
// individual threads.
auto plugin = std::make_shared<Common::Wasm::Plugin>(
config.config().name(), config.config().root_id(), config.config().vm_config().vm_id(),
envoy::api::v2::core::TrafficDirection::UNSPECIFIED, context.localInfo(),
nullptr /* listener_metadata */);
auto base_wasm =
Common::Wasm::createWasm(config.config().vm_config(), plugin, context.scope().createScope(""),
context.clusterManager(), context.dispatcher(), context.api());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot->set([base_wasm, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, *configuration, dispatcher));
});
return std::make_shared<WasmAccessLog>(root_id, std::move(tls_slot), std::move(filter));

auto callback = [access_log, &context,
configuration](std::shared_ptr<Common::Wasm::Wasm> base_wasm) {
auto tls_slot = context.threadLocal().allocateSlot();

// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot->set([base_wasm, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, *configuration, dispatcher));
});
access_log->setTlsSlot(std::move(tls_slot));
};

Common::Wasm::createWasm(config.config().vm_config(), plugin, context.scope().createScope(""),
context.clusterManager(), context.initManager(), context.dispatcher(),
context.api(), remote_data_provider_, std::move(callback));

return access_log;
}

ProtobufTypes::MessagePtr WasmAccessLogFactory::createEmptyConfigProto() {
Expand Down
3 changes: 3 additions & 0 deletions source/extensions/access_loggers/wasm/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "envoy/server/access_log_config.h"

#include "common/config/datasource.h"

namespace Envoy {
namespace Extensions {
namespace AccessLoggers {
Expand All @@ -22,6 +24,7 @@ class WasmAccessLogFactory : public Server::Configuration::AccessLogInstanceFact

private:
std::unordered_map<std::string, std::string> convertJsonFormatToMap(ProtobufWkt::Struct config);
Config::DataSource::RemoteAsyncDataProviderPtr remote_data_provider_;
};

} // namespace Wasm
Expand Down
12 changes: 10 additions & 2 deletions source/extensions/access_loggers/wasm/wasm_access_log_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,16 @@ class WasmAccessLog : public AccessLog::Instance {
return;
}
}
tls_slot_->getTyped<Common::Wasm::Wasm>().log(root_id_, request_headers, response_headers,
response_trailers, stream_info);

if (tls_slot_ != nullptr) {
tls_slot_->getTyped<Common::Wasm::Wasm>().log(root_id_, request_headers, response_headers,
response_trailers, stream_info);
}
}

void setTlsSlot(ThreadLocal::SlotPtr tls_slot) {
ASSERT(tls_slot_ == nullptr);
tls_slot_ = std::move(tls_slot);
}

private:
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/common/wasm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ envoy_cc_library(
"//include/envoy/server:wasm_interface",
"//include/envoy/upstream:cluster_manager_interface",
"//source/common/common:stack_array",
"//source/common/config:datasource_lib",
"//source/common/stats:symbol_table_lib",
"//source/extensions/filters/http:well_known_names",
"@envoy_api//envoy/config/filter/http/wasm/v2:pkg_cc_proto",
Expand Down Expand Up @@ -83,7 +84,6 @@ envoy_cc_library(
"//external:abseil_node_hash_map",
"//source/common/buffer:buffer_lib",
"//source/common/common:enum_to_int",
"//source/common/config:datasource_lib",
"//source/common/http:message_lib",
"//source/common/http:utility_lib",
"//source/common/tracing:http_tracer_lib",
Expand Down
91 changes: 58 additions & 33 deletions source/extensions/common/wasm/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include "common/common/empty_string.h"
#include "common/common/enum_to_int.h"
#include "common/common/logger.h"
#include "common/config/datasource.h"
#include "common/http/header_map_impl.h"
#include "common/http/message_impl.h"
#include "common/http/utility.h"
Expand Down Expand Up @@ -2623,46 +2622,72 @@ void GrpcStreamClientHandler::onRemoteClose(Grpc::Status::GrpcStatus status,
context->onGrpcClose(token, status, message);
}

static std::shared_ptr<Wasm> createWasmInternal(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher, Api::Api& api,
std::unique_ptr<Context> root_context_for_testing) {
static void createWasmInternal(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager,
Init::Manager& init_manager, Event::Dispatcher& dispatcher,
Api::Api& api, std::unique_ptr<Context> root_context_for_testing,
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
CreateWasmCallback&& cb) {
auto wasm =
std::make_shared<Wasm>(vm_config.runtime(), vm_config.vm_id(), vm_config.configuration(),
plugin, scope, cluster_manager, dispatcher);
const auto& code = Config::DataSource::read(vm_config.code(), true, api);
const auto& path = Config::DataSource::getPath(vm_config.code())
.value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
if (code.empty()) {
throw WasmException(fmt::format("Failed to load WASM code from {}", path));
}
if (!wasm->initialize(code, vm_config.allow_precompiled())) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", path));
}
if (!root_context_for_testing) {
wasm->start();

std::string source, code;
if (vm_config.code().has_remote()) {
source = vm_config.code().remote().http_uri().uri();
} else if (vm_config.code().has_local()) {
code = Config::DataSource::read(vm_config.code().local(), true, api);
source = Config::DataSource::getPath(vm_config.code().local())
.value_or(code.empty() ? EMPTY_STRING : INLINE_STRING);
}

auto callback = [wasm, cb, source, allow_precompiled = vm_config.allow_precompiled(),
context_ptr = root_context_for_testing ? root_context_for_testing.release()
: nullptr](const std::string& code) {
std::unique_ptr<Context> context(context_ptr);
if (code.empty()) {
throw WasmException(fmt::format("Failed to load WASM code from {}", source));
}
if (!wasm->initialize(code, allow_precompiled)) {
throw WasmException(fmt::format("Failed to initialize WASM code from {}", source));
}
if (!context) {
wasm->start();
} else {
wasm->startForTesting(std::move(context));
}
cb(wasm);
};

if (vm_config.code().has_remote()) {
remote_data_provider = std::make_unique<Config::DataSource::RemoteAsyncDataProvider>(
cluster_manager, init_manager, vm_config.code().remote(), true, std::move(callback));
} else if (vm_config.code().has_local()) {
callback(code);
} else {
wasm->startForTesting(std::move(root_context_for_testing));
callback(EMPTY_STRING);
}
return wasm;
}

std::shared_ptr<Wasm> createWasm(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher, Api::Api& api) {
return createWasmInternal(vm_config, plugin, scope, cluster_manager, dispatcher, api,
nullptr /* root_context_for_testing */);
} // namespace Wasm
void createWasm(const envoy::config::wasm::v2::VmConfig& vm_config, PluginSharedPtr plugin,
Stats::ScopeSharedPtr scope, Upstream::ClusterManager& cluster_manager,
Init::Manager& init_manager, Event::Dispatcher& dispatcher, Api::Api& api,
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
CreateWasmCallback&& cb) {
createWasmInternal(vm_config, plugin, scope, cluster_manager, init_manager, dispatcher, api,
nullptr /* root_context_for_testing */, remote_data_provider, std::move(cb));
}

std::shared_ptr<Wasm> createWasmForTesting(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher, Api::Api& api,
std::unique_ptr<Context> root_context_for_testing) {
return createWasmInternal(vm_config, plugin, scope, cluster_manager, dispatcher, api,
std::move(root_context_for_testing));
void createWasmForTesting(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
Event::Dispatcher& dispatcher, Api::Api& api,
std::unique_ptr<Context> root_context_for_testing,
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
CreateWasmCallback&& cb) {
createWasmInternal(vm_config, plugin, scope, cluster_manager, init_manager, dispatcher, api,
std::move(root_context_for_testing), remote_data_provider, std::move(cb));
}

std::shared_ptr<Wasm> createThreadLocalWasm(Wasm& base_wasm, absl::string_view configuration,
Expand Down
24 changes: 15 additions & 9 deletions source/extensions/common/wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/common/assert.h"
#include "common/common/logger.h"
#include "common/common/stack_array.h"
#include "common/config/datasource.h"
#include "common/stats/symbol_table_impl.h"

#include "extensions/common/wasm/wasm_vm.h"
Expand Down Expand Up @@ -692,22 +693,27 @@ class Wasm : public Envoy::Server::Wasm,
inline WasmVm* Context::wasmVm() const { return wasm_->wasmVm(); }
inline Upstream::ClusterManager& Context::clusterManager() const { return wasm_->clusterManager(); }

using CreateWasmCallback = std::function<void(std::shared_ptr<Wasm>)>;

// Create a high level Wasm VM with Envoy API support. Note: 'id' may be empty if this VM will not
// be shared by APIs (e.g. HTTP Filter + AccessLog).
std::shared_ptr<Wasm> createWasm(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin_config, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher, Api::Api& api);
void createWasm(const envoy::config::wasm::v2::VmConfig& vm_config, PluginSharedPtr plugin_config,
Stats::ScopeSharedPtr scope, Upstream::ClusterManager& cluster_manager,
Init::Manager& init_manager, Event::Dispatcher& dispatcher, Api::Api& api,
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
CreateWasmCallback&& cb);

// Create a ThreadLocal VM from an existing VM (e.g. from createWasm() above).
std::shared_ptr<Wasm> createThreadLocalWasm(Wasm& base_wasm, absl::string_view configuration,
Event::Dispatcher& dispatcher);

std::shared_ptr<Wasm> createWasmForTesting(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager,
Event::Dispatcher& dispatcher, Api::Api& api,
std::unique_ptr<Context> root_context_for_testing);
void createWasmForTesting(const envoy::config::wasm::v2::VmConfig& vm_config,
PluginSharedPtr plugin, Stats::ScopeSharedPtr scope,
Upstream::ClusterManager& cluster_manager, Init::Manager& init_manager,
Event::Dispatcher& dispatcher, Api::Api& api,
std::unique_ptr<Context> root_context_for_testing,
Config::DataSource::RemoteAsyncDataProviderPtr& remote_data_provider,
CreateWasmCallback&& cb);

// Get an existing ThreadLocal VM matching 'vm_id' or nullptr if there isn't one.
std::shared_ptr<Wasm> getThreadLocalWasmPtr(absl::string_view vm_id);
Expand Down
24 changes: 13 additions & 11 deletions source/extensions/filters/http/wasm/wasm_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,22 @@ namespace Wasm {
FilterConfig::FilterConfig(const envoy::config::filter::http::wasm::v2::Wasm& config,
Server::Configuration::FactoryContext& context)
: tls_slot_(context.threadLocal().allocateSlot()) {
// Create a base Wasm to verify that the code loads before setting/cloning the for the
// individual threads.
plugin_ = std::make_shared<Common::Wasm::Plugin>(
config.config().name(), config.config().root_id(), config.config().vm_config().vm_id(),
context.direction(), context.localInfo(), &context.listenerMetadata());
auto base_wasm = Common::Wasm::createWasm(
config.config().vm_config(), plugin_, context.scope().createScope(""),
context.clusterManager(), context.dispatcher(), context.api());
auto configuration = std::make_shared<std::string>(config.config().configuration());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, *configuration, dispatcher));
});

auto callback = [&config, this](std::shared_ptr<Common::Wasm::Wasm> base_wasm) {
auto configuration = std::make_shared<std::string>(config.config().configuration());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, *configuration, dispatcher));
});
};

Common::Wasm::createWasm(config.config().vm_config(), plugin_, context.scope().createScope(""),
context.clusterManager(), context.initManager(), context.dispatcher(),
context.api(), remote_data_provider_, std::move(callback));
}

} // namespace Wasm
Expand Down
1 change: 1 addition & 0 deletions source/extensions/filters/http/wasm/wasm_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class FilterConfig : Logger::Loggable<Logger::Id::wasm> {
uint32_t root_context_id_{0};
Envoy::Extensions::Common::Wasm::PluginSharedPtr plugin_;
ThreadLocal::SlotPtr tls_slot_;
Config::DataSource::RemoteAsyncDataProviderPtr remote_data_provider_;
};

typedef std::shared_ptr<FilterConfig> FilterConfigSharedPtr;
Expand Down
24 changes: 13 additions & 11 deletions source/extensions/filters/network/wasm/wasm_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,22 @@ namespace Wasm {
FilterConfig::FilterConfig(const envoy::config::filter::network::wasm::v2::Wasm& config,
Server::Configuration::FactoryContext& context)
: tls_slot_(context.threadLocal().allocateSlot()) {
// Create a base WASM to verify that the code loads before setting/cloning the for the
// individual threads.
plugin_ = std::make_shared<Common::Wasm::Plugin>(
config.config().name(), config.config().root_id(), config.config().vm_config().vm_id(),
context.direction(), context.localInfo(), &context.listenerMetadata());
auto base_wasm = Common::Wasm::createWasm(
config.config().vm_config(), plugin_, context.scope().createScope(""),
context.clusterManager(), context.dispatcher(), context.api());
auto configuration = std::make_shared<std::string>(config.config().configuration());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, *configuration, dispatcher));
});

auto callback = [&config, this](std::shared_ptr<Common::Wasm::Wasm> base_wasm) {
auto configuration = std::make_shared<std::string>(config.config().configuration());
// NB: the Slot set() call doesn't complete inline, so all arguments must outlive this call.
tls_slot_->set([base_wasm, configuration](Event::Dispatcher& dispatcher) {
return std::static_pointer_cast<ThreadLocal::ThreadLocalObject>(
Common::Wasm::getOrCreateThreadLocalWasm(*base_wasm, *configuration, dispatcher));
});
};

Common::Wasm::createWasm(config.config().vm_config(), plugin_, context.scope().createScope(""),
context.clusterManager(), context.initManager(), context.dispatcher(),
context.api(), remote_data_provider_, std::move(callback));
}

} // namespace Wasm
Expand Down
Loading

0 comments on commit 7c040df

Please sign in to comment.