Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#353_#355-redis-ada #49

Merged
merged 7 commits into from
Sep 10, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
#359_#360_#361_#366_#376_#380
  • Loading branch information
vv-ab committed Sep 8, 2024
commit eb4a168a81aa305a8bce2983e0c063947b2a1445
4 changes: 4 additions & 0 deletions blickbox/ada/Cargo.toml
Original file line number Diff line number Diff line change
@@ -21,9 +21,13 @@ futures-util = "0.3.30"
tempfile = "3.12.0"
rand = "0.9.0-alpha.2"
log = "0.4.21"
redis = { version = "0.26.1", features = ["tokio-comp", "aio"] }
time = "0.3.36"
async-trait = "=0.1.82"

[dev-dependencies]
mockito = "1.5.0"
mockall = "0.13.0"

[features]
mock = []
28 changes: 0 additions & 28 deletions blickbox/ada/src/communication/http_request.rs
Original file line number Diff line number Diff line change
@@ -41,34 +41,6 @@ pub mod http_request {
battery_voltage: f32,
}

pub async fn send_last_online(url: &str) -> crate::Result<()> {

// Create a reqwest HTTP client
let client = Client::new();

// Send the sensor data as JSON in the body of a POST request
let response = match client
.post(url)
.header("blickbox", "true")
.send()
.await {
Ok(response) => response,
Err(error) => {
return Err(format!("{}", error))
}
};

match response.status().is_success() {
true => {
println!("Last Online Status sent successfully!");
}
false => {
Err(format!("Request failed: {:?}", response.status()))?;
}
}
Ok(())
}

pub async fn send_data(url: &str, sensor_data: &SensorData) -> crate::Result<()> {

let base_url = url;
52 changes: 8 additions & 44 deletions blickbox/ada/src/communication/logging.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
pub mod logging {
use std::collections::VecDeque;

use reqwest;
use reqwest::Client;
use serde::Serialize;

use crate::get_time;

#[derive(Serialize, Debug)]
#[derive(Serialize, Debug, Clone)]
pub struct LogEntry {
pub title: String,
pub message: String,
@@ -16,51 +11,20 @@ pub mod logging {
pub timestamp: String,
}

pub async fn send_logs(url: &str, ringbuffer: &mut VecDeque<LogEntry>) -> crate::Result<()> {

println!("received url: {}", url);
// Create a reqwest HTTP client
let client = Client::new();

for log in ringbuffer {

let json = serde_json::to_string(&log).unwrap();
println!("JSON that will be sent: {}", json);

// Send the logs as JSON in the body of a POST request
let response = match client.post(url)
.header("Content-Type", "application/json")
.body(json)
.send()
.await {
Ok(response) => response,
Err(error) => {
return Err(format!("{}", error))
}
};
println!("Response: {:?}", response);

match response.status().is_success() {
true => {
println!("Logs sent successfully!");
}
false => {
Err(format!("Request failed: {:?}", response.status()))?;
}
}
}
Ok(())
#[derive(Clone, Debug, Eq, Hash, PartialEq)]
pub enum LogChannel {
Ada,
Sara,
}

pub fn log(title: String, message: String, log_type: String, ringbuffer: &mut VecDeque<LogEntry>) {
pub fn log(title: String, message: String, log_type: String) -> LogEntry {
let log_entry = LogEntry {
title,
message,
log_type,
timestamp: get_time(),
};
println!("Log: {:?}", log_entry);
ringbuffer.push_back(log_entry);
log_entry
}

}
}
1 change: 1 addition & 0 deletions blickbox/ada/src/communication/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod http_request;
pub mod logging;
pub mod redis;
94 changes: 94 additions & 0 deletions blickbox/ada/src/communication/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use std::collections::{VecDeque, HashMap};
use futures_util::StreamExt;
use redis::AsyncCommands;
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::communication::logging::logging::{LogChannel, LogEntry};

#[derive(Clone)]
pub struct RedisHandler {
client: redis::Client,
publish_conn: Arc<Mutex<redis::aio::MultiplexedConnection>>,
pubsub_conn: Arc<Mutex<redis::aio::PubSub>>,
logs: Arc<Mutex<HashMap<LogChannel, VecDeque<LogEntry>>>>,
}

impl RedisHandler {
pub async fn new(redis_url: &str) -> Result<Self, redis::RedisError> {
let client = redis::Client::open(redis_url)?;
let publish_conn = client.get_multiplexed_async_connection().await?;
let pubsub_conn = client.get_async_pubsub().await?;

Ok(Self {
client,
publish_conn: Arc::new(Mutex::new(publish_conn)),
pubsub_conn: Arc::new(Mutex::new(pubsub_conn)),
logs: Arc::new(Mutex::new(HashMap::new())),
})
}

pub async fn log_to_channel(&self, channel: LogChannel, log_entry: LogEntry) {
let mut logs = self.logs.lock().await;
logs.entry(channel)
.or_insert_with(VecDeque::new)
.push_back(log_entry);
}

pub async fn publish_all(&self) -> Result<(), redis::RedisError> {
let mut conn = self.publish_conn.lock().await;
let mut buffers = self.logs.lock().await;

for (channel, buffer) in buffers.iter_mut() {
let channel_str = match channel {
LogChannel::Ada => "ada-logs",
LogChannel::Sara => "sara-logs",
};

while let Some(log) = buffer.pop_front() {
let json = serde_json::to_string(&log).unwrap();
println!("Publishing to {}: {}", channel_str, json);
conn.publish(channel_str, json).await?;
}
}

Ok(())
}

pub async fn subscribe(&self, channel: &str) -> Result<(), redis::RedisError> {
let mut conn = self.pubsub_conn.lock().await;
conn.subscribe(channel).await?;
println!("subscribing to {}", channel);
Ok(())
}

pub async fn listen(&self) -> Result<(), redis::RedisError> {
let mut conn = self.pubsub_conn.lock().await;
let mut pubsub_stream = conn.on_message();

while let Some(msg) = pubsub_stream.next().await {
let payload: String = msg.get_payload()?;
let channel: String = String::from(msg.get_channel_name());

println!("Received message on channel {}: {}", channel, payload);

if payload == "Restart" {
println!("Received restart command on channel {}", channel);
// Restart
}
}

Ok(())
}
}

pub async fn initialize_redis(redis_url: &str) -> Result<RedisHandler, redis::RedisError> {
let handler = RedisHandler::new(redis_url).await?;

// ADA subscription
handler.subscribe("ada").await?;

// SARA subscription
handler.subscribe("sara").await?;

Ok(handler)
}
Loading