Skip to content

Monad/MonadIO, Handler, Coroutine/doNotation, Functional Programming features for Rust

License

Notifications You must be signed in to change notification settings

TeaEntityLab/fpRust

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

fpRust

tag Crates.io Travis CI Build Status docs

license stars forks

Monad, Functional Programming features for Rust

Why

I love functional programming, Rx-style coding.

However it's hard to implement them in Rust, and there're few libraries to achieve parts of them.

Thus I implemented fpRust. I hope you would like it :)

Features

  • MonadIO, Rx-like (fp_rust::monadio::MonadIO)

    • map/fmap/subscribe
    • async/sync
    • Support Future (to_future()) with *feature: for_futures
  • Publisher (fp_rust::publisher::Publisher)

    • Support Stream implementation(subscribe_as_stream()) with *feature: for_futures
  • Fp functions (fp_rust::fp)

    • compose!(), pipe!()
    • map!(), reduce!(), filter!(), foldl!(), foldr!()
    • contains!(), reverse!()
  • Async (fp_rust::sync & fp_rust::handler::HandlerThread)

    • simple BlockingQueue (inspired by Java BlockingQueue, implemented by built-in std::sync::mpsc::channel)
    • HandlerThread (inspired by Android Handler, implemented by built-in std::thread)
    • WillAsync (inspired by Java Future)
      • Support as a Future with *feature: for_futures
    • CountDownLatch (inspired by Java CountDownLatch, implemented by built-in std::sync::Mutex)
      • Support as a Future with *feature: for_futures
  • Cor (fp_rust::cor::Cor)

    • PythonicGenerator-like Coroutine
    • yield/yieldFrom
    • async/sync
  • Actor (fp_rust::actor::ActorAsync)

    • Pure simple Actor model(receive/send/spawn)
    • Context for keeping internal states
    • Able to communicate with Parent/Children Actors
  • DoNotation (fp_rust::cor::Cor)

    • Haskell DoNotation-like, macro

* Pattern matching

Usage

MonadIO (RxObserver-like)

Example:

extern crate fp_rust;

use std::{
    thread,
    time,
    sync::{
        Arc,
        Mutex,
        Condvar,
    }
};

use fp_rust::handler::{
    Handler,
    HandlerThread,
};
use fp_rust::common::SubscriptionFunc;
use fp_rust::monadio::{
    MonadIO,
    of,
};
use fp_rust::sync::CountDownLatch;

// fmap & map (sync)
let mut _subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<u16>| {
    println!("monadio_sync {:?}", x); // monadio_sync 36
    assert_eq!(36, *Arc::make_mut(&mut x.clone()));
}));
let subscription = _subscription.clone();
let monadio_sync = MonadIO::just(1)
    .fmap(|x| MonadIO::new(move || x * 4))
    .map(|x| x * 3)
    .map(|x| x * 3);
monadio_sync.subscribe(subscription);

// fmap & map (async)
let mut _handler_observe_on = HandlerThread::new_with_mutex();
let mut _handler_subscribe_on = HandlerThread::new_with_mutex();
let monadio_async = MonadIO::new_with_handlers(
    || {
        println!("In string");
        String::from("ok")
    },
    Some(_handler_observe_on.clone()),
    Some(_handler_subscribe_on.clone()),
);

let latch = CountDownLatch::new(1);
let latch2 = latch.clone();

thread::sleep(time::Duration::from_millis(1));

let subscription = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
    println!("monadio_async {:?}", x); // monadio_async ok

    latch2.countdown(); // Unlock here
}));
monadio_async.subscribe(subscription);
monadio_async.subscribe(Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
    println!("monadio_async sub2 {:?}", x); // monadio_async sub2 ok
})));
{
    let mut handler_observe_on = _handler_observe_on.lock().unwrap();
    let mut handler_subscribe_on = _handler_subscribe_on.lock().unwrap();

    println!("hh2");
    handler_observe_on.start();
    handler_subscribe_on.start();
    println!("hh2 running");

    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
    handler_observe_on.post(RawFunc::new(move || {}));
}
thread::sleep(time::Duration::from_millis(1));

// Waiting for being unlcoked
latch.clone().wait();

Publisher (PubSub-like)

Example:

extern crate fp_rust;

use fp_rust::common::{SubscriptionFunc, RawFunc};
use fp_rust::handler::{Handler, HandlerThread};
use fp_rust::publisher::Publisher;
use std::sync::Arc;

use fp_rust::sync::CountDownLatch;

let mut pub1 = Publisher::new();
pub1.subscribe_fn(|x: Arc<u16>| {
    println!("pub1 {:?}", x);
    assert_eq!(9, *Arc::make_mut(&mut x.clone()));
});
pub1.publish(9);

let mut _h = HandlerThread::new_with_mutex();

let mut pub2 = Publisher::new_with_handlers(Some(_h.clone()));

