diff --git a/src/config.rs b/src/config.rs index b9bad418c..44d589b33 100644 --- a/src/config.rs +++ b/src/config.rs @@ -124,7 +124,7 @@ impl Default for Config { #[cfg(feature = "driver")] preallocated_tracks: 1, #[cfg(feature = "driver")] - driver_retry: Default::default(), + driver_retry: Retry::default(), #[cfg(feature = "driver")] driver_timeout: Some(Duration::from_secs(10)), #[cfg(feature = "driver")] @@ -138,36 +138,42 @@ impl Default for Config { #[cfg(feature = "driver")] impl Config { /// Sets this `Config`'s chosen cryptographic tagging scheme. + #[must_use] pub fn crypto_mode(mut self, crypto_mode: CryptoMode) -> Self { self.crypto_mode = crypto_mode; self } /// Sets this `Config`'s received packet decryption/decoding behaviour. + #[must_use] pub fn decode_mode(mut self, decode_mode: DecodeMode) -> Self { self.decode_mode = decode_mode; self } /// Sets this `Config`'s audio mixing channel count. + #[must_use] pub fn mix_mode(mut self, mix_mode: MixMode) -> Self { self.mix_mode = mix_mode; self } /// Sets this `Config`'s number of tracks to preallocate. + #[must_use] pub fn preallocated_tracks(mut self, preallocated_tracks: usize) -> Self { self.preallocated_tracks = preallocated_tracks; self } /// Sets this `Config`'s timeout for establishing a voice connection. + #[must_use] pub fn driver_timeout(mut self, driver_timeout: Option) -> Self { self.driver_timeout = driver_timeout; self } /// Sets this `Config`'s voice connection retry configuration. + #[must_use] pub fn driver_retry(mut self, driver_retry: Retry) -> Self { self.driver_retry = driver_retry; self @@ -184,6 +190,7 @@ impl Config { #[cfg(feature = "gateway")] impl Config { /// Sets this `Config`'s timeout for joining a voice channel. + #[must_use] pub fn gateway_timeout(mut self, gateway_timeout: Option) -> Self { self.gateway_timeout = gateway_timeout; self diff --git a/src/driver/connection/error.rs b/src/driver/connection/error.rs index 0dc23d362..3b01eb6c9 100644 --- a/src/driver/connection/error.rs +++ b/src/driver/connection/error.rs @@ -94,21 +94,21 @@ impl From for Error { impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "failed to connect to Discord RTP server: ")?; - use Error::*; match self { - AttemptDiscarded => write!(f, "connection attempt was aborted/discarded"), - Crypto(e) => e.fmt(f), - CryptoModeInvalid => write!(f, "server changed negotiated encryption mode"), - CryptoModeUnavailable => write!(f, "server did not offer chosen encryption mode"), - EndpointUrl => write!(f, "endpoint URL received from gateway was invalid"), - ExpectedHandshake => write!(f, "voice initialisation protocol was violated"), - IllegalDiscoveryResponse => write!(f, "IP discovery/NAT punching response was invalid"), - IllegalIp => write!(f, "IP discovery/NAT punching response had bad IP value"), - Io(e) => e.fmt(f), - Json(e) => e.fmt(f), - InterconnectFailure(e) => write!(f, "failed to contact other task ({:?})", e), - Ws(e) => write!(f, "websocket issue ({:?}).", e), - TimedOut => write!(f, "connection attempt timed out"), + Self::AttemptDiscarded => write!(f, "connection attempt was aborted/discarded"), + Self::Crypto(e) => e.fmt(f), + Self::CryptoModeInvalid => write!(f, "server changed negotiated encryption mode"), + Self::CryptoModeUnavailable => write!(f, "server did not offer chosen encryption mode"), + Self::EndpointUrl => write!(f, "endpoint URL received from gateway was invalid"), + Self::ExpectedHandshake => write!(f, "voice initialisation protocol was violated"), + Self::IllegalDiscoveryResponse => + write!(f, "IP discovery/NAT punching response was invalid"), + Self::IllegalIp => write!(f, "IP discovery/NAT punching response had bad IP value"), + Self::Io(e) => e.fmt(f), + Self::Json(e) => e.fmt(f), + Self::InterconnectFailure(e) => write!(f, "failed to contact other task ({:?})", e), + Self::Ws(e) => write!(f, "websocket issue ({:?}).", e), + Self::TimedOut => write!(f, "connection attempt timed out"), } } } @@ -116,19 +116,19 @@ impl fmt::Display for Error { impl StdError for Error { fn source(&self) -> Option<&(dyn StdError + 'static)> { match self { - Error::AttemptDiscarded => None, + Error::AttemptDiscarded + | Error::CryptoModeInvalid + | Error::CryptoModeUnavailable + | Error::EndpointUrl + | Error::ExpectedHandshake + | Error::IllegalDiscoveryResponse + | Error::IllegalIp + | Error::InterconnectFailure(_) + | Error::Ws(_) + | Error::TimedOut => None, Error::Crypto(e) => e.source(), - Error::CryptoModeInvalid => None, - Error::CryptoModeUnavailable => None, - Error::EndpointUrl => None, - Error::ExpectedHandshake => None, - Error::IllegalDiscoveryResponse => None, - Error::IllegalIp => None, Error::Io(e) => e.source(), Error::Json(e) => e.source(), - Error::InterconnectFailure(_) => None, - Error::Ws(_) => None, - Error::TimedOut => None, } } } diff --git a/src/driver/crypto.rs b/src/driver/crypto.rs index 71cd3ce1c..d12795095 100644 --- a/src/driver/crypto.rs +++ b/src/driver/crypto.rs @@ -12,7 +12,7 @@ use xsalsa20poly1305::{ TAG_SIZE, }; -/// Variants of the XSalsa20Poly1305 encryption scheme. +/// Variants of the `XSalsa20Poly1305` encryption scheme. #[derive(Clone, Copy, Debug, Eq, PartialEq)] #[non_exhaustive] pub enum CryptoMode { @@ -35,57 +35,58 @@ pub enum CryptoMode { impl From for CryptoMode { fn from(val: CryptoState) -> Self { - use CryptoState::*; match val { - Normal => CryptoMode::Normal, - Suffix => CryptoMode::Suffix, - Lite(_) => CryptoMode::Lite, + CryptoState::Normal => Self::Normal, + CryptoState::Suffix => Self::Suffix, + CryptoState::Lite(_) => Self::Lite, } } } impl CryptoMode { /// Returns the name of a mode as it will appear during negotiation. + #[must_use] pub fn to_request_str(self) -> &'static str { - use CryptoMode::*; match self { - Normal => "xsalsa20_poly1305", - Suffix => "xsalsa20_poly1305_suffix", - Lite => "xsalsa20_poly1305_lite", + Self::Normal => "xsalsa20_poly1305", + Self::Suffix => "xsalsa20_poly1305_suffix", + Self::Lite => "xsalsa20_poly1305_lite", } } /// Returns the number of bytes each nonce is stored as within /// a packet. + #[must_use] pub fn nonce_size(self) -> usize { - use CryptoMode::*; match self { - Normal => RtpPacket::minimum_packet_size(), - Suffix => NONCE_SIZE, - Lite => 4, + Self::Normal => RtpPacket::minimum_packet_size(), + Self::Suffix => NONCE_SIZE, + Self::Lite => 4, } } /// Returns the number of bytes occupied by the encryption scheme /// which fall before the payload. - pub fn payload_prefix_len(self) -> usize { + #[must_use] + pub fn payload_prefix_len() -> usize { TAG_SIZE } /// Returns the number of bytes occupied by the encryption scheme /// which fall after the payload. + #[must_use] pub fn payload_suffix_len(self) -> usize { - use CryptoMode::*; match self { - Normal => 0, - Suffix | Lite => self.nonce_size(), + Self::Normal => 0, + Self::Suffix | Self::Lite => self.nonce_size(), } } /// Calculates the number of additional bytes required compared /// to an unencrypted payload. + #[must_use] pub fn payload_overhead(self) -> usize { - self.payload_prefix_len() + self.payload_suffix_len() + Self::payload_prefix_len() + self.payload_suffix_len() } /// Extracts the byte slice in a packet used as the nonce, and the remaining mutable @@ -95,10 +96,9 @@ impl CryptoMode { header: &'a [u8], body: &'a mut [u8], ) -> Result<(&'a [u8], &'a mut [u8]), CryptoError> { - use CryptoMode::*; match self { - Normal => Ok((header, body)), - Suffix | Lite => { + Self::Normal => Ok((header, body)), + Self::Suffix | Self::Lite => { let len = body.len(); if len < self.payload_suffix_len() { Err(CryptoError) @@ -135,7 +135,7 @@ impl CryptoMode { &nonce }; - let body_start = self.payload_prefix_len(); + let body_start = Self::payload_prefix_len(); let body_tail = self.payload_suffix_len(); if body_start > body_remaining.len() { @@ -183,7 +183,7 @@ impl CryptoMode { } } -/// State used in nonce generation for the XSalsa20Poly1305 encryption variants +/// State used in nonce generation for the `XSalsa20Poly1305` encryption variants /// in [`CryptoMode`]. #[derive(Clone, Copy, Debug, Eq, PartialEq)] #[non_exhaustive] @@ -206,11 +206,10 @@ pub enum CryptoState { impl From for CryptoState { fn from(val: CryptoMode) -> Self { - use CryptoMode::*; match val { - Normal => CryptoState::Normal, - Suffix => CryptoState::Suffix, - Lite => CryptoState::Lite(Wrapping(rand::random::())), + CryptoMode::Normal => CryptoState::Normal, + CryptoMode::Suffix => CryptoState::Suffix, + CryptoMode::Lite => CryptoState::Lite(Wrapping(rand::random::())), } } } @@ -225,12 +224,11 @@ impl CryptoState { let mode = self.kind(); let endpoint = payload_end + mode.payload_suffix_len(); - use CryptoState::*; match self { - Suffix => { + Self::Suffix => { rand::thread_rng().fill(&mut packet.payload_mut()[payload_end..endpoint]); }, - Lite(mut i) => { + Self::Lite(mut i) => { (&mut packet.payload_mut()[payload_end..endpoint]) .write_u32::(i.0) .expect( @@ -245,8 +243,8 @@ impl CryptoState { } /// Returns the underlying (stateless) type of the active crypto mode. - pub fn kind(&self) -> CryptoMode { - CryptoMode::from(*self) + pub fn kind(self) -> CryptoMode { + CryptoMode::from(self) } } diff --git a/src/driver/decode_mode.rs b/src/driver/decode_mode.rs index 55c03891d..172159260 100644 --- a/src/driver/decode_mode.rs +++ b/src/driver/decode_mode.rs @@ -26,6 +26,7 @@ pub enum DecodeMode { impl DecodeMode { /// Returns whether this mode will decrypt received packets. + #[must_use] pub fn should_decrypt(self) -> bool { self != DecodeMode::Pass } diff --git a/src/driver/mix_mode.rs b/src/driver/mix_mode.rs index 7223043af..4fdcdd1da 100644 --- a/src/driver/mix_mode.rs +++ b/src/driver/mix_mode.rs @@ -18,34 +18,30 @@ pub enum MixMode { impl MixMode { pub(crate) const fn to_opus(self) -> Channels { - use MixMode::*; match self { - Mono => Channels::Mono, - Stereo => Channels::Stereo, + Self::Mono => Channels::Mono, + Self::Stereo => Channels::Stereo, } } pub(crate) const fn sample_count_in_frame(self) -> usize { - use MixMode::*; match self { - Mono => MONO_FRAME_SIZE, - Stereo => STEREO_FRAME_SIZE, + Self::Mono => MONO_FRAME_SIZE, + Self::Stereo => STEREO_FRAME_SIZE, } } pub(crate) const fn channels(self) -> usize { - use MixMode::*; match self { - Mono => 1, - Stereo => 2, + Self::Mono => 1, + Self::Stereo => 2, } } pub(crate) const fn symph_layout(self) -> Layout { - use MixMode::*; match self { - Mono => Layout::Mono, - Stereo => Layout::Stereo, + Self::Mono => Layout::Mono, + Self::Stereo => Layout::Stereo, } } } diff --git a/src/driver/mod.rs b/src/driver/mod.rs index 53f912c81..d567671f2 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -65,6 +65,7 @@ impl Driver { /// /// This will create the core voice tasks in the background. #[inline] + #[must_use] pub fn new(config: Config) -> Self { let sender = Self::start_inner(config.clone()); @@ -73,7 +74,7 @@ impl Driver { self_mute: false, sender, #[cfg(feature = "builtin-queue")] - queue: Default::default(), + queue: TrackQueue::default(), } } @@ -188,20 +189,20 @@ impl Driver { /// Alternatively, `Auto` and `Max` remain available. #[instrument(skip(self))] pub fn set_bitrate(&mut self, bitrate: Bitrate) { - self.send(CoreMessage::SetBitrate(bitrate)) + self.send(CoreMessage::SetBitrate(bitrate)); } /// Stops playing audio from all sources, if any are set. #[instrument(skip(self))] pub fn stop(&mut self) { - self.send(CoreMessage::SetTrack(None)) + self.send(CoreMessage::SetTrack(None)); } /// Sets the configuration for this driver (and parent `Call`, if applicable). #[instrument(skip(self))] pub fn set_config(&mut self, config: Config) { self.config = config.clone(); - self.send(CoreMessage::SetConfig(config)) + self.send(CoreMessage::SetConfig(config)); } /// Returns a view of this driver's configuration. @@ -252,6 +253,7 @@ impl Driver { /// Requires the `"builtin-queue"` feature. /// Queue additions should be made via [`Driver::enqueue`] and /// [`Driver::enqueue_input`]. + #[must_use] pub fn queue(&self) -> &TrackQueue { &self.queue } @@ -275,7 +277,7 @@ impl Driver { impl Default for Driver { fn default() -> Self { - Self::new(Default::default()) + Self::new(Config::default()) } } @@ -283,7 +285,7 @@ impl Drop for Driver { /// Leaves the current connected voice channel, if connected to one, and /// forgets all configurations relevant to this Handler. fn drop(&mut self) { - let _ = self.sender.send(CoreMessage::Poison); + drop(self.sender.send(CoreMessage::Poison)); } } diff --git a/src/driver/retry/mod.rs b/src/driver/retry/mod.rs index e25374ba3..bd7e8a669 100644 --- a/src/driver/retry/mod.rs +++ b/src/driver/retry/mod.rs @@ -28,7 +28,7 @@ pub struct Retry { impl Default for Retry { fn default() -> Self { Self { - strategy: Strategy::Backoff(Default::default()), + strategy: Strategy::Backoff(ExponentialBackoff::default()), retry_limit: Some(5), } } @@ -40,7 +40,7 @@ impl Retry { last_wait: Option, attempts: usize, ) -> Option { - if self.retry_limit.map(|a| attempts < a).unwrap_or(true) { + if self.retry_limit.map_or(true, |a| attempts < a) { Some(self.strategy.retry_in(last_wait)) } else { None diff --git a/src/driver/retry/strategy.rs b/src/driver/retry/strategy.rs index 6de58e7ca..45017288a 100644 --- a/src/driver/retry/strategy.rs +++ b/src/driver/retry/strategy.rs @@ -58,7 +58,7 @@ impl Default for ExponentialBackoff { impl ExponentialBackoff { pub(crate) fn retry_in(&self, last_wait: Option) -> Duration { - let attempt = last_wait.map(|t| 2 * t).unwrap_or(self.min); + let attempt = last_wait.map_or(self.min, |t| 2 * t); let perturb = (1.0 - (self.jitter * 2.0 * (random::() - 1.0))) .max(0.0) .min(2.0); diff --git a/src/driver/tasks/error.rs b/src/driver/tasks/error.rs index c56319b8f..10a1bb796 100644 --- a/src/driver/tasks/error.rs +++ b/src/driver/tasks/error.rs @@ -32,9 +32,7 @@ impl Error { pub(crate) fn should_trigger_connect(&self) -> bool { matches!( self, - Error::InterconnectFailure(Recipient::AuxNetwork) - | Error::InterconnectFailure(Recipient::UdpRx) - | Error::InterconnectFailure(Recipient::UdpTx) + Error::InterconnectFailure(Recipient::AuxNetwork | Recipient::UdpRx | Recipient::UdpTx) ) } diff --git a/src/driver/tasks/events.rs b/src/driver/tasks/events.rs index 99b45afba..884d08397 100644 --- a/src/driver/tasks/events.rs +++ b/src/driver/tasks/events.rs @@ -15,13 +15,12 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver = vec![]; loop { - use EventMessage::*; match evt_rx.recv_async().await { - Ok(AddGlobalEvent(data)) => { + Ok(EventMessage::AddGlobalEvent(data)) => { info!("Global event added."); global.add_event(data); }, - Ok(AddTrackEvent(i, data)) => { + Ok(EventMessage::AddTrackEvent(i, data)) => { info!("Adding event to track {}.", i); let event_store = events @@ -33,7 +32,7 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + Ok(EventMessage::FireCoreEvent(ctx)) => { let ctx = ctx.to_user_context(); let evt = ctx .to_core_event() @@ -43,19 +42,17 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + Ok(EventMessage::RemoveGlobalEvents) => { global.remove_handlers(); }, - Ok(AddTrack(store, state, handle)) => { + Ok(EventMessage::AddTrack(store, state, handle)) => { events.push(store); states.push(state); handles.push(handle); info!("Event state for track {} added", events.len()); }, - Ok(ChangeState(i, change)) => { - use TrackStateChange::*; - + Ok(EventMessage::ChangeState(i, change)) => { let max_states = states.len(); debug!( "Changing state for track {} of {}: {:?}", @@ -67,50 +64,50 @@ pub(crate) async fn runner(_interconnect: Interconnect, evt_rx: Receiver { + TrackStateChange::Mode(mut mode) => { std::mem::swap(&mut state.playing, &mut mode); if state.playing != mode { global.fire_track_event(state.playing.as_track_event(), i); } }, - Volume(vol) => { + TrackStateChange::Volume(vol) => { state.volume = vol; }, - Position(pos) => { + TrackStateChange::Position(pos) => { // Currently, only Tick should fire time events. state.position = pos; }, - Loops(loops, user_set) => { + TrackStateChange::Loops(loops, user_set) => { state.loops = loops; if !user_set { global.fire_track_event(TrackEvent::Loop, i); } }, - Total(new) => { + TrackStateChange::Total(new) => { // Massive, unprecedented state changes. *state = new; }, } }, - Ok(RemoveTrack(i)) => { + Ok(EventMessage::RemoveTrack(i)) => { info!("Event state for track {} of {} removed.", i, events.len()); events.swap_remove(i); states.swap_remove(i); handles.swap_remove(i); }, - Ok(RemoveAllTracks) => { + Ok(EventMessage::RemoveAllTracks) => { info!("Event state for all tracks removed."); events.clear(); states.clear(); handles.clear(); }, - Ok(Tick) => { + Ok(EventMessage::Tick) => { // NOTE: this should fire saved up blocks of state change evts. global.tick(&mut events, &mut states, &mut handles).await; }, - Err(_) | Ok(Poison) => { + Err(_) | Ok(EventMessage::Poison) => { break; }, } diff --git a/src/driver/tasks/message/mixer.rs b/src/driver/tasks/message/mixer.rs index 8dd9f0679..e1221c1bd 100644 --- a/src/driver/tasks/message/mixer.rs +++ b/src/driver/tasks/message/mixer.rs @@ -19,8 +19,8 @@ pub struct MixerConnection { impl Drop for MixerConnection { fn drop(&mut self) { - let _ = self.udp_rx.send(UdpRxMessage::Poison); - let _ = self.udp_tx.send(UdpTxMessage::Poison); + drop(self.udp_rx.send(UdpRxMessage::Poison)); + drop(self.udp_tx.send(UdpTxMessage::Poison)); } } diff --git a/src/driver/tasks/message/mod.rs b/src/driver/tasks/message/mod.rs index 54b27babc..0009c177a 100644 --- a/src/driver/tasks/message/mod.rs +++ b/src/driver/tasks/message/mod.rs @@ -23,11 +23,11 @@ pub struct Interconnect { impl Interconnect { pub fn poison(&self) { - let _ = self.events.send(EventMessage::Poison); + drop(self.events.send(EventMessage::Poison)); } pub fn poison_all(&self) { - let _ = self.mixer.send(MixerMessage::Poison); + drop(self.mixer.send(MixerMessage::Poison)); self.poison(); } @@ -46,8 +46,9 @@ impl Interconnect { }); // Make mixer aware of new targets... - let _ = self - .mixer - .send(MixerMessage::ReplaceInterconnect(self.clone())); + drop( + self.mixer + .send(MixerMessage::ReplaceInterconnect(self.clone())), + ); } } diff --git a/src/driver/tasks/mixer/mix_logic.rs b/src/driver/tasks/mixer/mix_logic.rs index c6b2d44a0..26d23afa8 100644 --- a/src/driver/tasks/mixer/mix_logic.rs +++ b/src/driver/tasks/mixer/mix_logic.rs @@ -105,7 +105,21 @@ pub fn mix_symph_indiv( let in_rate = source_packet.spec().rate; - if in_rate != SAMPLE_RATE_RAW as u32 { + if in_rate == SAMPLE_RATE_RAW as u32 { + // No need to resample: mix as standard. + let samples_marched = mix_over_ref( + &source_packet, + symph_mix, + local_state.inner_pos, + samples_written, + volume, + ); + + samples_written += samples_marched; + + local_state.inner_pos += samples_marched; + local_state.inner_pos %= source_packet.frames(); + } else { // NOTE: this should NEVER change in one stream. let chan_c = source_packet.spec().channels.count(); let (_, resampler, rs_out_buf) = local_state.resampler.get_or_insert_with(|| { @@ -155,7 +169,7 @@ pub fn mix_symph_indiv( resampler .process_into_buffer(&*refs, rs_out_buf, None) - .unwrap() + .unwrap(); } else { unreachable!() } @@ -178,11 +192,7 @@ pub fn mix_symph_indiv( local_state.inner_pos += frames_to_take; local_state.inner_pos %= pkt_frames; - if resample_scratch.frames() != needed_in_frames { - // Not enough data to fill the resampler: fetch more. - buf_in_progress = true; - continue; - } else { + if resample_scratch.frames() == needed_in_frames { resampler .process_into_buffer( &resample_scratch.planes().planes()[..chan_c], @@ -192,26 +202,16 @@ pub fn mix_symph_indiv( .unwrap(); resample_scratch.clear(); buf_in_progress = false; + } else { + // Not enough data to fill the resampler: fetch more. + buf_in_progress = true; + continue; } }; let samples_marched = mix_resampled(rs_out_buf, symph_mix, samples_written, volume); samples_written += samples_marched; - } else { - // No need to resample: mix as standard. - let samples_marched = mix_over_ref( - &source_packet, - symph_mix, - local_state.inner_pos, - samples_written, - volume, - ); - - samples_written += samples_marched; - - local_state.inner_pos += samples_marched; - local_state.inner_pos %= source_packet.frames(); } } @@ -226,19 +226,17 @@ fn mix_over_ref( dest_pos: usize, volume: f32, ) -> usize { - use AudioBufferRef::*; - match source { - U8(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - U16(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - U24(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - U32(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - S8(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - S16(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - S24(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - S32(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - F32(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), - F64(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::U8(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::U16(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::U24(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::U32(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::S8(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::S16(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::S24(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::S32(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::F32(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), + AudioBufferRef::F64(v) => mix_symph_buffer(v, target, source_pos, dest_pos, volume), } } @@ -280,7 +278,7 @@ where } else if target_mono { let vol_adj = 1.0 / (source_chans as f32); let mut t_planes = target.planes_mut(); - let d_plane = &mut t_planes.planes()[0]; + let d_plane = &mut *t_planes.planes()[0]; for s_plane in source_raw_planes[..].iter() { for (d, s) in d_plane[dest_pos..dest_pos + mix_ct] .iter_mut() @@ -333,7 +331,7 @@ fn mix_resampled( } else if target_mono { let vol_adj = 1.0 / (source_chans as f32); let mut t_planes = target.planes_mut(); - let d_plane = &mut t_planes.planes()[0]; + let d_plane = &mut *t_planes.planes()[0]; for s_plane in source[..].iter() { for (d, s) in d_plane[dest_pos..dest_pos + mix_ct].iter_mut().zip(s_plane) { *d += volume * vol_adj * s; @@ -361,19 +359,17 @@ fn copy_into_resampler( dest_pos: usize, len: usize, ) -> usize { - use AudioBufferRef::*; - match source { - U8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - U16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - U24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - U32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - F32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - F64(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::F32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::F64(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), } } diff --git a/src/driver/tasks/mixer/mod.rs b/src/driver/tasks/mixer/mod.rs index a6de613e0..64c4426c5 100644 --- a/src/driver/tasks/mixer/mod.rs +++ b/src/driver/tasks/mixer/mod.rs @@ -259,11 +259,9 @@ impl Mixer { let mut conn_failure = false; let mut should_exit = false; - use MixerMessage::*; - let error = match msg { - AddTrack(t) => self.add_track(t), - SetTrack(t) => { + MixerMessage::AddTrack(t) => self.add_track(t), + MixerMessage::SetTrack(t) => { self.tracks.clear(); let mut out = self.fire_event(EventMessage::RemoveAllTracks); @@ -278,18 +276,18 @@ impl Mixer { out }, - SetBitrate(b) => { + MixerMessage::SetBitrate(b) => { self.bitrate = b; if let Err(e) = self.set_bitrate(b) { error!("Failed to update bitrate {:?}", e); } Ok(()) }, - SetMute(m) => { + MixerMessage::SetMute(m) => { self.muted = m; Ok(()) }, - SetConn(conn, ssrc) => { + MixerMessage::SetConn(conn, ssrc) => { self.conn_active = Some(conn); let mut rtp = MutableRtpPacket::new(&mut self.packet[..]).expect( "Too few bytes in self.packet for RTP header.\ @@ -301,11 +299,11 @@ impl Mixer { self.deadline = Instant::now(); Ok(()) }, - DropConn => { + MixerMessage::DropConn => { self.conn_active = None; Ok(()) }, - ReplaceInterconnect(i) => { + MixerMessage::ReplaceInterconnect(i) => { self.prevent_events = false; if let Some(ws) = &self.ws { conn_failure |= ws.send(WsMessage::ReplaceInterconnect(i.clone())).is_err(); @@ -321,7 +319,7 @@ impl Mixer { self.rebuild_tracks() }, - SetConfig(new_config) => { + MixerMessage::SetConfig(new_config) => { if new_config.mix_mode != self.config.mix_mode { self.soft_clip = SoftClip::new(new_config.mix_mode.to_opus()); if let Ok(enc) = new_encoder(self.bitrate, new_config.mix_mode) { @@ -359,7 +357,7 @@ impl Mixer { Ok(()) }, - RebuildEncoder => match new_encoder(self.bitrate, self.config.mix_mode) { + MixerMessage::RebuildEncoder => match new_encoder(self.bitrate, self.config.mix_mode) { Ok(encoder) => { self.encoder = encoder; Ok(()) @@ -372,11 +370,11 @@ impl Mixer { Ok(()) }, }, - Ws(new_ws_handle) => { + MixerMessage::Ws(new_ws_handle) => { self.ws = new_ws_handle; Ok(()) }, - Poison => { + MixerMessage::Poison => { should_exit = true; Ok(()) }, @@ -396,10 +394,9 @@ impl Mixer { // it's responsible for not forcibly recreating said context repeatedly. if !self.prevent_events { self.interconnect.events.send(event)?; - Ok(()) - } else { - Ok(()) } + + Ok(()) } #[inline] @@ -418,7 +415,7 @@ impl Mixer { #[inline] fn rebuild_tracks(&mut self) -> Result<()> { for (track, handle) in self.tracks.iter().zip(self.track_handles.iter()) { - let evts = Default::default(); + let evts = EventStore::default(); let state = track.state(); let handle = handle.clone(); @@ -517,11 +514,12 @@ impl Mixer { if track.playing.is_done() { let p_state = track.playing.clone(); let to_drop = self.tracks.swap_remove(i); - let _ = self - .disposer - .send(DisposalMessage::Track(Box::new(to_drop))); + drop( + self.disposer + .send(DisposalMessage::Track(Box::new(to_drop))), + ); let to_drop = self.track_handles.swap_remove(i); - let _ = self.disposer.send(DisposalMessage::Handle(to_drop)); + drop(self.disposer.send(DisposalMessage::Handle(to_drop))); to_remove.push(i); self.fire_event(EventMessage::ChangeState( @@ -598,7 +596,7 @@ impl Mixer { // A full reconnect might cause an inner closed connection. // It's safer to leave the central task to clean this up and // pass the mixer a new channel. - let _ = ws.send(WsMessage::Speaking(false)); + drop(ws.send(WsMessage::Speaking(false))); } self.march_deadline(); @@ -762,18 +760,15 @@ impl Mixer { opus_slot, ); - let return_here = match mix_type { - MixType::MixedPcm(pcm_len) => { - len = len.max(pcm_len); - false - }, - _ => { - if mix_state.passthrough == Passthrough::Inactive { - input.decoder.reset(); - } - mix_state.passthrough = Passthrough::Active; - true - }, + let return_here = if let MixType::MixedPcm(pcm_len) = mix_type { + len = len.max(pcm_len); + false + } else { + if mix_state.passthrough == Passthrough::Inactive { + input.decoder.reset(); + } + mix_state.passthrough = Passthrough::Active; + true }; // FIXME: allow Ended to trigger a seek/loop/revisit in the same mix cycle? @@ -784,16 +779,16 @@ impl Mixer { MixStatus::Errored(e) => track.playing = PlayMode::Errored(PlayError::Decode(e.into())), MixStatus::Ended if track.do_loop() => { - let _ = self.track_handles[i].seek_time(Default::default()); + let _ = self.track_handles[i].seek_time(Duration::default()); if !self.prevent_events { // position update is sent out later, when the seek concludes. - let _ = self.interconnect.events.send(EventMessage::ChangeState( + drop(self.interconnect.events.send(EventMessage::ChangeState( i, TrackStateChange::Loops(track.loops, false), - )); + ))); } }, - _ => { + MixStatus::Ended => { track.end(); }, } @@ -823,5 +818,5 @@ pub(crate) fn runner( mixer.run(); - let _ = mixer.disposer.send(DisposalMessage::Poison); + drop(mixer.disposer.send(DisposalMessage::Poison)); } diff --git a/src/driver/tasks/mixer/pool.rs b/src/driver/tasks/mixer/pool.rs index fd9edad83..a9feae88d 100644 --- a/src/driver/tasks/mixer/pool.rs +++ b/src/driver/tasks/mixer/pool.rs @@ -74,7 +74,7 @@ impl BlockyTaskPool { self.parse(config, callback, LiveInput::Raw(o), Some(rec), seek_time); }, Err(e) => { - let _ = callback.send(MixerInputResultMessage::CreateErr(e)); + drop(callback.send(MixerInputResultMessage::CreateErr(e))); }, } } @@ -96,11 +96,11 @@ impl BlockyTaskPool { if let Some(seek_time) = seek_time { pool_clone.seek(callback, parsed, rec, seek_time, false, config); } else { - let _ = callback.send(MixerInputResultMessage::Built(parsed, rec)); + drop(callback.send(MixerInputResultMessage::Built(parsed, rec))); }, Ok(_) => unreachable!(), Err(e) => { - let _ = callback.send(MixerInputResultMessage::ParseErr(e)); + drop(callback.send(MixerInputResultMessage::ParseErr(e))); }, }, ); @@ -119,12 +119,12 @@ impl BlockyTaskPool { let pool = self.pool.read(); pool.execute(move || { - let res = input + let seek_result = input .format .seek(SeekMode::Coarse, copy_seek_to(&seek_time)); let backseek_needed = matches!( - res, + seek_result, Err(SymphoniaError::SeekError(SeekErrorKind::ForwardOnly)) ); @@ -134,7 +134,7 @@ impl BlockyTaskPool { }, _ => { input.decoder.reset(); - let _ = callback.send(MixerInputResultMessage::Seek(input, rec, res)); + drop(callback.send(MixerInputResultMessage::Seek(input, rec, seek_result))); }, } }); diff --git a/src/driver/tasks/mixer/state.rs b/src/driver/tasks/mixer/state.rs index 4341ca8c9..eb613c71f 100644 --- a/src/driver/tasks/mixer/state.rs +++ b/src/driver/tasks/mixer/state.rs @@ -39,12 +39,10 @@ impl From for InputState { impl From<&InputState> for ReadyState { fn from(val: &InputState) -> Self { - use InputState::*; - match val { - NotReady(_) => Self::Uninitialised, - Preparing(_) => Self::Preparing, - Ready(_, _) => Self::Playable, + InputState::NotReady(_) => Self::Uninitialised, + InputState::Preparing(_) => Self::Preparing, + InputState::Ready(_, _) => Self::Playable, } } } diff --git a/src/driver/tasks/mixer/track.rs b/src/driver/tasks/mixer/track.rs index dbb0dfb48..0ccda1543 100644 --- a/src/driver/tasks/mixer/track.rs +++ b/src/driver/tasks/mixer/track.rs @@ -24,9 +24,9 @@ impl<'a> InternalTrack { playing: track.playing, volume: track.volume, input: InputState::from(track.input), - mix_state: Default::default(), - position: Default::default(), - play_time: Default::default(), + mix_state: DecodeState::default(), + position: Duration::default(), + play_time: Duration::default(), commands: receiver, loops: track.loops, }; @@ -71,73 +71,61 @@ impl<'a> InternalTrack { // In correct operation, the event thread should never panic, // but it receiving status updates is secondary do actually // doing the work. - loop { - match self.commands.try_recv() { - Ok(cmd) => { - use TrackCommand::*; - match cmd { - Play => { - self.playing.change_to(PlayMode::Play); - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Mode(self.playing.clone()), - )); - }, - Pause => { - self.playing.change_to(PlayMode::Pause); - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Mode(self.playing.clone()), - )); - }, - Stop => { - self.playing.change_to(PlayMode::Stop); - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Mode(self.playing.clone()), - )); - }, - Volume(vol) => { - self.volume = vol; - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Volume(self.volume), - )); - }, - Seek(time) => action.seek_point = Some(time), - AddEvent(evt) => { - let _ = ic.events.send(EventMessage::AddTrackEvent(index, evt)); - }, - Do(func) => { - if let Some(indiv_action) = func(self.view()) { - action.combine(indiv_action); - } - - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Total(self.state()), - )); - }, - Request(tx) => { - let _ = tx.send(self.state()); - }, - Loop(loops) => { - self.loops = loops; - let _ = ic.events.send(EventMessage::ChangeState( - index, - TrackStateChange::Loops(self.loops, true), - )); - }, - MakePlayable => action.make_playable = true, + while let Ok(cmd) = self.commands.try_recv() { + match cmd { + TrackCommand::Play => { + self.playing.change_to(PlayMode::Play); + drop(ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Mode(self.playing.clone()), + ))); + }, + TrackCommand::Pause => { + self.playing.change_to(PlayMode::Pause); + drop(ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Mode(self.playing.clone()), + ))); + }, + TrackCommand::Stop => { + self.playing.change_to(PlayMode::Stop); + drop(ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Mode(self.playing.clone()), + ))); + }, + TrackCommand::Volume(vol) => { + self.volume = vol; + drop(ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Volume(self.volume), + ))); + }, + TrackCommand::Seek(time) => action.seek_point = Some(time), + TrackCommand::AddEvent(evt) => { + drop(ic.events.send(EventMessage::AddTrackEvent(index, evt))); + }, + TrackCommand::Do(func) => { + if let Some(indiv_action) = func(self.view()) { + action.combine(indiv_action); } + + drop(ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Total(self.state()), + ))); }, - Err(TryRecvError::Disconnected) => { - // this branch will never be visited. - break; + TrackCommand::Request(tx) => { + drop(tx.send(self.state())); }, - Err(TryRecvError::Empty) => { - break; + TrackCommand::Loop(loops) => { + self.loops = loops; + drop(ic.events.send(EventMessage::ChangeState( + index, + TrackStateChange::Loops(self.loops, true), + ))); }, + TrackCommand::MakePlayable => action.make_playable = true, } } @@ -178,8 +166,6 @@ impl<'a> InternalTrack { config: &Arc, prevent_events: bool, ) -> StdResult<(&'a mut Parsed, &'a mut DecodeState), InputReadyingError> { - use InputReadyingError::*; - let input = &mut self.input; let local = &mut self.mix_state; @@ -205,7 +191,7 @@ impl<'a> InternalTrack { _ => unreachable!(), } - Err(Waiting) + Err(InputReadyingError::Waiting) }, InputState::Preparing(info) => { let queued_seek = info.queued_seek.take(); @@ -237,10 +223,10 @@ impl<'a> InternalTrack { self.position = std::time::Duration::from_secs_f64(time_in_float); if !prevent_events { - let _ = interconnect.events.send(EventMessage::ChangeState( + drop(interconnect.events.send(EventMessage::ChangeState( id, TrackStateChange::Position(self.position), - )); + ))); } local.reset(); @@ -252,13 +238,14 @@ impl<'a> InternalTrack { unreachable!() } }, - Err(e) => Err(Seeking(e)), + Err(e) => Err(InputReadyingError::Seeking(e)), } }, - Err(TryRecvError::Empty) => Err(Waiting), - Ok(MixerInputResultMessage::CreateErr(e)) => Err(Creation(e)), - Ok(MixerInputResultMessage::ParseErr(e)) => Err(Parsing(e)), - Err(TryRecvError::Disconnected) => Err(Dropped), + Ok(MixerInputResultMessage::CreateErr(e)) => + Err(InputReadyingError::Creation(e)), + Ok(MixerInputResultMessage::ParseErr(e)) => Err(InputReadyingError::Parsing(e)), + Err(TryRecvError::Disconnected) => Err(InputReadyingError::Dropped), + Err(TryRecvError::Empty) => Err(InputReadyingError::Waiting), }; let orig_out = orig_out.map(|a| (a, &mut self.mix_state)); diff --git a/src/driver/tasks/mod.rs b/src/driver/tasks/mod.rs index c11a1e7cb..275d3a318 100644 --- a/src/driver/tasks/mod.rs +++ b/src/driver/tasks/mod.rs @@ -74,19 +74,17 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { config = if let Some(new_config) = next_config.take() { - let _ = interconnect - .mixer - .send(MixerMessage::SetConfig(new_config.clone())); + drop( + interconnect + .mixer + .send(MixerMessage::SetConfig(new_config.clone())), + ); new_config } else { config }; - if connection - .as_ref() - .map(|conn| conn.info != info) - .unwrap_or(true) - { + if connection.as_ref().map_or(true, |conn| conn.info != info) { // Only *actually* reconnect if the conn info changed, or we don't have an // active connection. // This allows the gateway component to keep sending join requests independent @@ -97,7 +95,7 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { @@ -112,17 +110,17 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { let last_conn = connection.take(); - let _ = interconnect.mixer.send(MixerMessage::DropConn); - let _ = interconnect.mixer.send(MixerMessage::RebuildEncoder); + drop(interconnect.mixer.send(MixerMessage::DropConn)); + drop(interconnect.mixer.send(MixerMessage::RebuildEncoder)); if let Some(conn) = last_conn { - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::DriverDisconnect(InternalDisconnect { kind: DisconnectKind::Runtime, reason: None, info: conn.info.clone(), }), - )); + ))); } }, Ok(CoreMessage::SignalWsClosure(ws_idx, ws_info, mut reason)) => { @@ -130,46 +128,46 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { - let _ = interconnect.mixer.send(MixerMessage::SetTrack(s)); + drop(interconnect.mixer.send(MixerMessage::SetTrack(s))); }, Ok(CoreMessage::AddTrack(s)) => { - let _ = interconnect.mixer.send(MixerMessage::AddTrack(s)); + drop(interconnect.mixer.send(MixerMessage::AddTrack(s))); }, Ok(CoreMessage::SetBitrate(b)) => { - let _ = interconnect.mixer.send(MixerMessage::SetBitrate(b)); + drop(interconnect.mixer.send(MixerMessage::SetBitrate(b))); }, Ok(CoreMessage::SetConfig(mut new_config)) => { next_config = Some(new_config.clone()); new_config.make_safe(&config, connection.is_some()); - let _ = interconnect.mixer.send(MixerMessage::SetConfig(new_config)); + drop(interconnect.mixer.send(MixerMessage::SetConfig(new_config))); }, Ok(CoreMessage::AddEvent(evt)) => { - let _ = interconnect.events.send(EventMessage::AddGlobalEvent(evt)); + drop(interconnect.events.send(EventMessage::AddGlobalEvent(evt))); }, Ok(CoreMessage::RemoveGlobalEvents) => { - let _ = interconnect.events.send(EventMessage::RemoveGlobalEvents); + drop(interconnect.events.send(EventMessage::RemoveGlobalEvents)); }, Ok(CoreMessage::Mute(m)) => { - let _ = interconnect.mixer.send(MixerMessage::SetMute(m)); + drop(interconnect.mixer.send(MixerMessage::SetMute(m))); }, Ok(CoreMessage::Reconnect) => { if let Some(mut conn) = connection.take() { @@ -201,12 +199,12 @@ async fn runner(mut config: Config, rx: Receiver, tx: Sender { // Other side may not be listening: this is fine. - let _ = tx.send(Ok(())); + drop(tx.send(Ok(()))); - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::DriverConnect(InternalConnect { info: connection.info.clone(), ssrc: connection.ssrc, }), - )); + ))); }, ConnectionFlavour::Reconnect => { - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::DriverReconnect(InternalConnect { info: connection.info.clone(), ssrc: connection.ssrc, }), - )); + ))); }, } @@ -303,9 +301,8 @@ impl ConnectionRetryData { let idx = self.idx; spawn(async move { - let _ = &remote_ic; tsleep(t).await; - let _ = remote_ic.core.send(CoreMessage::RetryConnect(idx)); + drop(remote_ic.core.send(CoreMessage::RetryConnect(idx))); }); self.attempts += 1; @@ -326,24 +323,24 @@ impl ConnectionRetryData { match self.flavour { ConnectionFlavour::Connect(tx) => { // See above. - let _ = tx.send(Err(why)); + drop(tx.send(Err(why))); - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::DriverDisconnect(InternalDisconnect { kind: DisconnectKind::Connect, reason, info: self.info, }), - )); + ))); }, ConnectionFlavour::Reconnect => { - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::DriverDisconnect(InternalDisconnect { kind: DisconnectKind::Reconnect, reason, info: self.info, }), - )); + ))); }, } } diff --git a/src/driver/tasks/udp_rx.rs b/src/driver/tasks/udp_rx.rs index 9a89c9deb..bd024fc4a 100644 --- a/src/driver/tasks/udp_rx.rs +++ b/src/driver/tasks/udp_rx.rs @@ -5,7 +5,7 @@ use super::{ }; use crate::{ constants::*, - driver::DecodeMode, + driver::{CryptoMode, DecodeMode}, events::{internal_data::*, CoreContext}, }; use audiopus::{ @@ -53,27 +53,25 @@ enum PacketDecodeSize { impl PacketDecodeSize { fn bump_up(self) -> Self { - use PacketDecodeSize::*; match self { - TwentyMillis => ThirtyMillis, - ThirtyMillis => FortyMillis, - FortyMillis => SixtyMillis, - SixtyMillis | Max => Max, + Self::TwentyMillis => Self::ThirtyMillis, + Self::ThirtyMillis => Self::FortyMillis, + Self::FortyMillis => Self::SixtyMillis, + Self::SixtyMillis | Self::Max => Self::Max, } } fn can_bump_up(self) -> bool { - self != PacketDecodeSize::Max + self != Self::Max } fn len(self) -> usize { - use PacketDecodeSize::*; match self { - TwentyMillis => STEREO_FRAME_SIZE, - ThirtyMillis => (STEREO_FRAME_SIZE / 2) * 3, - FortyMillis => 2 * STEREO_FRAME_SIZE, - SixtyMillis => 3 * STEREO_FRAME_SIZE, - Max => 6 * STEREO_FRAME_SIZE, + Self::TwentyMillis => STEREO_FRAME_SIZE, + Self::ThirtyMillis => (STEREO_FRAME_SIZE / 2) * 3, + Self::FortyMillis => 2 * STEREO_FRAME_SIZE, + Self::SixtyMillis => 3 * STEREO_FRAME_SIZE, + Self::Max => 6 * STEREO_FRAME_SIZE, } } } @@ -86,7 +84,7 @@ enum SpeakingDelta { } impl SsrcState { - fn new(pkt: RtpPacket<'_>) -> Self { + fn new(pkt: &RtpPacket<'_>) -> Self { Self { silent_frame_count: 5, // We do this to make the first speech packet fire an event. decoder: OpusDecoder::new(SAMPLE_RATE, Channels::Stereo) @@ -98,7 +96,7 @@ impl SsrcState { fn process( &mut self, - pkt: RtpPacket<'_>, + pkt: &RtpPacket<'_>, data_offset: usize, data_trailer: usize, decode_mode: DecodeMode, @@ -254,15 +252,14 @@ impl UdpRx { self.process_udp_message(interconnect, len); } msg = self.rx.recv_async() => { - use UdpRxMessage::*; match msg { - Ok(ReplaceInterconnect(i)) => { + Ok(UdpRxMessage::ReplaceInterconnect(i)) => { *interconnect = i; }, - Ok(SetConfig(c)) => { + Ok(UdpRxMessage::SetConfig(c)) => { self.config = c; }, - Ok(Poison) | Err(_) => break, + Ok(UdpRxMessage::Poison) | Err(_) => break, } } } @@ -282,7 +279,7 @@ impl UdpRx { match demux::demux_mut(packet) { DemuxedMut::Rtp(mut rtp) => { - if !rtp_valid(rtp.to_immutable()) { + if !rtp_valid(&rtp.to_immutable()) { error!("Illegal RTP message received."); return; } @@ -301,9 +298,10 @@ impl UdpRx { None }; + let rtp = rtp.to_immutable(); let (rtp_body_start, rtp_body_tail, decrypted) = packet_data.unwrap_or_else(|| { ( - crypto_mode.payload_prefix_len(), + CryptoMode::payload_prefix_len(), crypto_mode.payload_suffix_len(), false, ) @@ -312,10 +310,10 @@ impl UdpRx { let entry = self .decoder_map .entry(rtp.get_ssrc()) - .or_insert_with(|| SsrcState::new(rtp.to_immutable())); + .or_insert_with(|| SsrcState::new(&rtp)); if let Ok((delta, audio)) = entry.process( - rtp.to_immutable(), + &rtp, rtp_body_start, rtp_body_tail, self.config.decode_mode, @@ -323,32 +321,32 @@ impl UdpRx { ) { match delta { SpeakingDelta::Start => { - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::SpeakingUpdate(InternalSpeakingUpdate { ssrc: rtp.get_ssrc(), speaking: true, }), - )); + ))); }, SpeakingDelta::Stop => { - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::SpeakingUpdate(InternalSpeakingUpdate { ssrc: rtp.get_ssrc(), speaking: false, }), - )); + ))); }, - _ => {}, + SpeakingDelta::Same => {}, } - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::VoicePacket(InternalVoicePacket { audio, packet: rtp.from_packet(), payload_offset: rtp_body_start, payload_end_pad: rtp_body_tail, }), - )); + ))); } else { warn!("RTP decoding/processing failed."); } @@ -368,26 +366,23 @@ impl UdpRx { let (start, tail) = packet_data.unwrap_or_else(|| { ( - crypto_mode.payload_prefix_len(), + CryptoMode::payload_prefix_len(), crypto_mode.payload_suffix_len(), ) }); - let _ = - interconnect - .events - .send(EventMessage::FireCoreEvent(CoreContext::RtcpPacket( - InternalRtcpPacket { - packet: rtcp.from_packet(), - payload_offset: start, - payload_end_pad: tail, - }, - ))); + drop(interconnect.events.send(EventMessage::FireCoreEvent( + CoreContext::RtcpPacket(InternalRtcpPacket { + packet: rtcp.from_packet(), + payload_offset: start, + payload_end_pad: tail, + }), + ))); }, DemuxedMut::FailedParse(t) => { warn!("Failed to parse message of type {:?}.", t); }, - _ => { + DemuxedMut::TooSmall => { warn!("Illegal UDP packet from voice server."); }, } @@ -406,7 +401,7 @@ pub(crate) async fn runner( let mut state = UdpRx { cipher, - decoder_map: Default::default(), + decoder_map: HashMap::new(), config, packet_buffer: [0u8; VOICE_PACKET_MAX], rx, @@ -419,6 +414,6 @@ pub(crate) async fn runner( } #[inline] -fn rtp_valid(packet: RtpPacket<'_>) -> bool { +fn rtp_valid(packet: &RtpPacket<'_>) -> bool { packet.get_version() == RTP_VERSION && packet.get_payload_type() == RTP_PROFILE_TYPE } diff --git a/src/driver/tasks/udp_tx.rs b/src/driver/tasks/udp_tx.rs index 4fd2c3c77..88b685725 100644 --- a/src/driver/tasks/udp_tx.rs +++ b/src/driver/tasks/udp_tx.rs @@ -25,7 +25,6 @@ impl UdpTx { let mut ka_time = Instant::now() + UDP_KEEPALIVE_GAP; loop { - use UdpTxMessage::*; match timeout_at(ka_time, self.rx.recv_async()).await { Err(_) => { trace!("Sending UDP Keepalive."); @@ -35,7 +34,7 @@ impl UdpTx { } ka_time += UDP_KEEPALIVE_GAP; }, - Ok(Ok(Packet(p))) => + Ok(Ok(UdpTxMessage::Packet(p))) => if let Err(e) = self.udp_tx.send(&p[..]).await { error!("Fatal UDP packet send error: {:?}.", e); break; @@ -44,7 +43,7 @@ impl UdpTx { error!("Fatal UDP packet receive error: {:?}.", e); break; }, - Ok(Ok(Poison)) => { + Ok(Ok(UdpTxMessage::Poison)) => { break; }, } diff --git a/src/driver/tasks/ws.rs b/src/driver/tasks/ws.rs index 65d2763ef..c9137f361 100644 --- a/src/driver/tasks/ws.rs +++ b/src/driver/tasks/ws.rs @@ -151,13 +151,13 @@ impl AuxNetwork { self.dont_send = true; if should_reconnect { - let _ = interconnect.core.send(CoreMessage::Reconnect); + drop(interconnect.core.send(CoreMessage::Reconnect)); } else { - let _ = interconnect.core.send(CoreMessage::SignalWsClosure( + drop(interconnect.core.send(CoreMessage::SignalWsClosure( self.attempt_idx, self.info.clone(), ws_reason, - )); + ))); break; } } @@ -186,17 +186,17 @@ impl AuxNetwork { fn process_ws(&mut self, interconnect: &Interconnect, value: GatewayEvent) { match value { GatewayEvent::Speaking(ev) => { - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::SpeakingStateUpdate(ev), - )); + ))); }, GatewayEvent::ClientConnect(ev) => { debug!("Received discontinued ClientConnect: {:?}", ev); }, GatewayEvent::ClientDisconnect(ev) => { - let _ = interconnect.events.send(EventMessage::FireCoreEvent( + drop(interconnect.events.send(EventMessage::FireCoreEvent( CoreContext::ClientDisconnect(ev), - )); + ))); }, GatewayEvent::HeartbeatAck(ev) => { if let Some(nonce) = self.last_heartbeat_nonce.take() { diff --git a/src/events/context/data/disconnect.rs b/src/events/context/data/disconnect.rs index 89e49bc2d..6fbbba0ae 100644 --- a/src/events/context/data/disconnect.rs +++ b/src/events/context/data/disconnect.rs @@ -84,21 +84,19 @@ pub enum DisconnectReason { impl From<&ConnectionError> for DisconnectReason { fn from(e: &ConnectionError) -> Self { - use ConnectionError::*; - match e { - AttemptDiscarded => Self::AttemptDiscarded, - CryptoModeInvalid - | CryptoModeUnavailable - | EndpointUrl - | ExpectedHandshake - | IllegalDiscoveryResponse - | IllegalIp - | Json(_) => Self::ProtocolViolation, - Io(_) => Self::Io, - Crypto(_) | InterconnectFailure(_) => Self::Internal, - Ws(ws) => ws.into(), - TimedOut => Self::TimedOut, + ConnectionError::AttemptDiscarded => Self::AttemptDiscarded, + ConnectionError::CryptoModeInvalid + | ConnectionError::CryptoModeUnavailable + | ConnectionError::EndpointUrl + | ConnectionError::ExpectedHandshake + | ConnectionError::IllegalDiscoveryResponse + | ConnectionError::IllegalIp + | ConnectionError::Json(_) => Self::ProtocolViolation, + ConnectionError::Io(_) => Self::Io, + ConnectionError::Crypto(_) | ConnectionError::InterconnectFailure(_) => Self::Internal, + ConnectionError::Ws(ws) => ws.into(), + ConnectionError::TimedOut => Self::TimedOut, } } } diff --git a/src/events/context/mod.rs b/src/events/context/mod.rs index 98d2e5cd2..693dc034c 100644 --- a/src/events/context/mod.rs +++ b/src/events/context/mod.rs @@ -62,17 +62,17 @@ pub enum CoreContext { impl<'a> CoreContext { pub(crate) fn to_user_context(&'a self) -> EventContext<'a> { - use CoreContext::*; - match self { - SpeakingStateUpdate(evt) => EventContext::SpeakingStateUpdate(*evt), - SpeakingUpdate(evt) => EventContext::SpeakingUpdate(SpeakingUpdateData::from(evt)), - VoicePacket(evt) => EventContext::VoicePacket(VoiceData::from(evt)), - RtcpPacket(evt) => EventContext::RtcpPacket(RtcpData::from(evt)), - ClientDisconnect(evt) => EventContext::ClientDisconnect(*evt), - DriverConnect(evt) => EventContext::DriverConnect(ConnectData::from(evt)), - DriverReconnect(evt) => EventContext::DriverReconnect(ConnectData::from(evt)), - DriverDisconnect(evt) => EventContext::DriverDisconnect(DisconnectData::from(evt)), + Self::SpeakingStateUpdate(evt) => EventContext::SpeakingStateUpdate(*evt), + Self::SpeakingUpdate(evt) => + EventContext::SpeakingUpdate(SpeakingUpdateData::from(evt)), + Self::VoicePacket(evt) => EventContext::VoicePacket(VoiceData::from(evt)), + Self::RtcpPacket(evt) => EventContext::RtcpPacket(RtcpData::from(evt)), + Self::ClientDisconnect(evt) => EventContext::ClientDisconnect(*evt), + Self::DriverConnect(evt) => EventContext::DriverConnect(ConnectData::from(evt)), + Self::DriverReconnect(evt) => EventContext::DriverReconnect(ConnectData::from(evt)), + Self::DriverDisconnect(evt) => + EventContext::DriverDisconnect(DisconnectData::from(evt)), } } } @@ -80,18 +80,17 @@ impl<'a> CoreContext { impl EventContext<'_> { /// Retreive the event class for an event (i.e., when matching) /// an event against the registered listeners. + #[must_use] pub fn to_core_event(&self) -> Option { - use EventContext::*; - match self { - SpeakingStateUpdate(_) => Some(CoreEvent::SpeakingStateUpdate), - SpeakingUpdate(_) => Some(CoreEvent::SpeakingUpdate), - VoicePacket(_) => Some(CoreEvent::VoicePacket), - RtcpPacket(_) => Some(CoreEvent::RtcpPacket), - ClientDisconnect(_) => Some(CoreEvent::ClientDisconnect), - DriverConnect(_) => Some(CoreEvent::DriverConnect), - DriverReconnect(_) => Some(CoreEvent::DriverReconnect), - DriverDisconnect(_) => Some(CoreEvent::DriverDisconnect), + Self::SpeakingStateUpdate(_) => Some(CoreEvent::SpeakingStateUpdate), + Self::SpeakingUpdate(_) => Some(CoreEvent::SpeakingUpdate), + Self::VoicePacket(_) => Some(CoreEvent::VoicePacket), + Self::RtcpPacket(_) => Some(CoreEvent::RtcpPacket), + Self::ClientDisconnect(_) => Some(CoreEvent::ClientDisconnect), + Self::DriverConnect(_) => Some(CoreEvent::DriverConnect), + Self::DriverReconnect(_) => Some(CoreEvent::DriverReconnect), + Self::DriverDisconnect(_) => Some(CoreEvent::DriverDisconnect), _ => None, } } diff --git a/src/events/core.rs b/src/events/core.rs index f88e4401e..b73ac0b0d 100644 --- a/src/events/core.rs +++ b/src/events/core.rs @@ -9,7 +9,7 @@ /// when a client leaves the session ([`ClientDisconnect`]), voice packets ([`VoicePacket`]), and /// telemetry data ([`RtcpPacket`]). The format of voice packets is described by [`VoiceData`]. /// -/// To detect when a user connects, you must correlate gateway (e.g., VoiceStateUpdate) events +/// To detect when a user connects, you must correlate gateway (e.g., `VoiceStateUpdate`) events /// from the main part of your bot. /// /// To obtain a user's SSRC, you must use [`SpeakingStateUpdate`] events. diff --git a/src/events/store.rs b/src/events/store.rs index 1846158c0..1a6f919bf 100644 --- a/src/events/store.rs +++ b/src/events/store.rs @@ -24,8 +24,9 @@ pub struct EventStore { impl EventStore { /// Creates a new event store to be used globally. + #[must_use] pub fn new() -> Self { - Default::default() + Self::default() } /// Creates a new event store to be used within a [`Track`]. @@ -34,6 +35,7 @@ impl EventStore { /// a track has been registered. /// /// [`Track`]: crate::tracks::Track + #[must_use] pub fn new_local() -> Self { EventStore { local_only: true, @@ -53,21 +55,20 @@ impl EventStore { return; } - use Event::*; match evt.event { - Core(c) => { + Event::Core(c) => { self.untimed .entry(c.into()) .or_insert_with(Vec::new) .push(evt); }, - Track(t) => { + Event::Track(t) => { self.untimed .entry(t.into()) .or_insert_with(Vec::new) .push(evt); }, - Delayed(_) | Periodic(_, _) => { + Event::Delayed(_) | Event::Periodic(_, _) => { self.timed.push(evt); }, _ => { @@ -105,15 +106,12 @@ impl EventStore { /// Processes all events due up to and including `now`. pub(crate) fn timed_event_ready(&self, now: Duration) -> bool { - self.timed - .peek() - .map(|evt| { - evt.fire_time - .as_ref() - .expect("Timed event must have a fire_time.") - <= &now - }) - .unwrap_or(false) + self.timed.peek().map_or(false, |evt| { + evt.fire_time + .as_ref() + .expect("Timed event must have a fire_time.") + <= &now + }) } /// Processes all events attached to the given track event. @@ -215,7 +213,7 @@ impl GlobalEvents { } } - for (evt, indices) in self.awaiting_tick.iter() { + for (evt, indices) in &self.awaiting_tick { let untimed = (*evt).into(); if !indices.is_empty() { @@ -223,7 +221,7 @@ impl GlobalEvents { } // Local untimed track events. - for &i in indices.iter() { + for &i in indices { let event_store = events .get_mut(i) .expect("Missing store index for Tick (local untimed)."); @@ -261,12 +259,12 @@ impl GlobalEvents { self.store .process_untimed(self.time, untimed, EventContext::Track(&global_ctx[..])) - .await + .await; } } // Now drain vecs. - for (_evt, indices) in self.awaiting_tick.iter_mut() { + for indices in self.awaiting_tick.values_mut() { indices.clear(); } } diff --git a/src/handler.rs b/src/handler.rs index 8f2119e38..68bd2ab10 100644 --- a/src/handler.rs +++ b/src/handler.rs @@ -73,12 +73,7 @@ impl Call { G: Into + Debug, U: Into + Debug, { - Self::new_raw_cfg( - guild_id.into(), - Some(ws), - user_id.into(), - Default::default(), - ) + Self::new_raw_cfg(guild_id.into(), Some(ws), user_id.into(), Config::default()) } /// Creates a new Call, configuring the driver as specified. @@ -107,7 +102,7 @@ impl Call { G: Into + Debug, U: Into + Debug, { - Self::new_raw_cfg(guild_id.into(), None, user_id.into(), Default::default()) + Self::new_raw_cfg(guild_id.into(), None, user_id.into(), Config::default()) } /// Creates a new standalone Call from the given configuration file. @@ -141,7 +136,7 @@ impl Call { match &self.connection { Some((ConnectionProgress::Complete(c), Return::Info(tx))) => { // It's okay if the receiver hung up. - let _ = tx.send(c.clone()); + drop(tx.send(c.clone())); }, #[cfg(feature = "driver")] Some((ConnectionProgress::Complete(c), Return::Conn(first_tx, driver_tx))) => { @@ -195,7 +190,7 @@ impl Call { self.leave().await?; true } else if conn.0.channel_id() == channel_id { - let _ = tx.send(completion_generator(self)); + drop(tx.send(completion_generator(self))); false } else { // not in progress, and/or a channel change. @@ -419,7 +414,7 @@ impl Call { where C: Into + Debug, { - self._update_state(session_id, channel_id.map(|c| c.into())) + self._update_state(session_id, channel_id.map(Into::into)); } fn _update_state(&mut self, session_id: String, channel_id: Option) { diff --git a/src/info.rs b/src/info.rs index 15e564f11..e5abda03d 100644 --- a/src/info.rs +++ b/src/info.rs @@ -18,10 +18,10 @@ impl ConnectionProgress { } pub(crate) fn get_connection_info(&self) -> Option<&ConnectionInfo> { - use ConnectionProgress::*; - match self { - Complete(c) => Some(c), - _ => None, + if let Self::Complete(c) = self { + Some(c) + } else { + None } } @@ -53,10 +53,7 @@ impl ConnectionProgress { } pub(crate) fn info(&self) -> Option { - match self { - ConnectionProgress::Complete(conn_info) => Some(conn_info.clone()), - _ => None, - } + self.get_connection_info().cloned() } pub(crate) fn apply_state_update(&mut self, session_id: String, channel_id: ChannelId) -> bool { @@ -65,26 +62,24 @@ impl ConnectionProgress { *self = ConnectionProgress::new(self.guild_id(), self.user_id(), channel_id); } - use ConnectionProgress::*; match self { - Complete(c) => { + Self::Complete(c) => { let should_reconn = c.session_id != session_id; c.session_id = session_id; should_reconn }, - Incomplete(i) => i + Self::Incomplete(i) => i .apply_state_update(session_id, channel_id) .map(|info| { - *self = Complete(info); + *self = Self::Complete(info); }) .is_some(), } } pub(crate) fn apply_server_update(&mut self, endpoint: String, token: String) -> bool { - use ConnectionProgress::*; match self { - Complete(c) => { + Self::Complete(c) => { let should_reconn = c.endpoint != endpoint || c.token != token; c.endpoint = endpoint; @@ -92,10 +87,10 @@ impl ConnectionProgress { should_reconn }, - Incomplete(i) => i + Self::Incomplete(i) => i .apply_server_update(endpoint, token) .map(|info| { - *self = Complete(info); + *self = Self::Complete(info); }) .is_some(), } diff --git a/src/input/adapters/async_adapter.rs b/src/input/adapters/async_adapter.rs index 7ebb31cfa..2d1438480 100644 --- a/src/input/adapters/async_adapter.rs +++ b/src/input/adapters/async_adapter.rs @@ -53,7 +53,7 @@ impl AsyncAdapterSink { if let Ok(n) = self.stream.read(&mut inner_buf).await { read_region = 0..n; if n == 0 { - let _ = self.resp_tx.send_async(AdapterResponse::ReadZero).await; + drop(self.resp_tx.send_async(AdapterResponse::ReadZero).await); hit_end = true; } seen_bytes += n as u64; @@ -104,22 +104,24 @@ impl AsyncAdapterSink { match msg { AdapterRequest::Wake => blocked = false, AdapterRequest::ByteLen => { - let _ = self - .resp_tx - .send_async(AdapterResponse::ByteLen(self.stream.byte_len().await)) - .await; + drop( + self.resp_tx + .send_async(AdapterResponse::ByteLen(self.stream.byte_len().await)) + .await, + ); }, AdapterRequest::Seek(pos) => { pause_buf_moves = true; - let _ = self.resp_tx.send_async(AdapterResponse::SeekClear).await; + drop(self.resp_tx.send_async(AdapterResponse::SeekClear).await); seek_res = Some(self.stream.seek(pos).await); }, AdapterRequest::SeekCleared => { if let Some(res) = seek_res.take() { - let _ = self - .resp_tx - .send_async(AdapterResponse::SeekResult(res)) - .await; + drop( + self.resp_tx + .send_async(AdapterResponse::SeekResult(res)) + .await, + ); } pause_buf_moves = false; }, @@ -148,6 +150,7 @@ pub struct AsyncAdapterStream { impl AsyncAdapterStream { /// Wrap and pull from an async file stream, with an intermediate ring-buffer of size `buf_len` /// between the async and sync halves. + #[must_use] pub fn new(stream: Box, buf_len: usize) -> AsyncAdapterStream { let (bytes_in, bytes_out) = RingBuffer::new(buf_len).split(); let (resp_tx, resp_rx) = flume::unbounded(); @@ -215,7 +218,7 @@ impl Read for AsyncAdapterStream { // This needs to remain blocking or spin loopy // Mainly because this is at odds with "keep CPU low." loop { - let _ = self.handle_messages(false); + drop(self.handle_messages(false)); match self.bytes_out.read(buf) { Ok(n) => { @@ -227,10 +230,10 @@ impl Read for AsyncAdapterStream { self.notify_tx.notify_one(); if self.finalised.load(Ordering::Relaxed) { return Ok(0); - } else { - self.check_dropped()?; - std::hint::spin_loop(); } + + self.check_dropped()?; + std::hint::spin_loop(); }, a => { println!("Misc err {:?}", a); diff --git a/src/input/adapters/cached/compressed.rs b/src/input/adapters/cached/compressed.rs index 3e99313b1..4674d1829 100644 --- a/src/input/adapters/cached/compressed.rs +++ b/src/input/adapters/cached/compressed.rs @@ -33,6 +33,7 @@ use streamcatcher::{ TxCatcher, }; use symphonia_core::{ + audio::Channels as SChannels, codecs::CodecRegistry, io::MediaSource, meta::{MetadataRevision, StandardTagKey, Value}, @@ -66,7 +67,7 @@ impl Default for Config { Self { codec_registry: &CODEC_REGISTRY, format_registry: &PROBE, - streamcatcher: Default::default(), + streamcatcher: ScConfig::default(), } } } @@ -163,7 +164,7 @@ impl Compressed { // } let track_info = parsed.decoder.codec_params(); - let chan_count = track_info.channels.map(|v| v.count()).unwrap_or(2); + let chan_count = track_info.channels.map_or(2, SChannels::count); let (channels, stereo) = if chan_count >= 2 { (Channels::Stereo, true) @@ -212,6 +213,7 @@ impl Compressed { /// Acquire a new handle to this object, creating a new /// view of the existing cached data from the beginning. + #[must_use] pub fn new_handle(&self) -> Self { Self { raw: self.raw.new_handle(), @@ -242,8 +244,6 @@ fn create_metadata( Bitrate::Max => Some(510_000), }; - let vbr = opus.vbr()?; - let mode = match opus.application()? { Application::Voip => "voip", Application::Audio => "music", @@ -258,7 +258,7 @@ fn create_metadata( sample_rate, frame_size: MONO_FRAME_BYTE_SIZE as u64, abr, - vbr, + vbr: opus.vbr()?, channels: channels.min(2), }; @@ -357,7 +357,7 @@ impl OpusCompressor { last_frame: Vec::with_capacity(4000), stereo_input, frame_pos: 0, - audio_bytes: Default::default(), + audio_bytes: AtomicUsize::default(), } } } diff --git a/src/input/adapters/cached/decompressed.rs b/src/input/adapters/cached/decompressed.rs index 1baac8890..43badec65 100644 --- a/src/input/adapters/cached/decompressed.rs +++ b/src/input/adapters/cached/decompressed.rs @@ -5,7 +5,7 @@ use crate::{ }; use std::io::{Read, Seek}; use streamcatcher::Catcher; -use symphonia_core::io::MediaSource; +use symphonia_core::{audio::Channels, io::MediaSource}; /// A wrapper around an existing [`Input`] which caches /// the decoded and converted audio data locally in memory @@ -85,7 +85,7 @@ impl Decompressed { let track_info = parsed.decoder.codec_params(); let chan_count = track_info .channels - .map(|v| v.count()) + .map(Channels::count) .ok_or(CodecCacheError::UnknownChannelCount)?; let sample_rate = SAMPLE_RATE_RAW as u32; @@ -102,6 +102,7 @@ impl Decompressed { /// Acquire a new handle to this object, creating a new /// view of the existing cached data from the beginning. + #[must_use] pub fn new_handle(&self) -> Self { Self { raw: self.raw.new_handle(), diff --git a/src/input/adapters/cached/memory.rs b/src/input/adapters/cached/memory.rs index cb3fd1fe0..7fb9ccc03 100644 --- a/src/input/adapters/cached/memory.rs +++ b/src/input/adapters/cached/memory.rs @@ -69,6 +69,7 @@ impl Memory { /// Acquire a new handle to this object, creating a new /// view of the existing cached data from the beginning. + #[must_use] pub fn new_handle(&self) -> Self { Self { raw: self.raw.new_handle(), diff --git a/src/input/adapters/cached/mod.rs b/src/input/adapters/cached/mod.rs index 4e4d4cfd6..4a341ea39 100644 --- a/src/input/adapters/cached/mod.rs +++ b/src/input/adapters/cached/mod.rs @@ -18,6 +18,7 @@ use std::{mem, time::Duration}; use streamcatcher::{Config, GrowthStrategy}; /// Estimates the cost, in B/s, of audio data compressed at the given bitrate. +#[must_use] pub fn compressed_cost_per_sec(bitrate: Bitrate) -> usize { let framing_cost_per_sec = AUDIO_FRAME_RATE * mem::size_of::(); @@ -31,6 +32,7 @@ pub fn compressed_cost_per_sec(bitrate: Bitrate) -> usize { } /// Calculates the cost, in B/s, of raw floating-point audio data. +#[must_use] pub fn raw_cost_per_sec(stereo: bool) -> usize { utils::timestamp_to_byte_count(Duration::from_secs(1), stereo) } @@ -41,6 +43,7 @@ pub fn raw_cost_per_sec(stereo: bool) -> usize { /// a constant chunk size of 5s worth of audio at the given bitrate estimate. /// /// [`streamcatcher`]: https://docs.rs/streamcatcher/0.1.0/streamcatcher/struct.Config.html +#[must_use] pub fn default_config(cost_per_sec: usize) -> Config { Config::new().chunk_size(GrowthStrategy::Constant(5 * cost_per_sec)) } diff --git a/src/input/adapters/cached/util.rs b/src/input/adapters/cached/util.rs index 4e5a73edc..0e47971a9 100644 --- a/src/input/adapters/cached/util.rs +++ b/src/input/adapters/cached/util.rs @@ -67,7 +67,7 @@ impl ToAudioBytes { let chan_limit = chan_limit.unwrap_or(chan_count); - let resample = if sample_rate != SAMPLE_RATE_RAW as u32 { + let resample = (sample_rate != SAMPLE_RATE_RAW as u32).then(|| { let spec = if let Some(chans) = maybe_chans { SignalSpec::new(SAMPLE_RATE_RAW as u32, chans) } else if let Some(layout) = maybe_layout { @@ -90,15 +90,13 @@ impl ToAudioBytes { let resampled_data = resampler.output_buffer_allocate(); - Some(ResampleState { + ResampleState { resampled_data, resampler, scratch, resample_pos: 0..0, - }) - } else { - None - }; + } + }); Self { chan_count, @@ -120,11 +118,9 @@ impl ToAudioBytes { fn is_done(&self) -> bool { self.done && self.inner_pos.is_empty() - && self - .resample - .as_ref() - .map(|v| v.scratch.frames() == 0 && v.resample_pos.is_empty()) - .unwrap_or(true) + && self.resample.as_ref().map_or(true, |v| { + v.scratch.frames() == 0 && v.resample_pos.is_empty() + }) && self.interrupted_byte_pos.is_empty() } } @@ -263,7 +259,7 @@ impl Read for ToAudioBytes { resample .resampler .process_into_buffer(&*refs, &mut resample.resampled_data, None) - .unwrap() + .unwrap(); } else { unreachable!() } @@ -286,9 +282,7 @@ impl Read for ToAudioBytes { self.inner_pos.start += frames_to_take; - if resample.scratch.frames() != needed_in_frames { - continue; - } else { + if resample.scratch.frames() == needed_in_frames { resample .resampler .process_into_buffer( @@ -298,6 +292,8 @@ impl Read for ToAudioBytes { ) .unwrap(); resample.scratch.clear(); + } else { + continue; } } @@ -346,19 +342,27 @@ fn write_out( spill_range: &mut Range, num_chans: usize, ) -> usize { - use AudioBufferRef::*; - match source { - U8(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - U16(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - U24(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - U32(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - S8(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - S16(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - S24(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - S32(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - F32(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), - F64(v) => write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::U8(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::U16(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::U24(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::U32(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::S8(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::S16(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::S24(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::S32(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::F32(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), + AudioBufferRef::F64(v) => + write_symph_buffer(v, target, source_pos, spillover, spill_range, num_chans), } } @@ -462,19 +466,17 @@ fn copy_into_resampler( dest_pos: usize, len: usize, ) -> usize { - use AudioBufferRef::*; - match source { - U8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - U16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - U24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - U32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - S32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - F32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), - F64(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::U32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S8(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S16(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S24(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::S32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::F32(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), + AudioBufferRef::F64(v) => copy_symph_buffer(v, target, source_pos, dest_pos, len), } } diff --git a/src/input/adapters/child.rs b/src/input/adapters/child.rs index dd10b963e..e2b02c513 100644 --- a/src/input/adapters/child.rs +++ b/src/input/adapters/child.rs @@ -27,6 +27,7 @@ impl Read for ChildContainer { impl ChildContainer { /// Create a new [`ChildContainer`] from a child process + #[must_use] pub fn new(children: Vec) -> Self { Self(children) } diff --git a/src/input/codecs/dca/mod.rs b/src/input/codecs/dca/mod.rs index 030c96f81..4a0d6a827 100644 --- a/src/input/codecs/dca/mod.rs +++ b/src/input/codecs/dca/mod.rs @@ -37,7 +37,7 @@ struct SeekAccel { } impl SeekAccel { - fn new(options: &FormatOptions, first_frame_byte_pos: u64) -> Self { + fn new(options: FormatOptions, first_frame_byte_pos: u64) -> Self { let per_s = options.seek_index_fill_rate; let next_ts = (per_s as u64) * (SAMPLE_RATE_RAW as u64); @@ -61,7 +61,7 @@ impl SeekAccel { } } -/// [DCA[0/1]](https://github.com/bwmarrin/dca) Format reader for Symphonia. +/// [DCA\[0/1\]](https://github.com/bwmarrin/dca) Format reader for Symphonia. pub struct DcaReader { source: MediaSourceStream, track: Option, @@ -176,7 +176,7 @@ impl FormatReader for DcaReader { codec_params, }), metas, - seek_accel: SeekAccel::new(options, bytes_read), + seek_accel: SeekAccel::new(*options, bytes_read), curr_ts: 0, max_ts: None, held_packet: None, diff --git a/src/input/codecs/opus.rs b/src/input/codecs/opus.rs index f56187d43..65c71ff29 100644 --- a/src/input/codecs/opus.rs +++ b/src/input/codecs/opus.rs @@ -119,7 +119,7 @@ impl Decoder for OpusDecoder { } fn finalize(&mut self) -> FinalizeResult { - Default::default() + FinalizeResult::default() } fn last_decoded(&self) -> AudioBufferRef { diff --git a/src/input/codecs/raw.rs b/src/input/codecs/raw.rs index fb5f1fe36..f7577747e 100644 --- a/src/input/codecs/raw.rs +++ b/src/input/codecs/raw.rs @@ -47,13 +47,10 @@ impl FormatReader for RawReader { let mut magic = [0u8; 8]; ReadBytes::read_buf_exact(&mut source, &mut magic[..])?; - match &magic { - b"SbirdRaw" => {}, - _ => { - source.seek_buffered_rel(-(magic.len() as isize)); - return symph_err::decode_error("rawf32: illegal magic byte sequence."); - }, - }; + if &magic != b"SbirdRaw" { + source.seek_buffered_rel(-(magic.len() as isize)); + return symph_err::decode_error("rawf32: illegal magic byte sequence."); + } let sample_rate = source.read_u32()?; let n_chans = source.read_u32()?; @@ -86,7 +83,7 @@ impl FormatReader for RawReader { language: None, codec_params, }, - meta: Default::default(), + meta: MetadataLog::default(), curr_ts: 0, max_ts: None, }) diff --git a/src/input/live_input.rs b/src/input/live_input.rs index 8a4733fdd..947043a40 100644 --- a/src/input/live_input.rs +++ b/src/input/live_input.rs @@ -1,9 +1,11 @@ use super::{AudioStream, Metadata, MetadataError, Parsed}; use symphonia_core::{ - codecs::{CodecRegistry, Decoder}, + codecs::{CodecRegistry, Decoder, DecoderOptions}, errors::Error as SymphError, - io::{MediaSource, MediaSourceStream}, + formats::FormatOptions, + io::{MediaSource, MediaSourceStream, MediaSourceStreamOptions}, + meta::MetadataOptions, probe::Probe, }; @@ -38,12 +40,13 @@ impl LiveInput { /// [`Wrapped`]: Self::Wrapped /// [`Read`]: https://doc.rust-lang.org/std/io/trait.Read.html /// [`Seek`]: https://doc.rust-lang.org/std/io/trait.Seek.html + #[allow(clippy::missing_panics_doc)] // Logic ensures panic doesn't occur pub fn promote(self, codecs: &CodecRegistry, probe: &Probe) -> Result { let mut out = self; if let LiveInput::Raw(r) = out { // TODO: allow passing in of MSS options? - let mss = MediaSourceStream::new(r.input, Default::default()); + let mss = MediaSourceStream::new(r.input, MediaSourceStreamOptions::default()); out = LiveInput::Wrapped(AudioStream { input: mss, hint: r.hint, @@ -54,8 +57,12 @@ impl LiveInput { let hint = w.hint.unwrap_or_default(); let input = w.input; - let probe_data = - probe.format(&hint, input, &Default::default(), &Default::default())?; + let probe_data = probe.format( + &hint, + input, + &FormatOptions::default(), + &MetadataOptions::default(), + )?; let format = probe_data.format; let meta = probe_data.metadata; @@ -72,7 +79,7 @@ impl LiveInput { continue; } - let this_decoder = codecs.make(&track.codec_params, &Default::default())?; + let this_decoder = codecs.make(&track.codec_params, &DecoderOptions::default())?; decoder = Some(this_decoder); default_track_id = Some(track.id); @@ -95,6 +102,7 @@ impl LiveInput { /// Returns a reference to the data parsed from this input stream, if it has /// been made available via [`Self::promote`]. + #[must_use] pub fn parsed(&self) -> Option<&Parsed> { if let Self::Parsed(parsed) = self { Some(parsed) @@ -115,6 +123,7 @@ impl LiveInput { /// Returns whether this stream's headers have been fully parsed, and so whether /// the track can be played or have its metadata read. + #[must_use] pub fn is_playable(&self) -> bool { self.parsed().is_some() } diff --git a/src/input/metadata.rs b/src/input/metadata.rs index e4e9527c0..a8f474426 100644 --- a/src/input/metadata.rs +++ b/src/input/metadata.rs @@ -8,7 +8,7 @@ use super::Parsed; /// Extra information about an [`Input`] which is acquired without /// parsing the file itself (e.g., from a webpage). /// -/// YOu can access this via [`Input::aux_metadata`] and [`Compose::aux_metadata`]. +/// You can access this via [`Input::aux_metadata`] and [`Compose::aux_metadata`]. /// /// [`Input`]: crate::input::Input /// [`Input::aux_metadata`]: crate::input::Input::aux_metadata @@ -79,7 +79,7 @@ impl AuxMetadata { let stream = value .as_object() .and_then(|m| m.get("streams")) - .and_then(|v| v.as_array()) + .and_then(Value::as_array) .and_then(|v| { v.iter() .find(|line| line.get("codec_type").and_then(Value::as_str) == Some("audio")) @@ -184,12 +184,12 @@ impl AuxMetadata { } /// Move all fields from a `Metadata` object into a new one. + #[must_use] pub fn take(&mut self) -> Self { Self { track: self.track.take(), artist: self.artist.take(), date: self.date.take(), - channels: self.channels.take(), channel: self.channel.take(), start_time: self.start_time.take(), diff --git a/src/input/mod.rs b/src/input/mod.rs index 6212dc908..e51e13088 100644 --- a/src/input/mod.rs +++ b/src/input/mod.rs @@ -199,7 +199,7 @@ impl Input { Self::Lazy(ref mut composer) => composer.aux_metadata().await.map_err(Into::into), Self::Live(_, Some(ref mut composer)) => composer.aux_metadata().await.map_err(Into::into), - _ => Err(AuxMetadataError::NoCompose), + Self::Live(_, None) => Err(AuxMetadataError::NoCompose), } } @@ -224,32 +224,27 @@ impl Input { /// must do so via [`Self::make_live_async`].* /// /// This is a no-op for an [`Input::Live`]. - pub fn make_live(self, handle: TokioHandle) -> Result { - use Input::*; + pub fn make_live(self, handle: &TokioHandle) -> Result { + if let Self::Lazy(mut lazy) = self { + let (created, lazy) = if lazy.should_create_async() { + let (tx, rx) = flume::bounded(1); + handle.spawn(async move { + let out = lazy.create_async().await; + drop(tx.send_async((out, lazy))); + }); + rx.recv().map_err(|_| { + let err_msg: Box = + "async Input create handler panicked".into(); + AudioStreamError::Fail(err_msg) + })? + } else { + (lazy.create(), lazy) + }; - let out = match self { - Lazy(mut lazy) => { - let (created, lazy) = if lazy.should_create_async() { - let (tx, rx) = flume::bounded(1); - handle.spawn(async move { - let out = lazy.create_async().await; - let _ = tx.send_async((out, lazy)); - }); - rx.recv().map_err(|_| { - let err_msg: Box = - "async Input create handler panicked".into(); - AudioStreamError::Fail(err_msg) - })? - } else { - (lazy.create(), lazy) - }; - - Live(LiveInput::Raw(created?), Some(lazy)) - }, - other => other, - }; - - Ok(out) + Ok(Self::Live(LiveInput::Raw(created?), Some(lazy))) + } else { + Ok(self) + } } /// Initialises (but does not parse) an [`Input::Lazy`] into an [`Input::Live`], @@ -257,28 +252,23 @@ impl Input { /// /// This is a no-op for an [`Input::Live`]. pub async fn make_live_async(self) -> Result { - use Input::*; - - let out = match self { - Lazy(mut lazy) => { - let (created, lazy) = if lazy.should_create_async() { - (lazy.create_async().await, lazy) - } else { - tokio::task::spawn_blocking(move || (lazy.create(), lazy)) - .await - .map_err(|_| { - let err_msg: Box = - "synchronous Input create handler panicked".into(); - AudioStreamError::Fail(err_msg) - })? - }; - - Live(LiveInput::Raw(created?), Some(lazy)) - }, - other => other, - }; + if let Self::Lazy(mut lazy) = self { + let (created, lazy) = if lazy.should_create_async() { + (lazy.create_async().await, lazy) + } else { + tokio::task::spawn_blocking(move || (lazy.create(), lazy)) + .await + .map_err(|_| { + let err_msg: Box = + "synchronous Input create handler panicked".into(); + AudioStreamError::Fail(err_msg) + })? + }; - Ok(out) + Ok(Self::Live(LiveInput::Raw(created?), Some(lazy))) + } else { + Ok(self) + } } /// Initialises and parses an [`Input::Lazy`] into an [`Input::Live`], @@ -296,7 +286,7 @@ impl Input { self, codecs: &CodecRegistry, probe: &Probe, - handle: TokioHandle, + handle: &TokioHandle, ) -> Result { let out = self.make_live(handle)?; match out { @@ -329,15 +319,18 @@ impl Input { /// Returns whether this audio stream is full initialised, parsed, and /// ready to play (e.g., `Self::Live(LiveInput::Parsed(p), _)`). + #[must_use] pub fn is_playable(&self) -> bool { - match self { - Self::Live(input, _) => input.is_playable(), - _ => false, + if let Self::Live(input, _) = self { + input.is_playable() + } else { + false } } /// Returns a reference to the live input, if it has been created via /// [`Self::make_live`] or [`Self::make_live_async`]. + #[must_use] pub fn live(&self) -> Option<&LiveInput> { if let Self::Live(input, _) = self { Some(input) @@ -358,14 +351,15 @@ impl Input { /// Returns a reference to the data parsed from this input stream, if it has /// been made available via [`Self::make_playable`] or [`LiveInput::promote`]. + #[must_use] pub fn parsed(&self) -> Option<&Parsed> { - self.live().and_then(|v| v.parsed()) + self.live().and_then(LiveInput::parsed) } /// Returns a mutable reference to the data parsed from this input stream, if it /// has been made available via [`Self::make_playable`] or [`LiveInput::promote`]. pub fn parsed_mut(&mut self) -> Option<&mut Parsed> { - self.live_mut().and_then(|v| v.parsed_mut()) + self.live_mut().and_then(LiveInput::parsed_mut) } } diff --git a/src/input/sources/file.rs b/src/input/sources/file.rs index 9ea76508d..f5fbe7d20 100644 --- a/src/input/sources/file.rs +++ b/src/input/sources/file.rs @@ -1,5 +1,5 @@ use crate::input::{AudioStream, AudioStreamError, Compose, Input}; -use std::{error::Error, path::Path}; +use std::{error::Error, ffi::OsStr, path::Path}; use symphonia_core::{io::MediaSource, probe::Hint}; /// A lazily instantiated local file. @@ -41,7 +41,7 @@ impl + Send + Sync> Compose for File

{ let input = Box::new(file.into_std().await); let mut hint = Hint::default(); - if let Some(ext) = self.path.as_ref().extension().and_then(|s| s.to_str()) { + if let Some(ext) = self.path.as_ref().extension().and_then(OsStr::to_str) { hint.with_extension(ext); } diff --git a/src/input/sources/ytdl.rs b/src/input/sources/ytdl.rs index 0625246c9..3aa7b0fea 100644 --- a/src/input/sources/ytdl.rs +++ b/src/input/sources/ytdl.rs @@ -11,7 +11,7 @@ const YOUTUBE_DL_COMMAND: &str = "yt-dlp"; /// A lazily instantiated call to download a file, finding its URL via youtube-dl. /// /// By default, this uses yt-dlp and is backed by an [`HttpRequest`]. This handler -/// attempts to find the best audio-only source (typically WebM, enabling low-cost +/// attempts to find the best audio-only source (typically `WebM`, enabling low-cost /// Opus frame passthrough). /// /// [`HttpRequest`]: super::HttpRequest @@ -28,6 +28,7 @@ impl YoutubeDl { /// /// This requires a reqwest client: ideally, one should be created and shared between /// all requests. + #[must_use] pub fn new(client: Client, url: String) -> Self { Self::new_ytdl_like(YOUTUBE_DL_COMMAND, client, url) } @@ -35,6 +36,7 @@ impl YoutubeDl { /// Creates a lazy request to select an audio stream from `url` as in [`new`], using `program`. /// /// [`new`]: Self::new + #[must_use] pub fn new_ytdl_like(program: &'static str, client: Client, url: String) -> Self { Self { program, @@ -83,7 +85,7 @@ impl Compose for YoutubeDl { let url = stdout .as_object() .and_then(|top| top.get("url")) - .and_then(|url| url.as_str()) + .and_then(Value::as_str) .ok_or_else(|| { let msg: Box = "URL field not found on youtube-dl output.".into(); @@ -109,7 +111,7 @@ impl Compose for YoutubeDl { return Ok(meta.clone()); } - let _ = self.query().await?; + self.query().await?; self.metadata.clone().ok_or_else(|| { let msg: Box = diff --git a/src/input/utils.rs b/src/input/utils.rs index d6072da29..282545f94 100644 --- a/src/input/utils.rs +++ b/src/input/utils.rs @@ -4,26 +4,30 @@ use crate::constants::*; use audiopus::{coder::Decoder, Channels, Result as OpusResult, SampleRate}; use std::{mem, time::Duration}; -/// Calculates the sample position in a FloatPCM stream from a timestamp. +/// Calculates the sample position in a `FloatPCM` stream from a timestamp. +#[must_use] pub fn timestamp_to_sample_count(timestamp: Duration, stereo: bool) -> usize { ((timestamp.as_millis() as usize) * (MONO_FRAME_SIZE / FRAME_LEN_MS)) << stereo as usize } -/// Calculates the time position in a FloatPCM stream from a sample index. +/// Calculates the time position in a `FloatPCM` stream from a sample index. +#[must_use] pub fn sample_count_to_timestamp(amt: usize, stereo: bool) -> Duration { Duration::from_millis((((amt * FRAME_LEN_MS) / MONO_FRAME_SIZE) as u64) >> stereo as u64) } -/// Calculates the byte position in a FloatPCM stream from a timestamp. +/// Calculates the byte position in a `FloatPCM` stream from a timestamp. /// /// Each sample is sized by `mem::size_of::() == 4usize`. +#[must_use] pub fn timestamp_to_byte_count(timestamp: Duration, stereo: bool) -> usize { timestamp_to_sample_count(timestamp, stereo) * mem::size_of::() } -/// Calculates the time position in a FloatPCM stream from a byte index. +/// Calculates the time position in a `FloatPCM` stream from a byte index. /// /// Each sample is sized by `mem::size_of::() == 4usize`. +#[must_use] pub fn byte_count_to_timestamp(amt: usize, stereo: bool) -> Duration { sample_count_to_timestamp(amt / mem::size_of::(), stereo) } diff --git a/src/lib.rs b/src/lib.rs index 0635fbb70..0b3bb8884 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -58,6 +58,21 @@ //! [codecs and formats provided by Symphonia]: https://github.com/pdeljanov/Symphonia#formats-demuxers //! [audiopus]: https://github.com/lakelezz/audiopus +#![warn(clippy::pedantic)] +#![allow( + // Allowed as they are too pedantic + clippy::module_name_repetitions, + clippy::wildcard_imports, + clippy::too_many_lines, + clippy::cast_lossless, + clippy::cast_sign_loss, + clippy::cast_possible_wrap, + clippy::cast_precision_loss, + clippy::cast_possible_truncation, + // TODO: would require significant rewriting of all existing docs + clippy::missing_errors_doc, +)] + mod config; pub mod constants; #[cfg(feature = "driver")] diff --git a/src/manager.rs b/src/manager.rs index e09b00fb5..33891f3a9 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "serenity")] +use crate::shards::SerenitySharder; use crate::{ error::{JoinError, JoinResult}, id::{ChannelId, GuildId, UserId}, @@ -57,8 +59,9 @@ impl Songbird { /// This must be [registered] after creation. /// /// [registered]: crate::serenity::register_with + #[must_use] pub fn serenity() -> Arc { - Self::serenity_from_config(Default::default()) + Self::serenity_from_config(Config::default()) } #[cfg(feature = "serenity")] @@ -67,11 +70,12 @@ impl Songbird { /// This must be [registered] after creation. /// /// [registered]: crate::serenity::register_with + #[must_use] pub fn serenity_from_config(config: Config) -> Arc { Arc::new(Self { - client_data: Default::default(), - calls: Default::default(), - sharder: Sharder::Serenity(Default::default()), + client_data: PRwLock::new(ClientData::default()), + calls: DashMap::new(), + sharder: Sharder::Serenity(SerenitySharder::default()), config: Some(config).into(), }) } @@ -88,7 +92,7 @@ impl Songbird { where U: Into, { - Self::twilight_from_config(cluster, user_id, Default::default()) + Self::twilight_from_config(cluster, user_id, Config::default()) } #[cfg(feature = "twilight")] @@ -109,7 +113,7 @@ impl Songbird { initialised: true, user_id: user_id.into(), }), - calls: Default::default(), + calls: DashMap::new(), sharder: Sharder::TwilightCluster(cluster), config: Some(config).into(), } diff --git a/src/serenity.rs b/src/serenity.rs index 92d66e1f4..eb9872d1b 100644 --- a/src/serenity.rs +++ b/src/serenity.rs @@ -11,9 +11,10 @@ use serenity::{ use std::sync::Arc; /// Zero-size type used to retrieve the registered [`Songbird`] instance -/// from serenity's inner TypeMap. +/// from serenity's inner [`TypeMap`]. /// /// [`Songbird`]: Songbird +/// [`TypeMap`]: serenity::prelude::TypeMap pub struct SongbirdKey; impl TypeMapKey for SongbirdKey { @@ -63,10 +64,13 @@ pub trait SerenityInit { /// access via [`get`]. /// /// [`get`]: get + #[must_use] fn register_songbird(self) -> Self; /// Registers a given Songbird voice system with serenity, as above. + #[must_use] fn register_songbird_with(self, voice: Arc) -> Self; /// Registers a Songbird voice system serenity, based on the given configuration. + #[must_use] fn register_songbird_from_config(self, config: Config) -> Self; } diff --git a/src/shards.rs b/src/shards.rs index 92613ab23..8073ba90e 100644 --- a/src/shards.rs +++ b/src/shards.rs @@ -69,17 +69,19 @@ impl Sharder { impl Sharder { #[allow(unreachable_patterns)] pub(crate) fn register_shard_handle(&self, shard_id: u64, sender: Sender) { - match self { - Sharder::Serenity(s) => s.register_shard_handle(shard_id, sender), - _ => error!("Called serenity management function on a non-serenity Songbird instance."), + if let Sharder::Serenity(s) = self { + s.register_shard_handle(shard_id, sender); + } else { + error!("Called serenity management function on a non-serenity Songbird instance."); } } #[allow(unreachable_patterns)] pub(crate) fn deregister_shard_handle(&self, shard_id: u64) { - match self { - Sharder::Serenity(s) => s.deregister_shard_handle(shard_id), - _ => error!("Called serenity management function on a non-serenity Songbird instance."), + if let Sharder::Serenity(s) = self { + s.deregister_shard_handle(shard_id); + } else { + error!("Called serenity management function on a non-serenity Songbird instance."); } } } @@ -120,7 +122,7 @@ impl SerenitySharder { } } -#[derive(Derivative)] +#[derive(Derivative, Clone)] #[derivative(Debug)] #[non_exhaustive] /// A reference to an individual websocket connection. @@ -138,22 +140,6 @@ pub enum Shard { Generic(#[derivative(Debug = "ignore")] Arc), } -impl Clone for Shard { - fn clone(&self) -> Self { - use Shard::*; - - match self { - #[cfg(feature = "serenity")] - Serenity(handle) => Serenity(Arc::clone(handle)), - #[cfg(feature = "twilight")] - TwilightCluster(handle, id) => TwilightCluster(Arc::clone(handle), *id), - #[cfg(feature = "twilight")] - TwilightShard(handle) => TwilightShard(Arc::clone(handle)), - Generic(handle) => Generic(Arc::clone(handle)), - } - } -} - #[async_trait] impl VoiceUpdate for Shard { async fn update_voice_state( diff --git a/src/tracks/command.rs b/src/tracks/command.rs index 3286afa55..e72d7c3e7 100644 --- a/src/tracks/command.rs +++ b/src/tracks/command.rs @@ -36,21 +36,20 @@ pub enum TrackCommand { impl std::fmt::Debug for TrackCommand { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { - use TrackCommand::*; write!( f, "TrackCommand::{}", match self { - Play => "Play".to_string(), - Pause => "Pause".to_string(), - Stop => "Stop".to_string(), - Volume(vol) => format!("Volume({})", vol), - Seek(d) => format!("Seek({:?})", d), - AddEvent(evt) => format!("AddEvent({:?})", evt), - Do(_f) => "Do([function])".to_string(), - Request(tx) => format!("Request({:?})", tx), - Loop(loops) => format!("Loop({:?})", loops), - MakePlayable => "MakePlayable".to_string(), + Self::Play => "Play".to_string(), + Self::Pause => "Pause".to_string(), + Self::Stop => "Stop".to_string(), + Self::Volume(vol) => format!("Volume({})", vol), + Self::Seek(d) => format!("Seek({:?})", d), + Self::AddEvent(evt) => format!("AddEvent({:?})", evt), + Self::Do(_f) => "Do([function])".to_string(), + Self::Request(tx) => format!("Request({:?})", tx), + Self::Loop(loops) => format!("Loop({:?})", loops), + Self::MakePlayable => "MakePlayable".to_string(), } ) } diff --git a/src/tracks/handle.rs b/src/tracks/handle.rs index 4722cc487..91e55bd09 100644 --- a/src/tracks/handle.rs +++ b/src/tracks/handle.rs @@ -43,6 +43,7 @@ impl TrackHandle { /// the underlying [`Input`] supports seek operations. /// /// [`Input`]: crate::input::Input + #[must_use] pub fn new(command_channel: Sender, uuid: Uuid) -> Self { let inner = Arc::new(InnerHandle { command_channel, @@ -164,17 +165,19 @@ impl TrackHandle { } /// Returns this handle's (and track's) unique identifier. + #[must_use] pub fn uuid(&self) -> Uuid { self.inner.uuid } - /// Allows access to this track's attached TypeMap. + /// Allows access to this track's attached [`TypeMap`]. /// - /// TypeMaps allow additional, user-defined data shared by all handles + /// [`TypeMap`]s allow additional, user-defined data shared by all handles /// to be attached to any track. /// /// Driver code will never attempt to lock access to this map, /// preventing deadlock/stalling. + #[must_use] pub fn typemap(&self) -> &RwLock { &self.inner.typemap } diff --git a/src/tracks/mod.rs b/src/tracks/mod.rs index a9d7eca04..c05826275 100644 --- a/src/tracks/mod.rs +++ b/src/tracks/mod.rs @@ -108,6 +108,7 @@ pub struct Track { impl Track { /// Create a new track directly from an [`Input`] and a random [`Uuid`]. + #[must_use] pub fn new(input: Input) -> Self { let uuid = Uuid::new_v4(); @@ -115,9 +116,10 @@ impl Track { } /// Create a new track directly from an [`Input`] with a custom [`Uuid`]. + #[must_use] pub fn new_with_uuid(input: Input, uuid: Uuid) -> Self { Self { - playing: Default::default(), + playing: PlayMode::default(), volume: 1.0, input, events: EventStore::new_local(), diff --git a/src/tracks/mode.rs b/src/tracks/mode.rs index f620ea328..3df14ffd7 100644 --- a/src/tracks/mode.rs +++ b/src/tracks/mode.rs @@ -31,14 +31,12 @@ impl PlayMode { } #[must_use] - pub(crate) fn next_state(self, other: Self) -> PlayMode { - use PlayMode::*; - + pub(crate) fn next_state(self, other: Self) -> Self { // Idea: a finished track cannot be restarted -- this action is final. // We may want to change this in future so that seekable tracks can uncancel // themselves, perhaps, but this requires a bit more machinery to readd... match self { - Play | Pause => other, + Self::Play | Self::Pause => other, state => state, } } @@ -49,12 +47,11 @@ impl PlayMode { #[must_use] pub(crate) fn as_track_event(&self) -> TrackEvent { - use PlayMode::*; match self { - Play => TrackEvent::Play, - Pause => TrackEvent::Pause, - Stop | End => TrackEvent::End, - Errored(_) => TrackEvent::Error, + Self::Play => TrackEvent::Play, + Self::Pause => TrackEvent::Pause, + Self::Stop | Self::End => TrackEvent::End, + Self::Errored(_) => TrackEvent::Error, } } } diff --git a/src/tracks/queue.rs b/src/tracks/queue.rs index 89beda6f3..b28502f24 100644 --- a/src/tracks/queue.rs +++ b/src/tracks/queue.rs @@ -75,6 +75,7 @@ impl Deref for Queued { impl Queued { /// Clones the inner handle + #[must_use] pub fn handle(&self) -> TrackHandle { self.0.clone() } @@ -154,6 +155,7 @@ impl EventHandler for SongPreloader { impl TrackQueue { /// Create a new, empty, track queue. + #[must_use] pub fn new() -> Self { Self { inner: Arc::new(Mutex::new(TrackQueueCore { @@ -181,7 +183,7 @@ impl TrackQueue { let meta = match track.input { Input::Lazy(ref mut rec) => rec.aux_metadata().await.ok(), Input::Live(_, Some(ref mut rec)) => rec.aux_metadata().await.ok(), - _ => None, + Input::Live(_, None) => None, }; meta.and_then(|meta| meta.duration) @@ -231,10 +233,11 @@ impl TrackQueue { } /// Returns a handle to the currently playing track. + #[must_use] pub fn current(&self) -> Option { let inner = self.inner.lock(); - inner.tracks.front().map(|h| h.handle()) + inner.tracks.front().map(Queued::handle) } /// Attempts to remove a track from the specified index. @@ -242,11 +245,13 @@ impl TrackQueue { /// The returned entry can be readded to *this* queue via [`modify_queue`]. /// /// [`modify_queue`]: TrackQueue::modify_queue + #[must_use] pub fn dequeue(&self, index: usize) -> Option { self.modify_queue(|vq| vq.remove(index)) } /// Returns the number of tracks currently in the queue. + #[must_use] pub fn len(&self) -> usize { let inner = self.inner.lock(); @@ -254,6 +259,7 @@ impl TrackQueue { } /// Returns whether there are no tracks currently in the queue. + #[must_use] pub fn is_empty(&self) -> bool { let inner = self.inner.lock(); @@ -319,10 +325,11 @@ impl TrackQueue { /// Use [`modify_queue`] for direct modification of the queue. /// /// [`modify_queue`]: TrackQueue::modify_queue + #[must_use] pub fn current_queue(&self) -> Vec { let inner = self.inner.lock(); - inner.tracks.iter().map(|q| q.handle()).collect() + inner.tracks.iter().map(Queued::handle).collect() } }