Skip to content

Commit

Permalink
Make librespot_playback work
Browse files Browse the repository at this point in the history
  • Loading branch information
Johannesd3 committed Jan 25, 2021
1 parent 6c9d8c8 commit fe37186
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 68 deletions.
8 changes: 5 additions & 3 deletions playback/src/audio_backend/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ pub trait Sink {
fn write(&mut self, data: &[i16]) -> io::Result<()>;
}

fn mk_sink<S: Sink + Open + 'static>(device: Option<String>) -> Box<dyn Sink> {
pub type SinkBuilder = fn(Option<String>) -> Box<dyn Sink + Send>;

fn mk_sink<S: Sink + Open + Send + 'static>(device: Option<String>) -> Box<dyn Sink + Send> {
Box::new(S::open(device))
}

Expand Down Expand Up @@ -54,7 +56,7 @@ use self::pipe::StdoutSink;
mod subprocess;
use self::subprocess::SubprocessSink;

pub const BACKENDS: &'static [(&'static str, fn(Option<String>) -> Box<dyn Sink>)] = &[
pub const BACKENDS: &'static [(&'static str, SinkBuilder)] = &[
#[cfg(feature = "alsa-backend")]
("alsa", mk_sink::<AlsaSink>),
#[cfg(feature = "portaudio-backend")]
Expand All @@ -73,7 +75,7 @@ pub const BACKENDS: &'static [(&'static str, fn(Option<String>) -> Box<dyn Sink>
("subprocess", mk_sink::<SubprocessSink>),
];

