Skip to content

Commit

Permalink
Use same input if for policy if they have same tap and config
Browse files Browse the repository at this point in the history
    - Only create Input Stream if the config and tap is different
    - generate hash from Configurable data
    - Add vector of policies that are using the input in InputStream
    - InputStream name : tap + "-" + hash of input config
  • Loading branch information
Leonardo Parente committed Jan 21, 2022
1 parent 8282af7 commit 142f426
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 47 deletions.
3 changes: 0 additions & 3 deletions src/AbstractModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ void AbstractRunnableModule::common_info_json(json &j) const
{
j["module"]["name"] = _name;
j["module"]["running"] = _running.load();
if (_policy) {
j["module"]["policy"]["name"] = _policy->name();
}
config_json(j["module"]["config"]);
}

Expand Down
12 changes: 0 additions & 12 deletions src/AbstractModule.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ class AbstractRunnableModule : public AbstractModule
protected:
std::atomic_bool _running = false;

Policy *_policy = nullptr;

void common_info_json(json &j) const;

public:
Expand All @@ -71,16 +69,6 @@ class AbstractRunnableModule : public AbstractModule
{
}

void set_policy(Policy *policy)
{
_policy = policy;
}

const Policy *policy() const
{
return _policy;
}

virtual ~AbstractRunnableModule(){};

virtual void start() = 0;
Expand Down
25 changes: 25 additions & 0 deletions src/Configurable.h
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,31 @@ class Configurable
}
}
}

void config_hash(std::string &hash)
{
std::shared_lock lock(_config_mutex);
std::string data;
for (const auto &[key, value] : _config) {
std::visit([&data, key = key](auto &&arg) {
data += key;
using T = std::decay_t<decltype(arg)>;
if constexpr (std::is_same_v<T, StringList>) {
for (const auto &s : arg) {
data += s;
}
} else if constexpr (std::is_same_v<T, std::string>) {
data += arg;
} else {
data += std::to_string(arg);
}
},
value);
}
std::sort(data.begin(), data.end());
auto h1 = std::hash<std::string>{}(data);
hash = std::to_string(h1);
}
};

class Config : public Configurable
Expand Down
21 changes: 20 additions & 1 deletion src/InputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ namespace visor {

class InputStream : public AbstractRunnableModule
{
mutable std::shared_mutex _input_mutex;
std::vector<const Policy *> _policies;

public:
InputStream(const std::string &name)
Expand All @@ -20,6 +22,24 @@ class InputStream : public AbstractRunnableModule

virtual ~InputStream(){};

void set_policy(const Policy *policy)
{
std::unique_lock lock(_input_mutex);
_policies.push_back(policy);
}

void remove_policy(const Policy *policy)
{
std::unique_lock lock(_input_mutex);
_policies.erase(std::remove(_policies.begin(), _policies.end(), policy), _policies.end());
}

size_t policies_count() const
{
std::unique_lock lock(_input_mutex);
return _policies.size();
}

virtual size_t consumer_count() const = 0;

void common_info_json(json &j) const
Expand All @@ -31,4 +51,3 @@ class InputStream : public AbstractRunnableModule
};

}

69 changes: 45 additions & 24 deletions src/Policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,30 +118,40 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
throw PolicyException(fmt::format("invalid input config for tap '{}': {}", tap_name, e.what()));
}
}
// TODO separate config and filter. for now, they merge
tap_filter.config_merge(tap_config);

// Create Policy
auto policy = std::make_unique<Policy>(policy_name, tap);
// if and only if policy succeeds, we will return this in result set
Policy *policy_ptr = policy.get();
std::string input_stream_module_name;
tap_filter.config_hash(input_stream_module_name);
input_stream_module_name.insert(0, tap->name() + "-");

