Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Tokio migration] Make RodioSink Send and other improvements #601

Merged
merged 4 commits into from
Feb 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion core/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn connect(addr: String, proxy: &Option<Url>) -> io::Result<Transport>
.map_err(|e| {
io::Error::new(io::ErrorKind::InvalidInput, format!("Invalid port: {}", e))
})?;

let host = split
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "Missing port"))?;
Expand Down
9 changes: 6 additions & 3 deletions playback/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,19 +29,22 @@ libpulse-binding = { version = "2.13", optional = true, default-features
libpulse-simple-binding = { version = "2.13", optional = true, default-features = false }
jack = { version = "0.6", optional = true }
libc = { version = "0.2", optional = true }
rodio = { version = "0.13", optional = true, default-features = false }
cpal = { version = "0.13", optional = true }
sdl2 = { version = "0.34", optional = true }
gstreamer = { version = "0.16", optional = true }
gstreamer-app = { version = "0.16", optional = true }
glib = { version = "0.10", optional = true }
zerocopy = { version = "0.3", optional = true }

# Rodio dependencies
rodio = { version = "0.13", optional = true, default-features = false }
cpal = { version = "0.13", optional = true }
thiserror = { version = "1", optional = true }

[features]
alsa-backend = ["alsa"]
portaudio-backend = ["portaudio-rs"]
pulseaudio-backend = ["libpulse-binding", "libpulse-simple-binding"]
jackaudio-backend = ["jack"]
rodio-backend = ["rodio", "cpal"]
rodio-backend = ["rodio", "cpal", "thiserror"]
sdl-backend = ["sdl2"]
gstreamer-backend = ["gstreamer", "gstreamer-app", "glib", "zerocopy"]
198 changes: 127 additions & 71 deletions playback/src/audio_backend/rodio.rs
Original file line number Diff line number Diff line change
@@ -1,109 +1,165 @@
use super::{Open, Sink};
extern crate cpal;
extern crate rodio;
use cpal::traits::{DeviceTrait, HostTrait};
use std::process::exit;
use std::{convert::Infallible, sync::mpsc};
use std::{io, thread, time};

use cpal::traits::{DeviceTrait, HostTrait};
use thiserror::Error;

use super::{Open, Sink};

#[derive(Debug, Error)]
pub enum RodioError {
#[error("Rodio: no device available")]
NoDeviceAvailable,
#[error("Rodio: device \"{0}\" is not available")]
DeviceNotAvailable(String),
#[error("Rodio play error: {0}")]
PlayError(#[from] rodio::PlayError),
#[error("Rodio stream error: {0}")]
StreamError(#[from] rodio::StreamError),
#[error("Cannot get audio devices: {0}")]
DevicesError(#[from] cpal::DevicesError),
}

pub struct RodioSink {
rodio_sink: rodio::Sink,
// We have to keep hold of this object, or the Sink can't play...
#[allow(dead_code)]
stream: rodio::OutputStream,

// will produce a TryRecvError on the receiver side when it is dropped.
_close_tx: mpsc::SyncSender<Infallible>,
}

fn list_formats(ref device: &rodio::Device) {
let default_fmt = match device.default_output_config() {
Ok(fmt) => cpal::SupportedStreamConfig::from(fmt),
Err(e) => {
warn!("Error getting default rodio::Sink config: {}", e);
return;
fn list_formats(device: &rodio::Device) {
match device.default_output_config() {
Ok(cfg) => {
debug!(" Default config:");
debug!(" {:?}", cfg);
}
};
debug!(" Default config:");
debug!(" {:?}", default_fmt);

let mut output_configs = match device.supported_output_configs() {
Ok(f) => f.peekable(),
Err(e) => {
warn!("Error getting supported rodio::Sink configs: {}", e);
return;
// Use loglevel debug, since even the output is only debug
debug!("Error getting default rodio::Sink config: {}", e);
}
};

if output_configs.peek().is_some() {
debug!(" Available configs:");
for format in output_configs {
debug!(" {:?}", format);
match device.supported_output_configs() {
Ok(mut cfgs) => {
if let Some(first) = cfgs.next() {
debug!(" Available configs:");
debug!(" {:?}", first);
} else {
return;
}

for cfg in cfgs {
debug!(" {:?}", cfg);
}
}
Err(e) => {
debug!("Error getting supported rodio::Sink configs: {}", e);
}
}
}

fn list_outputs() {
let default_device = get_default_device();
let default_device_name = default_device.name().expect("cannot get output name");
println!("Default Audio Device:\n {}", default_device_name);
list_formats(&default_device);

println!("Other Available Audio Devices:");
for device in cpal::default_host()
.output_devices()
.expect("cannot get list of output devices")
{
let device_name = device.name().expect("cannot get output name");
if device_name != default_device_name {
println!(" {}", device_name);
list_formats(&device);
fn list_outputs() -> Result<(), cpal::DevicesError> {
let mut default_device_name = None;

if let Some(default_device) = get_default_device() {
default_device_name = default_device.name().ok();
println!(
"Default Audio Device:\n {}",
default_device_name.as_deref().unwrap_or("[unknown name]")
);

list_formats(&default_device);

println!("Other Available Audio Devices:");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was println! before, but would this not make more sense to be info!? Like in general have no println! in the lib because it can not be filtered?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just executed in the case someone specifies "?" as device name and explicitly wishes to see the available devices.

} else {
warn!("No default device was found");
}

for device in cpal::default_host().output_devices()? {
match device.name() {
Ok(name) if Some(&name) == default_device_name.as_ref() => (),
Ok(name) => {
println!(" {}", name);
list_formats(&device);
}
Err(e) => {
warn!("Cannot get device name: {}", e);
println!(" [unknown name]");
list_formats(&device);
}
}
}

Ok(())
}

fn get_default_device() -> rodio::Device {
cpal::default_host()
.default_output_device()
.expect("no default output device available")
fn get_default_device() -> Option<rodio::Device> {
cpal::default_host().default_output_device()
}

fn match_device(device: Option<String>) -> rodio::Device {
match device {
Some(device_name) => {
if device_name == "?".to_string() {
list_outputs();
exit(0)
}
for d in cpal::default_host()
.output_devices()
.expect("cannot get list of output devices")
{
if d.name().expect("cannot get output name") == device_name {
return d;
fn create_sink(device: Option<String>) -> Result<(rodio::Sink, rodio::OutputStream), RodioError> {
let rodio_device = match device {
Some(ask) if &ask == "?" => {
let exit_code = match list_outputs() {
Ok(()) => 0,
Err(e) => {
error!("{}", e);
1
}
}
println!("No output sink matching '{}' found.", device_name);
exit(0)
};
exit(exit_code)
}
None => return get_default_device(),
}
Some(device_name) => {
cpal::default_host()
.output_devices()?
.find(|d| d.name().ok().map_or(false, |name| name == device_name)) // Ignore devices for which getting name fails
.ok_or(RodioError::DeviceNotAvailable(device_name))?
}
None => get_default_device().ok_or(RodioError::NoDeviceAvailable)?,
};

let name = rodio_device.name().ok();
info!(
"Using audio device: {}",
name.as_deref().unwrap_or("[unknown name]")
);

let (stream, handle) = rodio::OutputStream::try_from_device(&rodio_device)?;
let sink = rodio::Sink::try_new(&handle)?;
Ok((sink, stream))
}

impl Open for RodioSink {
fn open(device: Option<String>) -> RodioSink {
debug!(
"Using rodio sink with cpal host: {:?}",
cpal::default_host().id()
cpal::default_host().id().name()
);

let rodio_device = match_device(device);
debug!("Using cpal device");
let stream = rodio::OutputStream::try_from_device(&rodio_device)
.expect("Couldn't open output stream.");
debug!("Using rodio stream");
let sink = rodio::Sink::try_new(&stream.1).expect("Couldn't create output sink.");
debug!("Using rodio sink");
let (sink_tx, sink_rx) = mpsc::sync_channel(1);
let (close_tx, close_rx) = mpsc::sync_channel(1);

std::thread::spawn(move || match create_sink(device) {
Ok((sink, stream)) => {
sink_tx.send(Ok(sink)).unwrap();

close_rx.recv().unwrap_err(); // This will fail as soon as the sender is dropped
debug!("drop rodio::OutputStream");
drop(stream);
}
Err(e) => {
sink_tx.send(Err(e)).unwrap();
}
});

// Instead of the second `unwrap`, better error handling could be introduced
let sink = sink_rx.recv().unwrap().unwrap();

debug!("Rodio sink was created");
RodioSink {
rodio_sink: sink,
stream: stream.0,
_close_tx: close_tx,
}
}
}
Expand Down
34 changes: 17 additions & 17 deletions playback/src/player.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,15 +327,15 @@ impl Player {
}

pub async fn get_end_of_track_future(&self) {
self.get_player_event_channel()
.filter(|event| {
future::ready(matches!(
event,
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
))
})
.for_each(|_| future::ready(()))
.await
let mut channel = self.get_player_event_channel();
while let Some(event) = channel.next().await {
if matches!(
event,
PlayerEvent::EndOfTrack { .. } | PlayerEvent::Stopped { .. }
) {
return;
}
}
}

pub fn set_sink_event_callback(&self, callback: Option<SinkEventCallback>) {
Expand Down Expand Up @@ -676,14 +676,6 @@ impl PlayerTrackLoader {
let bytes_per_second = self.stream_data_rate(format);
let play_from_beginning = position_ms == 0;

let key = match self.session.audio_key().request(spotify_id, file_id).await {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
return None;
}
};

// This is only a loop to be able to reload the file if an error occured
// while opening a cached file.
loop {
Expand Down Expand Up @@ -713,6 +705,14 @@ impl PlayerTrackLoader {
stream_loader_controller.set_random_access_mode();
}

let key = match self.session.audio_key().request(spotify_id, file_id).await {
Ok(key) => key,
Err(_) => {
error!("Unable to load decryption key");
return None;
}
};

let mut decrypted_file = AudioDecrypt::new(key, encrypted_file);

let normalisation_factor = match NormalisationData::parse_from_file(&mut decrypted_file)
Expand Down