From 80d384e00164b61f4ef9c52acee345300e091341 Mon Sep 17 00:00:00 2001 From: johannesd3 Date: Thu, 21 Jan 2021 22:12:35 +0100 Subject: [PATCH] Migrated audio crate to futures 0.3 --- audio/Cargo.toml | 9 +- audio/src/decrypt.rs | 4 +- audio/src/fetch.rs | 759 +++++++++++++++++++++++-------------------- audio/src/lib.rs | 7 +- 4 files changed, 415 insertions(+), 364 deletions(-) diff --git a/audio/Cargo.toml b/audio/Cargo.toml index cde907c1a..f9d232b37 100644 --- a/audio/Cargo.toml +++ b/audio/Cargo.toml @@ -11,16 +11,17 @@ path = "../core" version = "0.1.3" [dependencies] +aes-ctr = "0.6" bit-set = "0.5" -byteorder = "1.3" -bytes = "0.4" -futures = "0.1" +byteorder = "1.4" +bytes = "1.0" +futures = "0.3" lewton = "0.9" log = "0.4" num-bigint = "0.3" num-traits = "0.2" +pin-project = "1.0" tempfile = "3.1" -aes-ctr = "0.3" librespot-tremor = { version = "0.1.0", optional = true } vorbis = { version ="0.0.14", optional = true } diff --git a/audio/src/decrypt.rs b/audio/src/decrypt.rs index 818eb34e2..616ef4f68 100644 --- a/audio/src/decrypt.rs +++ b/audio/src/decrypt.rs @@ -1,7 +1,7 @@ use std::io; -use aes_ctr::stream_cipher::generic_array::GenericArray; -use aes_ctr::stream_cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek}; +use aes_ctr::cipher::generic_array::GenericArray; +use aes_ctr::cipher::{NewStreamCipher, SyncStreamCipher, SyncStreamCipherSeek}; use aes_ctr::Aes128Ctr; use librespot_core::audio_key::AudioKey; diff --git a/audio/src/fetch.rs b/audio/src/fetch.rs index c47cb4d3b..5d15866c6 100644 --- a/audio/src/fetch.rs +++ b/audio/src/fetch.rs @@ -1,17 +1,25 @@ use crate::range_set::{Range, RangeSet}; use byteorder::{BigEndian, ByteOrder, WriteBytesExt}; use bytes::Bytes; -use futures::sync::{mpsc, oneshot}; -use futures::Stream; -use futures::{Async, Future, Poll}; -use std::cmp::{max, min}; +use futures::{ + channel::{mpsc, oneshot}, + future, +}; +use futures::{Future, Stream, StreamExt, TryFutureExt, TryStreamExt}; + use std::fs; use std::io::{self, Read, Seek, SeekFrom, Write}; use std::sync::{Arc, Condvar, Mutex}; +use std::task::Poll; use std::time::{Duration, Instant}; +use std::{ + cmp::{max, min}, + pin::Pin, + task::Context, +}; use tempfile::NamedTempFile; -use futures::sync::mpsc::unbounded; +use futures::channel::mpsc::unbounded; use librespot_core::channel::{Channel, ChannelData, ChannelError, ChannelHeaders}; use librespot_core::session::Session; use librespot_core::spotify_id::FileId; @@ -88,22 +96,6 @@ pub enum AudioFile { Streaming(AudioFileStreaming), } -pub enum AudioFileOpen { - Cached(Option), - Streaming(AudioFileOpenStreaming), -} - -pub struct AudioFileOpenStreaming { - session: Session, - initial_data_rx: Option, - initial_data_length: Option, - initial_request_sent_time: Instant, - headers: ChannelHeaders, - file_id: FileId, - complete_tx: Option>, - streaming_data_rate: usize, -} - enum StreamLoaderCommand { Fetch(Range), // signal the stream loader to fetch a range of the file RandomAccessMode(), // optimise download strategy for random access @@ -120,45 +112,36 @@ pub struct StreamLoaderController { impl StreamLoaderController { pub fn len(&self) -> usize { - return self.file_size; + self.file_size + } + + pub fn is_empty(&self) -> bool { + self.file_size == 0 } pub fn range_available(&self, range: Range) -> bool { if let Some(ref shared) = self.stream_shared { let download_status = shared.download_status.lock().unwrap(); - if range.length + range.length <= download_status .downloaded .contained_length_from_value(range.start) - { - return true; - } else { - return false; - } } else { - if range.length <= self.len() - range.start { - return true; - } else { - return false; - } + range.length <= self.len() - range.start } } pub fn range_to_end_available(&self) -> bool { - if let Some(ref shared) = self.stream_shared { + self.stream_shared.as_ref().map_or(true, |shared| { let read_position = shared.read_position.load(atomic::Ordering::Relaxed); self.range_available(Range::new(read_position, self.len() - read_position)) - } else { - true - } + }) } pub fn ping_time_ms(&self) -> usize { - if let Some(ref shared) = self.stream_shared { - return shared.ping_time_ms.load(atomic::Ordering::Relaxed); - } else { - return 0; - } + self.stream_shared.as_ref().map_or(0, |shared| { + shared.ping_time_ms.load(atomic::Ordering::Relaxed) + }) } fn send_stream_loader_command(&mut self, command: StreamLoaderCommand) { @@ -216,27 +199,23 @@ impl StreamLoaderController { } pub fn fetch_next(&mut self, length: usize) { - let range: Range = if let Some(ref shared) = self.stream_shared { - Range { + if let Some(ref shared) = self.stream_shared { + let range = Range { start: shared.read_position.load(atomic::Ordering::Relaxed), length: length, - } - } else { - return; - }; - self.fetch(range); + }; + self.fetch(range) + } } pub fn fetch_next_blocking(&mut self, length: usize) { - let range: Range = if let Some(ref shared) = self.stream_shared { - Range { + if let Some(ref shared) = self.stream_shared { + let range = Range { start: shared.read_position.load(atomic::Ordering::Relaxed), length: length, - } - } else { - return; - }; - self.fetch_blocking(range); + }; + self.fetch_blocking(range); + } } pub fn set_random_access_mode(&mut self) { @@ -288,12 +267,100 @@ struct AudioFileShared { read_position: AtomicUsize, } -impl AudioFileOpenStreaming { - fn finish(&mut self, size: usize) -> AudioFileStreaming { +impl AudioFile { + pub async fn open( + session: &Session, + file_id: FileId, + bytes_per_second: usize, + play_from_beginning: bool, + ) -> Result { + if let Some(file) = session.cache().and_then(|cache| cache.file(file_id)) { + debug!("File {} already in cache", file_id); + return Ok(AudioFile::Cached(file)); + } + + debug!("Downloading file {}", file_id); + + let (complete_tx, complete_rx) = oneshot::channel(); + let mut initial_data_length = if play_from_beginning { + INITIAL_DOWNLOAD_SIZE + + max( + (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, + (INITIAL_PING_TIME_ESTIMATE_SECONDS + * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS + * bytes_per_second as f64) as usize, + ) + } else { + INITIAL_DOWNLOAD_SIZE + }; + if initial_data_length % 4 != 0 { + initial_data_length += 4 - (initial_data_length % 4); + } + let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); + + let streaming = AudioFileStreaming::open( + session.clone(), + data, + initial_data_length, + Instant::now(), + headers, + file_id, + complete_tx, + bytes_per_second, + ); + + let session_ = session.clone(); + session.spawn(complete_rx.map_ok(move |mut file| { + if let Some(cache) = session_.cache() { + cache.save_file(file_id, &mut file); + debug!("File {} complete, saving to cache", file_id); + } else { + debug!("File {} complete", file_id); + } + })); + + Ok(AudioFile::Streaming(streaming.await?)) + } + + pub fn get_stream_loader_controller(&self) -> StreamLoaderController { + match self { + AudioFile::Streaming(ref stream) => StreamLoaderController { + channel_tx: Some(stream.stream_loader_command_tx.clone()), + stream_shared: Some(stream.shared.clone()), + file_size: stream.shared.file_size, + }, + AudioFile::Cached(ref file) => StreamLoaderController { + channel_tx: None, + stream_shared: None, + file_size: file.metadata().unwrap().len() as usize, + }, + } + } +} + +impl AudioFileStreaming { + pub async fn open( + session: Session, + initial_data_rx: ChannelData, + initial_data_length: usize, + initial_request_sent_time: Instant, + headers: ChannelHeaders, + file_id: FileId, + complete_tx: oneshot::Sender, + streaming_data_rate: usize, + ) -> Result { + let (_, data) = headers + .try_filter(|(id, _)| future::ready(*id == 0x3)) + .next() + .await + .unwrap()?; + + let size = BigEndian::read_u32(&data) as usize * 4; + let shared = Arc::new(AudioFileShared { - file_id: self.file_id, + file_id: file_id, file_size: size, - stream_data_rate: self.streaming_data_rate, + stream_data_rate: streaming_data_rate, cond: Condvar::new(), download_status: Mutex::new(AudioFileDownloadStatus { requested: RangeSet::new(), @@ -311,153 +378,29 @@ impl AudioFileOpenStreaming { let read_file = write_file.reopen().unwrap(); - let initial_data_rx = self.initial_data_rx.take().unwrap(); - let initial_data_length = self.initial_data_length.take().unwrap(); - let complete_tx = self.complete_tx.take().unwrap(); //let (seek_tx, seek_rx) = mpsc::unbounded(); let (stream_loader_command_tx, stream_loader_command_rx) = mpsc::unbounded::(); let fetcher = AudioFileFetch::new( - self.session.clone(), + session.clone(), shared.clone(), initial_data_rx, - self.initial_request_sent_time, + initial_request_sent_time, initial_data_length, write_file, stream_loader_command_rx, complete_tx, ); - self.session.spawn(move |_| fetcher); - AudioFileStreaming { + session.spawn(fetcher); + Ok(AudioFileStreaming { read_file: read_file, - position: 0, //seek: seek_tx, stream_loader_command_tx: stream_loader_command_tx, - shared: shared, - } - } -} - -impl Future for AudioFileOpen { - type Item = AudioFile; - type Error = ChannelError; - - fn poll(&mut self) -> Poll { - match *self { - AudioFileOpen::Streaming(ref mut open) => { - let file = try_ready!(open.poll()); - Ok(Async::Ready(AudioFile::Streaming(file))) - } - AudioFileOpen::Cached(ref mut file) => { - let file = file.take().unwrap(); - Ok(Async::Ready(AudioFile::Cached(file))) - } - } - } -} - -impl Future for AudioFileOpenStreaming { - type Item = AudioFileStreaming; - type Error = ChannelError; - - fn poll(&mut self) -> Poll { - loop { - let (id, data) = try_ready!(self.headers.poll()).unwrap(); - - if id == 0x3 { - let size = BigEndian::read_u32(&data) as usize * 4; - let file = self.finish(size); - - return Ok(Async::Ready(file)); - } - } - } -} - -impl AudioFile { - pub fn open( - session: &Session, - file_id: FileId, - bytes_per_second: usize, - play_from_beginning: bool, - ) -> AudioFileOpen { - let cache = session.cache().cloned(); - - if let Some(file) = cache.as_ref().and_then(|cache| cache.file(file_id)) { - debug!("File {} already in cache", file_id); - return AudioFileOpen::Cached(Some(file)); - } - - debug!("Downloading file {}", file_id); - - let (complete_tx, complete_rx) = oneshot::channel(); - let mut initial_data_length = if play_from_beginning { - INITIAL_DOWNLOAD_SIZE - + max( - (READ_AHEAD_DURING_PLAYBACK_SECONDS * bytes_per_second as f64) as usize, - (INITIAL_PING_TIME_ESTIMATE_SECONDS - * READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS - * bytes_per_second as f64) as usize, - ) - } else { - INITIAL_DOWNLOAD_SIZE - }; - if initial_data_length % 4 != 0 { - initial_data_length += 4 - (initial_data_length % 4); - } - let (headers, data) = request_range(session, file_id, 0, initial_data_length).split(); - - let open = AudioFileOpenStreaming { - session: session.clone(), - file_id: file_id, - - headers: headers, - initial_data_rx: Some(data), - initial_data_length: Some(initial_data_length), - initial_request_sent_time: Instant::now(), - - complete_tx: Some(complete_tx), - streaming_data_rate: bytes_per_second, - }; - - let session_ = session.clone(); - session.spawn(move |_| { - complete_rx - .map(move |mut file| { - if let Some(cache) = session_.cache() { - cache.save_file(file_id, &mut file); - debug!("File {} complete, saving to cache", file_id); - } else { - debug!("File {} complete", file_id); - } - }) - .or_else(|oneshot::Canceled| Ok(())) - }); - - return AudioFileOpen::Streaming(open); - } - - pub fn get_stream_loader_controller(&self) -> StreamLoaderController { - match self { - AudioFile::Streaming(ref stream) => { - return StreamLoaderController { - channel_tx: Some(stream.stream_loader_command_tx.clone()), - stream_shared: Some(stream.shared.clone()), - file_size: stream.shared.file_size, - }; - } - AudioFile::Cached(ref file) => { - return StreamLoaderController { - channel_tx: None, - stream_shared: None, - file_size: file.metadata().unwrap().len() as usize, - }; - } - } + }) } } @@ -502,141 +445,261 @@ enum ReceivedData { Data(PartialFileData), } -struct AudioFileFetchDataReceiver { +async fn audio_file_fetch_receive_data( shared: Arc, file_data_tx: mpsc::UnboundedSender, data_rx: ChannelData, initial_data_offset: usize, initial_request_length: usize, - data_offset: usize, - request_length: usize, - request_sent_time: Option, - measure_ping_time: bool, -} - -impl AudioFileFetchDataReceiver { - fn new( - shared: Arc, - file_data_tx: mpsc::UnboundedSender, - data_rx: ChannelData, - data_offset: usize, - request_length: usize, - request_sent_time: Instant, - ) -> AudioFileFetchDataReceiver { - let measure_ping_time = shared - .number_of_open_requests - .load(atomic::Ordering::SeqCst) - == 0; - - shared - .number_of_open_requests - .fetch_add(1, atomic::Ordering::SeqCst); - - AudioFileFetchDataReceiver { - shared: shared, - data_rx: data_rx, - file_data_tx: file_data_tx, - initial_data_offset: data_offset, - initial_request_length: request_length, - data_offset: data_offset, - request_length: request_length, - request_sent_time: Some(request_sent_time), - measure_ping_time: measure_ping_time, - } + request_sent_time: Instant, +) { + let mut data_offset = initial_data_offset; + let mut request_length = initial_request_length; + let mut measure_ping_time = shared + .number_of_open_requests + .load(atomic::Ordering::SeqCst) + == 0; + + shared + .number_of_open_requests + .fetch_add(1, atomic::Ordering::SeqCst); + + enum TryFoldErr { + ChannelError, + FinishEarly, } -} -impl AudioFileFetchDataReceiver { - fn finish(&mut self) { - if self.request_length > 0 { - let missing_range = Range::new(self.data_offset, self.request_length); + let result = data_rx + .map_err(|_| TryFoldErr::ChannelError) + .try_for_each(|data| { + if measure_ping_time { + let duration = Instant::now() - request_sent_time; + let duration_ms: u64; + if 0.001 * (duration.as_millis() as f64) + > MAXIMUM_ASSUMED_PING_TIME_SECONDS + { + duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; + } else { + duration_ms = duration.as_millis() as u64; + } + let _ = file_data_tx + .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); + measure_ping_time = false; + } + let data_size = data.len(); + let _ = file_data_tx + .unbounded_send(ReceivedData::Data(PartialFileData { + offset: data_offset, + data: data, + })); + data_offset += data_size; + if request_length < data_size { + warn!("Data receiver for range {} (+{}) received more data from server than requested.", initial_data_offset, initial_request_length); + request_length = 0; + } else { + request_length -= data_size; + } - let mut download_status = self.shared.download_status.lock().unwrap(); - download_status.requested.subtract_range(&missing_range); - self.shared.cond.notify_all(); - } + future::ready(if request_length == 0 { + Err(TryFoldErr::FinishEarly) + } else { + Ok(()) + }) + }) + .await; - self.shared - .number_of_open_requests - .fetch_sub(1, atomic::Ordering::SeqCst); + if request_length > 0 { + let missing_range = Range::new(data_offset, request_length); + + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.subtract_range(&missing_range); + shared.cond.notify_all(); + } + + shared + .number_of_open_requests + .fetch_sub(1, atomic::Ordering::SeqCst); + + if let Err(TryFoldErr::ChannelError) = result { + warn!( + "Error from channel for data receiver for range {} (+{}).", + initial_data_offset, initial_request_length + ); + } else if request_length > 0 { + warn!( + "Data receiver for range {} (+{}) received less data from server than requested.", + initial_data_offset, initial_request_length + ); } } +/* +async fn audio_file_fetch( + session: Session, + shared: Arc, + initial_data_rx: ChannelData, + initial_request_sent_time: Instant, + initial_data_length: usize, -impl Future for AudioFileFetchDataReceiver { - type Item = (); - type Error = (); + output: NamedTempFile, + stream_loader_command_rx: mpsc::UnboundedReceiver, + complete_tx: oneshot::Sender, +) { + let (file_data_tx, file_data_rx) = unbounded::(); + + let requested_range = Range::new(0, initial_data_length); + let mut download_status = shared.download_status.lock().unwrap(); + download_status.requested.add_range(&requested_range); + + session.spawn(audio_file_fetch_receive_data( + shared.clone(), + file_data_tx.clone(), + initial_data_rx, + 0, + initial_data_length, + initial_request_sent_time, + )); + + let mut network_response_times_ms: Vec::new(); + + let f1 = file_data_rx.map(|x| Ok::<_, ()>(x)).try_for_each(|x| { + match x { + ReceivedData::ResponseTimeMs(response_time_ms) => { + trace!("Ping time estimated as: {} ms.", response_time_ms); + + // record the response time + network_response_times_ms.push(response_time_ms); + + // prune old response times. Keep at most three. + while network_response_times_ms.len() > 3 { + network_response_times_ms.remove(0); + } - fn poll(&mut self) -> Poll<(), ()> { - loop { - match self.data_rx.poll() { - Ok(Async::Ready(Some(data))) => { - if self.measure_ping_time { - if let Some(request_sent_time) = self.request_sent_time { - let duration = Instant::now() - request_sent_time; - let duration_ms: u64; - if 0.001 * (duration.as_millis() as f64) - > MAXIMUM_ASSUMED_PING_TIME_SECONDS - { - duration_ms = (MAXIMUM_ASSUMED_PING_TIME_SECONDS * 1000.0) as u64; - } else { - duration_ms = duration.as_millis() as u64; - } - let _ = self - .file_data_tx - .unbounded_send(ReceivedData::ResponseTimeMs(duration_ms as usize)); - self.measure_ping_time = false; - } + // stats::median is experimental. So we calculate the median of up to three ourselves. + let ping_time_ms: usize = match network_response_times_ms.len() { + 1 => network_response_times_ms[0] as usize, + 2 => { + ((network_response_times_ms[0] + network_response_times_ms[1]) / 2) as usize } - let data_size = data.len(); - let _ = self - .file_data_tx - .unbounded_send(ReceivedData::Data(PartialFileData { - offset: self.data_offset, - data: data, - })); - self.data_offset += data_size; - if self.request_length < data_size { - warn!("Data receiver for range {} (+{}) received more data from server than requested.", self.initial_data_offset, self.initial_request_length); - self.request_length = 0; - } else { - self.request_length -= data_size; + 3 => { + let mut times = network_response_times_ms.clone(); + times.sort(); + times[1] } - if self.request_length == 0 { - self.finish(); - return Ok(Async::Ready(())); + _ => unreachable!(), + }; + + // store our new estimate for everyone to see + shared + .ping_time_ms + .store(ping_time_ms, atomic::Ordering::Relaxed); + } + ReceivedData::Data(data) => { + output + .as_mut() + .unwrap() + .seek(SeekFrom::Start(data.offset as u64)) + .unwrap(); + output + .as_mut() + .unwrap() + .write_all(data.data.as_ref()) + .unwrap(); + + let mut full = false; + + { + let mut download_status = shared.download_status.lock().unwrap(); + + let received_range = Range::new(data.offset, data.data.len()); + download_status.downloaded.add_range(&received_range); + shared.cond.notify_all(); + + if download_status.downloaded.contained_length_from_value(0) + >= shared.file_size + { + full = true; } + + drop(download_status); } - Ok(Async::Ready(None)) => { - if self.request_length > 0 { - warn!("Data receiver for range {} (+{}) received less data from server than requested.", self.initial_data_offset, self.initial_request_length); - } + + if full { self.finish(); - return Ok(Async::Ready(())); + return future::ready(Err(())); } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); - } - Err(ChannelError) => { - warn!( - "Error from channel for data receiver for range {} (+{}).", - self.initial_data_offset, self.initial_request_length + } + } + future::ready(Ok(())) + }); + + let f2 = stream_loader_command_rx.map(Ok::<_, ()>).try_for_each(|x| { + match cmd { + StreamLoaderCommand::Fetch(request) => { + self.download_range(request.start, request.length); + } + StreamLoaderCommand::RandomAccessMode() => { + *(shared.download_strategy.lock().unwrap()) = DownloadStrategy::RandomAccess(); + } + StreamLoaderCommand::StreamMode() => { + *(shared.download_strategy.lock().unwrap()) = DownloadStrategy::Streaming(); + } + StreamLoaderCommand::Close() => return future::ready(Err(())), + } + Ok(()) + }); + + let f3 = future::poll_fn(|_| { + if let DownloadStrategy::Streaming() = self.get_download_strategy() { + let number_of_open_requests = shared + .number_of_open_requests + .load(atomic::Ordering::SeqCst); + let max_requests_to_send = + MAX_PREFETCH_REQUESTS - min(MAX_PREFETCH_REQUESTS, number_of_open_requests); + + if max_requests_to_send > 0 { + let bytes_pending: usize = { + let download_status = shared.download_status.lock().unwrap(); + download_status + .requested + .minus(&download_status.downloaded) + .len() + }; + + let ping_time_seconds = + 0.001 * shared.ping_time_ms.load(atomic::Ordering::Relaxed) as f64; + let download_rate = session.channel().get_download_rate_estimate(); + + let desired_pending_bytes = max( + (PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * shared.stream_data_rate as f64) + as usize, + (FAST_PREFETCH_THRESHOLD_FACTOR * ping_time_seconds * download_rate as f64) + as usize, + ); + + if bytes_pending < desired_pending_bytes { + self.pre_fetch_more_data( + desired_pending_bytes - bytes_pending, + max_requests_to_send, ); - self.finish(); - return Ok(Async::Ready(())); } } } - } -} + Poll::Pending + }); + future::select_all(vec![f1, f2, f3]).await +}*/ +#[pin_project] struct AudioFileFetch { session: Session, shared: Arc, output: Option, file_data_tx: mpsc::UnboundedSender, + #[pin] file_data_rx: mpsc::UnboundedReceiver, + #[pin] stream_loader_command_rx: mpsc::UnboundedReceiver, complete_tx: Option>, network_response_times_ms: Vec, @@ -662,16 +725,14 @@ impl AudioFileFetch { download_status.requested.add_range(&requested_range); } - let initial_data_receiver = AudioFileFetchDataReceiver::new( + session.spawn(audio_file_fetch_receive_data( shared.clone(), file_data_tx.clone(), initial_data_rx, 0, initial_data_length, initial_request_sent_time, - ); - - session.spawn(move |_| initial_data_receiver); + )); AudioFileFetch { session: session, @@ -701,7 +762,7 @@ impl AudioFileFetch { return; } - if length <= 0 { + if length == 0 { return; } @@ -737,16 +798,14 @@ impl AudioFileFetch { download_status.requested.add_range(range); - let receiver = AudioFileFetchDataReceiver::new( + self.session.spawn(audio_file_fetch_receive_data( self.shared.clone(), self.file_data_tx.clone(), data, range.start, range.length, Instant::now(), - ); - - self.session.spawn(move |_| receiver); + )); } } @@ -794,13 +853,13 @@ impl AudioFileFetch { } } - fn poll_file_data_rx(&mut self) -> Poll<(), ()> { + + + fn poll_file_data_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { loop { - match self.file_data_rx.poll() { - Ok(Async::Ready(None)) => { - return Ok(Async::Ready(())); - } - Ok(Async::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms)))) => { + match Pin::new(&mut self.file_data_rx).poll_next(cx) { + Poll::Ready(None) => return Poll::Ready(()), + Poll::Ready(Some(ReceivedData::ResponseTimeMs(response_time_ms))) => { trace!("Ping time estimated as: {} ms.", response_time_ms); // record the response time @@ -821,7 +880,7 @@ impl AudioFileFetch { } 3 => { let mut times = self.network_response_times_ms.clone(); - times.sort(); + times.sort_unstable(); times[1] } _ => unreachable!(), @@ -832,7 +891,7 @@ impl AudioFileFetch { .ping_time_ms .store(ping_time_ms, atomic::Ordering::Relaxed); } - Ok(Async::Ready(Some(ReceivedData::Data(data)))) => { + Poll::Ready(Some(ReceivedData::Data(data))) => { self.output .as_mut() .unwrap() @@ -864,39 +923,40 @@ impl AudioFileFetch { if full { self.finish(); - return Ok(Async::Ready(())); + return Poll::Ready(()) } } - Ok(Async::NotReady) => { - return Ok(Async::NotReady); + Poll::Pending => { + return Poll::Pending } - Err(()) => unreachable!(), } } } - fn poll_stream_loader_command_rx(&mut self) -> Poll<(), ()> { + fn poll_stream_loader_command_rx(&mut self, cx: &mut Context<'_>) -> Poll<()> { loop { - match self.stream_loader_command_rx.poll() { - Ok(Async::Ready(None)) => { - return Ok(Async::Ready(())); - } - Ok(Async::Ready(Some(StreamLoaderCommand::Fetch(request)))) => { - self.download_range(request.start, request.length); - } - Ok(Async::Ready(Some(StreamLoaderCommand::RandomAccessMode()))) => { - *(self.shared.download_strategy.lock().unwrap()) = - DownloadStrategy::RandomAccess(); - } - Ok(Async::Ready(Some(StreamLoaderCommand::StreamMode()))) => { - *(self.shared.download_strategy.lock().unwrap()) = - DownloadStrategy::Streaming(); - } - Ok(Async::Ready(Some(StreamLoaderCommand::Close()))) => { - return Ok(Async::Ready(())); + match Pin::new(&mut self.stream_loader_command_rx).poll_next(cx) { + Poll::Ready(None) => + return Poll::Ready(()), + Poll::Ready(Some(cmd)) => { + match cmd { + StreamLoaderCommand::Fetch(request) => { + self.download_range(request.start, request.length); + } + StreamLoaderCommand::RandomAccessMode() => { + *(self.shared.download_strategy.lock().unwrap()) = + DownloadStrategy::RandomAccess(); + } + StreamLoaderCommand::StreamMode() => { + + *(self.shared.download_strategy.lock().unwrap()) = + DownloadStrategy::Streaming(); + } + StreamLoaderCommand::Close() => return Poll::Ready(()) + + } } - Ok(Async::NotReady) => return Ok(Async::NotReady), - Err(()) => unreachable!(), + Poll::Pending => return Poll::Pending } } } @@ -909,26 +969,16 @@ impl AudioFileFetch { let _ = complete_tx.send(output); } } - impl Future for AudioFileFetch { - type Item = (); - type Error = (); - - fn poll(&mut self) -> Poll<(), ()> { - match self.poll_stream_loader_command_rx() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(_)) => { - return Ok(Async::Ready(())); - } - Err(()) => unreachable!(), + type Output = (); + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> { + if let Poll::Ready(()) = self.poll_stream_loader_command_rx(cx) { + return Poll::Ready(()) } - match self.poll_file_data_rx() { - Ok(Async::NotReady) => (), - Ok(Async::Ready(_)) => { - return Ok(Async::Ready(())); - } - Err(()) => unreachable!(), + if let Poll::Ready(()) = self.poll_file_data_rx(cx) { + return Poll::Ready(()) } if let DownloadStrategy::Streaming() = self.get_download_strategy() { @@ -968,8 +1018,7 @@ impl Future for AudioFileFetch { } } } - - return Ok(Async::NotReady); + Poll::Pending } } @@ -1009,9 +1058,9 @@ impl Read for AudioFileStreaming { ranges_to_request.subtract_range_set(&download_status.downloaded); ranges_to_request.subtract_range_set(&download_status.requested); - for range in ranges_to_request.iter() { + for &range in ranges_to_request.iter() { self.stream_loader_command_tx - .unbounded_send(StreamLoaderCommand::Fetch(range.clone())) + .unbounded_send(StreamLoaderCommand::Fetch(range)) .unwrap(); } @@ -1058,7 +1107,7 @@ impl Read for AudioFileStreaming { .read_position .store(self.position as usize, atomic::Ordering::Relaxed); - return Ok(read_len); + Ok(read_len) } } diff --git a/audio/src/lib.rs b/audio/src/lib.rs index 3e13c079a..695558870 100644 --- a/audio/src/lib.rs +++ b/audio/src/lib.rs @@ -1,12 +1,13 @@ #[macro_use] -extern crate futures; -#[macro_use] extern crate log; +#[macro_use] +extern crate pin_project; extern crate aes_ctr; extern crate bit_set; extern crate byteorder; extern crate bytes; +extern crate futures; extern crate num_bigint; extern crate num_traits; extern crate tempfile; @@ -24,7 +25,7 @@ mod libvorbis_decoder; mod range_set; pub use decrypt::AudioDecrypt; -pub use fetch::{AudioFile, AudioFileOpen, StreamLoaderController}; +pub use fetch::{AudioFile, StreamLoaderController}; pub use fetch::{ READ_AHEAD_BEFORE_PLAYBACK_ROUNDTRIPS, READ_AHEAD_BEFORE_PLAYBACK_SECONDS, READ_AHEAD_DURING_PLAYBACK_ROUNDTRIPS, READ_AHEAD_DURING_PLAYBACK_SECONDS,