From 0ffc95a4537633a9916ac8879e953399a9f35bac Mon Sep 17 00:00:00 2001 From: giangndm <45644921+giangndm@users.noreply.github.com> Date: Thu, 21 Nov 2024 23:47:21 +0700 Subject: [PATCH] fix: wrong usage of smallmap cause server crash. switched to indexmap (#457) --- Cargo.lock | 167 ++++++++++++++++-- Cargo.toml | 1 - packages/audio_mixer/Cargo.toml | 1 + packages/audio_mixer/src/lib.rs | 7 +- packages/media_core/Cargo.toml | 3 +- packages/media_core/src/cluster.rs | 8 +- .../src/cluster/room/audio_mixer.rs | 22 +-- .../src/cluster/room/audio_mixer/manual.rs | 11 +- .../src/cluster/room/audio_mixer/publisher.rs | 6 +- .../src/cluster/room/media_track/publisher.rs | 21 +-- .../cluster/room/media_track/subscriber.rs | 22 +-- .../cluster/room/message_channel/publisher.rs | 33 ++-- .../room/message_channel/subscriber.rs | 33 ++-- .../media_core/src/cluster/room/metadata.rs | 78 ++++---- packages/media_core/src/endpoint/internal.rs | 6 +- .../internal/bitrate_allocator/egress.rs | 9 +- .../internal/bitrate_allocator/ingress.rs | 7 +- packages/media_runner/Cargo.toml | 1 + packages/media_runner/src/worker.rs | 36 +++- packages/media_utils/Cargo.toml | 11 +- packages/media_utils/benches/map_bench.rs | 26 +++ .../src/{small_2dmap.rs => indexmap_2d.rs} | 22 +-- packages/media_utils/src/lib.rs | 4 +- packages/transport_rtpengine/Cargo.toml | 1 - packages/transport_webrtc/Cargo.toml | 2 +- packages/transport_webrtc/src/media/mod.rs | 9 +- packages/transport_webrtc/src/shared_port.rs | 27 +-- packages/transport_webrtc/src/transport.rs | 9 +- 28 files changed, 392 insertions(+), 191 deletions(-) create mode 100644 packages/media_utils/benches/map_bench.rs rename packages/media_utils/src/{small_2dmap.rs => indexmap_2d.rs} (66%) diff --git a/Cargo.lock b/Cargo.lock index aab95f4d..8dedfc3f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,6 +171,12 @@ dependencies = [ "libc", ] +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + [[package]] name = "anstream" version = "0.6.15" @@ -601,6 +607,7 @@ checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" name = "audio-mixer" version = "0.1.0" dependencies = [ + "indexmap", "log", ] @@ -924,6 +931,12 @@ version = "1.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + [[package]] name = "cc" version = "1.1.30" @@ -999,6 +1012,33 @@ dependencies = [ "phf_codegen", ] +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + [[package]] name = "cipher" version = "0.2.5" @@ -1250,6 +1290,42 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "is-terminal", + "itertools 0.10.5", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools 0.10.5", +] + [[package]] name = "crossbeam-deque" version = "0.8.5" @@ -2149,6 +2225,16 @@ dependencies = [ "tracing", ] +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + [[package]] name = "hash32" version = "0.3.1" @@ -2718,6 +2804,17 @@ dependencies = [ "serde", ] +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -2747,6 +2844,15 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itertools" version = "0.11.0" @@ -3243,7 +3349,6 @@ dependencies = [ "mockall", "num_enum", "sans-io-runtime", - "smallmap", "test-log", "tracing-subscriber", ] @@ -3335,6 +3440,7 @@ dependencies = [ "atm0s-sdn", "atm0s-sdn-network", "convert-enum", + "indexmap", "log", "media-server-connector", "media-server-core", @@ -3361,13 +3467,14 @@ dependencies = [ name = "media-server-utils" version = "0.1.0" dependencies = [ + "criterion", "derive_more", + "indexmap", "log", "once_cell", "pin-project-lite", "serde", "serde-querystring", - "smallmap", "sorted-vec", "spin", "uriparse", @@ -3762,6 +3869,12 @@ version = "1.20.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + [[package]] name = "opaque-debug" version = "0.3.1" @@ -4154,6 +4267,34 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "953ec861398dccce10c670dfeaf3ec4911ca479e9c02154b3a215178c5f566f2" +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + [[package]] name = "poem" version = "3.1.1" @@ -5826,15 +5967,6 @@ dependencies = [ "futures-io", ] -[[package]] -name = "smallmap" -version = "1.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6d0205d551de9bbfea07e51ace307c7f1ca224958a3df56fef6c2cfd244c199" -dependencies = [ - "rustc_version 0.4.1", -] - [[package]] name = "smallvec" version = "1.13.2" @@ -6549,6 +6681,16 @@ dependencies = [ "crunchy", ] +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + [[package]] name = "tinyvec" version = "1.8.0" @@ -6748,7 +6890,6 @@ dependencies = [ "rtp-rs", "sans-io-runtime", "sdp-rs", - "smallmap", ] [[package]] @@ -6756,6 +6897,7 @@ name = "transport-webrtc" version = "0.1.0" dependencies = [ "derive_more", + "indexmap", "log", "media-server-core", "media-server-protocol", @@ -6764,7 +6906,6 @@ dependencies = [ "num_enum", "prost", "sans-io-runtime", - "smallmap", "str0m", ] diff --git a/Cargo.toml b/Cargo.toml index 1486341a..4a3804e8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,6 @@ convert-enum = "0.1" clap = "4.5" num_enum = "0.7" log = "0.4" -smallmap = "1.4" serde = "1.0" derivative = "2.2" derive_more = "1.0" diff --git a/packages/audio_mixer/Cargo.toml b/packages/audio_mixer/Cargo.toml index b59f3b83..7e610117 100644 --- a/packages/audio_mixer/Cargo.toml +++ b/packages/audio_mixer/Cargo.toml @@ -5,3 +5,4 @@ edition = "2021" [dependencies] log.workspace = true +indexmap.workspace = true \ No newline at end of file diff --git a/packages/audio_mixer/src/lib.rs b/packages/audio_mixer/src/lib.rs index ae926a73..10e68cdd 100644 --- a/packages/audio_mixer/src/lib.rs +++ b/packages/audio_mixer/src/lib.rs @@ -1,10 +1,11 @@ use std::{ - collections::HashMap, fmt::Debug, hash::Hash, time::{Duration, Instant}, }; +use indexmap::IndexMap; + const SILENT_LEVEL: i8 = -127; const SWITCH_AUDIO_THRESHOLD: i16 = 30; /// if no audio pkt received in AUDIO_SLOT_TIMEOUT, set audio level to SILENT_LEVEL @@ -28,7 +29,7 @@ struct OutputSlotState { #[derive(Debug)] pub struct AudioMixer { len: usize, - sources: HashMap, + sources: IndexMap, outputs: Vec>>, } @@ -38,7 +39,7 @@ impl AudioMixer { Self { len: 0, - sources: HashMap::new(), + sources: Default::default(), outputs: vec![None; output], } } diff --git a/packages/media_core/Cargo.toml b/packages/media_core/Cargo.toml index b73c983d..3467ae10 100644 --- a/packages/media_core/Cargo.toml +++ b/packages/media_core/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] log = { workspace = true } num_enum = { workspace = true } -smallmap = { workspace = true } +indexmap = { workspace = true } derivative = { workspace = true } derive_more = { workspace = true, features = ["full"] } mockall = { workspace = true } @@ -17,7 +17,6 @@ atm0s-sdn = { workspace = true } media-server-protocol = { path = "../protocol" } media-server-utils = { path = "../media_utils" } audio-mixer = { path = "../audio_mixer" } -indexmap = { workspace = true } [dev-dependencies] tracing-subscriber = { workspace = true } diff --git a/packages/media_core/src/cluster.rs b/packages/media_core/src/cluster.rs index 194922cc..1edc9860 100644 --- a/packages/media_core/src/cluster.rs +++ b/packages/media_core/src/cluster.rs @@ -6,9 +6,9 @@ //! use derive_more::{AsRef, Display, From}; +use indexmap::IndexMap; use sans_io_runtime::{return_if_none, TaskGroup, TaskGroupOutput, TaskSwitcherChild}; use std::{ - collections::HashMap, fmt::Debug, hash::{Hash, Hasher}, time::Instant, @@ -131,7 +131,7 @@ pub enum Output { } pub struct MediaCluster { - rooms_map: HashMap, + rooms_map: IndexMap, rooms: TaskGroup, room::Output, ClusterRoom, 16>, shutdown: bool, } @@ -139,7 +139,7 @@ pub struct MediaCluster { impl Default for MediaCluster { fn default() -> Self { Self { - rooms_map: HashMap::new(), + rooms_map: IndexMap::new(), rooms: TaskGroup::default(), shutdown: false, } @@ -197,7 +197,7 @@ impl TaskSwitcherChild Some(Output::Endpoint(endpoints, event)), room::Output::OnResourceEmpty(room) => { log::info!("[MediaCluster] remove room index {index}, hash {room}"); - self.rooms_map.remove(&room).expect("Should have room with index"); + self.rooms_map.swap_remove(&room).expect("Should have room with index"); self.rooms.remove_task(index); Some(Output::Continue) } diff --git a/packages/media_core/src/cluster/room/audio_mixer.rs b/packages/media_core/src/cluster/room/audio_mixer.rs index a232f075..9aa14f89 100644 --- a/packages/media_core/src/cluster/room/audio_mixer.rs +++ b/packages/media_core/src/cluster/room/audio_mixer.rs @@ -8,13 +8,13 @@ //TODO refactor multiple subscriber mode to array instead of manual implement with subscriber1, subscriber2, subscriber3 use std::{ - collections::HashMap, fmt::Debug, hash::Hash, time::{Duration, Instant}, }; use atm0s_sdn::features::pubsub::{self, ChannelId}; +use indexmap::IndexMap; use manual::ManualMixer; use media_server_protocol::{ endpoint::{AudioMixerConfig, PeerId, TrackName}, @@ -60,9 +60,9 @@ pub struct AudioMixer { room: ClusterRoomHash, mix_channel_id: ChannelId, //store number of outputs - auto_mode: HashMap, - manual_mode: HashMap, - manual_channels: HashMap>, + auto_mode: IndexMap, + manual_mode: IndexMap, + manual_channels: IndexMap>, publisher: TaskSwitcherBranch, Output>, subscriber1: TaskSwitcherBranch, Output>, subscriber2: TaskSwitcherBranch, Output>, @@ -77,9 +77,9 @@ impl AudioMixer { Self { room, mix_channel_id, - auto_mode: HashMap::new(), - manual_mode: HashMap::new(), - manual_channels: HashMap::new(), + auto_mode: IndexMap::new(), + manual_mode: IndexMap::new(), + manual_channels: IndexMap::new(), publisher: TaskSwitcherBranch::new(AudioMixerPublisher::new(mix_channel_id), TaskType::Publisher), subscriber1: TaskSwitcherBranch::new(AudioMixerSubscriber::new(mix_channel_id), TaskType::Subscriber1), subscriber2: TaskSwitcherBranch::new(AudioMixerSubscriber::new(mix_channel_id), TaskType::Subscriber2), @@ -138,7 +138,7 @@ impl AudioMixer { } pub fn on_leave(&mut self, now: Instant, endpoint: Endpoint) { - if let Some(outputs) = self.auto_mode.remove(&endpoint) { + if let Some(outputs) = self.auto_mode.swap_remove(&endpoint) { match outputs { 1 => self.subscriber1.input(&mut self.switcher).on_endpoint_leave(now, endpoint), 2 => self.subscriber2.input(&mut self.switcher).on_endpoint_leave(now, endpoint), @@ -147,9 +147,9 @@ impl AudioMixer { log::warn!("[ClusterRoomAudioMixer] unsupported mixer with {} outputs", outputs); } } - } else if let Some(index) = self.manual_mode.remove(&endpoint) { + } else if let Some(index) = self.manual_mode.swap_remove(&endpoint) { log::info!("[ClusterRoomAudioMixer] endpoint {:?} leave from manual mode", endpoint); - self.manual_mode.remove(&endpoint); + self.manual_mode.swap_remove(&endpoint); self.manuals.input(&mut self.switcher).on_event(now, index, manual::Input::LeaveRoom); } } @@ -270,7 +270,7 @@ impl TaskSwitcherChild> fo let (slot_index, _) = slot.iter().enumerate().find(|(_, task_i)| **task_i == index).expect("Subscribed task not found"); slot.swap_remove(slot_index); if slot.is_empty() { - self.manual_channels.remove(&channel_id); + self.manual_channels.swap_remove(&channel_id); return Some(out); } } diff --git a/packages/media_core/src/cluster/room/audio_mixer/manual.rs b/packages/media_core/src/cluster/room/audio_mixer/manual.rs index 2e5f1a86..a31a605b 100644 --- a/packages/media_core/src/cluster/room/audio_mixer/manual.rs +++ b/packages/media_core/src/cluster/room/audio_mixer/manual.rs @@ -4,12 +4,13 @@ //! to determine which source is sent to client. //! -use std::{collections::HashMap, fmt::Debug, time::Instant}; +use std::{fmt::Debug, time::Instant}; use atm0s_sdn::{ features::pubsub::{self, ChannelId}, NodeId, }; +use indexmap::{map::Entry, IndexMap}; use media_server_protocol::{ endpoint::TrackSource, media::{MediaMeta, MediaPacket}, @@ -36,7 +37,7 @@ pub struct ManualMixer { endpoint: Endpoint, room: ClusterRoomHash, outputs: Vec, - sources: HashMap, + sources: IndexMap, queue: DynamicDeque, 4>, mixer: audio_mixer::AudioMixer, } @@ -49,14 +50,14 @@ impl ManualMixer { room, mixer: audio_mixer::AudioMixer::new(outputs.len()), outputs, - sources: HashMap::new(), + sources: Default::default(), queue: Default::default(), } } fn attach(&mut self, _now: Instant, source: TrackSource) { let channel_id = id_generator::gen_track_channel_id(self.room, &source.peer, &source.track); - if let std::collections::hash_map::Entry::Vacant(e) = self.sources.entry(channel_id) { + if let Entry::Vacant(e) = self.sources.entry(channel_id) { log::info!("[ClusterManualMixer] add source {:?} => sub {channel_id}", source); e.insert(source); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::SubAuto))); @@ -89,7 +90,7 @@ impl ManualMixer { fn detach(&mut self, _now: Instant, source: TrackSource) { let channel_id = id_generator::gen_track_channel_id(self.room, &source.peer, &source.track); - if self.sources.remove(&channel_id).is_some() { + if self.sources.swap_remove(&channel_id).is_some() { log::info!("[ClusterManualMixer] remove source {:?} => unsub {channel_id}", source); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, pubsub::ChannelControl::UnsubAuto))); } diff --git a/packages/media_core/src/cluster/room/audio_mixer/publisher.rs b/packages/media_core/src/cluster/room/audio_mixer/publisher.rs index e72f3325..1f99da9d 100644 --- a/packages/media_core/src/cluster/room/audio_mixer/publisher.rs +++ b/packages/media_core/src/cluster/room/audio_mixer/publisher.rs @@ -1,11 +1,11 @@ use std::{ - collections::HashMap, fmt::Debug, hash::Hash, time::{Duration, Instant}, }; use atm0s_sdn::features::pubsub::{self, ChannelId}; +use indexmap::IndexMap; use media_server_protocol::{ endpoint::{AudioMixerPkt, PeerHashCode, PeerId, TrackName}, media::{MediaMeta, MediaPacket}, @@ -35,7 +35,7 @@ struct OutputSlot { pub struct AudioMixerPublisher { _c: Count, channel_id: pubsub::ChannelId, - tracks: HashMap<(Endpoint, RemoteTrackId), TrackSlot>, + tracks: IndexMap<(Endpoint, RemoteTrackId), TrackSlot>, mixer: audio_mixer::AudioMixer<(Endpoint, RemoteTrackId)>, slots: [Option; 3], queue: DynamicDeque, 4>, @@ -114,7 +114,7 @@ impl AudioMixerPublisher { log::debug!("[ClusterAudioMixerPublisher] on track unpublish {track}"); let key = (endpoint, track); assert!(self.tracks.contains_key(&key)); - self.tracks.remove(&key); + self.tracks.swap_remove(&key); if self.tracks.is_empty() { log::info!("[ClusterAudioMixerPublisher] last track leave ind Auto mode => unpublish channel {}", self.channel_id); self.queue.push_back(Output::Pubsub(pubsub::Control(self.channel_id, pubsub::ChannelControl::PubStop))); diff --git a/packages/media_core/src/cluster/room/media_track/publisher.rs b/packages/media_core/src/cluster/room/media_track/publisher.rs index 600d9772..8c342b5f 100644 --- a/packages/media_core/src/cluster/room/media_track/publisher.rs +++ b/packages/media_core/src/cluster/room/media_track/publisher.rs @@ -2,13 +2,10 @@ //! Channel Publisher will takecare of pubsub channel for sending data and handle when received channel feedback //! -use std::{ - collections::{HashMap, HashSet, VecDeque}, - fmt::Debug, - hash::Hash, -}; +use std::{collections::VecDeque, fmt::Debug, hash::Hash}; use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId, Feedback}; +use indexmap::{IndexMap, IndexSet}; use media_server_protocol::{ endpoint::{PeerId, TrackName}, media::MediaPacket, @@ -43,8 +40,8 @@ impl TryFrom for FeedbackKind { pub struct RoomChannelPublisher { _c: Count, room: ClusterRoomHash, - tracks: HashMap<(Endpoint, RemoteTrackId), (PeerId, TrackName, ChannelId)>, - tracks_source: HashMap>, // We allow multi sources here for avoiding crash + tracks: IndexMap<(Endpoint, RemoteTrackId), (PeerId, TrackName, ChannelId)>, + tracks_source: IndexMap>, // We allow multi sources here for avoiding crash queue: VecDeque>, } @@ -53,8 +50,8 @@ impl RoomChannelPublisher { Self { _c: Default::default(), room, - tracks: HashMap::new(), - tracks_source: HashMap::new(), + tracks: Default::default(), + tracks_source: Default::default(), queue: VecDeque::new(), } } @@ -107,12 +104,12 @@ impl RoomChannelPublisher { } pub fn on_track_unpublish(&mut self, endpoint: Endpoint, track: RemoteTrackId) { - let (peer, name, channel_id) = return_if_none!(self.tracks.remove(&(endpoint, track))); + let (peer, name, channel_id) = return_if_none!(self.tracks.swap_remove(&(endpoint, track))); let sources = self.tracks_source.get_mut(&channel_id).expect("Should have track_source"); - let removed = sources.remove(&(endpoint, track)); + let removed = sources.swap_remove(&(endpoint, track)); assert!(removed, "Should remove source child on unpublish"); if sources.is_empty() { - self.tracks_source.remove(&channel_id).expect("Should remove source channel on unpublish"); + self.tracks_source.swap_remove(&channel_id).expect("Should remove source channel on unpublish"); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))); } log::info!("[ClusterRoom {}/Publishers] peer ({peer} stopped track {name})", self.room); diff --git a/packages/media_core/src/cluster/room/media_track/subscriber.rs b/packages/media_core/src/cluster/room/media_track/subscriber.rs index 67776329..34cf3de1 100644 --- a/packages/media_core/src/cluster/room/media_track/subscriber.rs +++ b/packages/media_core/src/cluster/room/media_track/subscriber.rs @@ -2,18 +2,14 @@ //! Channel Subscriber handle logic for viewer. This module takecare sending Sub or Unsub, and also feedback //! -use std::{ - collections::{HashMap, VecDeque}, - fmt::Debug, - hash::Hash, - time::Instant, -}; +use std::{collections::VecDeque, fmt::Debug, hash::Hash, time::Instant}; use atm0s_sdn::{ features::pubsub::{self, ChannelControl, ChannelId, Feedback}, NodeId, }; use derivative::Derivative; +use indexmap::IndexMap; use media_server_protocol::{ endpoint::{PeerId, TrackName}, media::MediaPacket, @@ -41,15 +37,15 @@ const KEYFRAME_FEEDBACK_KIND: u8 = 1; #[derivative(Default(bound = ""))] struct ChannelContainer { endpoints: Vec<(Endpoint, LocalTrackId)>, - bitrate_fbs: HashMap, + bitrate_fbs: IndexMap, } #[derive(Debug)] pub struct RoomChannelSubscribe { _c: Count, room: ClusterRoomHash, - channels: HashMap>, - subscribers: HashMap<(Endpoint, LocalTrackId), (ChannelId, PeerId, TrackName)>, + channels: IndexMap>, + subscribers: IndexMap<(Endpoint, LocalTrackId), (ChannelId, PeerId, TrackName)>, queue: VecDeque>, } @@ -58,8 +54,8 @@ impl RoomChannelSubscribe Self { _c: Default::default(), room, - channels: HashMap::new(), - subscribers: HashMap::new(), + channels: IndexMap::new(), + subscribers: IndexMap::new(), queue: VecDeque::new(), } } @@ -146,7 +142,7 @@ impl RoomChannelSubscribe } pub fn on_track_unsubscribe(&mut self, endpoint: Endpoint, track: LocalTrackId) { - let (channel_id, target_peer, target_track) = return_if_none!(self.subscribers.remove(&(endpoint, track))); + let (channel_id, target_peer, target_track) = return_if_none!(self.subscribers.swap_remove(&(endpoint, track))); log::info!( "[ClusterRoom {}/Subscribers] endpoint {:?} track {track} unsubscribe from source {target_peer} {target_track}, channel {channel_id}", self.room, @@ -157,7 +153,7 @@ impl RoomChannelSubscribe channel_container.endpoints.swap_remove(index); if channel_container.endpoints.is_empty() { - self.channels.remove(&channel_id); + self.channels.swap_remove(&channel_id); log::info!("[ClusterRoom {}/Subscribers] last unsubscriber => Unsub channel {channel_id}", self.room); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))); } diff --git a/packages/media_core/src/cluster/room/message_channel/publisher.rs b/packages/media_core/src/cluster/room/message_channel/publisher.rs index 7b571b00..75e7bf65 100644 --- a/packages/media_core/src/cluster/room/message_channel/publisher.rs +++ b/packages/media_core/src/cluster/room/message_channel/publisher.rs @@ -1,10 +1,7 @@ -use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt::Debug, - hash::Hash, -}; +use std::{collections::VecDeque, fmt::Debug, hash::Hash}; use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId}; +use indexmap::{map::Entry, IndexMap, IndexSet}; use media_server_protocol::message_channel::MessageChannelPacket; use media_server_utils::Count; use sans_io_runtime::{return_if_none, TaskSwitcherChild}; @@ -18,15 +15,15 @@ use super::Output; #[derive(Debug)] struct ChannelContainer { - publishers: HashSet, + publishers: IndexSet, } #[derive(Debug)] pub struct MessageChannelPublisher { _c: Count, room: ClusterRoomHash, - channels: HashMap>, - publishers: HashMap>, + channels: IndexMap>, + publishers: IndexMap>, queue: VecDeque>, } @@ -36,8 +33,8 @@ impl MessageChannelPublisher { _c: Default::default(), room, queue: VecDeque::new(), - channels: HashMap::new(), - publishers: HashMap::new(), + channels: IndexMap::new(), + publishers: IndexMap::new(), } } @@ -51,7 +48,7 @@ impl MessageChannelPublisher { o.get_mut().publishers.insert(endpoint); } Entry::Vacant(v) => { - let mut channel = ChannelContainer { publishers: HashSet::new() }; + let mut channel = ChannelContainer { publishers: IndexSet::new() }; channel.publishers.insert(endpoint); v.insert(channel); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStart))); @@ -67,29 +64,29 @@ impl MessageChannelPublisher { let channel_id: ChannelId = id_generator::gen_msg_channel_id(self.room, label); let channel = return_if_none!(self.channels.get_mut(&channel_id)); - channel.publishers.remove(&endpoint); + channel.publishers.swap_remove(&endpoint); if let Some(publisher) = self.publishers.get_mut(&endpoint) { - publisher.remove(&channel_id); + publisher.swap_remove(&channel_id); if publisher.is_empty() { - self.publishers.remove(&endpoint); + self.publishers.swap_remove(&endpoint); } } if channel.publishers.is_empty() { - self.channels.remove(&channel_id); + self.channels.swap_remove(&channel_id); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::PubStop))); } } pub fn on_leave(&mut self, endpoint: Endpoint) { log::info!("[ClusterRoomDataChannel {}/Publishers] user leaves, clean up", self.room); - if let Some(channels) = self.publishers.remove(&endpoint) { + if let Some(channels) = self.publishers.swap_remove(&endpoint) { for c in channels { if let Some(channel) = self.channels.get_mut(&c) { - channel.publishers.remove(&endpoint); + channel.publishers.swap_remove(&endpoint); if channel.publishers.is_empty() { - self.channels.remove(&c); + self.channels.swap_remove(&c); self.queue.push_back(Output::Pubsub(pubsub::Control(c, ChannelControl::PubStop))); } } diff --git a/packages/media_core/src/cluster/room/message_channel/subscriber.rs b/packages/media_core/src/cluster/room/message_channel/subscriber.rs index 659146ed..f28086d4 100644 --- a/packages/media_core/src/cluster/room/message_channel/subscriber.rs +++ b/packages/media_core/src/cluster/room/message_channel/subscriber.rs @@ -1,10 +1,7 @@ -use std::{ - collections::{hash_map::Entry, HashMap, HashSet, VecDeque}, - fmt::Debug, - hash::Hash, -}; +use std::{collections::VecDeque, fmt::Debug, hash::Hash}; use atm0s_sdn::features::pubsub::{self, ChannelControl, ChannelId}; +use indexmap::{map::Entry, IndexMap, IndexSet}; use media_server_protocol::message_channel::MessageChannelPacket; use media_server_utils::Count; use sans_io_runtime::{return_if_none, TaskSwitcherChild}; @@ -17,7 +14,7 @@ use crate::{ #[derive(Debug)] struct ChannelContainer { - subscribers: HashSet, + subscribers: IndexSet, label: MessageChannelLabel, } @@ -25,8 +22,8 @@ struct ChannelContainer { pub struct MessageChannelSubscriber { _c: Count, room: ClusterRoomHash, - channels: HashMap>, - subscriptions: HashMap>, + channels: IndexMap>, + subscriptions: IndexMap>, queue: VecDeque>, } @@ -36,8 +33,8 @@ impl MessageChannelSubscriber { _c: Default::default(), room, queue: VecDeque::new(), - channels: HashMap::new(), - subscriptions: HashMap::new(), + channels: IndexMap::new(), + subscriptions: IndexMap::new(), } } @@ -52,7 +49,7 @@ impl MessageChannelSubscriber { } Entry::Vacant(v) => { let mut channel = ChannelContainer { - subscribers: HashSet::new(), + subscribers: IndexSet::new(), label: label.clone(), }; channel.subscribers.insert(endpoint); @@ -70,16 +67,16 @@ impl MessageChannelSubscriber { let channel = return_if_none!(self.channels.get_mut(&channel_id)); - channel.subscribers.remove(&endpoint); + channel.subscribers.swap_remove(&endpoint); if let Some(channels) = self.subscriptions.get_mut(&endpoint) { - channels.remove(&channel_id); + channels.swap_remove(&channel_id); if channels.is_empty() { - self.subscriptions.remove(&endpoint); + self.subscriptions.swap_remove(&endpoint); } } if channel.subscribers.is_empty() { - self.channels.remove(&channel_id); + self.channels.swap_remove(&channel_id); self.queue.push_back(Output::Pubsub(pubsub::Control(channel_id, ChannelControl::UnsubAuto))); } } @@ -99,12 +96,12 @@ impl MessageChannelSubscriber { pub fn on_leave(&mut self, endpoint: Endpoint) { log::info!("[ClusterRoomDataChannel {}/Subscribers] user leaves, clean up", self.room); - if let Some(channels) = self.subscriptions.remove(&endpoint) { + if let Some(channels) = self.subscriptions.swap_remove(&endpoint) { for c in channels { if let Some(channel) = self.channels.get_mut(&c) { - channel.subscribers.remove(&endpoint); + channel.subscribers.swap_remove(&endpoint); if channel.subscribers.is_empty() { - self.channels.remove(&c); + self.channels.swap_remove(&c); self.queue.push_back(Output::Pubsub(pubsub::Control(c, ChannelControl::UnsubAuto))); } } diff --git a/packages/media_core/src/cluster/room/metadata.rs b/packages/media_core/src/cluster/room/metadata.rs index 82bde410..1b8f29b8 100644 --- a/packages/media_core/src/cluster/room/metadata.rs +++ b/packages/media_core/src/cluster/room/metadata.rs @@ -10,9 +10,9 @@ use std::{collections::VecDeque, fmt::Debug, hash::Hash}; use atm0s_sdn::features::dht_kv::{self, Map, MapControl, MapEvent}; +use indexmap::{IndexMap, IndexSet}; use media_server_protocol::endpoint::{PeerId, PeerInfo, PeerMeta, RoomInfoPublish, RoomInfoSubscribe, TrackInfo, TrackMeta, TrackName}; use sans_io_runtime::{return_if_none, TaskSwitcherChild}; -use smallmap::{Map as SmallMap, Set as SmallSet}; use crate::{ cluster::{id_generator, ClusterEndpointEvent, ClusterRoomHash}, @@ -23,8 +23,8 @@ use crate::{ struct PeerContainer { peer: PeerId, publish: RoomInfoPublish, - sub_peers: SmallSet, - pub_tracks: SmallMap, + sub_peers: IndexSet, + pub_tracks: IndexMap, } #[derive(Debug, PartialEq, Eq)] @@ -39,13 +39,13 @@ pub struct RoomMetadata { room: ClusterRoomHash, peers_map: Map, tracks_map: Map, - peers: SmallMap, - peers_map_subscribers: SmallSet, - tracks_map_subscribers: SmallSet, + peers: IndexMap, + peers_map_subscribers: IndexSet, + tracks_map_subscribers: IndexSet, //This is for storing list of endpoints subscribe manual a target track - peers_tracks_subs: SmallMap>, - cluster_peers: SmallMap, - cluster_tracks: SmallMap, + peers_tracks_subs: IndexMap>, + cluster_peers: IndexMap, + cluster_tracks: IndexMap, queue: VecDeque>, } @@ -55,13 +55,13 @@ impl RoomMetadata { room, peers_map: id_generator::peers_map(room), tracks_map: id_generator::tracks_map(room), - peers: SmallMap::new(), - peers_map_subscribers: SmallMap::new(), - tracks_map_subscribers: SmallMap::new(), - peers_tracks_subs: SmallMap::new(), - cluster_peers: SmallMap::new(), - cluster_tracks: SmallMap::new(), - queue: VecDeque::new(), + peers: Default::default(), + peers_map_subscribers: Default::default(), + tracks_map_subscribers: Default::default(), + peers_tracks_subs: Default::default(), + cluster_peers: Default::default(), + cluster_tracks: Default::default(), + queue: Default::default(), } } @@ -78,8 +78,8 @@ impl RoomMetadata { PeerContainer { peer: peer.clone(), publish: publish.clone(), - sub_peers: SmallSet::new(), - pub_tracks: SmallMap::new(), + sub_peers: Default::default(), + pub_tracks: Default::default(), }, ); let peer_key = id_generator::peers_key(&peer); @@ -91,7 +91,7 @@ impl RoomMetadata { } // Let Sub to peers_map if need need subscribe.peers if subscribe.peers { - self.peers_map_subscribers.insert(endpoint, ()); + self.peers_map_subscribers.insert(endpoint); log::info!("[ClusterRoom {}] next peer sub peers => restore {} remote peers", self.room, self.cluster_peers.len()); // Restore already added peers @@ -109,7 +109,7 @@ impl RoomMetadata { } // Let Sub to tracks_map if need need subscribe.tracks if subscribe.tracks { - self.tracks_map_subscribers.insert(endpoint, ()); + self.tracks_map_subscribers.insert(endpoint); log::info!("[ClusterRoom {}] next peer sub tracks => restore {} remote tracks", self.room, self.cluster_tracks.len()); // Restore already added tracks @@ -130,7 +130,7 @@ impl RoomMetadata { } pub fn on_leave(&mut self, endpoint: Endpoint) { - let peer = return_if_none!(self.peers.remove(&endpoint)); + let peer = return_if_none!(self.peers.swap_remove(&endpoint)); log::info!("[ClusterRoom {}] leave peer {}", self.room, peer.peer); let peer_key = id_generator::peers_key(&peer.peer); // If remain remote tracks, must to delete from list. @@ -146,23 +146,23 @@ impl RoomMetadata { self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(peer_map, MapControl::Del(track_key)))); } - if self.peers_map_subscribers.remove(&endpoint).is_some() && self.peers_map_subscribers.is_empty() { + if self.peers_map_subscribers.swap_remove(&endpoint) && self.peers_map_subscribers.is_empty() { log::info!("[ClusterRoom {}] last peer unsub peers map => unsubscribe", self.room); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.peers_map, MapControl::Unsub))); } - if self.tracks_map_subscribers.remove(&endpoint).is_some() && self.tracks_map_subscribers.is_empty() { + if self.tracks_map_subscribers.swap_remove(&endpoint) && self.tracks_map_subscribers.is_empty() { log::info!("[ClusterRoom {}] last peer unsub tracks map => unsubscribe", self.room); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(self.tracks_map, MapControl::Unsub))); } // check if this peer manual subscribe to some private peer map => need send Unsub - for (target, _) in peer.sub_peers.into_iter() { + for target in peer.sub_peers.into_iter() { let target_peer_map = id_generator::peer_map(self.room, &target); - let subs = self.peers_tracks_subs.get_mut(&target_peer_map).expect("Should have private peer_map"); - subs.remove(&endpoint); + let subs: &mut IndexSet = self.peers_tracks_subs.get_mut(&target_peer_map).expect("Should have private peer_map"); + subs.swap_remove(&endpoint); if subs.is_empty() { - self.peers_tracks_subs.remove(&target_peer_map); + self.peers_tracks_subs.swap_remove(&target_peer_map); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(target_peer_map, MapControl::Unsub))); } } @@ -173,8 +173,8 @@ impl RoomMetadata { let target_peer_map = id_generator::peer_map(self.room, &target); let subs = self.peers_tracks_subs.entry(target_peer_map).or_default(); let need_sub = subs.is_empty(); - subs.insert(endpoint, ()); - peer.sub_peers.insert(target, ()); + subs.insert(endpoint); + peer.sub_peers.insert(target); if need_sub { self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(target_peer_map, MapControl::Sub))); @@ -185,10 +185,10 @@ impl RoomMetadata { let peer = self.peers.get_mut(&endpoint).expect("Should have peer"); let target_peer_map = id_generator::peer_map(self.room, &target); let subs = self.peers_tracks_subs.entry(target_peer_map).or_default(); - subs.remove(&endpoint); - peer.sub_peers.remove(&target); + subs.swap_remove(&endpoint); + peer.sub_peers.swap_remove(&target); if subs.is_empty() { - self.peers_tracks_subs.remove(&target_peer_map); + self.peers_tracks_subs.swap_remove(&target_peer_map); self.queue.push_back(Output::Kv(dht_kv::Control::MapCmd(target_peer_map, MapControl::Unsub))); } } @@ -212,7 +212,7 @@ impl RoomMetadata { pub fn on_track_unpublish(&mut self, endpoint: Endpoint, track_id: RemoteTrackId) { let peer = return_if_none!(self.peers.get_mut(&endpoint)); - let track = return_if_none!(peer.pub_tracks.remove(&track_id)); + let track = return_if_none!(peer.pub_tracks.swap_remove(&track_id)); let track_key = id_generator::tracks_key(&peer.peer, &track); let peer_map = id_generator::peer_map(self.room, &peer.peer); @@ -250,7 +250,7 @@ impl RoomMetadata { None }; - let subscribers = self.peers_map_subscribers.iter().map(|a| a.0).collect::>(); + let subscribers = self.peers_map_subscribers.iter().copied().collect::>(); if let Some(info) = info { log::info!("[ClusterRoom {}] cluster: peer {} joined => fire event to {:?}", self.room, info.peer, subscribers); self.cluster_peers.insert(peer_key, info.clone()); @@ -258,7 +258,7 @@ impl RoomMetadata { self.queue.push_back(Output::Endpoint(subscribers, ClusterEndpointEvent::PeerJoined(info.peer, info.meta))); } } else { - let info = return_if_none!(self.cluster_peers.remove(&peer_key)); + let info = return_if_none!(self.cluster_peers.swap_remove(&peer_key)); log::info!("[ClusterRoom {}] cluster: peer ({}) leaved => fire event to {:?}", self.room, info.peer, subscribers); if !subscribers.is_empty() { self.queue.push_back(Output::Endpoint(subscribers, ClusterEndpointEvent::PeerLeaved(info.peer, info.meta))); @@ -273,7 +273,7 @@ impl RoomMetadata { None }; - let subscribers = self.tracks_map_subscribers.iter().map(|a| a.0).collect::>(); + let subscribers = self.tracks_map_subscribers.iter().copied().collect::>(); if let Some(info) = info { log::info!( "[ClusterRoom {}] cluster: peer ({}) started track {}) => fire event to {:?}", @@ -288,7 +288,7 @@ impl RoomMetadata { .push_back(Output::Endpoint(subscribers, ClusterEndpointEvent::TrackStarted(info.peer, info.track, info.meta))); } } else { - let info = return_if_none!(self.cluster_tracks.remove(&track)); + let info = return_if_none!(self.cluster_tracks.swap_remove(&track)); log::info!( "[ClusterRoom {}] cluster: peer ({}) stopped track {}) => fire event to {:?}", self.room, @@ -310,7 +310,7 @@ impl RoomMetadata { None }; - let subscribers = return_if_none!(self.peers_tracks_subs.get(&peer_map)).iter().map(|a| a.0).collect::>(); + let subscribers = return_if_none!(self.peers_tracks_subs.get(&peer_map)).iter().copied().collect::>(); if let Some(info) = info { log::info!( "[ClusterRoom {}] cluster: peer ({}) started track {}) => fire event to {:?}", @@ -323,7 +323,7 @@ impl RoomMetadata { self.queue .push_back(Output::Endpoint(subscribers, ClusterEndpointEvent::TrackStarted(info.peer, info.track, info.meta))); } else { - let info = return_if_none!(self.cluster_tracks.remove(&track)); + let info = return_if_none!(self.cluster_tracks.swap_remove(&track)); log::info!( "[ClusterRoom {}] cluster: peer ({}) stopped track {}) => fire event to {:?}", self.room, diff --git a/packages/media_core/src/endpoint/internal.rs b/packages/media_core/src/endpoint/internal.rs index 68422878..ff64ad32 100644 --- a/packages/media_core/src/endpoint/internal.rs +++ b/packages/media_core/src/endpoint/internal.rs @@ -8,7 +8,7 @@ use media_server_protocol::{ record::SessionRecordEvent, transport::RpcError, }; -use media_server_utils::Small2dMap; +use media_server_utils::IndexMap2d; use sans_io_runtime::{return_if_none, return_if_some, TaskGroup, TaskGroupOutput, TaskSwitcher, TaskSwitcherBranch, TaskSwitcherChild}; use crate::{ @@ -54,8 +54,8 @@ pub struct EndpointInternal { state: Option<(Instant, TransportState)>, wait_join: EndpointInternalWaitJoin, joined: Option<(ClusterRoomHash, RoomId, PeerId, Option)>, - local_tracks_id: Small2dMap, - remote_tracks_id: Small2dMap, + local_tracks_id: IndexMap2d, + remote_tracks_id: IndexMap2d, local_tracks: TaskSwitcherBranch, TaskGroupOutput>, remote_tracks: TaskSwitcherBranch, TaskGroupOutput>, bitrate_allocator: TaskSwitcherBranch, diff --git a/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs b/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs index 4b883a9b..b07a90d8 100644 --- a/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs +++ b/packages/media_core/src/endpoint/internal/bitrate_allocator/egress.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; +use indexmap::IndexMap; use media_server_protocol::endpoint::TrackPriority; use crate::transport::LocalTrackId; @@ -23,7 +24,7 @@ pub struct EgressBitrateAllocator { max_egress_bitrate: u64, changed: bool, egress_bitrate: u64, - tracks: smallmap::Map, + tracks: IndexMap, queue: VecDeque, } @@ -33,8 +34,8 @@ impl EgressBitrateAllocator { max_egress_bitrate, changed: false, egress_bitrate: DEFAULT_BITRATE_BPS, - tracks: smallmap::Map::new(), - queue: VecDeque::new(), + tracks: Default::default(), + queue: Default::default(), } } @@ -55,7 +56,7 @@ impl EgressBitrateAllocator { pub fn del_video_track(&mut self, track: LocalTrackId) { log::info!("[EgressBitrateAllocator] del video track {track}"); - self.tracks.remove(&track); + self.tracks.swap_remove(&track); self.changed = true; } diff --git a/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs b/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs index a4b4395a..76b32e25 100644 --- a/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs +++ b/packages/media_core/src/endpoint/internal/bitrate_allocator/ingress.rs @@ -1,5 +1,6 @@ use std::collections::VecDeque; +use indexmap::IndexMap; use media_server_protocol::endpoint::TrackPriority; use crate::transport::RemoteTrackId; @@ -12,7 +13,7 @@ pub enum Action { pub struct IngressBitrateAllocator { changed: bool, ingress_bitrate: u64, - tracks: smallmap::Map, + tracks: IndexMap, queue: VecDeque<(RemoteTrackId, Action)>, } @@ -21,7 +22,7 @@ impl IngressBitrateAllocator { Self { ingress_bitrate, changed: false, - tracks: smallmap::Map::new(), + tracks: IndexMap::new(), queue: VecDeque::new(), } } @@ -38,7 +39,7 @@ impl IngressBitrateAllocator { pub fn del_video_track(&mut self, track: RemoteTrackId) { log::info!("[IngressBitrateAllocator] del video track {track}"); - self.tracks.remove(&track); + self.tracks.swap_remove(&track); self.changed = true; } diff --git a/packages/media_runner/Cargo.toml b/packages/media_runner/Cargo.toml index 5ceb841b..aef8acbc 100644 --- a/packages/media_runner/Cargo.toml +++ b/packages/media_runner/Cargo.toml @@ -10,6 +10,7 @@ rand = { workspace = true } log = { workspace = true } num_enum = { workspace = true } convert-enum = { workspace = true } +indexmap = { workspace = true } media-server-protocol = { path = "../protocol" } media-server-secure = { path = "../media_secure" } media-server-gateway = { path = "../media_gateway" } diff --git a/packages/media_runner/src/worker.rs b/packages/media_runner/src/worker.rs index 66ce12a1..d2707a10 100644 --- a/packages/media_runner/src/worker.rs +++ b/packages/media_runner/src/worker.rs @@ -14,6 +14,7 @@ use atm0s_sdn::{ ControllerPlaneCfg, DataPlaneCfg, DataWorkerHistory, NetInput, NetOutput, NodeAddr, SdnExtIn, SdnExtOut, SdnWorker, SdnWorkerBusEvent, SdnWorkerCfg, SdnWorkerInput, SdnWorkerOutput, TimePivot, }; use atm0s_sdn_network::data_plane::NetPair; +use indexmap::IndexMap; use media_server_connector::agent_service::ConnectorAgentServiceBuilder; use media_server_core::cluster::{self, MediaCluster}; use media_server_gateway::{agent_service::GatewayAgentServiceBuilder, NodeMetrics, ServiceKind, AGENT_SERVICE_ID}; @@ -127,8 +128,8 @@ pub struct MediaServerWorker { worker: u16, sdn_addr: NodeAddr, sdn_worker: TaskSwitcherBranch, SdnWorkerOutput>, - sdn_backend_addrs: HashMap, - sdn_backend_slots: HashMap, + sdn_backend_addrs: IndexMap, + sdn_backend_slots: IndexMap, media_cluster: TaskSwitcherBranch, cluster::Output>, media_webrtc: TaskSwitcherBranch, transport_webrtc::GroupOutput>, media_rtpengine: TaskSwitcherBranch, @@ -729,3 +730,34 @@ impl MediaServerWorker { } } } + +#[cfg(test)] +mod test { + use transport_rtpengine::RtpEngineSession; + use transport_webrtc::WebrtcSession; + + use super::MediaClusterEndpoint; + + #[test] + fn smallmap_collision() { + for i in 0..1_000_000 { + let mut map = indexmap::IndexMap::new(); + let webrtc = MediaClusterEndpoint::Webrtc(WebrtcSession::from(rand::random::())); + map.insert(webrtc, ()); + assert_eq!(map.len(), 1); + + let rtpengine = MediaClusterEndpoint::RtpEngine(RtpEngineSession::from(rand::random::())); + map.insert(rtpengine, ()); + assert_eq!(map.len(), 2); + + map.swap_remove(&webrtc); + + assert_eq!(map.len(), 1); + assert!(!map.is_empty(), "first failsed, cycle {i} {webrtc:?} {rtpengine:?}"); + + map.swap_remove(&rtpengine); + assert_eq!(map.len(), 0); + assert!(map.is_empty(), "second failsed, cycle {i} {webrtc:?} {rtpengine:?}"); + } + } +} diff --git a/packages/media_utils/Cargo.toml b/packages/media_utils/Cargo.toml index 74ba7783..1e89bc0a 100644 --- a/packages/media_utils/Cargo.toml +++ b/packages/media_utils/Cargo.toml @@ -6,8 +6,8 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -smallmap = { workspace = true } sorted-vec = "0.8" +indexmap = { workspace = true } log = { workspace = true } serde = { version = "1.0", features = ["derive"] } uriparse = "0.6" @@ -15,4 +15,11 @@ serde-querystring = "0.2" pin-project-lite = "0.2" spin = { workspace = true } once_cell = "1.20" -derive_more = "1.0" +derive_more = { version = "1.0", features = ["full"] } + +[dev-dependencies] +criterion = { version = "0.5", features = ["html_reports"] } + +[[bench]] +name = "map_bench" +harness = false \ No newline at end of file diff --git a/packages/media_utils/benches/map_bench.rs b/packages/media_utils/benches/map_bench.rs new file mode 100644 index 00000000..9c3626d9 --- /dev/null +++ b/packages/media_utils/benches/map_bench.rs @@ -0,0 +1,26 @@ +use criterion::{criterion_group, criterion_main, Criterion}; +use std::collections::HashMap; + +fn criterion_benchmark(c: &mut Criterion) { + let mut map = HashMap::new(); + for i in 0..64 { + map.insert(i, i); + } + + let mut map2 = indexmap::IndexMap::new(); + for i in 0..64 { + map2.insert(i, i); + } + + c.bench_function("std::map::iter", |b| b.iter(|| map.iter())); + c.bench_function("indexmap::iter", |b| b.iter(|| map2.iter())); + + c.bench_function("std::map::found", |b| b.iter(|| map.get(&55))); + c.bench_function("indexmap::found", |b| b.iter(|| map2.get(&55))); + + c.bench_function("std::map::notfound", |b| b.iter(|| map.get(&155))); + c.bench_function("indexmap::notfound", |b| b.iter(|| map2.get(&155))); +} + +criterion_group!(benches, criterion_benchmark); +criterion_main!(benches); diff --git a/packages/media_utils/src/small_2dmap.rs b/packages/media_utils/src/indexmap_2d.rs similarity index 66% rename from packages/media_utils/src/small_2dmap.rs rename to packages/media_utils/src/indexmap_2d.rs index 5616561b..356c7ef9 100644 --- a/packages/media_utils/src/small_2dmap.rs +++ b/packages/media_utils/src/indexmap_2d.rs @@ -1,11 +1,13 @@ use std::hash::Hash; -pub struct Small2dMap { - data: smallmap::Map, - reverse: smallmap::Map, +use indexmap::IndexMap; + +pub struct IndexMap2d { + data: IndexMap, + reverse: IndexMap, } -impl Default for Small2dMap { +impl Default for IndexMap2d { fn default() -> Self { Self { data: Default::default(), @@ -14,7 +16,7 @@ impl Default for Small2dMap Small2dMap { +impl IndexMap2d { pub fn insert(&mut self, key: T1, value: T2) { self.data.insert(key.clone(), value.clone()); self.reverse.insert(value, key); @@ -25,7 +27,7 @@ impl Small2dMap { } pub fn pairs(&self) -> Vec<(T1, T2)> { - self.data.iter().cloned().collect::>() + self.data.iter().map(|(k, v)| (k.clone(), v.clone())).collect::>() } pub fn keys2(&self) -> Vec { @@ -41,14 +43,14 @@ impl Small2dMap { } pub fn remove1(&mut self, key: &T1) -> Option { - let value = self.data.remove(key)?; - self.reverse.remove(&value); + let value = self.data.swap_remove(key)?; + self.reverse.swap_remove(&value); Some(value) } pub fn remove2(&mut self, key: &T2) -> Option { - let value = self.reverse.remove(key)?; - self.data.remove(&value); + let value = self.reverse.swap_remove(key)?; + self.data.swap_remove(&value); Some(value) } diff --git a/packages/media_utils/src/lib.rs b/packages/media_utils/src/lib.rs index 6f823579..de1218c3 100644 --- a/packages/media_utils/src/lib.rs +++ b/packages/media_utils/src/lib.rs @@ -1,9 +1,9 @@ mod count; mod f16; +mod indexmap_2d; mod select; mod seq_extend; mod seq_rewrite; -mod small_2dmap; mod state; mod time; mod ts_rewrite; @@ -11,10 +11,10 @@ mod uri; pub use count::{get_all_counts, Count}; pub use f16::{F16i, F16u}; +pub use indexmap_2d::IndexMap2d; pub use select::*; pub use seq_extend::RtpSeqExtend; pub use seq_rewrite::SeqRewrite; -pub use small_2dmap::Small2dMap; pub use state::*; pub use time::now_ms; pub use ts_rewrite::TsRewrite; diff --git a/packages/transport_rtpengine/Cargo.toml b/packages/transport_rtpengine/Cargo.toml index 24e7b066..96800c8a 100644 --- a/packages/transport_rtpengine/Cargo.toml +++ b/packages/transport_rtpengine/Cargo.toml @@ -7,7 +7,6 @@ edition = "2021" log = { workspace = true } num_enum = { workspace = true } derive_more = { workspace = true, features = ["full"] } -smallmap = { workspace = true } sans-io-runtime = { workspace = true, default-features = false } media-server-secure = { path = "../media_secure" } media-server-core = { path = "../media_core" } diff --git a/packages/transport_webrtc/Cargo.toml b/packages/transport_webrtc/Cargo.toml index 2c032bcf..48b9cb6c 100644 --- a/packages/transport_webrtc/Cargo.toml +++ b/packages/transport_webrtc/Cargo.toml @@ -8,7 +8,7 @@ edition = "2021" [dependencies] log = { workspace = true } num_enum = { workspace = true } -smallmap = { workspace = true } +indexmap = { workspace = true } derive_more = { workspace = true, features = ["full"] } sans-io-runtime = { workspace = true, default-features = false } prost = { workspace = true } diff --git a/packages/transport_webrtc/src/media/mod.rs b/packages/transport_webrtc/src/media/mod.rs index 95a66bfd..7d82f546 100644 --- a/packages/transport_webrtc/src/media/mod.rs +++ b/packages/transport_webrtc/src/media/mod.rs @@ -1,3 +1,4 @@ +use indexmap::IndexMap; use media_server_protocol::media::{H264Profile, MediaCodec, MediaLayerBitrate, MediaLayersBitrate, MediaMeta, MediaOrientation, MediaPacket, Vp9Profile}; use str0m::{ format::{CodecConfig, CodecSpec}, @@ -12,9 +13,9 @@ mod vp9; #[derive(Default)] pub struct RemoteMediaConvert { - map: smallmap::Map, - ssrcs_rid: smallmap::Map, - ssrcs_mid: smallmap::Map, + map: IndexMap, + ssrcs_rid: IndexMap, + ssrcs_mid: IndexMap, } impl RemoteMediaConvert { @@ -96,7 +97,7 @@ impl RemoteMediaConvert { #[derive(Default)] pub struct LocalMediaConvert { - map: smallmap::Map, + map: IndexMap, } impl LocalMediaConvert { diff --git a/packages/transport_webrtc/src/shared_port.rs b/packages/transport_webrtc/src/shared_port.rs index 6bfae467..3162298c 100644 --- a/packages/transport_webrtc/src/shared_port.rs +++ b/packages/transport_webrtc/src/shared_port.rs @@ -1,21 +1,22 @@ -use std::{collections::HashMap, fmt::Debug, hash::Hash, net::SocketAddr}; +use indexmap::IndexMap; +use std::{fmt::Debug, hash::Hash, net::SocketAddr}; use str0m::ice::StunMessage; #[derive(Debug)] pub struct SharedUdpPort { - task_remotes: HashMap, - task_remotes_map: HashMap>, - task_ufrags: HashMap, - task_ufrags_reverse: HashMap, + task_remotes: IndexMap, + task_remotes_map: IndexMap>, + task_ufrags: IndexMap, + task_ufrags_reverse: IndexMap, } impl Default for SharedUdpPort { fn default() -> Self { Self { - task_remotes: HashMap::new(), - task_remotes_map: HashMap::new(), - task_ufrags: HashMap::new(), - task_ufrags_reverse: HashMap::new(), + task_remotes: IndexMap::new(), + task_remotes_map: IndexMap::new(), + task_ufrags: IndexMap::new(), + task_ufrags_reverse: IndexMap::new(), } } } @@ -28,13 +29,13 @@ impl SharedUdpPort { } pub fn remove_task(&mut self, task: Task) -> Option<()> { - let ufrag = self.task_ufrags_reverse.remove(&task)?; + let ufrag = self.task_ufrags_reverse.swap_remove(&task)?; log::info!("Remove task {:?} => ufrag {}", task, ufrag); - self.task_ufrags.remove(&ufrag)?; - let remotes = self.task_remotes_map.remove(&task)?; + self.task_ufrags.swap_remove(&ufrag)?; + let remotes = self.task_remotes_map.swap_remove(&task)?; for remote in remotes { log::info!(" Remove remote {:?} => task {:?}", remote, task); - self.task_remotes.remove(&remote); + self.task_remotes.swap_remove(&remote); } Some(()) } diff --git a/packages/transport_webrtc/src/transport.rs b/packages/transport_webrtc/src/transport.rs index 160b1ad9..5916f78e 100644 --- a/packages/transport_webrtc/src/transport.rs +++ b/packages/transport_webrtc/src/transport.rs @@ -6,6 +6,7 @@ use std::{ time::{Duration, Instant}, }; +use indexmap::IndexMap; use media_server_core::{ endpoint::{EndpointEvent, EndpointReqId, EndpointRes}, transport::{Transport, TransportInput, TransportOutput}, @@ -18,7 +19,7 @@ use media_server_protocol::{ transport::{RpcError, RpcResult}, }; use media_server_secure::MediaEdgeSecure; -use media_server_utils::{Count, RtpSeqExtend, Small2dMap}; +use media_server_utils::{Count, IndexMap2d, RtpSeqExtend}; use sans_io_runtime::{ backend::{BackendIncoming, BackendOutgoing}, collections::DynamicDeque, @@ -114,9 +115,9 @@ pub struct TransportWebrtc { rtc: Rtc, rtc_ice_lite: bool, internal: Box, - ports: Small2dMap, + ports: IndexMap2d, local_convert: LocalMediaConvert, - seq_extends: smallmap::Map, + seq_extends: IndexMap, queue: DynamicDeque, 4>, _tmp: PhantomData, } @@ -175,7 +176,7 @@ impl TransportWebrtc { }; rtc.direct_api().enable_twcc_feedback(); - let mut ports = Small2dMap::default(); + let mut ports = IndexMap2d::default(); for (local_addr, slot) in local_addrs { ports.insert(*local_addr, *slot); rtc.add_local_candidate(Candidate::host(*local_addr, Protocol::Udp).expect("Should add local candidate"));