Skip to content

Commit db44f5b

Browse files
authored
Clean up LibuvOutputConsumer (#1744)
* Clean up LibuvOutputConsumer - Added UvShutdownReq.ShutdownAsync - Added Debug.Assert in LibuvAwaitable since it should never race.
1 parent 650a3cc commit db44f5b

File tree

3 files changed

+54
-42
lines changed

3 files changed

+54
-42
lines changed

src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvAwaitable.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22
// Licensed under the MIT license. See LICENSE file in the project root for full license information.
33

44
using System;
5+
using System.Diagnostics;
56
using System.Runtime.CompilerServices;
67
using System.Threading;
7-
using System.Threading.Tasks;
88
using Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking;
99

1010
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal
@@ -49,10 +49,16 @@ public UvWriteResult GetResult()
4949

5050
public void OnCompleted(Action continuation)
5151
{
52+
// There should never be a race between IsCompleted and OnCompleted since both operations
53+
// should always be on the libuv thread
54+
5255
if (_callback == _callbackCompleted ||
5356
Interlocked.CompareExchange(ref _callback, continuation, null) == _callbackCompleted)
5457
{
55-
Task.Run(continuation);
58+
Debug.Fail($"{typeof(LibuvAwaitable<TRequest>)}.{nameof(OnCompleted)} raced with {nameof(IsCompleted)}, running callback inline.");
59+
60+
// Just run it inline
61+
continuation();
5662
}
5763
}
5864

src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/LibuvOutputConsumer.cs

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,6 @@ public class LibuvOutputConsumer
1414
private readonly UvStreamHandle _socket;
1515
private readonly string _connectionId;
1616
private readonly ILibuvTrace _log;
17-
18-
private readonly WriteReqPool _writeReqPool;
1917
private readonly IPipeReader _pipe;
2018

2119
public LibuvOutputConsumer(
@@ -26,17 +24,16 @@ public LibuvOutputConsumer(
2624
ILibuvTrace log)
2725
{
2826
_pipe = pipe;
29-
// We need to have empty pipe at this moment so callback
30-
// get's scheduled
3127
_thread = thread;
3228
_socket = socket;
3329
_connectionId = connectionId;
3430
_log = log;
35-
_writeReqPool = thread.WriteReqPool;
3631
}
3732

3833
public async Task WriteOutputAsync()
3934
{
35+
var pool = _thread.WriteReqPool;
36+
4037
while (true)
4138
{
4239
var result = await _pipe.ReadAsync();
@@ -46,7 +43,7 @@ public async Task WriteOutputAsync()
4643
{
4744
if (!buffer.IsEmpty)
4845
{
49-
var writeReq = _writeReqPool.Allocate();
46+
var writeReq = pool.Allocate();
5047

5148
try
5249
{
@@ -62,14 +59,23 @@ public async Task WriteOutputAsync()
6259
finally
6360
{
6461
// Make sure we return the writeReq to the pool
65-
_writeReqPool.Return(writeReq);
62+
pool.Return(writeReq);
6663
}
6764
}
6865

6966
if (result.IsCancelled)
7067
{
7168
// Send a FIN
72-
await ShutdownAsync();
69+
_log.ConnectionWriteFin(_connectionId);
70+
71+
using (var shutdownReq = new UvShutdownReq(_log))
72+
{
73+
shutdownReq.Init(_thread);
74+
var shutdownResult = await shutdownReq.ShutdownAsync(_socket);
75+
76+
_log.ConnectionWroteFin(_connectionId, shutdownResult.Status);
77+
}
78+
7379
// Ensure no data is written after uv_shutdown
7480
break;
7581
}
@@ -105,32 +111,5 @@ private void LogWriteInfo(int status, Exception error)
105111
}
106112
}
107113
}
108-
109-
private Task ShutdownAsync()
110-
{
111-
var tcs = new TaskCompletionSource<object>();
112-
_log.ConnectionWriteFin(_connectionId);
113-
114-
var shutdownReq = new UvShutdownReq(_log);
115-
try
116-
{
117-
shutdownReq.Init(_thread);
118-
shutdownReq.Shutdown(_socket, (req, status, state) =>
119-
{
120-
req.Dispose();
121-
_log.ConnectionWroteFin(_connectionId, status);
122-
123-
tcs.TrySetResult(null);
124-
},
125-
this);
126-
}
127-
catch (Exception)
128-
{
129-
shutdownReq.Dispose();
130-
throw;
131-
}
132-
133-
return tcs.Task;
134-
}
135114
}
136115
}

src/Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv/Internal/Networking/UvShutdownReq.cs

Lines changed: 32 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// Licensed under the Apache License, Version 2.0. See License.txt in the project root for license information.
33

44
using System;
5+
using Microsoft.Extensions.Logging;
56

67
namespace Microsoft.AspNetCore.Server.Kestrel.Transport.Libuv.Internal.Networking
78
{
@@ -12,10 +13,11 @@ public class UvShutdownReq : UvRequest
1213
{
1314
private readonly static LibuvFunctions.uv_shutdown_cb _uv_shutdown_cb = UvShutdownCb;
1415

15-
private Action<UvShutdownReq, int, object> _callback;
16+
private Action<UvShutdownReq, int, Exception, object> _callback;
1617
private object _state;
18+
private LibuvAwaitable<UvShutdownReq> _awaitable = new LibuvAwaitable<UvShutdownReq>();
1719

18-
public UvShutdownReq(ILibuvTrace logger) : base (logger)
20+
public UvShutdownReq(ILibuvTrace logger) : base(logger)
1921
{
2022
}
2123

@@ -24,14 +26,20 @@ public override void Init(LibuvThread thread)
2426
var loop = thread.Loop;
2527

2628
CreateMemory(
27-
loop.Libuv,
29+
loop.Libuv,
2830
loop.ThreadId,
2931
loop.Libuv.req_size(LibuvFunctions.RequestType.SHUTDOWN));
3032

3133
base.Init(thread);
3234
}
3335

34-
public void Shutdown(UvStreamHandle handle, Action<UvShutdownReq, int, object> callback, object state)
36+
public LibuvAwaitable<UvShutdownReq> ShutdownAsync(UvStreamHandle handle)
37+
{
38+
Shutdown(handle, LibuvAwaitable<UvShutdownReq>.Callback, _awaitable);
39+
return _awaitable;
40+
}
41+
42+
public void Shutdown(UvStreamHandle handle, Action<UvShutdownReq, int, Exception, object> callback, object state)
3543
{
3644
_callback = callback;
3745
_state = state;
@@ -41,9 +49,28 @@ public void Shutdown(UvStreamHandle handle, Action<UvShutdownReq, int, object> c
4149
private static void UvShutdownCb(IntPtr ptr, int status)
4250
{
4351
var req = FromIntPtr<UvShutdownReq>(ptr);
44-
req._callback(req, status, req._state);
52+
53+
var callback = req._callback;
4554
req._callback = null;
55+
56+
var state = req._state;
4657
req._state = null;
58+
59+
Exception error = null;
60+
if (status < 0)
61+
{
62+
req.Libuv.Check(status, out error);
63+
}
64+
65+
try
66+
{
67+
callback(req, status, error, state);
68+
}
69+
catch (Exception ex)
70+
{
71+
req._log.LogError(0, ex, "UvShutdownCb");
72+
throw;
73+
}
4774
}
4875
}
4976
}

0 commit comments

Comments
 (0)