// Instantiate stream from tap
std::unique_ptr<InputStream> input_stream;
std::string input_stream_module_name;
try {
spdlog::get("visor")->info("policy [{}]: instantiating Tap: {}", policy_name, tap_name);
// TODO separate config and filter. for now, they merge
tap_filter.config_merge(tap_config);
input_stream = tap->instantiate(policy.get(), &tap_filter);
// ensure tap input type matches policy input tap
if (input_node["input_type"].as<std::string>() != tap->input_plugin()->plugin()) {
throw PolicyException(fmt::format("input_type for policy specified tap '{}' doesn't match tap's defined input type: {}/{}", tap_name, input_node["input_type"].as<std::string>(), tap->input_plugin()->plugin()));
InputStream *input_ptr;
if (_registry->input_manager()->module_exists(input_stream_module_name)) {
spdlog::get("visor")->info("policy [{}]: input stream already exists. reusing: {}", policy_name, input_stream_module_name);
std::unique_lock<std::shared_mutex> input_lock;
auto result_input = _registry->input_manager()->module_get_locked(input_stream_module_name);
input_ptr = result_input.module;
input_lock = std::move(result_input.lock);
} else {
// Instantiate stream from tap
try {
spdlog::get("visor")->info("policy [{}]: instantiating Tap: {}", policy_name, tap_name);
input_stream = tap->instantiate(&tap_filter, input_stream_module_name);
// ensure tap input type matches policy input tap
if (input_node["input_type"].as<std::string>() != tap->input_plugin()->plugin()) {
throw PolicyException(fmt::format("input_type for policy specified tap '{}' doesn't match tap's defined input type: {}/{}", tap_name, input_node["input_type"].as<std::string>(), tap->input_plugin()->plugin()));
}
input_ptr = input_stream.get();
} catch (std::runtime_error &e) {
throw PolicyException(fmt::format("unable to instantiate tap '{}': {}", tap_name, e.what()));
}
input_stream_module_name = input_stream->name();
} catch (std::runtime_error &e) {
throw PolicyException(fmt::format("unable to instantiate tap '{}': {}", tap_name, e.what()));
}
policy->set_input_stream(input_stream.get());

// Create Policy
auto policy = std::make_unique<Policy>(policy_name, tap);
// if and only if policy succeeds, we will return this in result set
Policy *policy_ptr = policy.get();
policy->set_input_stream(input_ptr);
// Handler Section
if (!it->second["handlers"] || !it->second["handlers"].IsMap()) {
throw PolicyException("missing or invalid handler configuration at key 'handlers'");
Expand Down Expand Up @@ -224,7 +234,7 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)

std::unique_ptr<StreamHandler> handler_module;
if (!handler_sequence || handler_modules.empty()) {
handler_module = handler_plugin->second->instantiate(policy_name + "-" + handler_module_name, input_stream.get(), &handler_config);
handler_module = handler_plugin->second->instantiate(policy_name + "-" + handler_module_name, input_ptr, &handler_config);
} else {
// for sequence, use only previous handler
handler_module = handler_plugin->second->instantiate(policy_name + "-" + handler_module_name, nullptr, &handler_config, handler_modules.back().get());
Expand All @@ -249,7 +259,9 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
throw PolicyException(fmt::format("policy [{}] creation failed (policy): {}", policy_name, e.what()));
}
try {
_registry->input_manager()->module_add(std::move(input_stream));
if (!_registry->input_manager()->module_exists(input_stream_module_name)) {
_registry->input_manager()->module_add(std::move(input_stream));
}
} catch (ModuleException &e) {
// note that if this call excepts, we are in an unknown state and the exception will propagate
module_remove(policy_name);
Expand All @@ -276,6 +288,7 @@ std::vector<Policy *> PolicyManager::load(const YAML::Node &policy_yaml)
}

// success
input_ptr->set_policy(policy_ptr);
result.push_back(policy_ptr);
}
return result;
Expand All @@ -299,7 +312,10 @@ void PolicyManager::remove_policy(const std::string &name)
for (const auto &name : module_names) {
_registry->handler_manager()->module_remove(name);
}
_registry->input_manager()->module_remove(input_name);

if (policy->input_stream()->policies_count()) {
_registry->input_manager()->module_remove(input_name);
}

_map.erase(name);
}
Expand Down Expand Up @@ -335,9 +351,14 @@ void Policy::stop()
}
spdlog::get("visor")->info("policy [{}]: stopping", _name);
if (_input_stream->running()) {
spdlog::get("visor")->info("policy [{}]: stopping input instance: {}", _name, _input_stream->name());
_input_stream->stop();
if (_input_stream->policies_count() <= 1) {
spdlog::get("visor")->info("policy [{}]: stopping input instance: {}", _name, _input_stream->name());
_input_stream->stop();
} else {
spdlog::get("visor")->info("policy [{}]: input instance {} not stopped because it is in use by another policy.", _name, _input_stream->name());
}
}
_input_stream->remove_policy(this);
for (auto &mod : _modules) {
if (mod->running()) {
spdlog::get("visor")->info("policy [{}]: stopping handler instance: {}", _name, mod->name());
Expand Down
6 changes: 3 additions & 3 deletions src/Taps.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ void TapManager::load(const YAML::Node &tap_yaml, bool strict)
}
}

