Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(Fix: DeviceClient): Fix the concurrency issue in MQTT stack #2234

Merged
merged 9 commits into from
Dec 9, 2021

Conversation

azabbasi
Copy link
Contributor

@azabbasi azabbasi commented Nov 8, 2021

This PR fixes #2232

In a multithreaded solution, there is a chance of Enqueuing and Dequeuing happening simultaneously and we need to guard against those cases.
We have two approaches available to us to take advantage of,
1- Use concurrent collections
2- Use regular collections with a semaphore lock.

Based on an interesting article here, It's recommended to use a regular collection with an external lock instead of a concurrent collection.

I quote:

Mixed producer-consumer scenario: 
Any given thread is both adding and removing elements.

In mixed producer-consumer scenarios, when the processing time is very small, a Queue<T> that has an external lock scales better than ConcurrentQueue<T> does. However, when processing time is around 500 FLOPS or more, then the ConcurrentQueue<T> scales better.

After discussing this more, I went back on my decision to use external locks to protect the queue.

Even though scalability may play a role here, it seems like it won't be worth the hassle and complexity of making multiple classes disposable since we have had many issues with disposing in the past.
Concurrent collections come with default thread-safety behaviors and seem to be the way to go.

Feedback and insights are more than welcome :)

@azabbasi azabbasi changed the title Fix the concurrency issue in MQTT stack (Fix: DeviceClient): Fix the concurrency issue in MQTT stack Nov 8, 2021
@azabbasi azabbasi force-pushed the azabbasi/fixConcurrentIssueInMqtt branch from 2eea45e to c0297be Compare November 8, 2021 22:04
Make MqttIotHubAdatper disposable.
@Azure Azure deleted a comment from azure-pipelines bot Nov 8, 2021
@azabbasi
Copy link
Contributor Author

azabbasi commented Nov 8, 2021

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@azabbasi azabbasi marked this pull request as ready for review November 8, 2021 22:50
@timtay-microsoft
Copy link
Member

There's something to be said about the simplicity of just using concurrency safe data structures, but considering the downsides the article brings up and the fact that you've already done the work of building out these locks, I'm fine with this approach.

@azabbasi
Copy link
Contributor Author

azabbasi commented Nov 9, 2021

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@@ -46,11 +46,13 @@ public Task CompleteWorkAsync(IChannelHandlerContext context, TWorkId workId)
throw new IotHubException("Nothing to complete.", isTransient: false);
}

