Skip to content

Work Queue support with Async #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 29 commits into from
Feb 13, 2025
Merged

Work Queue support with Async #45

merged 29 commits into from
Feb 13, 2025

Conversation

d3zd3z
Copy link
Collaborator

@d3zd3z d3zd3z commented Dec 20, 2024

Zephyr work queue support. This initial version focuses on using work queues to implement a basic async executor for Zephyr. This is able to schedule work using spawn, provides a spawn_local function that can spawn from a Future a task to run on the same work queue (allowing non-Send types, such as Rc). The primitives Semaphore, and Queue (and hence channels) have async versions of the blocking methods, and they are fully compatible with the regular Zephyr primitives. There is also a special Mutex implementation that usable entirely from the async context (it is implemented with a Semaphore).

Copy link

@theotherjimmy theotherjimmy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nits and requests for unsafe docs, mostly.

Comment on lines 29 to 46
WorkBuilder::new()
.set_worker(worker)
.set_name(name)
.start(future)
}

/// Run an async future on the current worker thread.
///
/// Arrange to have the given future run on the current worker thread. The resulting `JoinHandle`
/// has `join` and `join_async` methods that can be used to wait for the given thread.
///
/// The main use for this is to allow work threads to use `Rc` and `Rc<RefCell<T>>` within async
/// tasks. The main constraint is that references inside cannot be held across an `.await`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either remove the "main" from here, or add the other constraints please

/// Arrange to have the given future run on the current worker thread. The resulting `JoinHandle`
/// has `join` and `join_async` methods that can be used to wait for the given thread.
///
/// The main use for this is to allow work threads to use `Rc` and `Rc<RefCell<T>>` within async

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this in opposition to Arc and Arc<Refcell<T>>?

It would be nice to mention that this omits the need for Send from the future (but not the output, why not the output?)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the Send of the output is independent of the Send of the Future itself. At least if I did it right, the async wait on the Answer should properly have Send based on the output.

}

/// Yield the current thread, returning it to the work queue to be run after other work on that
/// queue. (This has to be called `yield_now` in Rust, because `yield` is a keyword.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit : missing )

_nosend: PhantomData<UnsafeCell<()>>,
}

// unsafe impl<T: ?Sized + Sync> Sync for MutexGuard<'_, T> {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

??? Same as below?

//! WORKER_STACK: ThreadStack<2048>;
//! }
//! // ...
//! let main_worker = Box::new(j

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extraneous j (do you perhaps use vim ;)

doc tests might catch this, if that's possible.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually, perhaps. I'm looking into how regular tests might be possible (at least with nightly) as we can provide our own test runner. The doc tests are generally just compiled though, so that might not be as difficult, but is still likely to require nightly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think doc tests are on stable, and should work during cargo test.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a 'todo' to get cargo test. It involves implementing a test running. Currently, it seems to require nightly, though.

/// Create a new `Signal`.
///
/// The Signal will be in the non-signaled state.
pub fn new() -> Result<Signal, Infallible> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did I miss why this must be a Result<Self, Infallible>? It seems like Self would also work here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More about trying to maintain compatibility with other constructors, but realistically, it probably makes more sense to just have it return Self.

}

/// Add an event that represents waiting for a semaphore to be available for "take".
pub unsafe fn add_semaphore<'a>(&'a mut self, sem: &'a Semaphore) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not clear from the documentation what safety invariant the caller must uphold.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has been rearranged, and these functions are now safe. The main safety concern with all of this is that these async methods can only be used with this executor. When I work to get another executor to be available, I'll try to address that.

}

/// Add an event that represents waiting for a signal.
pub unsafe fn add_signal<'a>(&'a mut self, signal: &'a Signal) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Document caller requirements.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto, now safe.

}

/// Add an event that represents waiting for a queue to have a message.
pub unsafe fn add_queue<'a>(&'a mut self, queue: &'a Queue) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here too.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also now safe.

