Skip to content

Commit

Permalink
Global clippy appeasement
Browse files Browse the repository at this point in the history
  • Loading branch information
FelixMcFelix committed May 20, 2023
1 parent 6cbd7ee commit 0f0d70d
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 48 deletions.
8 changes: 7 additions & 1 deletion src/driver/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,13 @@ pub(crate) use crypto::CryptoState;
#[cfg(feature = "receive")]
pub use decode_mode::DecodeMode;
pub use mix_mode::MixMode;
pub use scheduler::{LiveStatBlock, ScheduleMode, Scheduler, DEFAULT_SCHEDULER};
pub use scheduler::{
Error as SchedulerError,
LiveStatBlock,
ScheduleMode,
Scheduler,
DEFAULT_SCHEDULER,
};
#[cfg(test)]
pub use test_config::*;
#[cfg(any(test, feature = "internals"))]
Expand Down
4 changes: 2 additions & 2 deletions src/driver/scheduler/idle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl Idle {
pub fn new(mode: ScheduleMode) -> (Self, Sender<SchedulerMessage>) {
let (tx, rx) = flume::unbounded();

let stats = Default::default();
let stats = Arc::default();
let tasks = HashMap::with_capacity_and_hasher(128, BuildNoHashHasher::default());

// TODO: include heap of keepalive sending times?
Expand Down Expand Up @@ -112,7 +112,7 @@ impl Idle {
// we don't check every task every 20ms.
let now = TokInstant::now();

for (id, task) in self.tasks.iter_mut() {
for (id, task) in &mut self.tasks {
// NOTE: this is a non-blocking send so safe from async context.
if task.tick_and_keepalive(now.into()).is_err() {
self.to_cull.push(*id);
Expand Down
50 changes: 27 additions & 23 deletions src/driver/scheduler/live.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl Live {
advance_rtp_counters(packet);
}

for mixer in self.tasks.iter_mut() {
for mixer in &mut self.tasks {
mixer.audio_commands_events();
mixer.check_and_send_keepalive(self.start_of_work);
}
Expand Down Expand Up @@ -324,7 +324,11 @@ impl Live {
self.add_task(
task,
id,
*activation_time.get_or_insert_with(|| (self.deadline - TIMESTEP_LENGTH)),
*activation_time.get_or_insert_with(|| {
self.deadline
.checked_sub(TIMESTEP_LENGTH)
.unwrap_or(self.deadline)
}),
);
},
Err(TryRecvError::Empty) => break,
Expand Down Expand Up @@ -448,7 +452,7 @@ impl Live {

#[inline]
fn remove_excess_blocks(&mut self) {
self.packets.truncate(self.needed_blocks())
self.packets.truncate(self.needed_blocks());
}

/// Try to offload excess packet buffers.
Expand All @@ -472,22 +476,6 @@ impl Live {
}
}

/// Returns the block index into `self.packets` and the packet number in
/// the block for a given worker's index.
#[inline]
fn get_memory_indices_unscaled(&self, idx: usize) -> (usize, usize) {
let block_size = PACKETS_PER_BLOCK;
(idx / block_size, idx % block_size)
}

/// Returns the block index into `self.packets` and the byte offset into
/// a packet block for a given worker's index.
#[inline]
fn get_memory_indices(&self, idx: usize) -> (usize, usize) {
let (block, inner_unscaled) = self.get_memory_indices_unscaled(idx);
(block, inner_unscaled * VOICE_PACKET_MAX)
}

#[inline]
fn add_task(&mut self, task: ParkedMixer, id: TaskId, activation_time: Instant) {
let idx = self.ids.len();
Expand All @@ -503,7 +491,7 @@ impl Live {
self.packet_lens.push(0);
self.to_cull.push(false);

let (block, inner_idx) = self.get_memory_indices(idx);
let (block, inner_idx) = get_memory_indices(idx);

while self.packets.len() <= block {
self.add_packet_block();
Expand All @@ -527,7 +515,7 @@ impl Live {
#[inline]
fn add_packet_block(&mut self) {
let n_packets = if let Some(limit) = self.mode.task_limit() {
let (block, inner) = self.get_memory_indices_unscaled(limit);
let (block, inner) = get_memory_indices_unscaled(limit);
if self.packets.len() < block || inner == 0 {
PACKETS_PER_BLOCK
} else {
Expand Down Expand Up @@ -575,10 +563,10 @@ impl Live {
let mixer = self.tasks.swap_remove(idx);
let alive = !self.to_cull.swap_remove(idx);

let (block, inner_idx) = self.get_memory_indices(idx);
let (block, inner_idx) = get_memory_indices(idx);

let (removed, replacement) = if end > idx {
let (end_block, end_inner) = self.get_memory_indices(end);
let (end_block, end_inner) = get_memory_indices(end);
let (rest, target_block) = self.packets.split_at_mut(end_block);
let (last_block, end_pkt) = target_block[0].split_at_mut(end_inner);

Expand Down Expand Up @@ -648,6 +636,22 @@ fn packet_block(n_packets: usize) -> Box<[u8]> {
packets
}

/// Returns the block index into `self.packets` and the packet number in
/// the block for a given worker's index.
#[inline]
fn get_memory_indices_unscaled(idx: usize) -> (usize, usize) {
let block_size = PACKETS_PER_BLOCK;
(idx / block_size, idx % block_size)
}

/// Returns the block index into `self.packets` and the byte offset into
/// a packet block for a given worker's index.
#[inline]
fn get_memory_indices(idx: usize) -> (usize, usize) {
let (block, inner_unscaled) = get_memory_indices_unscaled(idx);
(block, inner_unscaled * VOICE_PACKET_MAX)
}

#[inline]
fn advance_rtp_counters(packet: &mut [u8]) {
let mut rtp = MutableRtpPacket::new(packet).expect(
Expand Down
55 changes: 50 additions & 5 deletions src/driver/scheduler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::{num::NonZeroUsize, sync::Arc};
use std::{error::Error as StdError, fmt::Display, num::NonZeroUsize, sync::Arc};

use flume::{Receiver, Sender};
use flume::{Receiver, RecvError, Sender};
use once_cell::sync::Lazy;

use crate::{constants::TIMESTEP_LENGTH, Config};
Expand Down Expand Up @@ -47,6 +47,7 @@ struct InnerScheduler {

impl Scheduler {
/// Create a new mixer scheduler from the allocation strategy `mode`.
#[must_use]
pub fn new(mode: ScheduleMode) -> Self {
let (core, tx) = Idle::new(mode);

Expand All @@ -66,27 +67,38 @@ impl Scheduler {
}

/// Returns the total number of calls (idle and active) scheduled.
#[must_use]
pub fn total_tasks(&self) -> u64 {
self.inner.stats.total_mixers()
}

/// Returns the total number of *active* calls scheduled and processing
/// audio.
#[must_use]
pub fn live_tasks(&self) -> u64 {
self.inner.stats.live_mixers()
}

/// Returns the total number of threads spawned to process live audio sessions.
#[must_use]
pub fn worker_threads(&self) -> u64 {
self.inner.stats.worker_threads()
}

/// Request a list of handles to statistics for currently live workers.
pub fn worker_thread_stats(&self) -> Result<Vec<Arc<LiveStatBlock>>, ()> {
pub async fn worker_thread_stats(&self) -> Result<Vec<Arc<LiveStatBlock>>, Error> {
let (tx, rx) = flume::bounded(1);
_ = self.inner.tx.send(SchedulerMessage::GetStats(tx));

rx.recv().map_err(|_| ())
rx.recv_async().await.map_err(Error::from)
}

/// Request a list of handles to statistics for currently live workers with a blocking call.
pub fn worker_thread_stats_blocking(&self) -> Result<Vec<Arc<LiveStatBlock>>, Error> {
let (tx, rx) = flume::bounded(1);
_ = self.inner.tx.send(SchedulerMessage::GetStats(tx));

rx.recv().map_err(Error::from)
}
}

Expand All @@ -98,7 +110,7 @@ impl Drop for InnerScheduler {

impl Default for Scheduler {
fn default() -> Self {
Scheduler::new(Default::default())
Scheduler::new(ScheduleMode::default())
}
}

Expand All @@ -123,6 +135,9 @@ impl ScheduleMode {

/// Returns the maximum number of concurrent mixers that a scheduler is
/// allowed to place on a single thread.
///
/// Future scheduling modes may choose to limit *only* on execution cost.
#[allow(clippy::unnecessary_wraps)]
fn task_limit(&self) -> Option<usize> {
match self {
Self::MaxPerThread(n) => Some(n.get()),
Expand Down Expand Up @@ -156,3 +171,33 @@ pub enum SchedulerMessage {
/// Cleanup once all `Scheduler` handles are dropped.
Kill,
}

/// Errors encountered when communicating with the internals of a [`Scheduler`].
///
/// [`Scheduler`]: crate::driver::Scheduler
#[non_exhaustive]
#[derive(Debug)]
pub enum Error {
/// The scheduler exited or crashed while awating the request.
Disconnected,
}

impl Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disconnected => f.write_str("the scheduler terminated mid-request"),
}
}
}

impl StdError for Error {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
None
}
}

impl From<RecvError> for Error {
fn from(_: RecvError) -> Self {
Self::Disconnected
}
}
12 changes: 4 additions & 8 deletions src/driver/scheduler/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,11 @@ impl LiveStatBlock {
pub(crate) fn has_room(&self, strategy: &ScheduleMode, task: &ParkedMixer) -> bool {
let task_room = strategy
.task_limit()
.map(|limit| self.live_mixers() < limit as u64)
.unwrap_or(true);
.map_or(true, |limit| self.live_mixers() < limit as u64);

let exec_room = task
.last_cost
.map(|cost| cost.as_nanos() as u64 + self.last_compute_cost_ns() < RESCHEDULE_THRESHOLD)
.unwrap_or(true);

println!("{task_room} {exec_room}");
let exec_room = task.last_cost.map_or(true, |cost| {
cost.as_nanos() as u64 + self.last_compute_cost_ns() < RESCHEDULE_THRESHOLD
});

task_room && exec_room
}
Expand Down
11 changes: 7 additions & 4 deletions src/driver/scheduler/task.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use std::time::{Duration, Instant};
use std::{
marker::PhantomData,
time::{Duration, Instant},
};

use flume::{Receiver, Sender};
use nohash_hasher::IsEnabled;
Expand All @@ -18,7 +21,7 @@ use super::SchedulerMessage;

/// Typesafe counter used to identify individual mixer/worker instances.
#[derive(Copy, Clone, Debug, Eq, Hash, Ord, PartialEq, PartialOrd)]
pub struct ResId<T>(u64, std::marker::PhantomData<T>);
pub struct ResId<T>(u64, PhantomData<T>);
#[allow(missing_docs)]
pub type TaskId = ResId<TaskMarker>;
#[allow(missing_docs)]
Expand All @@ -36,7 +39,7 @@ impl<T> IsEnabled for ResId<T> {}
#[allow(missing_docs)]
impl<T: Copy> ResId<T> {
pub fn new() -> Self {
ResId(0, Default::default())
ResId(0, PhantomData)
}

pub fn incr(&mut self) -> Self {
Expand Down Expand Up @@ -140,7 +143,7 @@ impl ParkedMixer {
/// Handle periodic events attached to this `Mixer`, including timer state
/// on the event thread and UDP keepalives needed to prevent session termination.
///
/// As we init our UDP sockets as non-blocking via Tokio -> into_std, it is
/// As we init our UDP sockets as non-blocking via Tokio -> `into_std`, it is
/// safe to call UDP packet sends like this.
pub fn tick_and_keepalive(&mut self, now: Instant) -> Result<(), ()> {
// TODO: should we include an atomic which signals whether the event
Expand Down
6 changes: 3 additions & 3 deletions src/driver/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub(crate) fn start(config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMe
});
}

fn start_internals(core: Sender<CoreMessage>, config: Config) -> Interconnect {
fn start_internals(core: Sender<CoreMessage>, config: &Config) -> Interconnect {
let (evt_tx, evt_rx) = flume::unbounded();
let (mix_tx, mix_rx) = flume::unbounded();

Expand All @@ -52,7 +52,7 @@ fn start_internals(core: Sender<CoreMessage>, config: Config) -> Interconnect {
});

let ic = interconnect.clone();
config.get_scheduler().new_mixer(&config, ic, mix_rx);
config.get_scheduler().new_mixer(config, ic, mix_rx);

interconnect
}
Expand All @@ -61,7 +61,7 @@ fn start_internals(core: Sender<CoreMessage>, config: Config) -> Interconnect {
async fn runner(mut config: Config, rx: Receiver<CoreMessage>, tx: Sender<CoreMessage>) {
let mut next_config: Option<Config> = None;
let mut connection: Option<Connection> = None;
let mut interconnect = start_internals(tx, config.clone());
let mut interconnect = start_internals(tx, &config);
let mut retrying = None;
let mut attempt_idx = 0;

Expand Down
5 changes: 4 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ pub type JoinResult<T> = Result<T, JoinError>;

#[cfg(feature = "driver")]
pub use crate::{
driver::connection::error::{Error as ConnectionError, Result as ConnectionResult},
driver::{
connection::error::{Error as ConnectionError, Result as ConnectionResult},
SchedulerError,
},
tracks::{ControlError, PlayError, TrackResult},
};
2 changes: 1 addition & 1 deletion src/input/adapters/async_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ impl AsyncAdapterStream {
};

tokio::spawn(async move {
sink.launch().await;
Box::pin(sink.launch()).await;
});

stream
Expand Down
3 changes: 3 additions & 0 deletions src/shards.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,19 @@ impl TwilightMap {
/// Construct a map of shards and command senders to those shards.
///
/// For correctness all shards should be in the map.
#[must_use]
pub fn new(map: std::collections::HashMap<u64, MessageSender>) -> Self {
TwilightMap { map }
}

/// Get the message sender for `shard_id`.
#[must_use]
pub fn get(&self, shard_id: u64) -> Option<&MessageSender> {
self.map.get(&shard_id)
}

/// Get the total number of shards in the map.
#[must_use]
pub fn shard_count(&self) -> u64 {
self.map.len() as u64
}
Expand Down

0 comments on commit 0f0d70d

Please sign in to comment.