Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use same input if for policy if they have same tap and config #186

Merged
6 commits merged into from
Jan 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
BUILD_TYPE: Release
CTEST_OUTPUT_ON_FAILURE: 1

jobs:
build:
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/static_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ on:
env:
# Customize the CMake build type here (Release, Debug, RelWithDebInfo, etc.)
BUILD_TYPE: Release
CTEST_OUTPUT_ON_FAILURE: 1

jobs:
build:
Expand Down
3 changes: 0 additions & 3 deletions src/AbstractModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ void AbstractRunnableModule::common_info_json(json &j) const
{
j["module"]["name"] = _name;
j["module"]["running"] = _running.load();
if (_policy) {
j["module"]["policy"]["name"] = _policy->name();
}
config_json(j["module"]["config"]);
}

Expand Down
12 changes: 0 additions & 12 deletions src/AbstractModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ class AbstractRunnableModule : public AbstractModule
protected:
std::atomic_bool _running = false;

Policy *_policy = nullptr;

void common_info_json(json &j) const;

public:
Expand All @@ -71,16 +69,6 @@ class AbstractRunnableModule : public AbstractModule
{
}

void set_policy(Policy *policy)
{
_policy = policy;
}

const Policy *policy() const
{
return _policy;
}

virtual ~AbstractRunnableModule(){};

virtual void start() = 0;
Expand Down
36 changes: 36 additions & 0 deletions src/Configurable.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <nlohmann/json.hpp>
#include <regex>
#include <shared_mutex>
#include <sstream>
#include <string>
#include <unordered_map>
#include <variant>
Expand Down Expand Up @@ -161,6 +162,41 @@ class Configurable
}
}
}

const std::string config_hash()
{
std::shared_lock lock(_config_mutex);
std::vector<std::string> key_values;
for (const auto &[key, value] : _config) {
std::visit([&key_values, key = key](auto &&arg) {
std::string data;
data += key;
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, StringList>) {
auto temp_list = arg;
std::sort(temp_list.begin(), temp_list.end());
for (const auto &s : temp_list) {
data += s;
}
} else if constexpr (std::is_same_v<T, std::string>) {
data += arg;
} else {
data += std::to_string(arg);
}
key_values.push_back(data);
},
value);
}
std::sort(key_values.begin(), key_values.end());
std::string hash;
for (auto &data : key_values) {
hash += data;
}
auto h1 = std::hash<std::string>{}(hash);
std::stringstream string_stream;
string_stream << std::hex << h1;
return string_stream.str();
}
};

class Config : public Configurable
Expand Down
21 changes: 20 additions & 1 deletion src/InputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace visor {

class InputStream : public AbstractRunnableModule
{
mutable std::shared_mutex _input_mutex;
std::vector<const Policy *> _policies;

public:
InputStream(const std::string &name)
Expand All @@ -20,6 +22,24 @@ class InputStream : public AbstractRunnableModule

virtual ~InputStream(){};

void add_policy(const Policy *policy)
{
std::unique_lock lock(_input_mutex);
_policies.push_back(policy);
}

void remove_policy(const Policy *policy)
{
std::unique_lock lock(_input_mutex);
_policies.erase(std::remove(_policies.begin(), _policies.end(), policy), _policies.end());
}

size_t policies_count() const
{
std::unique_lock lock(_input_mutex);
return _policies.size();
}

virtual size_t consumer_count() const = 0;

void common_info_json(json &j) const
Expand All @@ -31,4 +51,3 @@ class InputStream : public AbstractRunnableModule
};

}

68 changes: 44 additions & 24 deletions src/Policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,30 +118,39 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
throw PolicyException(fmt::format("invalid input config for tap '{}': {}", tap_name, e.what()));
}
}
// TODO separate config and filter. for now, they merge
tap_filter.config_merge(tap_config);

// Create Policy
auto policy = std::make_unique<Policy>(policy_name, tap);
// if and only if policy succeeds, we will return this in result set
Policy *policy_ptr = policy.get();
std::string input_stream_module_name = tap_filter.config_hash();
input_stream_module_name.insert(0, tap->name() + "-");

