Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make pools pollable #10

Merged
merged 20 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions .github/workflows/wasmtime.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ jobs:
- name: Install wasmtime (ubuntu)
if: startsWith(matrix.os, 'ubuntu-')
run: |
curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasmtime-v14.0.4-x86_64-linux.tar.xz
tar xvJf wasmtime-v14.0.4-x86_64-linux.tar.xz
echo "WASMTIME=$(pwd -P)/wasmtime-v14.0.4-x86_64-linux/wasmtime" >> ${GITHUB_ENV}
curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasmtime-v15.0.1-x86_64-linux.tar.xz
tar xvJf wasmtime-v15.0.1-x86_64-linux.tar.xz
echo "WASMTIME=$(pwd -P)/wasmtime-v15.0.1-x86_64-linux/wasmtime" >> ${GITHUB_ENV}

- name: Install wasmtime (macOS)
if: startsWith(matrix.os, 'macos-')
run: |
curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasmtime-v14.0.4-x86_64-macos.tar.xz
tar xvzf wasmtime-v14.0.4-x86_64-macos.tar.xz
echo "WASMTIME=$(pwd -P)/wasmtime-v14.0.4-x86_64-macos/wasmtime" >> ${GITHUB_ENV}
curl -O -L https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasmtime-v15.0.1-x86_64-macos.tar.xz
tar xvzf wasmtime-v15.0.1-x86_64-macos.tar.xz
echo "WASMTIME=$(pwd -P)/wasmtime-v15.0.1-x86_64-macos/wasmtime" >> ${GITHUB_ENV}

