High-performance lockless spsc/mpsc/mpmc channels.
It supports async contexts, and communication between async and blocking contexts.
The low level is based on crossbeam-queue.
For the concept, please refer to the wiki.
-
V1.0: Released in 2022.12 and used in production.
-
V2.0: Released in 2025.6. Refactored the codebase and API by removing generic types from the ChannelShared type, which made it easier to code with.
-
v2.1: Released in 2025.9. Removed the dependency on crossbeam-channel and implemented with a modified version of crossbeam-queue, which brings performance improvements for both async and blocking contexts.
Being a lockless channel, crossfire outperforms other async-capable channels. And thanks to a lighter notification mechanism, in a blocking context, some cases are even better than the original crossbeam-channel,
More benchmark data is posted on wiki.
Also, being a lockless channel, the algorithm relies on spinning and yielding. Spinning is good on
multi-core systems, but not friendly to single-core systems (like virtual machines).
So we provide a function detect_backoff_cfg()
to detect the running platform.
Calling it within the initialization section of your code, will get a 2x performance boost on VPS.
The benchmark is written in the criterion framework. You can run the benchmark by:
cargo bench --bench crossfire
NOTE: Because v2.1 has push the speed to a level no one has gone before, it can put a pure pressure to the async runtime. Some hidden bug (especially atomic ops on weaker ordering platform) might occur:
arch | runtime | workflow | status |
---|---|---|---|
x86_64 | threaded | cron_master_threaded_x86 | PASSED |
tokio 1.47.1 | cron_master_tokio_x86 | PASSED | |
async-std | cron_master_async_std_x86 | PASSED | |
smol | cron_master_smol-x86 | PASSED | |
arm | threaded |
cron_master_threaded_arm |
PASSED |
tokio-1.47.1 tokio issue 7632 (opened) tokio PR #7622 (unrelease) |
cron_dev_arm cron_dev_arm with trace_log |
DEADLOCK (not resolved) | |
async-std | cron_master_async_std_arm | PASSED | |
smol | cron_master_smol_arm | PASSED | |
miri (emulation) | threaded | miri_dev | PASSED |
tokio-1.47.1 tokio PR #7622 (unrelease) | DEBUGGING | ||
async-std | - | NOT supported by miri |
v2.0.26 (legacy):
arch | runtime | workflow | status |
---|---|---|---|
x86_64 | threaded | cron_2.0_x86 | PASSED |
tokio 1.47.1 | |||
async-std | |||
arm | threaded | cron_2.0_arm | PASSED |
tokio-1.47.1 | |||
async-std |
There are 3 modules: spsc, mpsc, mpmc, providing functions to allocate different types of channels.
The SP or SC interface is only for non-concurrent operation. It's more memory-efficient than MP or MC implementations, and sometimes slightly faster.
The return types in these 3 modules are different:
-
mpmc::bounded_blocking() : (tx blocking, rx blocking)
-
mpmc::bounded_async() : (tx async, rx async)
-
mpmc::bounded_tx_async_rx_blocking() : (tx async, rx blocking)
-
mpmc::bounded_tx_blocking_rx_async() : (tx blocking, rx async)
-
mpmc::unbounded_blocking() : (tx non-blocking, rx blocking)
-
mpmc::unbounded_async() : (tx non-blocking, rx async)
NOTE : For a bounded channel, a 0 size case is not supported yet. (Temporary rewrite as 1 size).
Context | Sender (Producer) | Receiver (Consumer) | ||
---|---|---|---|---|
Single | Multiple | Single | Multiple | |
Blocking | BlockingTxTrait | BlockingRxTrait | ||
Tx | MTx | Rx | MRx | |
Async | AsyncTxTrait | AsyncRxTrait | ||
AsyncTx | MAsyncTx | AsyncRx | MAsyncRx |
For the SP / SC version, AsyncTx
, AsyncRx
, Tx
, and Rx
are not Clone
and without Sync
.
Although can be moved to other threads, but not allowed to use send/recv while in an Arc.
(Refer to the compile_fail examples in the type document).
The benefit of using the SP / SC API is completely lockless waker registration, in exchange for a performance boost.
The sender/receiver can use the From
trait to convert between blocking and async context
counterparts.
Error types are the same as crossbeam-channel: TrySendError
, SendError
, TryRecvError
, RecvError
-
tokio
: Enable send_timeout, recv_timeout API for async context, based ontokio
. And will detect the right backoff strategy for the type of runtime (multi-threaded / current-thread). -
async_std
: Enable send_timeout, recv_timeout API for async context, based onasync-std
.
Tested on tokio-1.x and async-std-1.x, crossfire is runtime-agnostic.
The following scenarios are considered:
-
The
AsyncTx::send()
andAsyncRx:recv()
operations are cancellation-safe in an async context. You can safely use the select! macro and timeout() function in tokio/futures in combination with recv(). On cancellation, [SendFuture] and [RecvFuture] will trigger drop(), which will clean up the state of the waker, making sure there is no mem-leak and deadlock. But you cannot know the true result from SendFuture, since it's dropped upon cancellation. Thus, we suggest usingAsyncTx::send_timeout()
instead. -
When the "tokio" or "async_std" feature is enabled, we also provide two additional functions:
-
AsyncTx::send_timeout()
, which will return the message that failed to be sent in [SendTimeoutError]. We guarantee the result is atomic. Alternatively, you can useAsyncTx::send_with_timer()
. -
AsyncRx::recv_timeout()
, we guarantee the result is atomic. Alternatively, you can usecrate::AsyncRx::recv_with_timer()
.
-
Between blocking context and async context, and between different async runtime instances.
-
The async waker footprint.
When using a multi-producer and multi-consumer scenario, there's a small memory overhead to pass along a Weak
reference of wakers.
Because we aim to be lockless, when the sending/receiving futures are canceled (like tokio::time::timeout()),
it might trigger an immediate cleanup if the try-lock is successful, otherwise will rely on lazy cleanup.
(This won't be an issue because weak wakers will be consumed by actual message send and recv).
On an idle-select scenario, like a notification for close, the waker will be reused as much as possible
if poll() returns pending.
Cargo.toml:
[dependencies]
crossfire = "2.1"
extern crate crossfire;
use crossfire::*;
#[macro_use]
extern crate tokio;
use tokio::time::{sleep, interval, Duration};
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::bounded_async::<i32>(100);
for _ in 0..10 {
let _tx = tx.clone();
tokio::spawn(async move {
for i in 0i32..10 {
let _ = _tx.send(i).await;
sleep(Duration::from_millis(100)).await;
println!("sent {}", i);
}
});
}
drop(tx);
let mut inv = tokio::time::interval(Duration::from_millis(500));
loop {
tokio::select! {
_ = inv.tick() =>{
println!("tick");
}
r = rx.recv() => {
if let Ok(_i) = r {
println!("recv {}", _i);
} else {
println!("rx closed");
break;
}
}
}
}
}