let latch = CountDownLatch::new(1);
let latch2 = latch.clone();

let s = Arc::new(SubscriptionFunc::new(move |x: Arc<String>| {
    println!("pub2-s1 I got {:?}", x);

    latch2.countdown();
}));
pub2.subscribe(s.clone());
pub2.map(move |x: Arc<String>| {
    println!("pub2-s2 I got {:?}", x);
});

{
    let h = &mut _h.lock().unwrap();

    println!("hh2");
    h.start();
    println!("hh2 running");

    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
    h.post(RawFunc::new(move || {}));
}

pub2.publish(String::from("OKOK"));
pub2.publish(String::from("OKOK2"));

pub2.unsubscribe(s.clone());

pub2.publish(String::from("OKOK3"));

latch.clone().wait();

Cor (PythonicGenerator-like)

Example:

#[macro_use]
extern crate fp_rust;

use std::time;
use std::thread;

use fp_rust::cor::Cor;

println!("test_cor_new");

let _cor1 = cor_newmutex!(
    |this| {
        println!("cor1 started");

        let s = cor_yield!(this, Some(String::from("given_to_outside")));
        println!("cor1 {:?}", s);
    },
    String,
    i16
);
let cor1 = _cor1.clone();

let _cor2 = cor_newmutex!(
    move |this| {
        println!("cor2 started");

        println!("cor2 yield_from before");

        let s = cor_yield_from!(this, cor1, Some(3));
        println!("cor2 {:?}", s);
    },
    i16,
    i16
);

{
    let cor1 = _cor1.clone();
    cor1.lock().unwrap().set_async(true); // NOTE Cor default async
                                          // NOTE cor1 should keep async to avoid deadlock waiting.(waiting for each other)
}
{
    let cor2 = _cor2.clone();
    cor2.lock().unwrap().set_async(false);
    // NOTE cor2 is the entry point, so it could be sync without any deadlock.
}
cor_start!(_cor1);
cor_start!(_cor2);

thread::sleep(time::Duration::from_millis(1));

Do Notation (Haskell DoNotation-like)

Example:

#[macro_use]
extern crate fp_rust;

use std::time;
use std::thread;

use fp_rust::cor::Cor;


let v = Arc::new(Mutex::new(String::from("")));

let _v = v.clone();
do_m!(move |this| {
    println!("test_cor_do_m started");

    let cor_inner1 = cor_newmutex_and_start!(
        |this| {
            let s = cor_yield!(this, Some(String::from("1")));
            println!("cor_inner1 {:?}", s);
        },
        String,
        i16
    );
    let cor_inner2 = cor_newmutex_and_start!(
        |this| {
            let s = cor_yield!(this, Some(String::from("2")));
            println!("cor_inner2 {:?}", s);
        },
        String,
        i16
    );
    let cor_inner3 = cor_newmutex_and_start!(
        |this| {
            let s = cor_yield!(this, Some(String::from("3")));
            println!("cor_inner3 {:?}", s);
        },
        String,
        i16
    );

    {
        (*_v.lock().unwrap()) = [
            cor_yield_from!(this, cor_inner1, Some(1)).unwrap(),
            cor_yield_from!(this, cor_inner2, Some(2)).unwrap(),
            cor_yield_from!(this, cor_inner3, Some(3)).unwrap(),
        ].join("");
    }
});

let _v = v.clone();

{
    assert_eq!("123", *_v.lock().unwrap());
}

Fp Functions (Compose, Pipe, Map, Reduce, Filter)

Example:

#[macro_use]
extern crate fp_rust

use fp_rust::fp::{
  compose_two,
  map, reduce, filter,
};

let add = |x| x + 2;
let multiply = |x| x * 3;
let divide = |x| x / 2;

let result = (compose!(add, multiply, divide))(10);
assert_eq!(17, result);
println!("Composed FnOnce Result is {}", result);

let result = (pipe!(add, multiply, divide))(10);
assert_eq!(18, result);
println!("Piped FnOnce Result is {}", result);

let result = (compose!(reduce!(|a, b| a * b), filter!(|x| *x < 6), map!(|x| x * 2)))(vec![1, 2, 3, 4]);
assert_eq!(Some(8), result);
println!("test_map_reduce_filter Result is {:?}", result);

Actor

Actor common(send/receive/spawn/states)

Example:

use std::time::Duration;

use fp_rust::common::LinkedListAsync;

#[derive(Clone, Debug)]
enum Value {
    // Str(String),
    Int(i32),
    VecStr(Vec<String>),
    Spawn,
    Shutdown,
}

