From e52ff1462bc1b802d0257ce89e4196e23aff7e25 Mon Sep 17 00:00:00 2001 From: kingdoctor123 Date: Sun, 17 Mar 2024 00:33:37 +0900 Subject: [PATCH] Publish hw4, update hw1 --- homework/Cargo.toml | 1 + homework/doc/boc.md | 50 +++ homework/doc/hello_server.md | 5 +- homework/scripts/grade-boc.sh | 75 +++++ homework/scripts/grade-hello_server.sh | 32 +- homework/src/boc.rs | 380 +++++++++++++++++++++ homework/src/lib.rs | 2 + homework/tests/boc.rs | 438 +++++++++++++++++++++++++ 8 files changed, 975 insertions(+), 8 deletions(-) create mode 100644 homework/doc/boc.md create mode 100755 homework/scripts/grade-boc.sh create mode 100644 homework/src/boc.rs create mode 100644 homework/tests/boc.rs diff --git a/homework/Cargo.toml b/homework/Cargo.toml index 4338827e66..4fa0e80998 100644 --- a/homework/Cargo.toml +++ b/homework/Cargo.toml @@ -17,6 +17,7 @@ check-loom = ["loom"] cfg-if = "1.0.0" crossbeam-channel = "0.5.10" crossbeam-epoch = "0.9.17" +rayon = "1.9.0" ctrlc = { version = "3.4.2", optional = true } cs431 = { git = "https://github.com/kaist-cp/cs431" } # cs431 = { path = "../cs431" } diff --git a/homework/doc/boc.md b/homework/doc/boc.md new file mode 100644 index 0000000000..78cfde7ca3 --- /dev/null +++ b/homework/doc/boc.md @@ -0,0 +1,50 @@ +# Behaviour-Oriented Concurrency (BoC) +**Implement a runtime for Behaviour-Oriented Concurrency** + +> *The Behaviour-Oriented Concurrency paradigm: a concurrency paradigm +> that achieves flexible coordination over multiple resources, and ordered execution, and scalability.* (from §1 of the BoC [paper](https://doi.org/10.1145/3622852)) + +First, read [the original BoC paper](https://doi.org/10.1145/3622852) and understand its algorithm. +In particular, you should understand the key concepts (e.g., cown, behaviour, when, and thunk), and fully understand `Fig.3` and `§4.3` which contain the details of the implementation. + +Fill in the `todo!()`s in `src/boc.rs`. +The total lines of code to be written is about 70. +Your implementation should satisfy the following criterias: +* when clauses should be scheduled in the correct order of the *dependency graph* (§4.1). +* Your implementation of the BoC runtime should ensure *deadlock freedom*. + We will test the deadlock freedom by several stress tests with timeouts. +* Whenever you want to spawn a new thread, **don't use** [`std::thread::spawn`](https://doc.rust-lang.org/std/thread/fn.spawn.html). + Instead, use [`rayon::spawn`](https://docs.rs/rayon/latest/rayon/fn.spawn.html). + +We provide several ways of using the when clause in Rust, illustrated below. + +1. Using the `when!` macro. Below is a representative example describing its use: + + ```rust + when!(c1, c2; g1, g2; { + ... // thunk + }); + ``` + This results in a when clause that schedules a new behavior for two `CownPtr`s `c1` and `c2`. + `g1` and `g2` are mutable references to the shared resources protected by given `CownPtr`s + and can be used in the thunk. +2. Using the `run_when` function directly. Use this if you want to create a new behavior with an arbitrary number of `CownPtr`s. + For example, + + ```rust + run_when(vec![c1.clone(), c2.clone(), c3.clone()], move |mut acc| { + ... // thunk + }); + ``` + The first argument is a `Vec` of cowns with the same type. + `acc` is a vector of mutable references to the shared resources protected by the cowns, + and it is guaranteed that `acc` has the same length as the specified given `Vec` of `CownPtr`s. + +More examples can be found in `src/boc.rs` and `test/boc.rs`. + +## Grading (100 points) +Run `./scripts/grade-boc.sh`. +Basic tests account for 60 points and stress tests account for 40 points. + +## Submission +Submit `boc.rs` to gg. diff --git a/homework/doc/hello_server.md b/homework/doc/hello_server.md index 5b235bdba9..2d9b721dcf 100644 --- a/homework/doc/hello_server.md +++ b/homework/doc/hello_server.md @@ -17,9 +17,12 @@ The grader runs `./scripts/grade-hello_server.sh` in the `homework` directory. This script runs the tests with various options. -There will be no partial scores for each module. +There will be no partial scores for `tcp` and `thread_pool` modules. That is, you will get the score for a module only if your implementation passes **all** tests for that module. +On the other hand, we will give partial scores for `cache` module. +In particular, even if your implementation of `cache` blocks concurrent accesses to different keys, you can still get some points for basic functionalities. + ## Submission ```bash cd cs431/homework diff --git a/homework/scripts/grade-boc.sh b/homework/scripts/grade-boc.sh new file mode 100755 index 0000000000..913fb8177c --- /dev/null +++ b/homework/scripts/grade-boc.sh @@ -0,0 +1,75 @@ +#!/usr/bin/env bash +# set -e +set -uo pipefail +IFS=$'\n\t' + +# Imports library. +BASEDIR=$(dirname "$0") +source $BASEDIR/grade-utils.sh + +run_linters || exit 1 + +lines=$(grep_skip_comment "thread::spawn" "$BASEDIR/../src/boc.rs") +if [ -n "$lines" ]; then + echo "thread::spawn(...) is not allowed." + echo "$lines" + exit 1 +fi + +RUNNERS=( + "cargo" + "cargo --release" + "cargo_asan" + "cargo_asan --release" +) +TIMEOUT=180s +SCORE=0 + +echo "1. Basic tests" + +TESTS=( + "--doc boc" + "--test boc -- --exact basic_test::message_passing_test" + "--test boc -- --exact basic_test::message_passing_determines_order" + "--test boc -- --exact basic_test::merge_sort_basic_test" + "--test boc -- --exact basic_test::fibonacci_basic_test" + "--test boc -- --exact basic_test::banking_basic_test" +) + +basic_test_failed=false + +for RUNNER in "${RUNNERS[@]}"; do + echo "Running with $RUNNER, timeout $TIMEOUT..." + if [ $(run_tests) -ne 0 ]; then + basic_test_failed=true + break + fi +done + +if [ "$basic_test_failed" = false ]; then + SCORE=$((SCORE + 60)) +fi + +echo "2. Stress tests" + +TESTS=( + "--test boc -- --exact stress_test::merge_sort_stress_test" + "--test boc -- --exact stress_test::fibonacci_stress_test" + "--test boc -- --exact stress_test::banking_stress_test" +) + +stress_test_failed=false + +for RUNNER in "${RUNNERS[@]}"; do + echo "Running with $RUNNER, timeout $TIMEOUT..." + if [ $(run_tests) -ne 0 ]; then + stress_test_failed=true + break + fi +done + +if [ "$stress_test_failed" = false ]; then + SCORE=$((SCORE + 40)) +fi + +echo "Score: $SCORE / 100" diff --git a/homework/scripts/grade-hello_server.sh b/homework/scripts/grade-hello_server.sh index f13d01a981..e8e164c8da 100755 --- a/homework/scripts/grade-hello_server.sh +++ b/homework/scripts/grade-hello_server.sh @@ -19,7 +19,8 @@ RUNNERS=( ) -t1_failed=false +t1_basic_failed=false +t1_nonblocking_failed=false t2_failed=false t3_failed=false @@ -27,11 +28,25 @@ t3_failed=false for RUNNER in "${RUNNERS[@]}"; do echo "Running with $RUNNER..." - if [ "$t1_failed" = false ]; then - echo " Testing cache.rs..." - TESTS=("--test cache") + if [ "$t1_basic_failed" = false ]; then + echo " Testing basic functionalities of cache.rs..." + TESTS=( + "--test cache -- --exact cache_no_duplicate_sequential" + "--test cache -- --exact cache_no_duplicate_concurrent" + ) if [ $(run_tests) -ne 0 ]; then - t1_failed=true + t1_basic_failed=true + fi + fi + + if [ "$t1_nonblocking_failed" = false ]; then + echo " Testing nonblockingness of cache.rs..." + TESTS=( + "--test cache -- --exact cache_no_block_disjoint" + "--test cache -- --exact cache_no_reader_block" + ) + if [ $(run_tests) -ne 0 ]; then + t1_nonblocking_failed=true fi fi @@ -55,8 +70,11 @@ done SCORE=0 # Scores for cache.rs -if [ "$t1_failed" = false ]; then - SCORE=$((SCORE + 40)) +if [ "$t1_basic_failed" = false ]; then + SCORE=$((SCORE + 15)) +fi +if [ "$t1_nonblocking_failed" = false ]; then + SCORE=$((SCORE + 25)) fi # Scores for tcp.rs diff --git a/homework/src/boc.rs b/homework/src/boc.rs new file mode 100644 index 0000000000..080d244355 --- /dev/null +++ b/homework/src/boc.rs @@ -0,0 +1,380 @@ +//! Concurrent Owner (Cown) type. + +use core::cell::UnsafeCell; +use core::fmt; +use core::hint::spin_loop; +use core::ptr::{self, addr_eq, null_mut}; +use core::sync::atomic::{AtomicBool, AtomicPtr, AtomicUsize, Ordering::SeqCst}; +use crossbeam_channel::bounded; +use rayon; +use std::sync::Arc; + +/// A trait representing a `Cown`. +/// +/// Instead of directly using a `Cown`, which fixes _a single_ `T` we use a trait object to allow +/// for multiple requests with different `T`. to be used for the same cown. +/// +/// # Safety +/// +/// `tail` should actually return the last request for the corresponding cown. +trait CownBase: Send { + /// Return a pointer to the tail of this cown's request queue. + fn last(&self) -> &AtomicPtr; +} + +/// A request for a cown. +pub struct Request { + /// Pointer to the next scheduled behavior. + next: AtomicPtr, + /// Is this request scheduled? + scheduled: AtomicBool, + /// The cown that this request wants to access. + /// + /// This is an `Arc` as the all exposed `CownPtr`s may have been dropped while the behavior is + /// still scheduled. + target: Arc, +} + +// SAFETY: In the basic version of BoC, user cannot get shared reference through the [`CownBase`], +// so `Sync` bound on it is not necessary. +unsafe impl Send for Request {} + +impl Request { + /// Creates a new Request. + fn new(target: Arc) -> Request { + Request { + next: AtomicPtr::new(null_mut()), + scheduled: AtomicBool::new(false), + target, + } + } + + /// Start the first phase of the 2PL enqueue operation. + /// + /// Enqueues `self` onto the `target` cown. Returns once all previous behaviors on this cown has + /// finished enqueueing on all of its required cowns. This ensures the 2PL protocol. + /// + /// # SAFETY + /// + /// `behavior` must be a valid raw pointer to the behavior for `self`, and this should be the + /// only enqueueing of this request and behavior. + unsafe fn start_enqueue(&self, behavior: *const Behavior) { + todo!() + } + + /// Finish the second phase of the 2PL enqueue operation. + /// + /// Sets the scheduled flag so that subsequent behaviors can continue the 2PL enqueue. + /// + /// # Safety + /// + /// All enqueues for smaller requests on this cown must have been completed. + unsafe fn finish_enqueue(&self) { + todo!() + } + + /// Release the cown to the next behavior. + /// + /// Called when `self` has been completed, and thus can allow the next waiting behavior to run. + /// If there is no next behavior, then the cown's tail pointer is set to null. + /// + /// # Safety + /// + /// `self` must have been actually completed. + unsafe fn release(&self) { + todo!() + } +} + +impl Ord for Request { + fn cmp(&self, other: &Self) -> std::cmp::Ordering { + Arc::as_ptr(&self.target).cmp(&Arc::as_ptr(&other.target)) + } +} +impl PartialOrd for Request { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} +impl PartialEq for Request { + fn eq(&self, other: &Self) -> bool { + addr_eq(Arc::as_ptr(&self.target), Arc::as_ptr(&other.target)) + } +} +impl Eq for Request {} + +impl fmt::Debug for Request { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Request") + .field("next", &self.next) + .field("scheduled", &self.scheduled) + .finish() + } +} + +/// The value should only be accessed inside a `when!` block. +#[derive(Debug)] +struct Cown { + /// MCS lock tail. + /// + /// When a new node is enqueued, the enqueuer of the previous tail node will wait until the + /// current enqueuer sets that node's `.next`. + last: AtomicPtr, + /// The value of this cown. + value: UnsafeCell, +} + +impl CownBase for Cown { + fn last(&self) -> &AtomicPtr { + &self.last + } +} + +/// Public interface to Cown. +#[derive(Debug)] +pub struct CownPtr { + inner: Arc>, +} + +// SAFETY: In the basic version of BoC, user cannot get `&T`, so `Sync` is not necessary. +unsafe impl Send for CownPtr {} + +impl Clone for CownPtr { + fn clone(&self) -> Self { + CownPtr { + inner: self.inner.clone(), + } + } +} + +impl CownPtr { + /// Creates a new Cown. + pub fn new(value: T) -> CownPtr { + CownPtr { + inner: Arc::new(Cown { + last: AtomicPtr::new(null_mut()), + value: UnsafeCell::new(value), + }), + } + } +} + +type BehaviorThunk = Box; + +/// Behavior that captures the content of a when body. +struct Behavior { + /// The body of the Behavior. + thunk: BehaviorThunk, + /// Number of not-yet enqueued requests. + count: AtomicUsize, + /// The requests for this behavior. + requests: Box<[Request]>, +} + +impl Behavior { + /// Schedules the Behavior. + /// + /// Performs two phase locking (2PL) over the enqueuing of the requests. + /// This ensures that the overall effect of the enqueue is atomic. + fn schedule(self) { + todo!() + } + + /// Resolves a single outstanding request for `this`. + /// + /// Called when a request for `this` is at the head of the queue for a particular cown. If it is + /// the last request, then the thunk is scheduled. + /// + /// # Safety + /// + /// `this` must be a valid behavior. + unsafe fn resolve_one(this: *const Self) { + todo!() + } +} + +impl fmt::Debug for Behavior { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Behavior") + .field("thunk", &"BehaviorThunk") + .field("count", &self.count) + .field("requests", &self.requests) + .finish() + } +} + +// TODO: terminator? +impl Behavior { + fn new(cowns: C, f: F) -> Behavior + where + C: CownPtrs + Send + 'static, + F: for<'l> Fn(C::CownRefs<'l>) + Send + 'static, + { + todo!() + } +} + +/// Trait for a collection of `CownPtr`s. +/// +/// Users pass `CownPtrs` to `when!` clause to specify a collection of shared resources, and such +/// resources can be accessed via `CownRefs` inside the thunk. +/// +/// # Safety +/// +/// `requests` should actually return the requests for the corresponding cowns. +pub unsafe trait CownPtrs { + /// Types for references corresponding to `CownPtrs`. + type CownRefs<'l> + where + Self: 'l; + + /// Returns a collection of `Request`. + fn requests(&self) -> Box<[Request]>; + + /// Returns mutable references of type `CownRefs`. + /// + /// # Safety + /// + /// Must be called only if it is safe to access the shared resources. + unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l>; +} + +unsafe impl CownPtrs for () { + type CownRefs<'l> = () + where + Self: 'l; + + fn requests(&self) -> Box<[Request]> { + Box::new([]) + } + + unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l> {} +} + +unsafe impl CownPtrs for (CownPtr, Ts) { + type CownRefs<'l> = (&'l mut T, Ts::CownRefs<'l>) + where + Self: 'l; + + fn requests(&self) -> Box<[Request]> { + let mut rs = self.1.requests().into_vec(); + let cown_base: Arc = self.0.inner.clone(); + rs.push(Request::new(cown_base)); + rs.into_boxed_slice() + } + + unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l> { + unsafe { (&mut *self.0.inner.value.get(), self.1.get_mut()) } + } +} + +unsafe impl CownPtrs for Vec> { + type CownRefs<'l> = Vec<&'l mut T> + where + Self: 'l; + + fn requests(&self) -> Box<[Request]> { + self.iter().map(|x| Request::new(x.inner.clone())).collect() + } + + unsafe fn get_mut<'l>(self) -> Self::CownRefs<'l> { + self.iter() + .map(|x| unsafe { &mut *x.inner.value.get() }) + .collect() + } +} + +/// Creates a `Behavior` and schedules it. Used by "When" block. +pub fn run_when(cowns: C, f: F) +where + C: CownPtrs + Send + 'static, + F: for<'l> Fn(C::CownRefs<'l>) + Send + 'static, +{ + Behavior::new(cowns, f).schedule(); +} + +/// from https://docs.rs/tuple_list/latest/tuple_list/ +#[macro_export] +macro_rules! tuple_list { + () => ( () ); + + // handling simple identifiers, for limited types and patterns support + ($i:ident) => ( ($i, ()) ); + ($i:ident,) => ( ($i, ()) ); + ($i:ident, $($e:ident),*) => ( ($i, $crate::tuple_list!($($e),*)) ); + ($i:ident, $($e:ident),*,) => ( ($i, $crate::tuple_list!($($e),*)) ); + + // handling complex expressions + ($i:expr) => ( ($i, ()) ); + ($i:expr,) => ( ($i, ()) ); + ($i:expr, $($e:expr),*) => ( ($i, $crate::tuple_list!($($e),*)) ); + ($i:expr, $($e:expr),*,) => ( ($i, $crate::tuple_list!($($e),*)) ); +} + +/// "When" block. +#[macro_export] +macro_rules! when { + ( $( $cs:ident ),* ; $( $gs:ident ),* ; $thunk:expr ) => {{ + run_when(tuple_list!($($cs.clone()),*), move |tuple_list!($($gs),*)| $thunk); + }}; +} + +#[test] +fn boc() { + let c1 = CownPtr::new(0); + let c2 = CownPtr::new(0); + let c3 = CownPtr::new(false); + let c2_ = c2.clone(); + let c3_ = c3.clone(); + + let (finish_sender, finish_receiver) = bounded(0); + + when!(c1, c2; g1, g2; { + // c3, c2 are moved into this thunk. there's no such thing as auto-cloning move closure + *g1 += 1; + *g2 += 1; + when!(c3, c2; g3, g2; { + *g2 += 1; + *g3 = true; + }); + }); + + when!(c1, c2_, c3_; g1, g2, g3; { + assert_eq!(*g1, 1); + assert_eq!(*g2, if *g3 { 2 } else { 1 }); + finish_sender.send(()).unwrap(); + }); + + // wait for termination + finish_receiver.recv().unwrap(); +} + +#[test] +fn boc_vec() { + let c1 = CownPtr::new(0); + let c2 = CownPtr::new(0); + let c3 = CownPtr::new(false); + let c2_ = c2.clone(); + let c3_ = c3.clone(); + + let (finish_sender, finish_receiver) = bounded(0); + + run_when(vec![c1.clone(), c2.clone()], move |mut x| { + // c3, c2 are moved into this thunk. there's no such thing as auto-cloning move closure + *x[0] += 1; + *x[1] += 1; + when!(c3, c2; g3, g2; { + *g2 += 1; + *g3 = true; + }); + }); + + when!(c1, c2_, c3_; g1, g2, g3; { + assert_eq!(*g1, 1); + assert_eq!(*g2, if *g3 { 2 } else { 1 }); + finish_sender.send(()).unwrap(); + }); + + // wait for termination + finish_receiver.recv().unwrap(); +} diff --git a/homework/src/lib.rs b/homework/src/lib.rs index b8606278e3..bea4d27c50 100644 --- a/homework/src/lib.rs +++ b/homework/src/lib.rs @@ -8,6 +8,7 @@ mod adt; mod arc; +pub mod boc; mod elim_stack; mod hash_table; pub mod hazard_pointer; @@ -19,6 +20,7 @@ pub mod test; pub use adt::{ConcurrentMap, ConcurrentSet}; pub use arc::Arc; +pub use boc::CownPtr; pub use elim_stack::ElimStack; pub use hash_table::{GrowableArray, SplitOrderedList}; pub use linked_list::LinkedList; diff --git a/homework/tests/boc.rs b/homework/tests/boc.rs new file mode 100644 index 0000000000..8bb1bf349e --- /dev/null +++ b/homework/tests/boc.rs @@ -0,0 +1,438 @@ +// Two modules `boc_fibonacci` and `boc_banking` are from https://github.com/ic-slurp/verona-benchmarks/tree/main/savina/boc + +/// Implementation of computing fibonacci sequence +mod boc_fibonacci { + use crossbeam_channel::bounded; + use cs431_homework::{boc::run_when, tuple_list, when, CownPtr}; + + fn fibonacci_inner( + n: usize, + sender: Option>, + ) -> CownPtr { + if n == 0 { + CownPtr::new(0) + } else if n <= 2 { + CownPtr::new(1) + } else { + let prev = fibonacci_inner(n - 1, None); + let pprev = fibonacci_inner(n - 2, None); + when!(prev, pprev; g1, g2; { + *g1 += *g2; + if let Some(sender) = &sender { + sender.send(*g1).unwrap(); + } + }); + return prev; + } + } + + pub fn fibonacci(n: usize) -> usize { + if n == 0 { + return 0; + } else if n <= 2 { + return 1; + } + + let (finish_sender, finish_receiver) = bounded(0); + let _ = fibonacci_inner(n, Some(finish_sender)); + + finish_receiver.recv().unwrap() + } +} + +mod boc_banking { + use std::{thread::sleep, time::Duration}; + + use crossbeam_channel::bounded; + use cs431_homework::{boc::run_when, test::RandGen, tuple_list, when, CownPtr}; + use rand::thread_rng; + + const TRANSFER_LIMIT: usize = 2048; + + pub fn run_transactions(account_cnt: usize, transaction_cnt: usize, use_sleep: bool) { + assert_ne!(account_cnt, 0); + assert_ne!(transaction_cnt, 0); + + let mut rng = thread_rng(); + let accounts: Vec> = (0..account_cnt) + .map(|_| CownPtr::new(usize::rand_gen(&mut rng))) + .collect(); + let teller: CownPtr<(Vec>, usize)> = + CownPtr::new((accounts, transaction_cnt)); + + let (finish_sender, finish_receiver) = bounded(0); + + when!(teller; teller_inner; { + let mut rng = thread_rng(); + for _ in 0..transaction_cnt { + // randomly pick src and dest accounts + let src = usize::rand_gen(&mut rng) % account_cnt; + let mut dst = usize::rand_gen(&mut rng) % account_cnt; + if src == dst { dst = (dst + 1) % account_cnt; } + + let amount = usize::rand_gen(&mut rng) % TRANSFER_LIMIT; + let random_sleep = usize::rand_gen(&mut rng) % 2 == 0; + + let cown1 = teller_inner.0[src].clone(); + let cown2 = teller_inner.0[dst].clone(); + let teller = teller.clone(); + let finish_sender = finish_sender.clone(); + + when!(cown1, cown2; g1, g2; { + // transfer + if amount <= *g1 { *g1 -= amount; *g2 += amount; } + if random_sleep && use_sleep { sleep(Duration::from_secs(1)); } + + let finish_sender = finish_sender.clone(); + + // Main thread waits until all transactions finish + when!(teller; teller_inner; { + teller_inner.1 -= 1; + if teller_inner.1 == 0 { + finish_sender.send(()).unwrap(); + } + }); + }); + } + }); + + finish_receiver.recv().unwrap(); + } +} + +/// Implementation of a merge sort that uses BoC +mod boc_merge_sort { + use crossbeam_channel::bounded; + use cs431_homework::{ + boc::{run_when, CownPtr}, + tuple_list, when, + }; + + fn merge_sort_inner( + idx: usize, + step_size: usize, + n: usize, + boc_arr: &Vec>, + boc_finish: &Vec>, + sender: &crossbeam_channel::Sender>, + ) { + if idx == 0 { + return; + } + + // Recursively sort a subarray within range [from, to) + let from = idx * step_size - n; + let to = (idx + 1) * step_size - n; + + let mut bocs: Vec> = boc_arr[from..to].iter().map(|x| x.clone()).collect(); + bocs.push(boc_finish[idx].clone()); + bocs.push(boc_finish[idx * 2].clone()); + bocs.push(boc_finish[idx * 2 + 1].clone()); + + let boc_arr_clone = boc_arr.clone(); + let boc_finish_clone = boc_finish.clone(); + let sender_clone = sender.clone(); + + run_when(bocs, move |mut content| { + // Check if both left and right subarrays are already sorted + let ready = (*content[step_size + 1] == 1) && (*content[step_size + 2] == 1); + if !ready || *content[step_size] == 1 { + return; // We skip if both subarrays are not ready or we already sorted for this range + } + + // Now, merge the two subarrays + let mut lo = 0; + let mut hi = step_size / 2; + let mut res = Vec::new(); + while res.len() < step_size { + if lo >= step_size / 2 || (hi < step_size && *content[lo] > *content[hi]) { + res.push(*content[hi]); + hi += 1; + } else { + res.push(*content[lo]); + lo += 1; + } + } + for i in 0..step_size { + *content[i] = res[i]; + } + + // Signal we have sorted the subarray within range [from, to) + *content[step_size] = 1; + + // Send a signal to main thread if this completes the sorting process + if idx == 1 { + sender_clone.send(res).unwrap(); + return; + } + + // Recursively sort the larger subarray (bottom up) + merge_sort_inner( + idx / 2, + step_size * 2, + n, + &boc_arr_clone, + &boc_finish_clone, + &sender_clone, + ); + }); + } + + /// The main function of merge sort that returns the sorted array of `arr` + /// Assumption: `arr` should have size of 2^`logsize` + pub fn merge_sort(arr: Vec, logsize: usize) -> Vec { + let n: usize = 1 << logsize; + assert_eq!(arr.len(), n); + if logsize == 0 { + return arr; + } + + let boc_arr: Vec> = arr.iter().map(|x| CownPtr::new(*x)).collect(); + let boc_finish: Vec> = (0..(2 * n)).map(|_| CownPtr::new(0)).collect(); + + let (finish_sender, finish_receiver) = bounded(0); + + for i in 0..n { + let arr_elem = boc_arr[i].clone(); + let finish_elem = boc_finish[i + n].clone(); + let boc_arr_clone = boc_arr.clone(); + let boc_finish_clone = boc_finish.clone(); + let sender = finish_sender.clone(); + when!(arr_elem, finish_elem; _garr, gfinish; { + *gfinish = 1; // signals finish of sorting of subarray within range [i, i+1) + merge_sort_inner((n + i) / 2, 2, n, &boc_arr_clone, &boc_finish_clone, &sender); + }); + } + + // Wait until sorting finishes and get the result + finish_receiver.recv().unwrap() + } +} + +mod basic_test { + use crate::{boc_banking, boc_fibonacci, boc_merge_sort}; + use crossbeam_channel::bounded; + use cs431_homework::{ + boc::{run_when, CownPtr}, + tuple_list, when, + }; + use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }; + + #[test] + fn message_passing_test() { + for _ in 0..20 { + let c1 = CownPtr::new(false); + let c1_ = c1.clone(); + let msg = Arc::new(AtomicUsize::new(0)); + let msg_ = msg.clone(); + let msg__ = msg.clone(); + + let (send1, recv1) = bounded(1); + let (send2, recv2) = bounded(1); + + rayon::spawn(move || { + when!(c1; g1; { + if !*g1 { + msg.fetch_add(1, Ordering::Relaxed); + *g1 = true; + } else { + assert_eq!(1, msg.load(Ordering::Relaxed)); + } + send1.send(()).unwrap(); + }); + }); + rayon::spawn(move || { + when!(c1_; g1; { + if !*g1 { + msg_.fetch_add(1, Ordering::Relaxed); + *g1 = true; + } else { + assert_eq!(1, msg_.load(Ordering::Relaxed)); + } + send2.send(()).unwrap(); + }); + }); + + recv1.recv().unwrap(); + recv2.recv().unwrap(); + + assert_eq!(1, msg__.load(Ordering::Relaxed)); + } + } + + #[test] + fn message_passing_determines_order() { + for _ in 0..20 { + let c1 = CownPtr::new(false); + let c2 = CownPtr::new(false); + let c1_ = c1.clone(); + let c2_ = c2.clone(); + let msg = Arc::new(AtomicUsize::new(0)); + let msg_ = msg.clone(); + let msg__ = msg.clone(); + + let (send1, recv) = bounded(0); + let send2 = send1.clone(); + + rayon::spawn(move || { + when!(c1; g1; *g1 = true); + if msg + .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + when!(c2; g2; { + assert!(*g2); + send1.send(()).unwrap(); + }); + } + }); + rayon::spawn(move || { + when!(c2_; g2; *g2 = true); + if msg_ + .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst) + .is_err() + { + when!(c1_; g1; { + assert!(*g1); + send2.send(()).unwrap(); + }); + } + }); + + recv.recv().unwrap(); + + assert_eq!(1, msg__.load(Ordering::Relaxed)); + } + } + + #[test] + fn fibonacci_basic_test() { + let (send_finish, recv_finish) = bounded(0); + + rayon::spawn(move || { + let mut arr = vec![0, 1, 1]; + while arr.len() <= 25 { + let n = arr.len(); + let ans = arr[n - 2] + arr[n - 1]; + arr.push(ans); + assert_eq!(ans, boc_fibonacci::fibonacci(n)); + } + + send_finish.send(()).unwrap(); + }); + + recv_finish.recv().unwrap(); + } + + #[test] + fn merge_sort_basic_test() { + let (send_finish, recv_finish) = bounded(0); + + rayon::spawn(move || { + let mut arr1 = vec![2, 3, 1, 4]; + let res1 = boc_merge_sort::merge_sort(arr1.clone(), 2); + arr1.sort(); + assert_eq!(arr1, res1); + + let mut arr2 = vec![3, 4, 2, 1, 8, 5, 6, 7]; + let res2 = boc_merge_sort::merge_sort(arr2.clone(), 3); + arr2.sort(); + assert_eq!(arr2, res2); + + let res2_ = boc_merge_sort::merge_sort(arr2.clone(), 3); + assert_eq!(arr2, res2_); + + let mut arr3 = arr2.clone(); + arr3.append(&mut arr3.clone()); + arr3.append(&mut arr3.clone()); + arr3.append(&mut arr3.clone()); + arr3.append(&mut arr3.clone()); + arr3.append(&mut arr3.clone()); + let res3 = boc_merge_sort::merge_sort(arr3.clone(), 8); + arr3.sort(); + assert_eq!(arr3, res3); + + let mut arr4: Vec<_> = (0..1024).rev().collect(); + let res4 = boc_merge_sort::merge_sort(arr4.clone(), 10); + arr4.sort(); + assert_eq!(arr4, res4); + + send_finish.send(()).unwrap(); + }); + + recv_finish.recv().unwrap(); + } + + #[test] + fn banking_basic_test() { + let (send_finish, recv_finish) = bounded(0); + + rayon::spawn(move || { + boc_banking::run_transactions(20, 20, true); + send_finish.send(()).unwrap(); + }); + + recv_finish.recv().unwrap(); + } +} + +mod stress_test { + use crate::{boc_banking, boc_fibonacci, boc_merge_sort}; + use crossbeam_channel::bounded; + use cs431_homework::test::RandGen; + use rand::thread_rng; + + #[test] + fn fibonacci_stress_test() { + let (send_finish, recv_finish) = bounded(0); + + rayon::spawn(move || { + assert_eq!(boc_fibonacci::fibonacci(32), 2178309); + send_finish.send(()).unwrap(); + }); + + recv_finish.recv().unwrap(); + } + + #[test] + fn banking_stress_test() { + let (send_finish, recv_finish) = bounded(0); + + rayon::spawn(move || { + boc_banking::run_transactions(1234, 100000, false); + send_finish.send(()).unwrap(); + }); + + recv_finish.recv().unwrap(); + } + + #[test] + fn merge_sort_stress_test() { + const ITER: usize = 10; + const LOGSZ_LO: usize = 10; + const LOGSZ_HI: usize = 14; + + let channels: Vec<_> = (0..ITER).map(|_| bounded(1)).collect(); + let mut rng = thread_rng(); + + for i in 0..ITER { + let sender = channels[i].0.clone(); + let logsize = LOGSZ_LO + i % (LOGSZ_HI - LOGSZ_LO); + let len = 1 << logsize; + let mut arr: Vec<_> = (0..len).map(|_| usize::rand_gen(&mut rng)).collect(); + rayon::spawn(move || { + let res = boc_merge_sort::merge_sort(arr.clone(), logsize); + arr.sort(); + assert_eq!(arr, res); + sender.send(()).unwrap(); + }); + } + + for i in 0..ITER { + channels[i].1.recv().unwrap(); + } + } +}