- name: Build guest (Rust)
run: |
Expand Down
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
*/target/*
*.lock
2 changes: 1 addition & 1 deletion guest_c/build.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#! /bin/sh
set -e
if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then
curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasi_snapshot_preview1.reactor.wasm
curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasi_snapshot_preview1.reactor.wasm
fi

if [ ! -d libjpeg ]; then
Expand Down
31 changes: 23 additions & 8 deletions guest_c/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -156,24 +156,39 @@ exports_wasi_sensor_interface_main()

sensing_borrow_pool_t borrowed_pool =
wasi_buffer_pool_buffer_pool_borrow_pool(pool);
sensing_own_pollable_t poll =
wasi_buffer_pool_buffer_pool_method_pool_subscribe(
borrowed_pool);
sensing_borrow_pollable_t borrowed_poll =
wasi_io_0_2_0_rc_2023_11_10_poll_borrow_pollable(poll);
int n = 60;
int i;
for (i = 0; i < n; i++) {
wasi_buffer_pool_buffer_pool_frame_info_t frame;
if (!wasi_buffer_pool_buffer_pool_method_pool_block_read_frame(
borrowed_pool, &frame, &buffer_error)) {
for (i = 0; i < n;) {
wasi_io_0_2_0_rc_2023_11_10_poll_method_pollable_block(
borrowed_poll);
sensing_list_wasi_buffer_pool_buffer_pool_frame_info_t frames;
if (!wasi_buffer_pool_buffer_pool_method_pool_read_frames(
borrowed_pool, 1, &frames, &buffer_error)) {
fprintf(stderr, "block-read-frame failed (error %u)\n",
(unsigned int)buffer_error);
return false;
}
fprintf(stderr, "got a frame (%u/%u)\n", i + 1, n);
if (!process_frame_info(&frame)) {
return false;
size_t j;
for (j = 0; j < frames.len; j++) {
wasi_buffer_pool_buffer_pool_frame_info_t *frame =
&frames.ptr[j];
i++;
fprintf(stderr, "got a frame (%u/%u)\n", i, n);
if (!process_frame_info(frame)) {
return false;
}
}
wasi_buffer_pool_buffer_pool_frame_info_free(&frame);
sensing_list_wasi_buffer_pool_buffer_pool_frame_info_free(
&frames);
}

fprintf(stderr, "cleaning up\n");
wasi_io_0_2_0_rc_2023_11_10_poll_pollable_drop_own(poll);
wasi_sensor_sensor_device_drop_own(device);
wasi_buffer_pool_buffer_pool_pool_drop_own(pool);

Expand Down
2 changes: 1 addition & 1 deletion guest_rust/build.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#! /bin/sh
set -e
if [ ! -f wasi_snapshot_preview1.reactor.wasm ]; then
curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v14.0.4/wasi_snapshot_preview1.reactor.wasm
curl --fail -L -O https://github.com/bytecodealliance/wasmtime/releases/download/v15.0.1/wasi_snapshot_preview1.reactor.wasm
fi

release_opt=--release
Expand Down
13 changes: 10 additions & 3 deletions guest_rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,9 +194,16 @@ fn main2() -> Result<()> {

println!("starting sensor {:?}", sensor);
sensor.start(&pool_name)?;
for _ in 0..60 {
let frame = pool.block_read_frame()?;
process_frame(&frame)?;
let poll = pool.subscribe();
let mut n = 0;
while n < 60 {
poll.block();
let frames = pool.read_frames(1)?;
assert!(frames.len() == 1);
for ref frame in &frames {
process_frame(frame)?;
n += 1;
}
}
let stats = pool.get_statistics()?;
println!("pool statistics: {:?}", stats);
Expand Down
6 changes: 4 additions & 2 deletions host_wasmtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ fraction = "0.14"
tracing = { version = "0.1.40", features = ["max_level_trace"] }
tracing-subscriber = "0.3.18"
image = { version = "0.24.7", default-features = false }
async-trait = "0.1.71"
tokio = { version = "1.26.0", default-features = false }

# released versions of nokhwa is a bit broken for avfoundation.
# https://github.com/l1npengtul/nokhwa/pull/151
# https://github.com/l1npengtul/nokhwa/pull/152
nokhwa = {git = "https://github.com/yamt/nokhwa", rev = "0.10+fixes", features = ["input-native", "output-threaded"]}

# preview2 and component-model are still moving targets.
wasmtime = { version = "14.0.4", default-features = false, features = ["component-model", "cranelift"]}
wasmtime-wasi = { version = "14.0.4", default-features = false, features = ["preview2", "sync"] }
wasmtime = { version = "15.0.1", default-features = false, features = ["component-model", "cranelift"]}
wasmtime-wasi = { version = "15.0.1", default-features = false, features = ["preview2", "sync"] }
2 changes: 1 addition & 1 deletion host_wasmtime/src/dummy_device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ impl SensorDevice for DummyDevice {
thread::sleep(next_frame - now);
}
next_frame += config.frame_duration;
match pool.enqueue(Box::new(data), None) {
match pool.try_enqueue(Box::new(data), None) {
Ok(_) => println!("DummyDevice generated frame enqueued"),
_ => println!("DummyDevice generated frame dropped"),
}
Expand Down
71 changes: 47 additions & 24 deletions host_wasmtime/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ use wasmtime::{Config, Engine, Store};
use wasmtime_wasi::ambient_authority;
use wasmtime_wasi::preview2::DirPerms;
use wasmtime_wasi::preview2::FilePerms;
use wasmtime_wasi::preview2::Pollable;
use wasmtime_wasi::preview2::Subscribe;
use wasmtime_wasi::preview2::Table;
use wasmtime_wasi::preview2::WasiCtx;
use wasmtime_wasi::preview2::WasiCtxBuilder;
Expand All @@ -32,6 +34,7 @@ wasmtime::component::bindgen!({
with: {
"wasi:buffer-pool/buffer-pool/pool": Pool,
"wasi:sensor/sensor/device": Device,
"wasi:io/poll": wasmtime_wasi::preview2::bindings::io::poll,
},
});

Expand All @@ -42,8 +45,23 @@ trait WasiSensorView {

pub struct Pool {
name: String,
next_frame: Option<(u64, u64, Box<wasi::buffer_pool::buffer_pool::FrameData>)>,
pool: Arc<dyn traits::BufferPool + Send + Sync>,
}

#[async_trait::async_trait]
impl Subscribe for Pool {
async fn ready(&mut self) {
if self.next_frame.is_some() {
return;
}
// XXX this confuses the flow-control in the pool by 1 frame
let frame = self.pool.dequeue().await;
assert!(self.next_frame.is_none()); /* XXX */
self.next_frame = Some(frame);
}
}

