Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent Dictionary for thread safety #921

Merged
merged 4 commits into from
Oct 7, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 24 additions & 16 deletions src/NATS.Client/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ public class Connection : IConnection

private ServerInfo serverInfo = null;

private Dictionary<Int64, Subscription> subs =
new Dictionary<Int64, Subscription>();
private ConcurrentDictionary<Int64, Subscription> subs =
new ConcurrentDictionary<Int64, Subscription>();

private readonly ConcurrentQueue<SingleUseChannel<bool>> pongs = new ConcurrentQueue<SingleUseChannel<bool>>();

Expand All @@ -155,10 +155,10 @@ public class Connection : IConnection
private readonly string globalRequestInbox;

// used to map replies to requests from client (should lock)
private long nextRequestId = 0;
private InterlockedLong nextRequestId = new InterlockedLong(0);

private readonly Dictionary<string, InFlightRequest> waitingRequests
= new Dictionary<string, InFlightRequest>(StringComparer.OrdinalIgnoreCase);
private readonly ConcurrentDictionary<string, InFlightRequest> waitingRequests
= new ConcurrentDictionary<string, InFlightRequest>(StringComparer.OrdinalIgnoreCase);

// Prepare protocol messages for efficiency
private byte[] PING_P_BYTES = null;
Expand Down Expand Up @@ -2930,7 +2930,8 @@ private void RemoveOutstandingRequest(string requestId)
{
lock (mu)
{
waitingRequests.Remove(requestId);
InFlightRequest ignored;
waitingRequests.TryRemove(requestId, out ignored);
}
}

Expand Down Expand Up @@ -2959,7 +2960,9 @@ private void RequestResponseHandler(object sender, MsgHandlerEventArgs args)
// _INBOX.<nuid>.<requestId>
var requestId = subject.Substring(globalRequestInbox.Length + 1);
if (!waitingRequests.TryGetValue(requestId, out request))
{
return;
}
}
else
{
Expand Down Expand Up @@ -3003,25 +3006,29 @@ private void RequestResponseHandler(object sender, MsgHandlerEventArgs args)

private InFlightRequest setupRequest(int timeout, CancellationToken token)
{
var requestId = Interlocked.Increment(ref nextRequestId);
if (requestId < 0) //Check if recycled
requestId = (requestId + long.MaxValue + 1);

var requestId = nextRequestId.Increment();
var request = new InFlightRequest(requestId.ToString(CultureInfo.InvariantCulture), token, timeout, RemoveOutstandingRequest);
request.Waiter.Task.ContinueWith(t => GC.KeepAlive(t.Exception), TaskContinuationOptions.OnlyOnFaulted);

lock (mu)
{
// We shouldn't ever get an Argument exception because the ID is incrementing
// and since this is performant sensitive code, skipping an existence check.
waitingRequests.Add(request.Id, request);
if (!waitingRequests.TryAdd(request.Id, request))
{

}

if (globalRequestSubscription != null)
{
return request;
}

if (globalRequestSubscription == null)
globalRequestSubscription = subscribeAsync(string.Concat(globalRequestInbox, ".*"), null,
RequestResponseHandler);
{
globalRequestSubscription =
subscribeAsync(string.Concat(globalRequestInbox, ".*"), null, RequestResponseHandler);
}
}

return request;
Expand Down Expand Up @@ -4186,7 +4193,8 @@ internal void AddSubscription(Subscription s)

internal void RemoveSubscription(Subscription s)
{
subs.Remove(s.sid);
Subscription ignore;
subs.TryRemove(s.sid, out ignore);
}

// caller must lock
Expand Down Expand Up @@ -4481,15 +4489,15 @@ internal void removeSubSafe(Subscription s)
// caller must lock
internal virtual void removeSub(Subscription s)
{
subs.Remove(s.sid);
Subscription ignore;
subs.TryRemove(s.sid, out ignore);
Copy link
Contributor

@mtmk mtmk Oct 7, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could use subs.TryRemove(s.sid, out _); if possible

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

if (s.mch != null)
{
if (s.ownsChannel)
s.mch.close();

s.mch = null;
}

s.closed = true;
}

Expand Down
Loading