std::unique_ptr<InputStream> Tap::instantiate(Policy *policy, const Configurable *filter_config)
std::unique_ptr<InputStream> Tap::instantiate(const Configurable *filter_config, std::string input_name)
{
Config c;
c.config_merge(dynamic_cast<const Configurable &>(*this));
c.config_merge(*filter_config);
auto module = _input_plugin->instantiate(_name + "-" + policy->name(), &c);
module->set_policy(policy);
auto module = _input_plugin->instantiate(input_name, &c);

return module;
}

Expand Down
2 changes: 1 addition & 1 deletion src/Taps.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class Tap : public AbstractModule
assert(input_plugin);
}

std::unique_ptr<InputStream> instantiate(Policy *policy, const Configurable *filter_config);
std::unique_ptr<InputStream> instantiate(const Configurable *filter_config, std::string input_name);

const InputModulePlugin *input_plugin() const
{
Expand Down
62 changes: 59 additions & 3 deletions src/tests/test_policies.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,31 @@ version: "1.0"
type: net
)";

auto policies_config_same_input = R"(
version: "1.0"
visor:
policies:
# policy name and description
same_input:
kind: collection
input:
# this must reference a tap name, or application of the policy will fail
tap: anycast
input_type: mock
config:
sample: value
filter:
bpf: "tcp or udp"
handlers:
window_config:
num_periods: 5
deep_sample_rate: 100
modules:
net:
type: net
)";

auto policies_config_bad1 = R"(
visor:
policies:
Expand Down Expand Up @@ -335,7 +360,7 @@ TEST_CASE("Policies", "[policies]")
REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(policy->name() == "default_view");
CHECK(policy->input_stream()->name() == "anycast-default_view");
CHECK(policy->input_stream()->name() == "anycast-14766593178614402303");
CHECK(policy->input_stream()->config_get<std::string>("bpf") == "tcp or udp"); // TODO this will move to filter member variable
CHECK(policy->input_stream()->config_get<std::string>("sample") == "value");
CHECK(policy->modules()[0]->name() == "default_view-default_net");
Expand Down Expand Up @@ -363,7 +388,7 @@ TEST_CASE("Policies", "[policies]")
REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(policy->name() == "default_view");
CHECK(policy->input_stream()->name() == "anycast-default_view");
CHECK(policy->input_stream()->name() == "anycast-16071510057998546344");
CHECK(policy->modules()[0]->name() == "default_view-default_dns");
CHECK(policy->modules()[1]->name() == "default_view-default_net");
CHECK(policy->input_stream()->running());
Expand Down Expand Up @@ -521,7 +546,7 @@ TEST_CASE("Policies", "[policies]")

// ensure the modules were rolled back
REQUIRE(!registry.policy_manager()->module_exists("default_view"));
REQUIRE(!registry.input_manager()->module_exists("anycast-default_view"));
REQUIRE(!registry.input_manager()->module_exists("anycast-14766593178614402303"));
}
SECTION("Good Config, test stop()")
{
Expand Down Expand Up @@ -585,4 +610,35 @@ TEST_CASE("Policies", "[policies]")
new_lock.unlock();
REQUIRE_NOTHROW(registry.policy_manager()->remove_policy("default_view"));
}

SECTION("Good Config, policies with same tap and input")
{
CoreRegistry registry;
registry.start(nullptr);
YAML::Node config_file = YAML::Load(policies_config);

CHECK(config_file["visor"]["policies"]);
CHECK(config_file["visor"]["policies"].IsMap());

REQUIRE_NOTHROW(registry.tap_manager()->load(config_file["visor"]["taps"], true));
REQUIRE_NOTHROW(registry.policy_manager()->load(config_file["visor"]["policies"]));

REQUIRE(registry.policy_manager()->module_exists("default_view"));
auto [policy, lock] = registry.policy_manager()->module_get_locked("default_view");
CHECK(policy->name() == "default_view");
CHECK(policy->input_stream()->name() == "anycast-14766593178614402303");
lock.unlock();

YAML::Node config_file2 = YAML::Load(policies_config_same_input);
CHECK(config_file2["visor"]["policies"]);
CHECK(config_file2["visor"]["policies"].IsMap());

REQUIRE_NOTHROW(registry.policy_manager()->load(config_file2["visor"]["policies"]));

REQUIRE(registry.policy_manager()->module_exists("same_input"));
auto [policy2, lock2] = registry.policy_manager()->module_get_locked("same_input");
CHECK(policy2->name() == "same_input");
CHECK(policy2->input_stream()->name() == "anycast-14766593178614402303");
lock2.unlock();
}
}

0 comments on commit 142f426

Please sign in to comment.