Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ChannelMessageHandlers: Make RegisterHandler() not remove the existing handler if another one with same id is given #952

Merged
merged 7 commits into from
Nov 6, 2022

Conversation

ibc
Copy link
Member

@ibc ibc commented Nov 6, 2022

  • Do not remove existing handler if an attempt to register a new one with same id happens. Tis prevents the following issue discovered while adding a new unit test:
FAIL  node/tests/test-PipeTransport.js
 ✓ router.pipeToRouter() succeeds with audio (24 ms)
 ✓ router.pipeToRouter() succeeds with video (21 ms)
 ### NEW TEST:
 ✓ router.pipeToRouter() fails if both Routers belong to the same Worker (32 ms)
 ✓ router.createPipeTransport() with enableRtx succeeds (6 ms)
 ✓ router.createPipeTransport() with invalid srtpParameters must fail (3 ms)
 ✓ router.createPipeTransport() with enableSrtp succeeds (9 ms)
 ✓ router.createPipeTransport() with fixed port succeeds (8 ms)
 ✓ transport.consume() for a pipe Producer succeeds (6 ms)
 ### UNRELATED FAILING TEST:
 ✕ producer.pause() and producer.resume() are transmitted to pipe Consumer (2 ms)
 ✓ producer.close() is transmitted to pipe Consumer (2 ms)
 ✓ router.pipeToRouter() succeeds with data (7 ms)
 ✓ transport.dataConsume() for a pipe DataProducer succeeds (2 ms)
 ✓ dataProducer.close() is transmitted to pipe DataConsumer (2 ms)
 ✓ router.pipeToRouter() called twice generates a single PipeTransport pair (14 ms)
 ✓ router.pipeToRouter() called in two Routers passing one to each other as argument generates a single a single PipeTransport pair (10 ms)

 ● producer.pause() and producer.resume() are transmitted to pipe Consumer

   Channel request handler with ID c9cba815-1128-4500-aa4f-7edbc135b889 not found [method:producer.resume]

It doesn't make sense that such a test fails with that error (producer not found in the worker). To simplify the error, I've added a temporal code in the new test that clearly shows the issue:

test('router.pipeToRouter() fails if both Routers belong to the same Worker', async () =>
{
	const router1bis = await worker1.createRouter({ mediaCodecs });

	await expect(router1.pipeToRouter(
		{
			producerId : videoProducer.id,
			router     : router1bis
		}))
		.rejects
		.toThrow(Error);

	// TODO: Temporal code to show an unexpected error.
	{
		// This is ok.
		expect(videoProducer.closed).toBe(false);

		// However these will fail since the handler Id does not exist in the worker.
		// This is impossible and must be a bug.
		await videoProducer.resume();
		await videoProducer.pause();
	}

	router1bis.close();
}, 2000);

Add a test to assert than calling `router1.pipeToRouter(router2)` fails if both `Routers` are in the same `Worker`.

### TODO

The new test succeed however it **breaks** an unrelated test that runs later that should NOT be affected at all:

```
FAIL  node/tests/test-PipeTransport.js
 ✓ router.pipeToRouter() succeeds with audio (24 ms)
 ✓ router.pipeToRouter() succeeds with video (21 ms)
 ### NEW TEST:
 ✓ router.pipeToRouter() fails if both Routers belong to the same Worker (32 ms)
 ✓ router.createPipeTransport() with enableRtx succeeds (6 ms)
 ✓ router.createPipeTransport() with invalid srtpParameters must fail (3 ms)
 ✓ router.createPipeTransport() with enableSrtp succeeds (9 ms)
 ✓ router.createPipeTransport() with fixed port succeeds (8 ms)
 ✓ transport.consume() for a pipe Producer succeeds (6 ms)
 ### UNRELATED FAILING TEST:
 ✕ producer.pause() and producer.resume() are transmitted to pipe Consumer (2 ms)
 ✓ producer.close() is transmitted to pipe Consumer (2 ms)
 ✓ router.pipeToRouter() succeeds with data (7 ms)
 ✓ transport.dataConsume() for a pipe DataProducer succeeds (2 ms)
 ✓ dataProducer.close() is transmitted to pipe DataConsumer (2 ms)
 ✓ router.pipeToRouter() called twice generates a single PipeTransport pair (14 ms)
 ✓ router.pipeToRouter() called in two Routers passing one to each other as argument generates a single a single PipeTransport pair (10 ms)

 ● producer.pause() and producer.resume() are transmitted to pipe Consumer

   Channel request handler with ID c9cba815-1128-4500-aa4f-7edbc135b889 not found [method:producer.resume]
```

It doesn't make sense that such a test fails with that error (producer not found in the worker).

To simplify the error, I've added a temporal code in the new test that clearly shows the issue:

```ts
test('router.pipeToRouter() fails if both Routers belong to the same Worker', async () =>
{
	const router1bis = await worker1.createRouter({ mediaCodecs });

	await expect(router1.pipeToRouter(
		{
			producerId : videoProducer.id,
			router     : router1bis
		}))
		.rejects
		.toThrow(Error);

	// TODO: Temporal code to show an unexpected error.
	{
		// This is ok.
		expect(videoProducer.closed).toBe(false);

		// However these will fail since the handler Id does not exist in the worker.
		// This is impossible and must be a bug.
		await videoProducer.resume();
		await videoProducer.pause();
	}

	router1bis.close();
}, 2000);
```

In addition, Rust test is missing but I don't want to deal with that until the issue is solved.
@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

