From ee2578392d261adbb2741c5543c7dcb58159e541 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Mon, 31 Jul 2023 17:11:31 -0700 Subject: [PATCH 1/4] checkpoint, compiles --- src/queue.rs | 45 +++++++++++++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 12 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 0cfe9f53..bdb3f4a6 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -12,6 +12,14 @@ use crossbeam_channel::{unbounded as channel, Receiver, Sender}; #[cfg(not(feature = "crossbeam-channel"))] use std::sync::mpsc::{channel, Receiver, Sender}; +type BoxedSource = Box + Send>; + +trait InputQueue { + fn keep_alive_if_empty(&self) -> bool; + fn has_next(&self) -> bool; + fn next(&self) -> Option<(BoxedSource, Option>)>; +} + /// Builds a new queue. It consists of an input and an output. /// /// The input can be used to add sounds to the end of the queue, while the output implements @@ -45,7 +53,7 @@ where /// The input of the queue. pub struct SourcesQueueInput { - next_sounds: Mutex + Send>, Option>)>>, + next_sounds: Mutex, Option>)>>, // See constructor. keep_alive_if_empty: AtomicBool, @@ -101,6 +109,24 @@ where len } } + +impl InputQueue for SourcesQueueInput +where + S: Sample + Send + 'static, +{ + fn keep_alive_if_empty(&self) -> bool { + self.keep_alive_if_empty.load(Ordering::Acquire) + } + + fn has_next(&self) -> bool { + false + } + + fn next(&self) -> Option<(BoxedSource, Option>)> { + None + } +} + /// The output of the queue. Implements `Source`. pub struct SourcesQueueOutput { // The current iterator that produces samples. @@ -110,7 +136,7 @@ pub struct SourcesQueueOutput { signal_after_end: Option>, // The next sounds. - input: Arc>, + input: Arc + Send + Sync>, } const THRESHOLD: usize = 512; @@ -135,9 +161,7 @@ where if let Some(val) = self.current.current_frame_len() { if val != 0 { return Some(val); - } else if self.input.keep_alive_if_empty.load(Ordering::Acquire) - && self.input.next_sounds.lock().unwrap().is_empty() - { + } else if self.input.keep_alive_if_empty() && self.input.has_next() { // The next source will be a filler silence which will have the length of `THRESHOLD` return Some(THRESHOLD); } @@ -212,19 +236,16 @@ where let _ = signal_after_end.send(()); } - let (next, signal_after_end) = { - let mut next = self.input.next_sounds.lock().unwrap(); - - if next.len() == 0 { + let (next, signal_after_end) = match self.input.next() { + Some(t) => t, + None => { let silence = Box::new(Zero::::new_samples(1, 44100, THRESHOLD)) as Box<_>; - if self.input.keep_alive_if_empty.load(Ordering::Acquire) { + if self.input.keep_alive_if_empty() { // Play a short silence in order to avoid spinlocking. (silence, None) } else { return Err(()); } - } else { - next.remove(0) } }; From f17903523078a6c19d7ad38577817a460cf261ab Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Mon, 31 Jul 2023 17:19:01 -0700 Subject: [PATCH 2/4] actually implement for existing queue --- src/queue.rs | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index bdb3f4a6..5b9bfc74 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -119,11 +119,16 @@ where } fn has_next(&self) -> bool { - false + self.next_sounds.lock().unwrap().len() > 0 } fn next(&self) -> Option<(BoxedSource, Option>)> { - None + let mut next = self.next_sounds.lock().unwrap(); + if next.len() > 0 { + Some(next.remove(0)) + } else { + None + } } } From 597cc748e7f886eea58f0547f230ecad72fc6f97 Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 1 Aug 2023 10:37:31 -0700 Subject: [PATCH 3/4] add id_queue with some tests --- src/queue.rs | 174 ++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 171 insertions(+), 3 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 5b9bfc74..949122a3 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -13,11 +13,12 @@ use crossbeam_channel::{unbounded as channel, Receiver, Sender}; use std::sync::mpsc::{channel, Receiver, Sender}; type BoxedSource = Box + Send>; +type QueueNextItem = (BoxedSource, Option>); trait InputQueue { fn keep_alive_if_empty(&self) -> bool; fn has_next(&self) -> bool; - fn next(&self) -> Option<(BoxedSource, Option>)>; + fn next(&self) -> Option>; } /// Builds a new queue. It consists of an input and an output. @@ -49,11 +50,30 @@ where (input, output) } +pub fn id_queue(keep_alive_if_empty: bool) -> (Arc>, SourcesQueueOutput) +where + S: Sample + Send + 'static, + I: Eq + PartialEq + Send + 'static, +{ + let input = Arc::new(SourcesIdQueueInput { + next_sounds: Mutex::new(Vec::new()), + keep_alive_if_empty: AtomicBool::new(keep_alive_if_empty), + }); + + let output = SourcesQueueOutput { + current: Box::new(Empty::::new()) as Box<_>, + signal_after_end: None, + input: input.clone(), + }; + + (input, output) +} + // TODO: consider reimplementing this with `from_factory` /// The input of the queue. pub struct SourcesQueueInput { - next_sounds: Mutex, Option>)>>, + next_sounds: Mutex>>, // See constructor. keep_alive_if_empty: AtomicBool, @@ -122,7 +142,7 @@ where self.next_sounds.lock().unwrap().len() > 0 } - fn next(&self) -> Option<(BoxedSource, Option>)> { + fn next(&self) -> Option> { let mut next = self.next_sounds.lock().unwrap(); if next.len() > 0 { Some(next.remove(0)) @@ -132,6 +152,100 @@ where } } +/// A queue input that can associate ids with each item. This allows the queue to be used more like +/// a playlist where items can be removed and/or reordered +pub struct SourcesIdQueueInput { + next_sounds: Mutex)>>, + + // See constructor. + keep_alive_if_empty: AtomicBool, +} + +impl SourcesIdQueueInput +where + S: Sample + Send + 'static, + I: Eq + PartialEq, +{ + /// Adds a new source to the end of the queue. + #[inline] + pub fn append(&self, id: I, source: T) + where + T: Source + Send + 'static, + { + self.next_sounds + .lock() + .unwrap() + .push((id, (Box::new(source) as Box<_>, None))); + } + + /// Adds a new source to the end of the queue. + /// + /// The `Receiver` will be signalled when the sound has finished playing. + /// + /// Enable the feature flag `crossbeam-channel` in rodio to use a `crossbeam_channel::Receiver` instead. + #[inline] + pub fn append_with_signal(&self, id: I, source: T) -> Receiver<()> + where + T: Source + Send + 'static, + { + let (tx, rx) = channel(); + self.next_sounds + .lock() + .unwrap() + .push((id, (Box::new(source) as Box<_>, Some(tx)))); + rx + } + + /// Remove the item with id `id` from the queue + #[inline] + pub fn remove(&self, id: I) { + let mut next = self.next_sounds.lock().unwrap(); + next.retain(|i| i.0 != id); + } + + /// Swap item having id `id_a` with the item having id `id_b`. If either item does not exist, + /// this is a no-op + pub fn swap(&self, id_a: I, id_b: I) { + let mut next_sounds = self.next_sounds.lock().unwrap(); + let mut index_a = None; + let mut index_b = None; + let mut p = 0; + for (id, _) in next_sounds.iter() { + if index_a.is_none() && *id == id_a { + index_a = Some(p); + } else if index_b.is_none() && *id == id_b { + index_b = Some(p); + } + p+=1; + } + if let (Some(index_a), Some(index_b)) = (index_a, index_b) { + next_sounds.swap(index_a, index_b); + } + } +} + +impl InputQueue for SourcesIdQueueInput +where + S: Sample + Send + 'static, +{ + fn keep_alive_if_empty(&self) -> bool { + self.keep_alive_if_empty.load(Ordering::Acquire) + } + + fn has_next(&self) -> bool { + self.next_sounds.lock().unwrap().len() > 0 + } + + fn next(&self) -> Option> { + let mut next = self.next_sounds.lock().unwrap(); + if next.len() > 0 { + Some(next.remove(0).1) + } else { + None + } + } +} + /// The output of the queue. Implements `Source`. pub struct SourcesQueueOutput { // The current iterator that produces samples. @@ -289,6 +403,60 @@ mod tests { assert_eq!(rx.next(), None); } + #[test] + fn simple_id() { + let (tx, mut rx) = queue::id_queue(false); + let s1 = "sb1".to_string(); + let s2 = "sb2".to_string(); + let v1 = vec![10i16, -10, 10, -10]; + let v2 = vec![10i16, -9, 9, -9]; + tx.append(s1, SamplesBuffer::new(1, 48000, v1.clone())); + tx.append(s2, SamplesBuffer::new(1, 48000, v2.clone())); + assert_eq!(rx.channels(), 1); + assert_eq!(rx.sample_rate(), 48000); + for v in v1.into_iter().chain(v2.into_iter()) { + assert_eq!(rx.next(), Some(v)); + } + } + + #[test] + fn id_with_remove() { + let (tx, mut rx) = queue::id_queue(false); + let s1 = "sb1".to_string(); + let s2 = "sb2".to_string(); + tx.append(s1, SamplesBuffer::new(1, 48000, vec![10i16, -10, 10, -10])); + tx.append(s2, SamplesBuffer::new(1, 48000, vec![10i16, -9, 9, -9])); + assert_eq!(rx.channels(), 1); + assert_eq!(rx.sample_rate(), 48000); + assert_eq!(rx.next(), Some(10)); + assert_eq!(rx.next(), Some(-10)); + tx.remove("sb2".to_string()); + assert_eq!(rx.next(), Some(10)); + assert_eq!(rx.next(), Some(-10)); + assert_eq!(rx.next(), None); + } + + #[test] + fn id_swap() { + let (tx, mut rx) = queue::id_queue(false); + let s1 = "sb1".to_string(); + let s2 = "sb2".to_string(); + let s3 = "sb3".to_string(); + let v1 = vec![10i16, -10, 10, -10]; + let v2 = vec![10i16, -9, 9, -9]; + let v3 = vec![12i16, -12, 12, -12]; + tx.append(s1, SamplesBuffer::new(1, 48000, v1.clone())); + tx.append(s2.clone(), SamplesBuffer::new(1, 48000, v2.clone())); + tx.append(s3.clone(), SamplesBuffer::new(1, 48000, v3.clone())); + assert_eq!(rx.channels(), 1); + assert_eq!(rx.sample_rate(), 48000); + tx.swap(s3, s2); + for v in v1.into_iter().chain(v3.into_iter()).chain(v2.into_iter()) { + assert_eq!(rx.next(), Some(v)); + } + } + + #[test] fn immediate_end() { let (_, mut rx) = queue::queue::(false); From 1cb2d91d3f600bdbfba786bfbf089a33c9aa792b Mon Sep 17 00:00:00 2001 From: Nick Lanham Date: Tue, 1 Aug 2023 10:45:44 -0700 Subject: [PATCH 4/4] fmt --- src/queue.rs | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 949122a3..44e667f3 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -50,10 +50,12 @@ where (input, output) } -pub fn id_queue(keep_alive_if_empty: bool) -> (Arc>, SourcesQueueOutput) +pub fn id_queue( + keep_alive_if_empty: bool, +) -> (Arc>, SourcesQueueOutput) where - S: Sample + Send + 'static, - I: Eq + PartialEq + Send + 'static, + S: Sample + Send + 'static, + I: Eq + PartialEq + Send + 'static, { let input = Arc::new(SourcesIdQueueInput { next_sounds: Mutex::new(Vec::new()), @@ -137,7 +139,7 @@ where fn keep_alive_if_empty(&self) -> bool { self.keep_alive_if_empty.load(Ordering::Acquire) } - + fn has_next(&self) -> bool { self.next_sounds.lock().unwrap().len() > 0 } @@ -216,7 +218,7 @@ where } else if index_b.is_none() && *id == id_b { index_b = Some(p); } - p+=1; + p += 1; } if let (Some(index_a), Some(index_b)) = (index_a, index_b) { next_sounds.swap(index_a, index_b); @@ -231,7 +233,7 @@ where fn keep_alive_if_empty(&self) -> bool { self.keep_alive_if_empty.load(Ordering::Acquire) } - + fn has_next(&self) -> bool { self.next_sounds.lock().unwrap().len() > 0 } @@ -456,7 +458,6 @@ mod tests { } } - #[test] fn immediate_end() { let (_, mut rx) = queue::queue::(false);