Skip to content

Commit

Permalink
Rust integration: Adding variables & clients.
Browse files Browse the repository at this point in the history
Co-authored-by: Dima Dorezyuk <ddo@qwello.eu>
Signed-off-by: Holger Rapp <hra@qwello.eu>
Signed-off-by: Holger Rapp <HolgerRapp@gmx.net>
  • Loading branch information
SirVer and Dima Dorezyuk committed Oct 10, 2023
1 parent 7fcf0ac commit 9cf60a7
Show file tree
Hide file tree
Showing 5 changed files with 193 additions and 56 deletions.
16 changes: 15 additions & 1 deletion everestrs/everestrs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ This is Rust support using cxx.rs to wrap the framework C++ library.
- You should now be able to configure the `RsSmokeTest` module in your config
YAML.

## Differences to other EVerest language wrappers

- The `enable_external_mqtt` is ignored for Rust modules. If you want to interact
with MQTT externally, just pull an external mqtt module (for example the
really excellent [rumqttc](https://docs.rs/rumqttc/latest/rumqttc/)) crate
into your module and use it directly. This is a concious decision to future
proof, should everst at some point move to something different than MQTT as
transport layer and for cleaner abstraction.

## Status

This code is currently only supporting providing an interface to be implemented, i.e. no variables publish or receiving and no calling of other interfaces. Those features are straightforward, quick and easy to implement, but for now this is probably enough to iron out the integration questions.
Full support for requiring and providing interfaces is implemented, missing
currently is:

- Support for EVerest logging
- Support for Configuration options of Modules
- Support for implementations with `max_connections != 1` or `min_connections != 1`
185 changes: 139 additions & 46 deletions everestrs/everestrs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use argh::FromArgs;
use serde::de::DeserializeOwned;
use std::collections::HashMap;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::{RwLock, Weak};
use thiserror::Error;

#[derive(Error, Debug)]
Expand All @@ -19,14 +19,10 @@ pub type Result<T> = ::std::result::Result<T, Error>;

#[cxx::bridge]
mod ffi {
struct CommandMeta {
implementation_id: String,
name: String,
}

extern "Rust" {
type Runtime;
fn handle_command(self: &Runtime, meta: &CommandMeta, json: JsonBlob) -> JsonBlob;
fn handle_command(self: &Runtime, implementation_id: &str, name: &str, json: JsonBlob) -> JsonBlob;
fn handle_variable(self: &Runtime, implementation_id: &str, name: &str, json: JsonBlob);
fn on_ready(&self);
}

Expand All @@ -42,18 +38,35 @@ mod ffi {

/// Connects to the message broker and launches the main everest thread to push work
/// forward. Returns the module manifest.
fn initialize(self: Pin<&mut Module>) -> JsonBlob;
fn initialize(self: &Module) -> JsonBlob;

/// Returns the interface definition.
fn get_interface(self: &Module, interface_name: &str) -> JsonBlob;

/// Registers the callback of the `GenericModule` to be called and calls
/// Registers the callback of the `Subscriber` to be called and calls
/// `Everest::Module::signal_ready`.
fn signal_ready(self: &Module, rt: &Runtime);

/// Informs the runtime that we implement the command described in `meta` and registers the
/// `handle_command` method from the `GenericModule` as the handler.
fn provide_command(self: &Module, rt: &Runtime, meta: &CommandMeta);
/// Informs the runtime that we implement the command described by `implementation_id` and
/// `name`, and registers the `handle_command` method from the `Subscriber` as the handler.
fn provide_command(self: &Module, rt: &Runtime, implementation_id: String, name: String);

/// Call the command described by 'implementation_id' and `name` with the given 'args'.
/// Returns the return value.
fn call_command(
self: &Module,
implementation_id: &str,
name: &str,
args: JsonBlob,
) -> JsonBlob;

/// Informs the runtime that we want to receive the variable described by
/// `implementation_id` and `name` and registers the `handle_variable` method from the
/// `Subscriber` as the handler.
fn subscribe_variable(self: &Module, rt: &Runtime, implementation_id: String, name: String);

/// Publishes the given `blob` under the `implementation_id` and `name`.
fn publish_variable(self: &Module, implementation_id: &str, name: &str, blob: JsonBlob);
}
}

Expand All @@ -72,8 +85,14 @@ impl ffi::JsonBlob {
}
}

/// The cpp_module is for Rust an opaque type - so Rust can't tell if it is safe
/// to be accessed from multiple threads. We know that the c++ runtime is meant
/// to be used concurrently.
unsafe impl Sync for ffi::Module {}
unsafe impl Send for ffi::Module {}

/// Arguments for an EVerest node.
#[derive(FromArgs, Debug)]
/// An everest Node.
struct Args {
/// prefix of installation.
#[argh(option)]
Expand All @@ -94,7 +113,7 @@ struct Args {
/// details of the current module, i.e. it deals with JSON blobs and strings as command names. Code
/// generation is used to build the concrete, strongly typed abstractions that are then used by
/// final implementors.
pub trait GenericModule: Sync {
pub trait Subscriber: Sync + Send {
/// Handler for the command `name` on `implementation_id` with the given `parameters`. The return value
/// will be returned as the result of the call.
fn handle_command(
Expand All @@ -104,73 +123,147 @@ pub trait GenericModule: Sync {
parameters: HashMap<String, serde_json::Value>,
) -> Result<serde_json::Value>;

/// Handler for the variable `name` on `implementation_id` with the given `value`.
fn handle_variable(
&self,
implementation_id: &str,
name: &str,
value: serde_json::Value,
) -> Result<()>;

fn on_ready(&self) {}
}

/// The [Runtime] is the central piece of the bridge between c++ and Rust. We
/// have to ensure that the `cpp_module` never outlives the [Runtime] object.
/// This means that the [Runtime] **must** take ownership of `cpp_module`.
///
/// The `Subscriber` is not owned by the [Runtime] - in fact in derived user
/// code the `Subscriber` might take ownership of the [Runtime] - the weak
/// ownership hence is necessary to break possible ownership cycles.
pub struct Runtime {
// There are two subtleties here:
// 1. We are handing out pointers to `module_impl` to `cpp_module` for callbacks. The pointers
// must must stay valid for as long as `cpp_module` is alive. Hence `module_impl` must never
// move in memory. Rust can model this through the Pin concept which upholds this guarantee.
// We use a Box to put the object on the heap.
// 2. For the same reason, `module_impl` should outlive `cpp_module`, hence should be dropped
// after it. Rust drops fields in declaration order, hence `cpp_module` should come before
// `module_impl` in this struct.
cpp_module: cxx::UniquePtr<ffi::Module>,
module_impl: Pin<Box<dyn GenericModule>>,
sub_impl: RwLock<Option<Weak<dyn Subscriber>>>,
}

impl Runtime {
fn on_ready(&self) {
self.module_impl.on_ready();
self.sub_impl
.read()
.unwrap()
.as_ref()
.unwrap()
.upgrade()
.unwrap()
.on_ready();
}

fn handle_command(&self, meta: &ffi::CommandMeta, json: ffi::JsonBlob) -> ffi::JsonBlob {
fn handle_command(&self, impl_id: &str, name: &str, json: ffi::JsonBlob) -> ffi::JsonBlob {
let blob = self
.module_impl
.handle_command(&meta.implementation_id, &meta.name, json.deserialize())
.sub_impl
.read()
.unwrap()
.as_ref()
.unwrap()
.upgrade()
.unwrap()
.handle_command(impl_id, name, json.deserialize())
.unwrap();
ffi::JsonBlob::from_vec(serde_json::to_vec(&blob).unwrap())
}

fn handle_variable(&self, impl_id: &str, name: &str, json: ffi::JsonBlob) {
self.sub_impl
.read()
.unwrap()
.as_ref()
.unwrap()
.upgrade()
.unwrap()
.handle_variable(impl_id, name, json.deserialize())
.unwrap();
}

pub fn publish_variable<T: serde::Serialize>(
&self,
impl_id: &str,
var_name: &str,
message: &T,
) {
let blob = ffi::JsonBlob::from_vec(
serde_json::to_vec(&message).expect("Serialization of data cannot fail."),
);
(self.cpp_module)
.as_ref()
.unwrap()
.publish_variable(impl_id, var_name, blob);
}

pub fn call_command<T: serde::Serialize, R: serde::de::DeserializeOwned>(
&self,
impl_id: &str,
name: &str,
args: &T,
) -> R {
let blob = ffi::JsonBlob::from_vec(
serde_json::to_vec(args).expect("Serialization of data cannot fail."),
);
let return_value = (self.cpp_module)
.as_ref()
.unwrap()
.call_command(impl_id, name, blob);
serde_json::from_slice(&return_value.data).unwrap()
}

// TODO(hrapp): This function could use some error handling.
pub fn from_commandline<T: GenericModule + 'static>(module_impl: T) -> Self {
pub fn new() -> Self {
let args: Args = argh::from_env();
let mut cpp_module = ffi::create_module(
let cpp_module = ffi::create_module(
&args.module,
&args.prefix.to_string_lossy(),
&args.conf.to_string_lossy(),
);
let manifest_json = cpp_module.as_mut().unwrap().initialize();
let manifest: schema::Manifest = manifest_json.deserialize();
let module = Self {

Self {
cpp_module,
module_impl: Box::pin(module_impl),
};
sub_impl: RwLock::new(None),
}
}

pub fn set_subscriber(&self, sub_impl: Weak<dyn Subscriber>) {
*self.sub_impl.write().unwrap() = Some(sub_impl);
let manifest_json = self.cpp_module.as_ref().unwrap().initialize();
let manifest: schema::Manifest = manifest_json.deserialize();

// Implement all commands for all of our implementations, dispatch everything to the
// GenericModule.
// Subscriber.
for (implementation_id, implementation) in manifest.provides {
let interface_s = module.cpp_module.get_interface(&implementation.interface);
let interface_s = self.cpp_module.get_interface(&implementation.interface);
let interface: schema::Interface = interface_s.deserialize();
for (name, _) in interface.cmds {
let meta = ffi::CommandMeta {
implementation_id: implementation_id.clone(),
name,
};
self.cpp_module
.as_ref()
.unwrap()
.provide_command(self, implementation_id.clone(), name);
}
}

module
.cpp_module
// Subscribe to all variables that might be of interest.
// TODO(hrapp): This looks very similar to the block above.
for (implementation_id, provides) in manifest.requires {
let interface_s = self.cpp_module.get_interface(&provides.interface);
let interface: schema::Interface = interface_s.deserialize();
for (name, _) in interface.vars {
self.cpp_module
.as_ref()
.unwrap()
.provide_command(&module, &meta);
.subscribe_variable(self, implementation_id.clone(), name);
}
}

// Since users can choose to overwrite `on_ready`, we can call signal_ready right away.
// TODO(sirver): There were some doubts if this strategy is too inflexible, discuss design
// TODO(hrapp): There were some doubts if this strategy is too inflexible, discuss design
// again.
module.cpp_module.as_ref().unwrap().signal_ready(&module);
module
(self.cpp_module).as_ref().unwrap().signal_ready(&self);
}
}
8 changes: 8 additions & 0 deletions everestrs/everestrs/src/schema/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::BTreeMap;
pub struct Manifest {
pub description: String,
pub provides: BTreeMap<String, ProvidesEntry>,
pub requires: BTreeMap<String, RequiresEntry>,
pub metadata: Metadata,
}

Expand All @@ -19,6 +20,13 @@ pub struct ProvidesEntry {
pub description: String,
}

#[derive(Debug, Deserialize)]
pub struct RequiresEntry {
pub interface: String,
pub max_connections: i32,
pub min_connections: i32,
}

#[derive(Debug, Deserialize)]
pub struct Metadata {
pub license: String,
Expand Down
30 changes: 26 additions & 4 deletions everestrs/everestrs_sys/everestrs_sys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ JsonBlob Module::get_interface(rust::Str interface_name) const {
return json2blob(interface_def);
}

JsonBlob Module::initialize() {
JsonBlob Module::initialize() const {
handle_->connect();
handle_->spawn_main_loop_thread();

Expand All @@ -59,13 +59,35 @@ void Module::signal_ready(const Runtime& rt) const {
handle_->signal_ready();
}

void Module::provide_command(const Runtime& rt, const CommandMeta& meta) const {
handle_->provide_cmd(std::string(meta.implementation_id), std::string(meta.name), [&rt, meta](json args) {
JsonBlob blob = rt.handle_command(meta, json2blob(args));
void Module::provide_command(const Runtime& rt, rust::String implementation_id, rust::String name) const {
handle_->provide_cmd(std::string(implementation_id), std::string(name), [&rt, implementation_id, name](json args) {
JsonBlob blob = rt.handle_command(implementation_id, name, json2blob(args));
return json::parse(blob.data.begin(), blob.data.end());
});
}

void Module::subscribe_variable(const Runtime& rt, rust::String implementation_id, rust::String name) const {
// TODO(hrapp): I am not sure how to model the multiple slots that could theoretically be here.
const Requirement req(std::string(implementation_id), 0);
handle_->subscribe_var(req, std::string(name), [&rt, implementation_id, name](json args) {
rt.handle_variable(implementation_id, name, json2blob(args));
});
}

JsonBlob Module::call_command(rust::Str implementation_id, rust::Str name, JsonBlob blob) const {
// TODO(hrapp): I am not sure how to model the multiple slots that could theoretically be here.
const Requirement req(std::string(implementation_id), 0);
json return_value = handle_->call_cmd(req, std::string(name), json::parse(blob.data.begin(), blob.data.end()));

return json2blob(return_value);
}

std::unique_ptr<Module> create_module(rust::Str module_id, rust::Str prefix, rust::Str conf) {
return std::make_unique<Module>(std::string(module_id), std::string(prefix), std::string(conf));
}

void Module::publish_variable(rust::Str implementation_id, rust::Str name,
JsonBlob blob) const {
handle_->publish_var(std::string(implementation_id), std::string(name),
json::parse(blob.data.begin(), blob.data.end()));
}
10 changes: 5 additions & 5 deletions everestrs/everestrs_sys/everestrs_sys.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,21 @@

#include "rust/cxx.h"

struct CommandMeta;
struct JsonBlob;
struct Runtime;

class Module {
public:
Module(const std::string& module_id, const std::string& prefix, const std::string& conf);

JsonBlob initialize();
JsonBlob initialize() const;
JsonBlob get_interface(rust::Str interface_name) const;

void signal_ready(const Runtime& rt) const;
void provide_command(const Runtime& rt, const CommandMeta& meta) const;

// TODO(hrapp): Add call_command, publish_variable and subscribe_variable.
void provide_command(const Runtime& rt, rust::String implementation_id, rust::String name) const;
JsonBlob call_command(rust::Str implementation_id, rust::Str name, JsonBlob args) const;
void subscribe_variable(const Runtime& rt, rust::String implementation_id, rust::String name) const;
void publish_variable(rust::Str implementation_id, rust::Str name, JsonBlob blob) const;

private:
const std::string module_id_;
Expand Down

0 comments on commit 9cf60a7

Please sign in to comment.