pub fn find(name: Option<String>) -> Option<fn(Option<String>) -> Box<dyn Sink>> {
pub fn find(name: Option<String>) -> Option<SinkBuilder> {
if let Some(name) = name {
BACKENDS
.iter()
Expand Down
2 changes: 1 addition & 1 deletion playback/src/audio_backend/pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::io::{self, Write};
use std::mem;
use std::slice;

pub struct StdoutSink(Box<dyn Write>);
pub struct StdoutSink(Box<dyn Write + Send>);

impl Open for StdoutSink {
fn open(path: Option<String>) -> StdoutSink {
Expand Down
122 changes: 59 additions & 63 deletions playback/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::audio::{
};
use crate::audio_backend::Sink;
use crate::config::{Bitrate, PlayerConfig};
use crate::librespot_core::tokio;
use crate::metadata::{AudioItem, FileFormat};
use crate::mixer::AudioFilter;
use librespot_core::session::Session;
Expand All @@ -19,7 +20,6 @@ use futures::{
};
use std::io::{Read, Seek, SeekFrom};
use std::mem;
use std::thread;
use std::time::{Duration, Instant};
use std::{borrow::Cow, io};
use std::{
Expand All @@ -32,7 +32,7 @@ const PRELOAD_NEXT_TRACK_BEFORE_END_DURATION_MS: u32 = 30000;

pub struct Player {
commands: Option<mpsc::UnboundedSender<PlayerCommand>>,
thread_handle: Option<thread::JoinHandle<()>>,
task_handle: Option<tokio::task::JoinHandle<()>>,
play_request_id_generator: SeqGenerator<u64>,
}

Expand All @@ -52,7 +52,7 @@ struct PlayerInternal {

state: PlayerState,
preload: PlayerPreload,
sink: Box<dyn Sink>,
sink: Box<dyn Sink + Send>,
sink_status: SinkStatus,
sink_event_callback: Option<SinkEventCallback>,
audio_filter: Option<Box<dyn AudioFilter + Send>>,
Expand Down Expand Up @@ -242,38 +242,38 @@ impl Player {
sink_builder: F,
) -> (Player, PlayerEventChannel)
where
F: FnOnce() -> Box<dyn Sink> + Send + 'static,
F: FnOnce() -> Box<dyn Sink + Send> + Send + 'static,
{
let (cmd_tx, cmd_rx) = mpsc::unbounded();
let (event_sender, event_receiver) = mpsc::unbounded();

let handle = thread::spawn(move || {
debug!("new Player[{}]", session.session_id());

let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,

state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};
debug!("new Player[{}]", session.session_id());

let internal = PlayerInternal {
session: session,
config: config,
commands: cmd_rx,

state: PlayerState::Stopped,
preload: PlayerPreload::None,
sink: sink_builder(),
sink_status: SinkStatus::Closed,
sink_event_callback: None,
audio_filter: audio_filter,
event_senders: [event_sender].to_vec(),
};

// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
todo!("How to block in futures 0.3?");
// While PlayerInternal is written as a future, it still contains blocking code.
// It must be run by using wait() in a dedicated thread.
let handle = tokio::spawn(async move {
internal.await;
debug!("PlayerInternal thread finished.");
});

(
Player {
commands: Some(cmd_tx),
thread_handle: Some(handle),
task_handle: Some(handle),
play_request_id_generator: SeqGenerator::new(0),
},
event_receiver,
Expand Down Expand Up @@ -347,11 +347,13 @@ impl Drop for Player {
fn drop(&mut self) {
debug!("Shutting down player thread ...");
self.commands = None;
if let Some(handle) = self.thread_handle.take() {
match handle.join() {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
if let Some(handle) = self.task_handle.take() {
tokio::spawn(async {
match handle.await {
Ok(_) => (),
Err(_) => error!("Player thread panicked!"),
}
});
}
}
}
Expand All @@ -369,11 +371,11 @@ enum PlayerPreload {
None,
Loading {
track_id: SpotifyId,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>>>>,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
},
Ready {
track_id: SpotifyId,
loaded_track: PlayerLoadedTrackData,
loaded_track: Box<PlayerLoadedTrackData>,
},
}

Expand All @@ -385,7 +387,7 @@ enum PlayerState {
track_id: SpotifyId,
play_request_id: u64,
start_playback: bool,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>>>>,
loader: Pin<Box<dyn Future<Output = Result<PlayerLoadedTrackData, ()>> + Send>>,
},
Paused {
track_id: SpotifyId,
Expand Down Expand Up @@ -430,23 +432,15 @@ impl PlayerState {

#[allow(dead_code)]
fn is_stopped(&self) -> bool {
use self::PlayerState::*;
match *self {
Stopped => true,
_ => false,
}
matches!(self, Self::Stopped)
}

fn is_loading(&self) -> bool {
use self::PlayerState::*;
match *self {
Loading { .. } => true,
_ => false,
}
matches!(self, Self::Loading { .. })
}

fn decoder(&mut self) -> Option<&mut Decoder> {
use self::PlayerState::*;
use PlayerState::*;
match *self {
Stopped | EndOfTrack { .. } | Loading { .. } => None,
Paused {
Expand Down Expand Up @@ -575,10 +569,10 @@ struct PlayerTrackLoader {
}

impl PlayerTrackLoader {
async fn find_available_alternative<'a>(
&self,
audio: &'a AudioItem,
) -> Option<Cow<'a, AudioItem>> {
async fn find_available_alternative<'a, 'b>(
&'a self,
audio: &'b AudioItem,
) -> Option<Cow<'b, AudioItem>> {
if audio.available {
Some(Cow::Borrowed(audio))
} else if let Some(alternatives) = &audio.alternatives {
Expand Down Expand Up @@ -716,7 +710,7 @@ impl PlayerTrackLoader {
}
Err(_) => {
warn!("Unable to extract normalisation data, using default value.");
1.0 as f32
1.0_f32
}
};

Expand Down Expand Up @@ -811,7 +805,7 @@ impl Future for PlayerInternal {
self.send_event(PlayerEvent::Preloading { track_id });
self.preload = PlayerPreload::Ready {
track_id,
loaded_track,
loaded_track: Box::new(loaded_track),
};
}
Poll::Ready(Err(_)) => {
Expand Down Expand Up @@ -1061,7 +1055,7 @@ impl PlayerInternal {
fn handle_packet(&mut self, packet: Option<VorbisPacket>, normalisation_factor: f32) {
match packet {
Some(mut packet) => {
if packet.data().len() > 0 {
if !packet.data().is_empty() {
if let Some(ref editor) = self.audio_filter {
editor.modify_stream(&mut packet.data_mut())
};
Expand Down Expand Up @@ -1216,10 +1210,9 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking.
// But most likely the track is fully
// loaded already because we played
// to the end of it.
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
loaded_track.stream_loader_controller.set_stream_mode();
loaded_track.stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
Expand Down Expand Up @@ -1252,7 +1245,7 @@ impl PlayerInternal {
// we can use the current decoder. Ensure it's at the correct position.
if Self::position_ms_to_pcm(position_ms) != *stream_position_pcm {
stream_loader_controller.set_random_access_mode();
let _ = decoder.seek(position_ms as i64); // This may be blocking.
let _ = tokio::task::block_in_place(|| decoder.seek(position_ms as i64));
stream_loader_controller.set_stream_mode();
*stream_position_pcm = Self::position_ms_to_pcm(position_ms);
}
Expand Down Expand Up @@ -1320,10 +1313,12 @@ impl PlayerInternal {
loaded_track
.stream_loader_controller
.set_random_access_mode();
let _ = loaded_track.decoder.seek(position_ms as i64); // This may be blocking
let _ = tokio::task::block_in_place(|| {
loaded_track.decoder.seek(position_ms as i64)
});
loaded_track.stream_loader_controller.set_stream_mode();
}
self.start_playback(track_id, play_request_id, loaded_track, play);
self.start_playback(track_id, play_request_id, *loaded_track, play);
return;
} else {
unreachable!();
Expand Down Expand Up @@ -1539,7 +1534,7 @@ impl PlayerInternal {
&self,
spotify_id: SpotifyId,
position_ms: u32,
) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + 'static {
) -> impl Future<Output = Result<PlayerLoadedTrackData, ()>> + Send + 'static {
// This method creates a future that returns the loaded stream and associated info.
// Ideally all work should be done using asynchronous code. However, seek() on the
// audio stream is implemented in a blocking fashion. Thus, we can't turn it into future
Expand All @@ -1554,11 +1549,10 @@ impl PlayerInternal {

let (result_tx, result_rx) = oneshot::channel();

std::thread::spawn(move || {
todo!("How to block in futures 0.3?")
/*if let Some(data) = block_on(loader.load_track(spotify_id, position_ms)) {
tokio::spawn(async move {
if let Some(data) = loader.load_track(spotify_id, position_ms).await {
let _ = result_tx.send(data);
}*/
}
});

result_rx.await.map_err(|_| ())
Expand Down Expand Up @@ -1588,7 +1582,9 @@ impl PlayerInternal {
* bytes_per_second as f64) as usize,
(READ_AHEAD_BEFORE_PLAYBACK_SECONDS * bytes_per_second as f64) as usize,
);
stream_loader_controller.fetch_next_blocking(wait_for_data_length);
tokio::task::block_in_place(|| {
stream_loader_controller.fetch_next_blocking(wait_for_data_length)
});
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
#![cfg_attr(feature = "cargo-clippy", allow(unused_io_amount))]

pub extern crate librespot_audio as audio;
pub extern crate librespot_connect as connect;
// pub extern crate librespot_connect as connect;
pub extern crate librespot_core as core;
pub extern crate librespot_metadata as metadata;
pub extern crate librespot_playback as playback;
Expand Down

0 comments on commit fe37186

Please sign in to comment.