// Instantiate stream from tap
std::unique_ptr<InputStream> input_stream;
std::string input_stream_module_name;
try {
spdlog::get("visor")->info("policy [{}]: instantiating Tap: {}", policy_name, tap_name);
// TODO separate config and filter. for now, they merge
tap_filter.config_merge(tap_config);
input_stream = tap->instantiate(policy.get(), &tap_filter);
// ensure tap input type matches policy input tap
if (input_node["input_type"].as<std::string>() != tap->input_plugin()->plugin()) {
throw PolicyException(fmt::format("input_type for policy specified tap '{}' doesn't match tap's defined input type: {}/{}", tap_name, input_node["input_type"].as<std::string>(), tap->input_plugin()->plugin()));
InputStream *input_ptr;
std::unique_lock<std::shared_mutex> input_lock;
if (_registry->input_manager()->module_exists(input_stream_module_name)) {
spdlog::get("visor")->info("policy [{}]: input stream already exists. reusing: {}", policy_name, input_stream_module_name);
auto result_input = _registry->input_manager()->module_get_locked(input_stream_module_name);
input_ptr = result_input.module;
input_lock = std::move(result_input.lock);
This conversation was marked as resolved.
Show resolved Hide resolved
} else {
// Instantiate stream from tap
try {
spdlog::get("visor")->info("policy [{}]: instantiating Tap: {}", policy_name, tap_name);
input_stream = tap->instantiate(&tap_filter, input_stream_module_name);
// ensure tap input type matches policy input tap
if (input_node["input_type"].as<std::string>() != tap->input_plugin()->plugin()) {
throw PolicyException(fmt::format("input_type for policy specified tap '{}' doesn't match tap's defined input type: {}/{}", tap_name, input_node["input_type"].as<std::string>(), tap->input_plugin()->plugin()));
}
input_ptr = input_stream.get();
} catch (std::runtime_error &e) {
throw PolicyException(fmt::format("unable to instantiate tap '{}': {}", tap_name, e.what()));
}
input_stream_module_name = input_stream->name();
} catch (std::runtime_error &e) {
throw PolicyException(fmt::format("unable to instantiate tap '{}': {}", tap_name, e.what()));
}
policy->set_input_stream(input_stream.get());

// Create Policy
auto policy = std::make_unique<Policy>(policy_name, tap);
// if and only if policy succeeds, we will return this in result set
Policy *policy_ptr = policy.get();
policy->set_input_stream(input_ptr);
// Handler Section
if (!it->second["handlers"] || !it->second["handlers"].IsMap()) {
throw PolicyException("missing or invalid handler configuration at key 'handlers'");
Expand Down Expand Up @@ -224,7 +233,7 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)

std::unique_ptr<StreamHandler> handler_module;
if (!handler_sequence || handler_modules.empty()) {
handler_module = handler_plugin->second->instantiate(policy_name + "-" + handler_module_name, input_stream.get(), &handler_config);
handler_module = handler_plugin->second->instantiate(policy_name + "-" + handler_module_name, input_ptr, &handler_config);
} else {
// for sequence, use only previous handler
handler_module = handler_plugin->second->instantiate(policy_name + "-" + handler_module_name, nullptr, &handler_config, handler_modules.back().get());
Expand All @@ -249,7 +258,9 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what()));
}
try {
_registry->input_manager()->module_add(std::move(input_stream));
if (input_stream) {
_registry->input_manager()->module_add(std::move(input_stream));
}
} catch (ModuleException &e) {
// note that if this call excepts, we are in an unknown state and the exception will propagate
module_remove(policy_name);
Expand All @@ -276,6 +287,7 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
}

// success
input_ptr->add_policy(policy_ptr);
result.push_back(policy_ptr);
}
return result;
Expand All @@ -299,7 +311,10 @@ void PolicyManager::remove_policy(const std::string &name)
for (const auto &name : module_names) {
_registry->handler_manager()->module_remove(name);
}
_registry->input_manager()->module_remove(input_name);

if (!policy->input_stream()->policies_count()) {
_registry->input_manager()->module_remove(input_name);
}

_map.erase(name);
}
Expand Down Expand Up @@ -335,9 +350,14 @@ void Policy::stop()
}
spdlog::get("visor")->info("policy [{}]: stopping", _name);
if (_input_stream->running()) {
spdlog::get("visor")->info("policy [{}]: stopping input instance: {}", _name, _input_stream->name());
_input_stream->stop();
if (_input_stream->policies_count() <= 1) {
spdlog::get("visor")->info("policy [{}]: stopping input instance: {}", _name, _input_stream->name());
_input_stream->stop();
} else {
spdlog::get("visor")->info("policy [{}]: input instance {} not stopped because it is in use by another policy.", _name, _input_stream->name());
}
}
_input_stream->remove_policy(this);
for (auto &mod : _modules) {
if (mod->running()) {
spdlog::get("visor")->info("policy [{}]: stopping handler instance: {}", _name, mod->name());
Expand Down
6 changes: 3 additions & 3 deletions src/Taps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ void TapManager::load(const YAML::Node &tap_yaml, bool strict)
}
}

std::unique_ptr<InputStream> Tap::instantiate(Policy *policy, const Configurable *filter_config)
std::unique_ptr<InputStream> Tap::instantiate(const Configurable *filter_config, std::string input_name)
{
Config c;
c.config_merge(dynamic_cast<const Configurable &>(*this));
c.config_merge(*filter_config);
auto module = _input_plugin->instantiate(_name + "-" + policy->name(), &c);
module->set_policy(policy);
auto module = _input_plugin->instantiate(input_name, &c);

return module;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Taps.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Tap : public AbstractModule
assert(input_plugin);
}