IncompleteWorkItem incompleteWorkItem = _incompleteQueue.Peek();
if (incompleteWorkItem.Id.Equals(workId))
if (_incompleteQueue.TryPeek(out IncompleteWorkItem incompleteWorkItem)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you are supposed to TryPeek and then assume when you call TryDequeue that you get the same item. In the intervening time another thread could have dequeued it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions I have here are, is this really a queue model since we're not dealing with FIFO? Are the workid's guaranteed to be unique? If so couldn't we do this with a dictionary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, messages have to be processed in the order they came in, so it is in fact FIFO in this MQTT messaging model.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have removed the TryPeek approach and dequeuing it instead, if TryPeek returned the wrong workitem, we would have thrown a non-transient exception so it doesn't seem to matter whether or not we dequeue the message or not at that point, we can have the same functionality using dequeue and that should resolve the issue.

Copy link
Contributor

@drwill-ms drwill-ms left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about the TryPeek/TryDequeue logic.

@azabbasi
Copy link
Contributor Author

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@WillooWisp
Copy link

@azabbasi will this solve this "deadlock" issue, #2237?

@WillooWisp
Copy link

@azabbasi just verified that this branch (PR) does not solve the deadlock referenced here, #2237.

In fact disconnecting from internet not even triggers the ConnectionStatusChangesHandler sometimes using this branch.

@azabbasi
Copy link
Contributor Author

@azabbasi just verified that this branch (PR) does not solve the deadlock referenced here, #2237.

In fact disconnecting from internet not even triggers the ConnectionStatusChangesHandler sometimes using this branch.

This PR doesn't address any deadlock issues, this helps fix concurrency issue with the internal queue we have for MQTT

@azabbasi
Copy link
Contributor Author

azabbasi commented Dec 9, 2021

/azp run

@azure-pipelines
Copy link

Azure Pipelines successfully started running 1 pipeline(s).

@azabbasi azabbasi merged commit 4618ef3 into main Dec 9, 2021
@azabbasi azabbasi deleted the azabbasi/fixConcurrentIssueInMqtt branch December 9, 2021 19:20
abhipsaMisra added a commit that referenced this pull request Feb 14, 2022
* feat(shared): Add common resources for convention-based operations

* feat(iot-device): Add support for convention-based telemetry operation

* feat(iot-device): Add support for convention-based command operations

* * feat(iot-device): Add support for convention-based properties operations

* feat(iot-device): Add support for convention-based properties operations

Co-authored-by: James Davis ⛺️🏔 <jamdavi@microsoft.com>

* feat(e2e-tests): Add telemetry E2E tests

* feat(e2e-tests): Add command E2E tests

* fix(iot-device): Updating client property collection to handle no convention

* feat(samples): Add thermostat and temperature controller sample

* fix(doc, samples): Update API design doc and move SystemTextJson helper to samples

* fix(iot-device): Separate out root-level and component-level property addition operations

* feat(tests): Add unit tests for ClientPropertyCollection

feat(tests): Add unit tests for ClientPropertyCollection

Co-authored-by: Abhipsa Misra <abhipsa.misra@microsoft.com>

* feat(e2e-tests): Add properties E2E tests

Co-authored-by: Abhipsa Misra <abhipsa.misra@microsoft.com>

* feat(e2e-tests): Add fault injection tests for properties operations (#2001)

* fix(iot-device, shared, samples): Rename StatusCodes to CommonClientResponseCodes and add a comment to highlight ClientOptions behavior

* fix(iot-device): Fix enumerator implementation to return key-value pairs

* fix(iot-device): Make ClientOptions.PayloadConvention readonly

* fix(shared): Fix from merge conflict

* fix(samples): Update the pnp samples readme (#2025)

* refactor(samples): Move preview samples to samples repository (#2057)

* fix(e2e-tests): Fix file renaming Configutaion to TestConfiguration

* refactor(iot-device): Rename property add method

* * fix(iot-device): TryGetValue methods should check for the component identifier when applicable and should not throw any exceptions (#2111)

* refactor(iot-device): Separate client reported properties into a separate accessor

* fix(iot-device): Add checks to verify that client property update response has version and requestId (#2115)

* refactor(iot-device): Update the internal method names as per functionality

* feat(iot-device): Add convenience method for acknowledging writable property update requests

* fix(iot-device): Update PayloadCollection to accept dictionary of values (#2171)

* refactor(iot-device): Merge flows for twin and client property operations (#2180)

* fix(e2e-tests): Update E2E tests to initialize and dispose resources correctly

* doc(client, service): Update documents about the Proxy property for various clients (#2248)

Update our code docs to better explain proxy settings

* Cleanup IDE warnings. From 442 down to 31. (#2254)

* Fix documented exception types thrown by methods in device client. (#2256)

* (Fix: DeviceClient): Fix the concurrency issue in MQTT stack (#2234)

* Update RegistryManager bulk API unit tests (#2258)

* feat(hub-svc): add support and tests for configurations export (#2250)

* Temporarily disable import devices/configs test (#2263)

* fix(iot-svc): update debug assert (#2264)

* fix(iot-svc): update debug assert

* Temporarily disable import devices/configs test (#2263)

* fix(iot-svc): update debug assert

* Reduce modules in ModulesClient_GetModulesOnDevice to 2

* Consolidate keyvault writes, and fix error about null (#2266)

* Cleanup code in Hub RTAC (#2268)

* fix(deviceClient): Fix issue with AMQP connection pool and TokenReferesher disposal. (#2260)

* Fix issue with AMQP connection pool

* Dispose the connection holder appropriately

* Add unit testing capability to connection pool

* Move more LA steps under conditional (#2270)

* Disable import/export of config until bug fix (#2273)

* Adding RBAC support for provisioning SDK (#2262)

* Added types for different credentials

* Altered code to work with new types

* Fixed Managers

* removed unneeded #if !NET451

* Filled in method summaries

* refactored GetHeaderProvider

* Added documentation

* Formatting fixes to address comments

* Adding Common 0.7.1 (#2272)

* Fix doc comment list bullets (#2275)

* Fix doc comment list bullets

* Change to use description tag

* sdl(all): Create SBOM for net packages  (#2261)

* Streamline an RBAC test (#2274)

* Exclude low pri .net targets in PR builds (#2277)

* fix(tests): Fix device client test to use updated API

* Update IDeviceIdentity interface to add doc comments. (#2282)

* PnP doc comment updates (#2278)

* Fix remaining doc comment bullets (#2285)

* Give e2e appId permission on the DPS instance (#2283)

* fix(DeviceClient): Avoid NRE after client dispose (#2286)

* refactor(iot-device): Update API surface for property callback API

* refactor(iot-device): Update API surface for command callback API

* fix(e2e-tests): Update E2E tests to use fluent assertions

* fix(iot-device): Update API surface to remove redundant private setters

* Revert "Adding RBAC support for provisioning SDK (#2262)" (#2289)

This reverts commit 7a26eda.

* Version bump for 2021-01-26 release (#2291)

* Update service clients instantiation doc comments (#2290)

* Update RegistryManager instantiation doc comments

* Update other service client doc comments.

* Fix class field constants casing (#2292)

* Disable checks for config in import e2e test (#2293)

* Update iteration path in vsts.yaml (#2296)

Co-authored-by: James Davis ⛺️🏔 <jamdavi@microsoft.com>
Co-authored-by: jamdavi <73593426+jamdavi@users.noreply.github.com>
Co-authored-by: Azad Abbasi <azabbasi@microsoft.com>
Co-authored-by: David R. Williamson <drwill@microsoft.com>
Co-authored-by: dylanbulfinMS <95251881+dylanbulfinMS@users.noreply.github.com>
timstewartm pushed a commit to timstewartm/azure-iot-sdk-csharp that referenced this pull request May 30, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

DeviceClient-SendEventAsync throwing InvalidOperationException
7 participants