/// // Leak a work queue in a Box.
/// let wq = Box::new(WorkQueueBuilder::new().start(...));
/// let _ = Box::leak(wq);
///

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close the code block.

@d3zd3z d3zd3z marked this pull request as ready for review January 14, 2025 17:40
@d3zd3z d3zd3z marked this pull request as draft January 14, 2025 17:40
@d3zd3z d3zd3z force-pushed the work branch 2 times, most recently from 85fe22d to 6abda1a Compare January 17, 2025 19:34
@d3zd3z d3zd3z force-pushed the work branch 2 times, most recently from cb86f9e to cccd8a6 Compare January 29, 2025 22:35
@d3zd3z d3zd3z marked this pull request as ready for review January 31, 2025 07:16
d3zd3z added 21 commits February 4, 2025 14:18
Accept both space and semicolon separated paths and defines from cmake.
The add_custom_command really wants to replace the semicolons with
spaces, whereas the 'file' command doesn't.  To make this work, have the
build.rs split on either space or semicolon.

This fixes a problem where `cargo check` and `rust-analyzer` fail to run
correctly when the Zephyr build is configured for release, and they are
trying to do a debug build.  With this fix, the debug build from cargo
will work, allowing the analysis or verification.

Signed-off-by: David Brown <david.brown@linaro.org>
This provides a first pass an an implementation of management of Zephyr
work queues, and an executor that schedules work using Zephyr's work
queues.

TODO: Clean up how Futures can use the Context to indicate scheduling.
TODO: Move a few things to make the used modules a bit cleaner.

Signed-off-by: David Brown <david.brown@linaro.org>
If `CONFIG_SEGGER_SYSVIEW` is enabled, add a few calls to the workqueue
scheduling to record these as task switches.  There is still a problem
with the task names showing up, but this does generally show scheduled
work as separate tasks in System View.

The main issues where this lacks is that if the Zephyr scheduler
schedules something else, and then returns to the work queue, that will
be shown as being done on the work queue thread instead of the
particular work.

Signed-off-by: David Brown <david.brown@linaro.org>
This is intended to be a demonstration of implementing the dining
philosopher's problem using work queues.  At this point, it just serves
as a test of the workqueue and async mechanism.

Signed-off-by: David Brown <david.brown@linaro.org>
These are starting to get messy, but for now, add to at least allow code
to compile with alloc off.

Signed-off-by: David Brown <david.brown@linaro.org>
Because Zephyr's workqueue re-scheduling is based on the Context struct,
not the waker, we need a way to get back to our `WorkInfo` struct that
this Context is embedded in.

Create a `ContextExt` that adds some methods to the `Context` to be able
to indicate when the scheduler should reschedule this work.

Signed-off-by: David Brown <david.brown@linaro.org>
Fix various broken documentation links.

Signed-off-by: David Brown <david.brown@linaro.org>
The join handle can bew used across multiple threads, as long as the
output of the Future is Send.

Signed-off-by: David Brown <david.brown@linaro.org>
Implement the async Semaphore based solution to the philospher problem.
This demonstrates a few things:

- The use of multiple async tasks created with `spawn` and `spawn_local`.
- How to use `spawn_local` to allow a group of tasks all running on the
  same worker to use Rc instead of Arc for sharing.
- A check that completed work (from a spawn, after calling join or
  join_async) drops all of its data properly.

Signed-off-by: David Brown <david.brown@linaro.org>
All hand-written work to directly schedule timeouts.

Signed-off-by: David Brown <david.brown@linaro.org>
The newly added code needs some fixups to comply with rustfmt.  Apply
these.

Signed-off-by: David Brown <david.brown@linaro.org>
This runs various sync primitives in a basic benchmark.  This is useful
to compare performance between native threads and work-queue based
async.  Initial results suggest that, unexpectedly, there is additional
overhead with async.

Signed-off-by: David Brown <david.brown@linaro.org>
The semaphore API shouldn't have `&mut self` declarations, as the
purpose of the API is to be that thread synchronization.  Even the less
save ones, such as `count_get` and `reset` have well defined
multi-threaded behavior.