pub struct Device {
device: Box<dyn SensorDevice + Send + Sync>,
}
Expand Down Expand Up @@ -93,44 +111,49 @@ impl<T: WasiSensorView> wasi::buffer_pool::buffer_pool::HostPool for T {
> {
let pool = SimplePool::new(mode, size as usize, buffer_num as usize)?;
let pool = Arc::new(pool);
let idx = self.table().push_resource(Pool {
let idx = self.table().push(Pool {
name: name.clone(),
next_frame: None,
pool: pool.clone(),
})?;
self.pools().insert(name, pool);
Ok(Ok(idx))
}

fn block_read_frame(
fn read_frames(
&mut self,
res: Resource<wasi::buffer_pool::buffer_pool::Pool>,
max_results: u32,
) -> Result<
Result<
wasi::buffer_pool::buffer_pool::FrameInfo,
Vec<wasi::buffer_pool::buffer_pool::FrameInfo>,
wasi::buffer_pool::buffer_pool::BufferError,
>,
> {
let pool = self.table().get_resource_mut(&res)?;
let (sequence_number, timestamp, data) = pool.pool.dequeue();
let pool = self.table().get_mut(&res)?;
if max_results == 0 {
return Ok(Ok(vec![]));
}
let (sequence_number, timestamp, data) = match pool.next_frame.take() {
Some(frame) => frame,
None => match pool.pool.try_dequeue() {
Some(frame) => frame,
None => return Ok(Ok(vec![])),
},
};
let frame = wasi::buffer_pool::buffer_pool::FrameInfo {
sequence_number: sequence_number,
timestamp: timestamp,
data: vec![*data],
};
Ok(Ok(frame))
Ok(Ok(vec![frame]))
}
fn poll_read_frame(

fn subscribe(
&mut self,
res: Resource<wasi::buffer_pool::buffer_pool::Pool>,
) -> Result<
Result<
wasi::buffer_pool::buffer_pool::FrameInfo,
wasi::buffer_pool::buffer_pool::BufferError,
>,
> {
Ok(Err(
wasi::buffer_pool::buffer_pool::BufferError::NotSupported,
))
) -> Result<Resource<Pollable>> {
wasmtime_wasi::preview2::subscribe(self.table(), res)
}

fn get_statistics(
Expand All @@ -142,7 +165,7 @@ impl<T: WasiSensorView> wasi::buffer_pool::buffer_pool::HostPool for T {
wasi::buffer_pool::buffer_pool::BufferError,
>,
> {
let pool = self.table().get_resource(&res)?;
let pool = self.table().get(&res)?;
let stats = pool.pool.get_statistics()?;
Ok(Ok(stats))
}
Expand All @@ -151,9 +174,9 @@ impl<T: WasiSensorView> wasi::buffer_pool::buffer_pool::HostPool for T {
&mut self,
res: Resource<wasi::buffer_pool::buffer_pool::Pool>,
) -> wasmtime::Result<()> {
let pool = self.table().get_resource(&res)?;
let pool = self.table().get(&res)?;
let name = pool.name.clone();
self.table().delete_resource(res)?;
self.table().delete(res)?;
self.pools().remove(&name);
Ok(())
}
Expand All @@ -180,7 +203,7 @@ impl<T: WasiSensorView> wasi::sensor::sensor::HostDevice for T {
let device = Device {
device: device_impl,
};
let idx = self.table().push_resource(device)?;
let idx = self.table().push(device)?;
Ok(Ok(idx))
}

Expand All @@ -198,7 +221,7 @@ impl<T: WasiSensorView> wasi::sensor::sensor::HostDevice for T {
_ => return Ok(Err(wasi::sensor::sensor::DeviceError::NotFound)),
};
let pool = Arc::clone(pool);
let device = self.table().get_resource_mut(&res)?;
let device = self.table().get_mut(&res)?;
Ok(device.device.start_streaming(pool))
}
fn stop(
Expand All @@ -213,7 +236,7 @@ impl<T: WasiSensorView> wasi::sensor::sensor::HostDevice for T {
key: wasi::sensor::property::PropertyKey,
value: wasi::sensor::property::PropertyValue,
) -> Result<Result<(), wasi::sensor::sensor::DeviceError>> {
let device = self.table().get_resource_mut(&res)?;
let device = self.table().get_mut(&res)?;
Ok(device.device.set_property(key, value))
}
fn get_property(
Expand All @@ -222,12 +245,12 @@ impl<T: WasiSensorView> wasi::sensor::sensor::HostDevice for T {
key: wasi::sensor::property::PropertyKey,
) -> Result<Result<wasi::sensor::property::PropertyValue, wasi::sensor::sensor::DeviceError>>
{
let device = self.table().get_resource_mut(&res)?;
let device = self.table().get_mut(&res)?;
Ok(device.device.get_property(key))
}
fn drop(&mut self, res: Resource<wasi::sensor::sensor::Device>) -> wasmtime::Result<()> {
trace!("dropping {:?}", res);
self.table().delete_resource(res)?;
self.table().delete(res)?;
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion host_wasmtime/src/nokhwa.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl SensorDevice for NokhwaDevice {
let data = wasi::buffer_pool::buffer_pool::FrameData::ByValue(
wasi::buffer_pool::data_types::DataType::Image(image),
);
match pool.enqueue(Box::new(data), None) {
match pool.try_enqueue(Box::new(data), None) {
Ok(_) => println!("NokhwaDevice frame enqueued"),
_ => println!("NokhwaDevice frame dropped"),
}
Expand Down
Loading