Skip to content

Commit

Permalink
Test
Browse files Browse the repository at this point in the history
  • Loading branch information
vertexclique committed Oct 17, 2019
1 parent a846834 commit 7deedca
Show file tree
Hide file tree
Showing 8 changed files with 569 additions and 19 deletions.
5 changes: 5 additions & 0 deletions lightproc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,9 @@ categories = []
edition = "2018"

[dependencies]
crossbeam-utils = "0.6.6"
rustc-hash = "1.0.1"

[dev-dependencies]
crossbeam = "0.7.1"
futures-preview = "0.3.0-alpha.17"
57 changes: 57 additions & 0 deletions lightproc/examples/proc_run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
//! A function that runs a future to completion on a dedicated thread.
use std::future::Future;
use std::sync::Arc;
use std::thread;

use crossbeam::channel;
use futures::executor;
use lightproc::prelude::*;

/// Spawns a future on a new dedicated thread.
///
/// The returned handle can be used to await the output of the future.
fn spawn_on_thread<F, R>(future: F) -> ProcHandle<R, ()>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
// Create a channel that holds the task when it is scheduled for running.
let (sender, receiver) = channel::unbounded();
let sender = Arc::new(sender);
let s = Arc::downgrade(&sender);

// Wrap the future into one that disconnects the channel on completion.
let future = async move {
// When the inner future completes, the sender gets dropped and disconnects the channel.
let _sender = sender;
future.await
};

// Create a task that is scheduled by sending itself into the channel.
let schedule = move |t| s.upgrade().unwrap().send(t).unwrap();
let (proc, handle) = LightProc::<()>::new()
.with_future(future)
.with_schedule(schedule)
.with_stack(ProcStack::default())
.returning();

// Schedule the task by sending it into the channel.
proc.schedule();

// Spawn a thread running the task to completion.
thread::spawn(move || {
// Keep taking the task from the channel and running it until completion.
for proc in receiver {
proc.run();
}
});

handle
}

fn main() {
executor::block_on(spawn_on_thread(async {
println!("Hello, world!");
}));
}
2 changes: 2 additions & 0 deletions lightproc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,7 @@ pub mod state;

pub mod prelude {
pub use crate::lightproc::*;
pub use crate::stack::*;
pub use crate::proc_layout::*;
pub use crate::proc_handle::*;
}
83 changes: 71 additions & 12 deletions lightproc/src/lightproc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::alloc;
use std::{alloc, mem, ptr};
use std::future::Future;
use std::marker::PhantomData as marker;
use std::ptr::NonNull;
Expand All @@ -11,6 +11,7 @@ use std::alloc::Layout;
use crate::raw_proc::RawProc;
use crate::proc_handle::ProcHandle;
use crate::stack::ProcStack;
use crate::proc_data::ProcData;

