Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrency issue with Subscriber sockets (m_subscriptions) #15

Open
wiz0u opened this issue Nov 2, 2017 · 3 comments
Open

Concurrency issue with Subscriber sockets (m_subscriptions) #15

wiz0u opened this issue Nov 2, 2017 · 3 comments

Comments

@wiz0u
Copy link

wiz0u commented Nov 2, 2017

Hello
I have a NetMQPoller thread, started by RunAsync(), polling on a SubscriberSocket, receiving messages.
On my Main Thread, I randomly call subscriber.Subscribe(...) on new topics

At some point, this will crash, usually with the following stack: (release build)

System.IndexOutOfRangeException
   at NetMQ.Core.Patterns.Utils.Trie.Check(Byte[], Int32, Int32)
   at NetMQ.Core.Patterns.XSub.XHasIn()
   at NetMQ.Core.SocketBase.GetSocketOption(NetMQ.Core.ZmqSocketOption)
   at NetMQ.Core.Utils.Selector.Select(NetMQ.Core.Utils.SelectItem[], Int32, Int64)
   at NetMQ.NetMQPoller.Run()
   at System.Threading.ExecutionContext.RunInternal(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object, Boolean)
   at System.Threading.ExecutionContext.Run(System.Threading.ExecutionContext, System.Threading.ContextCallback, System.Object)
   at System.Threading.ThreadHelper.ThreadStart()

As far as I analyzed NetMQ source code, it is due to the NetMQPollerThread trying to Check() the m_subscriptions Trie, while the main thread (going synchronously through Subscribe/SetSocketOption/XSetSocketOption/XSend) is trying to Add() a new topic to the m_subscriptions Trie.
Access to m_subscriptions seems not MT safe.

If this is by-design, what is the proper way to call Subscribe ?
Shouldn't the XSetSocketOption/XSend be marshalled via pipes through the Selector (or something like that) so it is handled in the same thread as NetMQPoller?

@wiz0u
Copy link
Author

wiz0u commented Nov 6, 2017

According to somdoron in zeromq/netmq#564 (comment)

NetMQSocket is not thread safe, this kind of exception usually indicate you are using the socket outside of the poller thread.
Once you added a socket to a poller and the poller is running you can only use the socket from the poller thread (ReceiveReady event or other events raised by the poller, like NetMQTimer or NetMQQueue).

So I guess, we must really be on the NetMQPoller thread for every (Un)Subscribe...
It's a pain but alright. Here is my solution:

		internal static void ExecuteOnPoll(Action action) // inspired by NetMQPoller.Run(action)
		{
			if (Poller.CanExecuteTaskInline)
				action();
			else
				new Task(action).Start(Poller);
		}

		// when adequate:
		ExecuteOnPoll(() => subscriber.Subscribe(topic));

I let you close this issue if that's the correct way of solving the problem.

@lakhansrn
Copy link

@wiz0u : Did the above code, fix the Concurrency issue.?

@wiz0u
Copy link
Author

wiz0u commented Sep 2, 2020

Yes, the solution I presented is a good workaround for concurrency issues with (Un)Subscribe

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants