Skip to content

Commit

Permalink
Merge pull request #1 from Elvis339/resolution
Browse files Browse the repository at this point in the history
Resolution
  • Loading branch information
Elvis339 authored Oct 6, 2024
2 parents db2a871 + 6299354 commit ffe008b
Show file tree
Hide file tree
Showing 10 changed files with 439 additions and 338 deletions.
50 changes: 50 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
on: [push, pull_request]

name: CI

jobs:
check:
name: Check
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable

- name: Run cargo check
run: cargo check

test:
name: Test Suite
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable

- name: Run cargo test
run: cargo test

lints:
name: Lints
runs-on: ubuntu-latest
steps:
- name: Checkout sources
uses: actions/checkout@v3

- name: Install stable toolchain
uses: dtolnay/rust-toolchain@master
with:
toolchain: stable
components: rustfmt, clippy

- name: Run cargo fmt
run: cargo fmt --all -- --check
23 changes: 8 additions & 15 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 8 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
[package]
name = "zmq_rchan"
name = "threadsafe_zmq"
version = "1.0.0"
edition = "2021"
authors = ["Elvis Sabanovic <elvissabanovic3@gmail.com>"]
description = "Threadsafe zeromq"
readme = "README.md"
keywords = ["threadsafe", "zeromq", "sockets", "unix-domain-sockets", "ipc"]
categories = ["concurrency"]
repository = "https://github.com/Elvis339/threadsafe_zmq"

[dependencies]
zmq = "0.10.0"
crossbeam-channel = "0.5.13"
anyhow = "1.0.89"
crossbeam-channel = "0.5.13"
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ This implementation is based on Golang's [zmqchan](https://github.com/abligh/zmq

This is just a tweaked implementation in Rust

# Message Flow

```
+-------------------+
| Client |
Expand Down
15 changes: 15 additions & 0 deletions example/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "threadsafe_zmq_example"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "server"
path = "src/server.rs"

[[bin]]
name = "client"
path = "src/client.rs"

[dependencies]
threadsafe_zmq = { path = "../src", version = "1.0.0" }
6 changes: 6 additions & 0 deletions example/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Example
In one terminal window run:
`RUST_LOG=trace cargo run --bin server`

In another run
`RUST_LOG=info cargo run --bin client`
67 changes: 67 additions & 0 deletions example/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use env_logger;
use log::{error, info};
use rand::Rng;
use zmq::Context;

fn main() {
env_logger::init();

let clients = 4;
let mut handles = Vec::with_capacity(clients);

for i in 0..clients {
let client_id = i;
let handle = std::thread::spawn(move || {
let addr = "tcp://localhost:5555";
let ctx = Context::new();
let socket = ctx
.socket(zmq::DEALER)
.expect("Failed to create PAIR socket");

let rand_id = client_id as u8 + generate_random_number();
let id = format!("client-{}", rand_id);
socket
.set_identity(id.clone().as_bytes())
.expect("Failed to set identity");
socket.connect(addr).expect("Failed to connect to server");

info!("{} connected to: {}", id, addr);
loop {
let rand_num = generate_random_number();
let rand_num_bytes = rand_num.to_le_bytes().to_vec();

match socket.send_multipart(vec![rand_num_bytes], 0) {
Ok(_) => info!("{}, sent number: {}", id, rand_num),
Err(snd_err) => {
error!("{}, failed to send message: {:?}", id, snd_err);
continue;
}
}

match socket.recv_multipart(0) {
Ok(message) => {
info!("Client {}, received result: {:?}", client_id, message);
}
Err(rcv_err) => {
error!(
"Client {}, failed to receive message: {:?}",
client_id, rcv_err
);
}
}

std::thread::sleep(std::time::Duration::from_millis(100));
}
});
handles.push(handle);
}

loop {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}

fn generate_random_number() -> u8 {
let mut rng = rand::thread_rng();
rng.gen_range(0..=30)
}
84 changes: 84 additions & 0 deletions example/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use env_logger;
use log::{debug, error, info};
use threadsafe_zmq::{ChannelPair, Sender, ZmqByteStream};
use zmq::Context;

fn main() {
env_logger::init();

let addr = "tcp://*:5555";
let ctx = Context::new();
let socket = ctx
.socket(zmq::ROUTER)
.expect("Failed to create ROUTER socket");
socket.bind(addr).expect("Failed to bind to address");

let channel_pair = ChannelPair::new(socket).expect("Failed to create channel pair");
info!("Server listening on {}", addr);

loop {
debug!("Waiting to receive messages...");

match channel_pair.rx_chan().recv() {
Ok(message) => {
if message.len() < 2 {
error!("Received malformed message: {:?}", message);
continue;
}

println!("Received message: {:?}", message);
let cp = channel_pair.clone();
std::thread::spawn(move || {
calculate_fib(message, cp.tx_chan());
});
}
Err(rcv_err) => {
error!("Failed to receive message: {:?}", rcv_err);
}
}
}
}

fn calculate_fib(messages: ZmqByteStream, sender: &Sender) {
// The first part is the identity, and the second part is the actual message
let identity = messages[0].clone();
let payload = messages[1].clone();

let id_str = String::from_utf8_lossy(&identity);

if payload.is_empty() {
error!("Received an empty payload, skipping Fibonacci calculation.");
return;
}

info!("Received message from: {:?}", id_str);

// Deserialize the message into u32
// let number = match payload.as_slice().try_into() {
// Ok(bytes) => u32::from_le_bytes(bytes),
// Err(_) => {
// error!("Failed to deserialize payload, skipping.");
// return;
// }
// };
let number = 13;

info!("Calculating Fibonacci for number: {}", number);
let result = fibonacci_recursive(number);
let result_bytes = result.to_le_bytes().to_vec();

// The response must include the identity frame, followed by the result
let response = vec![identity.clone(), result_bytes];
match sender.send(response) {
Ok(_) => info!("Successfully sent response: {:?} to: {:?}", result, id_str),
Err(err) => error!("Failed to send response: {:?}", err),
}
}

fn fibonacci_recursive(n: u32) -> u32 {
if n <= 1 {
n
} else {
fibonacci_recursive(n - 1) + fibonacci_recursive(n - 2)
}
}
29 changes: 29 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::fmt;
use zmq;

#[derive(Debug)]
pub enum ChannelPairError {
Zmq(zmq::Error),
ChannelError(String),
ConfigurationError(String),
Other(String),
}

impl From<zmq::Error> for ChannelPairError {
fn from(error: zmq::Error) -> Self {
ChannelPairError::Zmq(error)
}
}

impl fmt::Display for ChannelPairError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ChannelPairError::Zmq(e) => write!(f, "ZeroMQ Error: {}", e),
ChannelPairError::ChannelError(msg) => write!(f, "Channel Error: {}", msg),
ChannelPairError::ConfigurationError(msg) => write!(f, "Configuration Error: {}", msg),
ChannelPairError::Other(msg) => write!(f, "Other Error: {}", msg),
}
}
}

impl std::error::Error for ChannelPairError {}
Loading

0 comments on commit ffe008b

Please sign in to comment.