#[derive(Debug)]
pub struct LightProc<T> {
Expand Down Expand Up @@ -80,17 +81,16 @@ impl<T> LightProc<T> {
self
}

pub fn with_stack(mut self, st: T) -> Self
where T: Send + 'static,
pub fn with_stack(mut self, st: ProcStack) -> Self
{
let stack_mem = Layout::new::<T>();
let stack_mem = Layout::new::<ProcStack>();
let (new_layout, offset_st) = extend(self.proc_layout.layout, stack_mem);
self.proc_layout.offset_table.insert("stack", offset_st);

self.reallocate(new_layout);

let rawp =
RawProc::<usize, usize, usize, T>::from_ptr(
RawProc::<usize, usize, usize, ProcStack>::from_ptr(
self.raw_proc.as_ptr(), &self.proc_layout);

unsafe {
Expand All @@ -104,7 +104,7 @@ impl<T> LightProc<T> {
let raw_proc = self.raw_proc;
let proc = LightProc {
raw_proc,
proc_layout: self.proc_layout,
proc_layout: self.proc_layout.clone(),
_private: marker,
};
let handle = ProcHandle {
Expand All @@ -114,16 +114,75 @@ impl<T> LightProc<T> {
(proc, handle)
}

fn reallocate(&mut self, added: Layout) {
pub fn schedule(self) {
let ptr = self.raw_proc.as_ptr();
let header = ptr as *const ProcData;
mem::forget(self);

unsafe {
((*header).vtable.schedule)(ptr);
}
}

pub fn run(self) {
let ptr = self.raw_proc.as_ptr();
let header = ptr as *const ProcData;
mem::forget(self);

unsafe {
((*header).vtable.run)(ptr);
}
}

pub fn cancel(&self) {
let ptr = self.raw_proc.as_ptr();
let header = ptr as *const ProcData;

unsafe {
(*header).cancel();
}
}

pub fn stack(&self) -> &T {
let offset = ProcData::offset_tag::<T>();
let ptr = self.raw_proc.as_ptr();

unsafe {
let pointer = alloc::realloc(
self.raw_proc.as_ptr() as *mut u8,
let raw = (ptr as *mut u8).add(offset) as *const T;
&*raw
}
}

fn reallocate(&mut self, enlarged: Layout) {
unsafe {
let old = self.raw_proc.as_ptr() as *mut u8;
let bigger = alloc::realloc(
old,
self.proc_layout.layout,
added.size(),
enlarged.size(),
);
self.raw_proc = NonNull::new(pointer as *mut ()).unwrap()
ptr::copy(old, bigger, self.proc_layout.layout.size());
self.raw_proc = NonNull::new(bigger as *mut ()).unwrap()
}

self.proc_layout.layout = added;
self.proc_layout.layout = enlarged;
}
}

impl<T> Drop for LightProc<T> {
fn drop(&mut self) {
let ptr = self.raw_proc.as_ptr();
let header = ptr as *const ProcData;

unsafe {
// Cancel the task.
(*header).cancel();

// Drop the future.
((*header).vtable.drop_future)(ptr);

// Drop the task reference.
((*header).vtable.decrement)(ptr);
}
}
}
132 changes: 129 additions & 3 deletions lightproc/src/proc_data.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,138 @@
use crate::proc_vtable::ProcVTable;
use std::cell::UnsafeCell;
use std::sync::atomic::AtomicUsize;
use std::cell::Cell;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::task::Waker;
use std::fmt;
use crate::state::*;
use std::alloc::Layout;
use crate::layout_helpers::*;
use crossbeam_utils::Backoff;

pub(crate) struct ProcData {
pub(crate) state: AtomicUsize,

pub(crate) awaiter: UnsafeCell<Option<Waker>>,
pub(crate) awaiter: Cell<Option<Waker>>,

pub(crate) vtable: &'static ProcVTable,
}


impl ProcData {
/// Cancels the task.
///
/// This method will only mark the task as closed and will notify the awaiter, but it won't
/// reschedule the task if it's not completed.
pub(crate) fn cancel(&self) {
let mut state = self.state.load(Ordering::Acquire);

loop {
// If the task has been completed or closed, it can't be cancelled.
if state & (COMPLETED | CLOSED) != 0 {
break;
}

// Mark the task as closed.
match self.state.compare_exchange_weak(
state,
state | CLOSED,
Ordering::AcqRel,
Ordering::Acquire,
) {
Ok(_) => {
// Notify the awaiter that the task has been closed.
if state & AWAITER != 0 {
self.notify();
}

break;
}
Err(s) => state = s,
}
}
}

/// Notifies the task blocked on the task.
///
/// If there is a registered waker, it will be removed from the header and woken.
#[inline]
pub(crate) fn notify(&self) {
if let Some(waker) = self.swap_awaiter(None) {
waker.wake();
}
}

/// Notifies the task blocked on the task unless its waker matches `current`.
///
/// If there is a registered waker, it will be removed from the header.
#[inline]
pub(crate) fn notify_unless(&self, current: &Waker) {
if let Some(waker) = self.swap_awaiter(None) {
if !waker.will_wake(current) {
waker.wake();
}
}
}

/// Swaps the awaiter and returns the previous value.
#[inline]
pub(crate) fn swap_awaiter(&self, new: Option<Waker>) -> Option<Waker> {
let new_is_none = new.is_none();

// We're about to try acquiring the lock in a loop. If it's already being held by another
// thread, we'll have to spin for a while so it's best to employ a backoff strategy.
let backoff = Backoff::new();
loop {
// Acquire the lock. If we're storing an awaiter, then also set the awaiter flag.
let state = if new_is_none {
self.state.fetch_or(LOCKED, Ordering::Acquire)
} else {
self.state.fetch_or(LOCKED | AWAITER, Ordering::Acquire)
};

// If the lock was acquired, break from the loop.
if state & LOCKED == 0 {
break;
}

// Snooze for a little while because the lock is held by another thread.
backoff.snooze();
}

// Replace the awaiter.
let old = self.awaiter.replace(new);

// Release the lock. If we've cleared the awaiter, then also unset the awaiter flag.
if new_is_none {
self.state.fetch_and(!LOCKED & !AWAITER, Ordering::Release);
} else {
self.state.fetch_and(!LOCKED, Ordering::Release);
}

old
}

/// Returns the offset at which the tag of type `T` is stored.
#[inline]
pub(crate) fn offset_tag<T>() -> usize {
let layout_proc_data = Layout::new::<ProcData>();
let layout_t = Layout::new::<T>();
let (_, offset_t) = extend(layout_proc_data, layout_t);
offset_t
}
}

impl fmt::Debug for ProcData {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let state = self.state.load(Ordering::SeqCst);

f.debug_struct("ProcData")
.field("scheduled", &(state & SCHEDULED != 0))
.field("running", &(state & RUNNING != 0))
.field("completed", &(state & COMPLETED != 0))
.field("closed", &(state & CLOSED != 0))
.field("awaiter", &(state & AWAITER != 0))
.field("handle", &(state & HANDLE != 0))
.field("ref_count", &(state / REFERENCE))
.finish()
}
}
Loading

0 comments on commit 7deedca

Please sign in to comment.