Skip to content

Commit

Permalink
ChannelMessageHandlers: Make RegisterHandler() not remove the existin…
Browse files Browse the repository at this point in the history
…g handler if another one with same id is given (versatica#952)

Co-authored-by: Nazar Mokrynskyi <nazar@mokrynskyi.com>
  • Loading branch information
2 people authored and piranna committed Feb 8, 2023
1 parent b4ee1bd commit 369023a
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 29 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
# Changelog


### NEXT

* `ChannelMessageHandlers`: Make `RegisterHandler()` not remove the existing handler if another one with same `id` is given (PR #952).


### 3.11.2

* Fix installation issue in Linux due to a bug in ninja latest version 1.11.1 (PR #948).
Expand Down
15 changes: 15 additions & 0 deletions node/tests/test-PipeTransport.js
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,21 @@ test('router.pipeToRouter() succeeds with video', async () =>
expect(pipeProducer.paused).toBe(true);
}, 2000);

test('router.pipeToRouter() fails if both Routers belong to the same Worker', async () =>
{
const router1bis = await worker1.createRouter({ mediaCodecs });

await expect(router1.pipeToRouter(
{
producerId : videoProducer.id,
router : router1bis
}))
.rejects
.toThrow(Error);

router1bis.close();
}, 2000);

test('router.createPipeTransport() with enableRtx succeeds', async () =>
{
const pipeTransport = await router1.createPipeTransport(
Expand Down
34 changes: 34 additions & 0 deletions rust/tests/integration/pipe_transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mediasoup::rtp_parameters::{
};
use mediasoup::sctp_parameters::SctpStreamParameters;
use mediasoup::srtp_parameters::{SrtpCryptoSuite, SrtpParameters};
use mediasoup::transport::ProduceError;
use mediasoup::webrtc_transport::{TransportListenIps, WebRtcTransport, WebRtcTransportOptions};
use mediasoup::worker::{RequestError, Worker, WorkerSettings};
use mediasoup::worker_manager::WorkerManager;
Expand Down Expand Up @@ -552,6 +553,39 @@ fn pipe_to_router_succeeds_with_video() {
});
}

#[test]
fn pipe_to_router_fails_if_both_routers_belong_to_the_same_worker() {
future::block_on(async move {
let (worker1, _worker2, router1, _router2, transport1, _transport2) = init().await;

let router1bis = worker1
.create_router(RouterOptions::new(media_codecs()))
.await
.expect("Failed to create router");

let video_producer = transport1
.produce(video_producer_options())
.await
.expect("Failed to produce video");

let result = router1
.pipe_producer_to_router(
video_producer.id(),
PipeToRouterOptions::new(router1bis.clone()),
)
.await;

if let Err(PipeProducerToRouterError::ProduceFailed(ProduceError::Request(
RequestError::Response { reason },
))) = result
{
assert!(reason.contains("already exists [method:transport.produce]"));
} else {
panic!("Unexpected result: {:?}", result);
}
});
}

#[test]
fn weak() {
future::block_on(async move {
Expand Down
63 changes: 34 additions & 29 deletions worker/src/ChannelMessageHandlers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,51 +63,56 @@ void ChannelMessageHandlers::RegisterHandler(
{
MS_TRACE();

try
if (channelRequestHandler != nullptr)
{
if (channelRequestHandler != nullptr)
if (
ChannelMessageHandlers::mapChannelRequestHandlers.find(id) !=
ChannelMessageHandlers::mapChannelRequestHandlers.end())
{
if (
ChannelMessageHandlers::mapChannelRequestHandlers.find(id) !=
ChannelMessageHandlers::mapChannelRequestHandlers.end())
{
MS_THROW_ERROR("Channel request handler with ID %s already exists", id.c_str());
}

ChannelMessageHandlers::mapChannelRequestHandlers[id] = channelRequestHandler;
MS_THROW_ERROR("Channel request handler with ID %s already exists", id.c_str());
}

if (payloadChannelRequestHandler != nullptr)
ChannelMessageHandlers::mapChannelRequestHandlers[id] = channelRequestHandler;
}

if (payloadChannelRequestHandler != nullptr)
{
if (
ChannelMessageHandlers::mapPayloadChannelRequestHandlers.find(id) !=
ChannelMessageHandlers::mapPayloadChannelRequestHandlers.end())
{
if (
ChannelMessageHandlers::mapPayloadChannelRequestHandlers.find(id) !=
ChannelMessageHandlers::mapPayloadChannelRequestHandlers.end())
if (channelRequestHandler != nullptr)
{
MS_THROW_ERROR("PayloadChannel request handler with ID %s already exists", id.c_str());
ChannelMessageHandlers::mapChannelRequestHandlers.erase(id);
}

ChannelMessageHandlers::mapPayloadChannelRequestHandlers[id] = payloadChannelRequestHandler;
MS_THROW_ERROR("PayloadChannel request handler with ID %s already exists", id.c_str());
}

if (payloadChannelNotificationHandler != nullptr)
ChannelMessageHandlers::mapPayloadChannelRequestHandlers[id] = payloadChannelRequestHandler;
}

if (payloadChannelNotificationHandler != nullptr)
{
if (
ChannelMessageHandlers::mapPayloadChannelNotificationHandlers.find(id) !=
ChannelMessageHandlers::mapPayloadChannelNotificationHandlers.end())
{
if (
ChannelMessageHandlers::mapPayloadChannelNotificationHandlers.find(id) !=
ChannelMessageHandlers::mapPayloadChannelNotificationHandlers.end())
if (channelRequestHandler != nullptr)
{
MS_THROW_ERROR("PayloadChannel notification handler with ID %s already exists", id.c_str());
ChannelMessageHandlers::mapChannelRequestHandlers.erase(id);
}

ChannelMessageHandlers::mapPayloadChannelNotificationHandlers[id] =
payloadChannelNotificationHandler;
if (payloadChannelRequestHandler != nullptr)
{
ChannelMessageHandlers::mapPayloadChannelRequestHandlers.erase(id);
}

MS_THROW_ERROR("PayloadChannel notification handler with ID %s already exists", id.c_str());
}
}
catch (const MediaSoupError& error)
{
// In case of error unregister everything.
ChannelMessageHandlers::UnregisterHandler(id);

throw;
ChannelMessageHandlers::mapPayloadChannelNotificationHandlers[id] =
payloadChannelNotificationHandler;
}
}

Expand Down

0 comments on commit 369023a

Please sign in to comment.