Signed-off-by: David Brown <david.brown@linaro.org>
Perform a simple ping-ping semaphore benchmark across multiple threads
and multiple work-queues to compare the performance of these.

Signed-off-by: David Brown <david.brown@linaro.org>
Make sure the sample matches, including target selection.

Signed-off-by: David Brown <david.brown@linaro.org>
Provode a `size_of` utility that can take a Future and return how much
memory is needed to schedule it.

Signed-off-by: David Brown <david.brown@linaro.org>
Schedule a possibly large number of workers to measure how performance
degrades.

Signed-off-by: David Brown <david.brown@linaro.org>
Add target restrictions to this sample as well.  Without these, nightly
runs on zephyr's main try to build for unsupported targets.

Signed-off-by: David Brown <david.brown@linaro.org>
Update the code to the rustfmt suggested formatting.

Signed-off-by: David Brown <david.brown@linaro.org>
Setting these to always inline has a rather noticeable impact on
performance of most calls to Zephyr, at the cost of a bit more code
space.

Signed-off-by: David Brown <david.brown@linaro.org>
Change the old Work implementation, which was really more of an early
attempt at futures into a useful simple Work model for Zephyr Work
queues.  These do not use triggered work, but are simply re-submitted as
needed to the work queues.  Currently, they are always contained within
an Arc.

Signed-off-by: David Brown <david.brown@linaro.org>
Create a benchmark that has various work entities that schedule each
other to tease out lots of workqueue scheduling.

Signed-off-by: David Brown <david.brown@linaro.org>
Make sure the benchmark is fully optimized, even in dev mode.

Signed-off-by: David Brown <david.brown@linaro.org>
@d3zd3z d3zd3z requested review from teburd and cfriedt February 11, 2025 14:36
@d3zd3z d3zd3z changed the title Start of Zephyr work-q support Work Queue support with Async Feb 11, 2025
This is a comparison benchmark of implementing deferred initialization
of Zephyr kernel objects from Rust.  The idea here is that instead of
the current `Fixed` enum, which contains either a reference to a static,
or a Pin<Box<T>> of the object, we pair the objects with an atomic
pointer variable indicating the pointer state.  It starts as a null
pointer, and will result in the object being initialized when it is
first used.  The pointer is then replaced with the address of `Self` so
that later code can detect an attempt to more the object.

Signed-off-by: David Brown <david.brown@linaro.org>
Update code formatting.

Signed-off-by: David Brown <david.brown@linaro.org>
Update formatting to match rustfmt.

Signed-off-by: David Brown <david.brown@linaro.org>
Copy link
Collaborator

@teburd teburd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a fairly large PR so I can't say I looked deeply at this, but the samples look fine to me and the async bits seem ok, I wouldn't label this kio though myself as Zephyr has an async I/O framework already in it that closely matches io_uring called rtio as I've probably noted before.

The kio support is behind `CONFIG_RUST_ALLOC`. This needs to be
conditional in addition to the code that uses it.

Signed-off-by: David Brown <david.brown@linaro.org>
Instead of `Result<Self, Infallible>`, just return Self, as this cannot
ever fail.

Signed-off-by: David Brown <david.brown@linaro.org>
Fix some typos and clarify safety in a few places.

Signed-off-by: David Brown <david.brown@linaro.org>
Copy link

@inthehack inthehack left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you so much for such addition to Rust bindings. I look forward to hearing from you on the next steps.

I set this review as request changes, sorry I don't want to be rude, especially at one day of code freeze. Feel free to bypass this time if you need it.

zephyr-build = "0.1.0"

[profile.release]
debug-assertions = true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you could add codegen-units = 1 here to speed up the build?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can give it a try.

lto = true

[profile.dev]
debug = 2

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here?