To be clear, it looks like this code is closing/deleting videoProducer in the worker (it shouldn't):

router1.pipeToRouter(
	{
		producerId : videoProducer.id,
		router     : router1bis
	});

Ok, the code throws because both routers are in the same worker, but that should NOT close the given producer.

@nazar-pc I cannot find anything that could cause this error. I've verified tat Router::pipeToRouter() method in Router.ts does not close the given producer at all.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

I know where the bug is.

@ibc ibc changed the title Add extra pipeToRouter() test ChannelMessageHandlers: Make RegisterHandler() not remove the existing handler if another one with same id is given Nov 6, 2022
@ibc ibc requested a review from nazar-pc November 6, 2022 15:28
@ibc ibc marked this pull request as ready for review November 6, 2022 15:28
@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

Adding test in Rust.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

@nazar-pc I definitely need your help in the Rust test:

failures:

---- pipe_transport::pipe_to_router_fails_if_both_routers_belong_to_the_same_worker stdout ----
Err(ProduceFailed(Request(Response { reason: "Channel request handler with ID d41aa2bc-588c-467f-941f-4b7bfe1e8f63 already exists [method:transport.produce]" })))

thread 'pipe_transport::pipe_to_router_fails_if_both_routers_belong_to_the_same_worker' panicked at 'assertion failed: matches!(result,\n    Err(PipeProducerToRouterError ::\n    ProduceFailed(ProduceError :: AlreadyExists(producer_id))))', rust/tests/integration/pipe_transport.rs:584:9
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace


failures:
    pipe_transport::pipe_to_router_fails_if_both_routers_belong_to_the_same_worker

@nazar-pc
Copy link
Collaborator

nazar-pc commented Nov 6, 2022

The error comes from

if let Some(id) = &producer_options.id {
if self.router().has_producer(id) {
return Err(ProduceError::AlreadyExists(*id));
}
}

As far as Rust side is concerned, router certainly contains such producer already and it doesn't make any sense to produce it again.

I don't think I fully understand what you're trying to do in this PR.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

The error comes from

No, it doesn't. The error comes from router1.pipe_producer_to_router() as expected given that I'm calling it by passing as argument a router1bis Router which was created in the same worker. Remember the limitation due to the global id handler?

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

As far as Rust side is concerned, router certainly contains such producer already and it doesn't make any sense to produce it again.

No, this is not the error.

@nazar-pc
Copy link
Collaborator

nazar-pc commented Nov 6, 2022

I see. You'll need to inspect error message then and convert it to ProduceError::AlreadyExists (and I think similarly for data producer as well). Otherwise it returns generic error straight from the worker as a reason string.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

CleanShot-2022-11-06-at-18 28 31@2x

  • 1 works fine. No error here.
  • 2 fails (as expected) since there is already a handler with same id the the worker.
  • 3 fails because definitely I don't know what to put here. This is an error generated by the Worker rather than by Rust. However when doing assert!(matches!(result, xxxxx) is seems that xxxxx MUST be an enum of PipeProducerToRouterError, but none of those are being generated because the error is NOT generated by Rust but later by the Worker.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

I see. You'll need to inspect error message then and convert it to ProduceError::AlreadyExists (and I think similarly for data producer as well). Otherwise it returns generic error straight from the worker as a reason string.

Please help here, I've spent lot long time already trying to figure out the syntax here without success. I don't know how to inspect the error message.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

I've already searched for all error matching in Rust tests. None is similar to what it's needed here. And there is nothing about data producers that can be taken as guide.

@nazar-pc
Copy link
Collaborator

nazar-pc commented Nov 6, 2022

As you figured out, the root error is obviously PipeProducerToRouterError, which is an enum. Specifically it failed because with PipeProducerToRouterError::ProduceFailed, which in turn contains enum that explains why produce failed, in this case variant is ProduceError::Request, meaning request failed, which failed because worker sent corresponding response, hence RequestError::Response with an error message Channel request handler with ID d91991e8-fcbe-4069-8660-668cb94dc6dd already exists [method:transport.produce]. If you want to catch that, you can check if error matches that and then inspect internal reason message.

So it is just a few enums nested in one another with narrowing reason of an error.

I have pushed corresponding change, let me know if that is sufficient or I should still intercept it and replace with ProduceError::AlreadyExists instead.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

Thanks, IMHO looks good, we don't need to be super error specific in corner cases like this.

Said that, I don't think this is ok (pipe_producer_to_router() method):

CleanShot-2022-11-06-at-19 00 25@2x

In case the pipe_producer creation fails, the method should close the pipe_consumer (as we do in TS).

@ibc ibc merged commit b74f34e into v3 Nov 6, 2022
@ibc ibc deleted the add-pipe-to-router-test branch November 6, 2022 18:02
@nazar-pc
Copy link
Collaborator

nazar-pc commented Nov 6, 2022

In case the pipe_producer creation fails, the method should close the pipe_consumer (as we do in TS).

We have RAII in Rust, since that consumer is not stored anywhere (due to error exiting early) it'll be destroyed on drop automatically with destructor (Drop trait implementation in Rust). So it works the same way as TypeScript, just doesn't require explicit code for that.

@ibc
Copy link
Member Author

ibc commented Nov 6, 2022

lovely!

impl Drop for Inner {
    fn drop(&mut self) {
        debug!("drop()");

        self.close(true);
    }
}

piranna pushed a commit to dyte-in/mediasoup that referenced this pull request Feb 9, 2023
…g handler if another one with same id is given (versatica#952)

Co-authored-by: Nazar Mokrynskyi <nazar@mokrynskyi.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

2 participants