std::unique_ptr<InputStream> instantiate(Policy *policy, const Configurable *filter_config);
std::unique_ptr<InputStream> instantiate(const Configurable *filter_config, std::string input_name);

const InputModulePlugin *input_plugin() const
{
Expand Down
71 changes: 67 additions & 4 deletions src/tests/test_policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,31 @@ version: "1.0"
type: net
)";

auto policies_config_same_input = R"(
version: "1.0"

visor:
policies:
# policy name and description
same_input:
kind: collection
input:
# this must reference a tap name, or application of the policy will fail
tap: anycast
input_type: mock
config:
sample: value
filter:
bpf: "tcp or udp"
handlers:
window_config:
num_periods: 5
deep_sample_rate: 100
modules:
net:
type: net
)";

auto policies_config_bad1 = R"(
visor:
policies:
Expand Down Expand Up @@ -335,7 +360,7 @@ TEST_CASE("Policies", "[policies]")
REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(policy->name() == "default_view");
CHECK(policy->input_stream()->name() == "anycast-default_view");
CHECK(policy->input_stream()->name().find("anycast-") != std::string::npos);
CHECK(policy->input_stream()->config_get<std::string>("bpf") == "tcp or udp"); // TODO this will move to filter member variable
CHECK(policy->input_stream()->config_get<std::string>("sample") == "value");
CHECK(policy->modules()[0]->name() == "default_view-default_net");
Expand All @@ -354,7 +379,6 @@ TEST_CASE("Policies", "[policies]")
CoreRegistry registry;
registry.start(nullptr);
YAML::Node config_file = YAML::Load(policies_config_hseq);

CHECK(config_file["visor"]["policies"]);
CHECK(config_file["visor"]["policies"].IsMap());
REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true));
Expand All @@ -363,7 +387,7 @@ TEST_CASE("Policies", "[policies]")
REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(policy->name() == "default_view");
CHECK(policy->input_stream()->name() == "anycast-default_view");
CHECK(policy->input_stream()->name().find("anycast-") != std::string::npos);
CHECK(policy->modules()[0]->name() == "default_view-default_dns");
CHECK(policy->modules()[1]->name() == "default_view-default_net");
CHECK(policy->input_stream()->running());
Expand Down Expand Up @@ -519,9 +543,17 @@ TEST_CASE("Policies", "[policies]")
registry.handler_manager()->module_add(std::move(mod));
REQUIRE_THROWS_WITH(registry.policy_manager()->load(config_file["visor"]["policies"]), "policy [default_view-default_net] creation failed (handler: default_view): module name 'default_view-default_net' already exists");

auto input_node = config_file["visor"]["policies"]["default_view"]["input"];
Config input_filter;
input_filter.config_set_yaml(input_node["filter"]);
Config input_config;
input_config.config_set_yaml(input_node["config"]);
input_filter.config_merge(input_config);
auto hash = input_filter.config_hash();

// ensure the modules were rolled back
REQUIRE(!registry.policy_manager()->module_exists("default_view"));
REQUIRE(!registry.input_manager()->module_exists("anycast-default_view"));
REQUIRE(!registry.input_manager()->module_exists("anycast-" + hash));
}
SECTION("Good Config, test stop()")
{
Expand Down Expand Up @@ -585,4 +617,35 @@ TEST_CASE("Policies", "[policies]")
new_lock.unlock();
REQUIRE_NOTHROW(registry.policy_manager()->remove_policy("default_view"));
}

SECTION("Good Config, policies with same tap and input")
{
CoreRegistry registry;
registry.start(nullptr);
YAML::Node config_file = YAML::Load(policies_config);

CHECK(config_file["visor"]["policies"]);
CHECK(config_file["visor"]["policies"].IsMap());

REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true));
REQUIRE_NOTHROW(registry.policy_manager()->load(config_file["visor"]["policies"]));

REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(policy->name() == "default_view");
CHECK(policy->input_stream()->name().find("anycast-") != std::string::npos);
lock.unlock();

YAML::Node config_file2 = YAML::Load(policies_config_same_input);
CHECK(config_file2["visor"]["policies"]);
CHECK(config_file2["visor"]["policies"].IsMap());

REQUIRE_NOTHROW(registry.policy_manager()->load(config_file2["visor"]["policies"]));

REQUIRE(registry.policy_manager()->module_exists("same_input"));
auto [policy2, lock2] = registry.policy_manager()->module_get_locked("same_input");
CHECK(policy2->name() == "same_input");
CHECK(policy2->input_stream()->name() == policy->input_stream()->name());
lock2.unlock();
}
}