let result_i32 = LinkedListAsync::<i32>::new();
let result_i32_thread = result_i32.clone();
let result_string = LinkedListAsync::<Vec<String>>::new();
let result_string_thread = result_string.clone();
let mut root = ActorAsync::new(
    move |this: &mut ActorAsync<_, _>, msg: Value, context: &mut HashMap<String, Value>| {
        match msg {
            Value::Spawn => {
                println!("Actor Spawn");
                let result_i32_thread = result_i32_thread.clone();
                let spawned = this.spawn_with_handle(Box::new(
                    move |this: &mut ActorAsync<_, _>, msg: Value, _| {
                        match msg {
                            Value::Int(v) => {
                                println!("Actor Child Int");
                                result_i32_thread.push_back(v * 10);
                            }
                            Value::Shutdown => {
                                println!("Actor Child Shutdown");
                                this.stop();
                            }
                            _ => {}
                        };
                    },
                ));
                let list = context.get("children_ids").cloned();
                let mut list = match list {
                    Some(Value::VecStr(list)) => list,
                    _ => Vec::new(),
                };
                list.push(spawned.get_id());
                context.insert("children_ids".into(), Value::VecStr(list));
            }
            Value::Shutdown => {
                println!("Actor Shutdown");
                if let Some(Value::VecStr(ids)) = context.get("children_ids") {
                    result_string_thread.push_back(ids.clone());
                }

                this.for_each_child(move |id, handle| {
                    println!("Actor Shutdown id {:?}", id);
                    handle.send(Value::Shutdown);
                });
                this.stop();
            }
            Value::Int(v) => {
                println!("Actor Int");
                if let Some(Value::VecStr(ids)) = context.get("children_ids") {
                    for id in ids {
                        println!("Actor Int id {:?}", id);
                        if let Some(mut handle) = this.get_handle_child(id) {
                            handle.send(Value::Int(v));
                        }
                    }
                }
            }
            _ => {}
        }
    },
);

let mut root_handle = root.get_handle();
root.start();

// One child
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(10));
// Two children
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(20));
// Three children
root_handle.send(Value::Spawn);
root_handle.send(Value::Int(30));

// Send Shutdown
root_handle.send(Value::Shutdown);

thread::sleep(Duration::from_millis(1));
// 3 children Actors
assert_eq!(3, result_string.pop_front().unwrap().len());

let mut v = Vec::<Option<i32>>::new();
for _ in 1..7 {
    let i = result_i32.pop_front();
    println!("Actor {:?}", i);
    v.push(i);
}
v.sort();
assert_eq!(
    [
        Some(100),
        Some(200),
        Some(200),
        Some(300),
        Some(300),
        Some(300)
    ],
    v.as_slice()
)

Actor Ask (inspired by Akka/Erlang)

Example:

use std::time::Duration;

use fp_rust::common::LinkedListAsync;

#[derive(Clone, Debug)]
enum Value {
    AskIntByLinkedListAsync((i32, LinkedListAsync<i32>)),
    AskIntByBlockingQueue((i32, BlockingQueue<i32>)),
}

let mut root = ActorAsync::new(
    move |_: &mut ActorAsync<_, _>, msg: Value, _: &mut HashMap<String, Value>| match msg {
        Value::AskIntByLinkedListAsync(v) => {
            println!("Actor AskIntByLinkedListAsync");
            v.1.push_back(v.0 * 10);
        }
        Value::AskIntByBlockingQueue(mut v) => {
            println!("Actor AskIntByBlockingQueue");

            // NOTE If negative, hanging for testing timeout
            if v.0 < 0 {
                return;
            }

            // NOTE General Cases
            v.1.offer(v.0 * 10);
        } // _ => {}
    },
);

let mut root_handle = root.get_handle();
root.start();

// LinkedListAsync<i32>
let result_i32 = LinkedListAsync::<i32>::new();
root_handle.send(Value::AskIntByLinkedListAsync((1, result_i32.clone())));
root_handle.send(Value::AskIntByLinkedListAsync((2, result_i32.clone())));
root_handle.send(Value::AskIntByLinkedListAsync((3, result_i32.clone())));
thread::sleep(Duration::from_millis(1));
let i = result_i32.pop_front();
assert_eq!(Some(10), i);
let i = result_i32.pop_front();
assert_eq!(Some(20), i);
let i = result_i32.pop_front();
assert_eq!(Some(30), i);

// BlockingQueue<i32>
let mut result_i32 = BlockingQueue::<i32>::new();
result_i32.timeout = Some(Duration::from_millis(1));
root_handle.send(Value::AskIntByBlockingQueue((4, result_i32.clone())));
root_handle.send(Value::AskIntByBlockingQueue((5, result_i32.clone())));
root_handle.send(Value::AskIntByBlockingQueue((6, result_i32.clone())));
thread::sleep(Duration::from_millis(1));
let i = result_i32.take();
assert_eq!(Some(40), i);
let i = result_i32.take();
assert_eq!(Some(50), i);
let i = result_i32.take();
assert_eq!(Some(60), i);

// Timeout case:
root_handle.send(Value::AskIntByBlockingQueue((-1, result_i32.clone())));
let i = result_i32.take();
assert_eq!(None, i);