diff --git a/doc/admin-guide/plugins/wasm.en.rst b/doc/admin-guide/plugins/wasm.en.rst index ec129cbbe0e..9b83b8b8eeb 100644 --- a/doc/admin-guide/plugins/wasm.en.rst +++ b/doc/admin-guide/plugins/wasm.en.rst @@ -97,6 +97,8 @@ generated wasm modules with the plugin. Runtime can be chosen by changing the ``runtime`` field inside the yaml configuration file for the plugin. ``ats.wasm.runtime.wamr`` is for WAMR while ``ats.wasm.runtime.wasmedge`` is for WasmEdge. +The plugin can also take more than one yaml file as arguments and can thus load more than one wasm modules. + TODO ==== @@ -104,7 +106,6 @@ TODO * Need to support functionality for retrieving and setting request/response body * Need to support functionality for making async request call * Need to support L4 lifecycle handler functions -* Support loading more than one Wasm module Limitations =========== diff --git a/plugins/experimental/wasm/wasm_main.cc b/plugins/experimental/wasm/wasm_main.cc index c9ac17e2616..5267914db5f 100644 --- a/plugins/experimental/wasm/wasm_main.cc +++ b/plugins/experimental/wasm/wasm_main.cc @@ -37,10 +37,9 @@ // struct for storing plugin configuration struct WasmInstanceConfig { - std::string config_filename; - std::string wasm_filename; - std::shared_ptr wasm = nullptr; - std::shared_ptr plugin = nullptr; + std::list config_filenames = {}; + + std::list, std::shared_ptr>> configs = {}; std::list, std::shared_ptr>> deleted_configs = {}; }; @@ -60,23 +59,31 @@ schedule_handler(TSCont contp, TSEvent /*event*/, void * /*data*/) c->onTick(0); // use 0 as token - if (!wasm_config->wasm) { - TSError("[wasm][%s] Configuration object is null", __FUNCTION__); + if (wasm_config->configs.empty()) { + TSError("[wasm][%s] Configuration objects are empty", __FUNCTION__); TSMutexUnlock(old_wasm->mutex()); return 0; } - if (c->wasm() == wasm_config->wasm.get()) { - auto *wasm = static_cast(c->wasm()); - uint32_t root_context_id = c->id(); - if (wasm->existsTimerPeriod(root_context_id)) { - TSDebug(WASM_DEBUG_TAG, "[%s] reschedule continuation", __FUNCTION__); - std::chrono::milliseconds period = wasm->getTimerPeriod(root_context_id); - TSContScheduleOnPool(contp, static_cast(period.count()), TS_THREAD_POOL_NET); - } else { - TSDebug(WASM_DEBUG_TAG, "[%s] can't find period for root context id: %d", __FUNCTION__, root_context_id); + bool found = false; + for (auto it = wasm_config->configs.begin(); it != wasm_config->configs.end(); it++) { + std::shared_ptr wbp = it->first; + if (wbp.get() == old_wasm) { + found = true; + auto *wasm = static_cast(c->wasm()); + uint32_t root_context_id = c->id(); + if (wasm->existsTimerPeriod(root_context_id)) { + TSDebug(WASM_DEBUG_TAG, "[%s] reschedule continuation", __FUNCTION__); + std::chrono::milliseconds period = wasm->getTimerPeriod(root_context_id); + TSContScheduleOnPool(contp, static_cast(period.count()), TS_THREAD_POOL_NET); + } else { + TSDebug(WASM_DEBUG_TAG, "[%s] can't find period for root context id: %d", __FUNCTION__, root_context_id); + } + break; } - } else { + } + + if (!found) { std::shared_ptr temp = nullptr; uint32_t root_context_id = c->id(); old_wasm->removeTimerPeriod(root_context_id); @@ -181,11 +188,20 @@ http_event_handler(TSCont contp, TSEvent event, void *data) case TS_EVENT_HTTP_READ_CACHE_HDR: break; - case TS_EVENT_HTTP_TXN_CLOSE: + case TS_EVENT_HTTP_TXN_CLOSE: { context->onDone(); context->onDelete(); - if (context->wasm() == wasm_config->wasm.get()) { + bool found = false; + for (auto it = wasm_config->configs.begin(); it != wasm_config->configs.end(); it++) { + std::shared_ptr wbp = it->first; + if (wbp.get() == context->wasm()) { + found = true; + break; + } + } + + if (found) { TSDebug(WASM_DEBUG_TAG, "[%s] config wasm has not changed", __FUNCTION__); } else { if (old_wasm->readyShutdown()) { @@ -220,7 +236,7 @@ http_event_handler(TSCont contp, TSEvent event, void *data) TSContDestroy(contp); result = 0; break; - + } default: break; } @@ -244,24 +260,28 @@ http_event_handler(TSCont contp, TSEvent event, void *data) static int global_hook_handler(TSCont /*contp*/, TSEvent /*event*/, void *data) { - auto *wasm = wasm_config->wasm.get(); - TSMutexLock(wasm->mutex()); - auto *rootContext = wasm->getRootContext(wasm_config->plugin, false); - auto *context = new ats_wasm::Context(wasm, rootContext->id(), wasm_config->plugin); - auto *txnp = static_cast(data); - context->initialize(txnp); - context->onCreate(); - TSMutexUnlock(wasm->mutex()); - - // create continuation for transaction - TSCont txn_contp = TSContCreate(http_event_handler, nullptr); - TSHttpTxnHookAdd(txnp, TS_HTTP_READ_REQUEST_HDR_HOOK, txn_contp); - TSHttpTxnHookAdd(txnp, TS_HTTP_READ_RESPONSE_HDR_HOOK, txn_contp); - TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, txn_contp); - // add send response hook for local reply if needed - TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, txn_contp); - - TSContDataSet(txn_contp, context); + auto *txnp = static_cast(data); + for (auto it = wasm_config->configs.begin(); it != wasm_config->configs.end(); it++) { + std::shared_ptr wbp = it->first; + std::shared_ptr plg = it->second; + auto *wasm = wbp.get(); + TSMutexLock(wasm->mutex()); + auto *rootContext = wasm->getRootContext(plg, false); + auto *context = new ats_wasm::Context(wasm, rootContext->id(), plg); + context->initialize(txnp); + context->onCreate(); + TSMutexUnlock(wasm->mutex()); + + // create continuation for transaction + TSCont txn_contp = TSContCreate(http_event_handler, nullptr); + TSHttpTxnHookAdd(txnp, TS_HTTP_READ_REQUEST_HDR_HOOK, txn_contp); + TSHttpTxnHookAdd(txnp, TS_HTTP_READ_RESPONSE_HDR_HOOK, txn_contp); + TSHttpTxnHookAdd(txnp, TS_HTTP_TXN_CLOSE_HOOK, txn_contp); + // add send response hook for local reply if needed + TSHttpTxnHookAdd(txnp, TS_HTTP_SEND_RESPONSE_HDR_HOOK, txn_contp); + + TSContDataSet(txn_contp, context); + } TSHttpTxnReenable(txnp, TS_EVENT_HTTP_CONTINUE); return 0; @@ -273,13 +293,26 @@ read_file(const std::string &fn, std::string *s) { auto fd = open(fn.c_str(), O_RDONLY); if (fd < 0) { + char *errmsg = strerror(errno); + TSError("[wasm][%s] wasm unable to open: %s", __FUNCTION__, errmsg); return -1; } auto n = ::lseek(fd, 0, SEEK_END); + if (n < 0) { + char *errmsg = strerror(errno); + TSError("[wasm][%s] wasm unable to lseek: %s", __FUNCTION__, errmsg); + return -1; + } ::lseek(fd, 0, SEEK_SET); s->resize(n); auto nn = ::read(fd, const_cast(&*s->begin()), n); + if (nn < 0) { + char *errmsg = strerror(errno); + TSError("[wasm][%s] wasm unable to read: %s", __FUNCTION__, errmsg); + return -1; + } if (nn != static_cast(n)) { + TSError("[wasm][%s] wasm unable to read: size different from buffer", __FUNCTION__); return -1; } return 0; @@ -289,239 +322,248 @@ read_file(const std::string &fn, std::string *s) static bool read_configuration() { - // PluginBase parameters - std::string name = ""; - std::string root_id = ""; - std::string configuration = ""; - bool fail_open = true; - - // WasmBase parameters - std::string runtime = ""; - std::string vm_id = ""; - std::string vm_configuration = ""; - std::string wasm_filename = ""; - bool allow_precompiled = true; - - proxy_wasm::AllowedCapabilitiesMap cap_maps; - std::unordered_map envs; - - try { - YAML::Node config = YAML::LoadFile(wasm_config->config_filename); - - for (YAML::const_iterator it = config.begin(); it != config.end(); ++it) { - const std::string &node_name = it->first.as(); - YAML::NodeType::value type = it->second.Type(); - - if (node_name != "config" || type != YAML::NodeType::Map) { - TSError("[wasm][%s] Invalid YAML Configuration format for wasm: %s, reason: Top level nodes must be named config and be of " - "type map", - __FUNCTION__, wasm_config->config_filename.c_str()); - return false; - } + std::list, std::shared_ptr>> new_configs = {}; + + for (auto const &cfn : wasm_config->config_filenames) { + // PluginBase parameters + std::string name = ""; + std::string root_id = ""; + std::string configuration = ""; + bool fail_open = true; + + // WasmBase parameters + std::string runtime = ""; + std::string vm_id = ""; + std::string vm_configuration = ""; + std::string wasm_filename = ""; + bool allow_precompiled = true; + + proxy_wasm::AllowedCapabilitiesMap cap_maps; + std::unordered_map envs; + + try { + YAML::Node config = YAML::LoadFile(cfn); + + for (YAML::const_iterator it = config.begin(); it != config.end(); ++it) { + const std::string &node_name = it->first.as(); + YAML::NodeType::value type = it->second.Type(); + + if (node_name != "config" || type != YAML::NodeType::Map) { + TSError( + "[wasm][%s] Invalid YAML Configuration format for wasm: %s, reason: Top level nodes must be named config and be of " + "type map", + __FUNCTION__, cfn.c_str()); + return false; + } - for (YAML::const_iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) { - const YAML::Node first = it2->first; - const YAML::Node second = it2->second; + for (YAML::const_iterator it2 = it->second.begin(); it2 != it->second.end(); ++it2) { + const YAML::Node first = it2->first; + const YAML::Node second = it2->second; - const std::string &key = first.as(); - if (second.IsScalar()) { - const std::string &value = second.as(); - if (key == "name") { - name = value; - } - if (key == "root_id" || key == "rootId") { - root_id = value; - } - if (key == "configuration") { - configuration = value; - } - if (key == "fail_open") { - if (value == "false") { - fail_open = false; + const std::string &key = first.as(); + if (second.IsScalar()) { + const std::string &value = second.as(); + if (key == "name") { + name = value; + } + if (key == "root_id" || key == "rootId") { + root_id = value; + } + if (key == "configuration") { + configuration = value; + } + if (key == "fail_open") { + if (value == "false") { + fail_open = false; + } } } - } - if (second.IsMap() && (key == "capability_restriction_config")) { - if (second["allowed_capabilities"]) { - const YAML::Node ac_node = second["allowed_capabilities"]; - if (ac_node.IsSequence()) { - for (const auto &i : ac_node) { - auto ac = i.as(); - proxy_wasm::SanitizationConfig sc; - cap_maps[ac] = sc; + if (second.IsMap() && (key == "capability_restriction_config")) { + if (second["allowed_capabilities"]) { + const YAML::Node ac_node = second["allowed_capabilities"]; + if (ac_node.IsSequence()) { + for (const auto &i : ac_node) { + auto ac = i.as(); + proxy_wasm::SanitizationConfig sc; + cap_maps[ac] = sc; + } } } } - } - if (second.IsMap() && (key == "vm_config" || key == "vmConfig")) { - for (YAML::const_iterator it3 = second.begin(); it3 != second.end(); ++it3) { - const YAML::Node vm_config_first = it3->first; - const YAML::Node vm_config_second = it3->second; + if (second.IsMap() && (key == "vm_config" || key == "vmConfig")) { + for (YAML::const_iterator it3 = second.begin(); it3 != second.end(); ++it3) { + const YAML::Node vm_config_first = it3->first; + const YAML::Node vm_config_second = it3->second; - const std::string &vm_config_key = vm_config_first.as(); - if (vm_config_second.IsScalar()) { - const std::string &vm_config_value = vm_config_second.as(); - if (vm_config_key == "runtime") { - runtime = vm_config_value; - } - if (vm_config_key == "vm_id" || vm_config_key == "vmId") { - vm_id = vm_config_value; - } - if (vm_config_key == "configuration") { - vm_configuration = vm_config_value; - } - if (vm_config_key == "allow_precompiled") { - if (vm_config_value == "false") { - allow_precompiled = false; + const std::string &vm_config_key = vm_config_first.as(); + if (vm_config_second.IsScalar()) { + const std::string &vm_config_value = vm_config_second.as(); + if (vm_config_key == "runtime") { + runtime = vm_config_value; + } + if (vm_config_key == "vm_id" || vm_config_key == "vmId") { + vm_id = vm_config_value; + } + if (vm_config_key == "configuration") { + vm_configuration = vm_config_value; + } + if (vm_config_key == "allow_precompiled") { + if (vm_config_value == "false") { + allow_precompiled = false; + } } } - } - if (vm_config_key == "environment_variables" && vm_config_second.IsMap()) { - if (vm_config_second["host_env_keys"]) { - const YAML::Node ek_node = vm_config_second["host_env_keys"]; - if (ek_node.IsSequence()) { - for (const auto &i : ek_node) { - auto ek = i.as(); - if (auto *value = std::getenv(ek.data())) { - envs[ek] = value; + if (vm_config_key == "environment_variables" && vm_config_second.IsMap()) { + if (vm_config_second["host_env_keys"]) { + const YAML::Node ek_node = vm_config_second["host_env_keys"]; + if (ek_node.IsSequence()) { + for (const auto &i : ek_node) { + auto ek = i.as(); + if (auto *value = std::getenv(ek.data())) { + envs[ek] = value; + } } } } - } - if (vm_config_second["key_values"]) { - const YAML::Node kv_node = vm_config_second["key_values"]; - if (kv_node.IsMap()) { - for (YAML::const_iterator it4 = kv_node.begin(); it4 != kv_node.end(); ++it4) { - envs[it4->first.as()] = it4->second.as(); + if (vm_config_second["key_values"]) { + const YAML::Node kv_node = vm_config_second["key_values"]; + if (kv_node.IsMap()) { + for (YAML::const_iterator it4 = kv_node.begin(); it4 != kv_node.end(); ++it4) { + envs[it4->first.as()] = it4->second.as(); + } } } } - } - if (vm_config_key == "code" && vm_config_second.IsMap()) { - if (vm_config_second["local"]) { - const YAML::Node local_node = vm_config_second["local"]; - if (local_node["filename"]) { - wasm_filename = local_node["filename"].as(); + if (vm_config_key == "code" && vm_config_second.IsMap()) { + if (vm_config_second["local"]) { + const YAML::Node local_node = vm_config_second["local"]; + if (local_node["filename"]) { + wasm_filename = local_node["filename"].as(); + } } } } } } - } - // only allowed one config block (first one) for now - break; + // only allowed one config block (first one) for now + break; + } + } catch (const YAML::Exception &e) { + TSError("[wasm][%s] YAML::Exception %s when parsing YAML config file %s for wasm", __FUNCTION__, e.what(), cfn.c_str()); + return false; } - } catch (const YAML::Exception &e) { - TSError("[wasm][%s] YAML::Exception %s when parsing YAML config file %s for wasm", __FUNCTION__, e.what(), - wasm_config->config_filename.c_str()); - return false; - } - std::shared_ptr wasm; - if (runtime == "ats.wasm.runtime.wasmedge") { + std::shared_ptr wasm; + if (runtime == "ats.wasm.runtime.wasmedge") { #ifdef WASMEDGE - wasm = std::make_shared(proxy_wasm::createWasmEdgeVm(), // VM - vm_id, // vm_id - vm_configuration, // vm_configuration - "", // vm_key, - envs, // envs - cap_maps // allowed capabilities - ); + wasm = std::make_shared(proxy_wasm::createWasmEdgeVm(), // VM + vm_id, // vm_id + vm_configuration, // vm_configuration + "", // vm_key, + envs, // envs + cap_maps // allowed capabilities + ); #else - TSError("[wasm][%s] wasm unable to use WasmEdge runtime", __FUNCTION__); - return false; + TSError("[wasm][%s] wasm unable to use WasmEdge runtime", __FUNCTION__); + return false; #endif - } else if (runtime == "ats.wasm.runtime.wamr") { + } else if (runtime == "ats.wasm.runtime.wamr") { #ifdef WAMR - wasm = std::make_shared(proxy_wasm::createWamrVm(), // VM - vm_id, // vm_id - vm_configuration, // vm_configuration - "", // vm_key, - envs, // envs - cap_maps // allowed capabilities - ); + wasm = std::make_shared(proxy_wasm::createWamrVm(), // VM + vm_id, // vm_id + vm_configuration, // vm_configuration + "", // vm_key, + envs, // envs + cap_maps // allowed capabilities + ); #else - TSError("[wasm][%s] wasm unable to use WAMR runtime", __FUNCTION__); - return false; + TSError("[wasm][%s] wasm unable to use WAMR runtime", __FUNCTION__); + return false; #endif - } else { - TSError("[wasm][%s] wasm unable to use %s runtime", __FUNCTION__, runtime.c_str()); - return false; - } - wasm->wasm_vm()->integration() = std::make_unique(); - - auto plugin = std::make_shared(name, // name - root_id, // root_id - vm_id, // vm_id - runtime, // engine - configuration, // plugin_configuration - fail_open, // failopen - "" // TODO: plugin key from where ? - ); - - wasm_config->wasm_filename = wasm_filename; - if (*wasm_config->wasm_filename.begin() != '/') { - wasm_config->wasm_filename = std::string(TSConfigDirGet()) + "/" + wasm_config->wasm_filename; - } - std::string code; - if (read_file(wasm_config->wasm_filename, &code) < 0) { - TSError("[wasm][%s] wasm unable to read file '%s'", __FUNCTION__, wasm_config->wasm_filename.c_str()); - return false; - } + } else { + TSError("[wasm][%s] wasm unable to use %s runtime", __FUNCTION__, runtime.c_str()); + return false; + } + wasm->wasm_vm()->integration() = std::make_unique(); + + auto plugin = std::make_shared(name, // name + root_id, // root_id + vm_id, // vm_id + runtime, // engine + configuration, // plugin_configuration + fail_open, // failopen + "" // TODO: plugin key from where ? + ); - if (code.empty()) { - TSError("[wasm][%s] code is empty", __FUNCTION__); - return false; - } + if (*wasm_filename.begin() != '/') { + wasm_filename = std::string(TSConfigDirGet()) + "/" + wasm_filename; + } + std::string code; + if (read_file(wasm_filename, &code) < 0) { + TSError("[wasm][%s] wasm unable to read file '%s'", __FUNCTION__, wasm_filename.c_str()); + return false; + } - if (!wasm) { - TSError("[wasm][%s] wasm wasm wasm unable to create vm", __FUNCTION__); - return false; - } - if (!wasm->load(code, allow_precompiled)) { - TSError("[wasm][%s] Failed to load Wasm code", __FUNCTION__); - return false; - } - if (!wasm->initialize()) { - TSError("[wasm][%s] Failed to initialize Wasm code", __FUNCTION__); - return false; - } + if (code.empty()) { + TSError("[wasm][%s] code is empty", __FUNCTION__); + return false; + } + + if (!wasm) { + TSError("[wasm][%s] wasm wasm wasm unable to create vm", __FUNCTION__); + return false; + } + if (!wasm->load(code, allow_precompiled)) { + TSError("[wasm][%s] Failed to load Wasm code", __FUNCTION__); + return false; + } + if (!wasm->initialize()) { + TSError("[wasm][%s] Failed to initialize Wasm code", __FUNCTION__); + return false; + } - TSCont contp = TSContCreate(schedule_handler, TSMutexCreate()); - auto *rootContext = wasm->start(plugin, contp); + TSCont contp = TSContCreate(schedule_handler, TSMutexCreate()); + auto *rootContext = wasm->start(plugin, contp); - if (!wasm->configure(rootContext, plugin)) { - TSError("[wasm][%s] Failed to configure Wasm", __FUNCTION__); - return false; + if (!wasm->configure(rootContext, plugin)) { + TSError("[wasm][%s] Failed to configure Wasm", __FUNCTION__); + return false; + } + + auto new_config = std::make_pair(wasm, plugin); + new_configs.push_front(new_config); } - auto old_wasm = wasm_config->wasm; - auto old_plugin = wasm_config->plugin; + auto old_configs = wasm_config->configs; - wasm_config->wasm = wasm; - wasm_config->plugin = plugin; + wasm_config->configs = new_configs; - if (old_wasm != nullptr) { - TSDebug(WASM_DEBUG_TAG, "[%s] previous WasmBase exists", __FUNCTION__); - TSMutexLock(old_wasm->mutex()); - if (old_wasm->readyShutdown()) { - TSDebug(WASM_DEBUG_TAG, "[%s] starting WasmBase Shutdown", __FUNCTION__); - old_wasm->startShutdown(); - if (!old_wasm->readyDelete()) { - TSDebug(WASM_DEBUG_TAG, "[%s] not ready to delete WasmBase/PluginBase", __FUNCTION__); + for (auto it = old_configs.begin(); it != old_configs.end(); it++) { + std::shared_ptr old_wasm = it->first; + std::shared_ptr old_plugin = it->second; + + if (old_wasm != nullptr) { + TSDebug(WASM_DEBUG_TAG, "[%s] previous WasmBase exists", __FUNCTION__); + TSMutexLock(old_wasm->mutex()); + if (old_wasm->readyShutdown()) { + TSDebug(WASM_DEBUG_TAG, "[%s] starting WasmBase Shutdown", __FUNCTION__); + old_wasm->startShutdown(); + if (!old_wasm->readyDelete()) { + TSDebug(WASM_DEBUG_TAG, "[%s] not ready to delete WasmBase/PluginBase", __FUNCTION__); + auto deleted_config = std::make_pair(old_wasm, old_plugin); + wasm_config->deleted_configs.push_front(deleted_config); + } + } else { + TSDebug(WASM_DEBUG_TAG, "[%s] not ready to shutdown WasmBase", __FUNCTION__); auto deleted_config = std::make_pair(old_wasm, old_plugin); wasm_config->deleted_configs.push_front(deleted_config); } - } else { - TSDebug(WASM_DEBUG_TAG, "[%s] not ready to shutdown WasmBase", __FUNCTION__); - auto deleted_config = std::make_pair(old_wasm, old_plugin); - wasm_config->deleted_configs.push_front(deleted_config); + TSMutexUnlock(old_wasm->mutex()); } - TSMutexUnlock(old_wasm->mutex()); } return true; @@ -556,11 +598,13 @@ TSPluginInit(int argc, const char *argv[]) wasm_config = std::make_unique(); - std::string filename = std::string(argv[1]); - if (*filename.begin() != '/') { - filename = std::string(TSConfigDirGet()) + "/" + filename; + for (int i = 1; i < argc; i++) { + std::string filename = std::string(argv[i]); + if (*filename.begin() != '/') { + filename = std::string(TSConfigDirGet()) + "/" + filename; + } + wasm_config->config_filenames.push_front(filename); } - wasm_config->config_filename = filename; if (!read_configuration()) { return;