/// Construct a new semaphore, with the given initial_count and limit. There is a bit of
/// trickery to pass the initial values through to the initializer, but otherwise this is just a
/// basic initialization.
pub fn new(initial_count: c_uint, limit: c_uint) -> Semaphore {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why using the C types here instead of Rust type (e.g. u32) for any other reason than convenience with the k_sem API? Am I missing something?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case, this was intended to just be a quick test to see if the "base sem" model will be workable. Given that this has similar performance to using Fixed, I will likely make a larger change across the codebase to have the zephyr objects directly, with an init atomic. I won't be changing the APIs to the existing types.

pub fn new(initial_count: c_uint, limit: c_uint) -> Semaphore {
let this = Self {
state: AtomicUsize::new(0),
item: unsafe { UnsafeCell::new(mem::zeroed()) },

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be more reliable to use UnsafeCell<MaybeUninit<k_kem>> here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The behavior is different, though. I'm not certain if the Zephyr init actually requires the items to be zero initialized, but I wanted to keep the semantics.

}

/// Get the raw pointer, initializing the `k_sem` if needed.
fn get(&self) -> *mut k_sem {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't be better to return a Fixed<k_sem> here instead of a raw pointer, so the type is more explicit on what's behind?

Talking about safety, I would say that types must give insights to the developer about actual confidence one could have in the data behind the raw pointer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point of this little exercise (basesem, which will go away at some point) is to see about the feasibility of eliminating the Fixed type entirely. In this case, the items are always just used directly, and the atomic protects against things being moved (which only happens when there is a single reference anyway).

///
/// # Panics
/// If this is called other than from a worker task running on a work thread, it will panic.
pub fn spawn_local<F>(future: F, name: &'static CStr) -> JoinHandle<F>

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here.

/// A mutual exclusion primitive useful for protecting shared data. Async version.
///
/// This mutex will block a task waiting for the lock to become available.
pub struct Mutex<T: ?Sized> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for my misunderstanding. What is the semantic difference with zephyr::sync::Mutex?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Zephyr's Mutex type is only blocking, and using from async would block the entire work queue. This is kind of an interim solution to provide mutual exclusion (without condvar) that works from the workqueue-based async.

@@ -13,13 +13,19 @@
pub mod align;
pub mod device;
pub mod error;
#[cfg(CONFIG_RUST_ALLOC)]
pub mod kio;
Copy link

@inthehack inthehack Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general question: would it be possible to have a no-alloc version with heapless or do you think that wouldn't be relevant?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it'd be a good idea, and a good thing, too, and a lot more reasonable to do after I get rid of Fixed. But, I think that is better to do as some changes to come after this.

///
/// T must also implement Send, since although 'get' always retrieves the current thread's data,
/// `insert` will typically need to move `T` across threads.
pub struct SimpleTls<T: Copy + Send> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the need for not too long name. Although, I think this could be helpful for the developer experience to have something more explicit like SimpleLocalStorage. Even more, we can agree that a simple implementation could be named ThreadLocalStorage and more advanced ones with specific names. In the end, if some developer prefer shorter names, they can alias to Tls internally in their project.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps pub(crate)? It really isn't intended for general consumption.

///
/// The AtomicPtr is either Null, or contains a raw pointer to the underlying Mutex holding the
/// data.
data: AtomicPtr<Mutex<SimpleTls<T>>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why we need an atomic on a Sync type like Mutex here. If this is because of the null possible value, maybe the type could be Mutex<MaybeUninit<SimpleTls<T>>> or something equivalent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is that currently, Mutex cannot be declared statically (the constructor is not const), as it requires an allocation. My proposal suggested in basesem will fix that.

@d3zd3z
Copy link
Collaborator Author

d3zd3z commented Feb 13, 2025

Thank you so much for such addition to Rust bindings. I look forward to hearing from you on the next steps.

I set this review as request changes, sorry I don't want to be rude, especially at one day of code freeze. Feel free to bypass this time if you need it.

Given the closeness to the feature freeze, I'm going to take the above as a kind of ok, to proceed with this, and make the changes in subsequent PRs. I think it will help to have this code in mainline for this release.

@d3zd3z d3zd3z merged commit 49c6712 into main Feb 13, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants