-
Notifications
You must be signed in to change notification settings - Fork 362
/
Copy pathDiagnosticsEventPipeProcessor.cs
189 lines (161 loc) · 7.26 KB
/
DiagnosticsEventPipeProcessor.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
using System;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Diagnostics.NETCore.Client;
using Microsoft.Diagnostics.Tracing;
namespace Microsoft.Diagnostics.Monitoring.EventPipe
{
internal partial class DiagnosticsEventPipeProcessor : IAsyncDisposable
{
private readonly MonitoringSourceConfiguration _configuration;
private readonly Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> _onEventSourceAvailable;
private readonly object _lock = new();
private TaskCompletionSource<bool> _initialized;
private TaskCompletionSource<bool> _sessionStarted;
private EventPipeEventSource _eventSource;
private Func<Task> _stopFunc;
private bool _disposed;
// Allows tests to know when the event pipe session has started so that the
// target application can start producing events.
internal Task SessionStarted => _sessionStarted.Task;
public DiagnosticsEventPipeProcessor(
MonitoringSourceConfiguration configuration,
Func<EventPipeEventSource, Func<Task>, CancellationToken, Task> onEventSourceAvailable
)
{
_configuration = configuration ?? throw new ArgumentNullException(nameof(configuration));
_onEventSourceAvailable = onEventSourceAvailable ?? throw new ArgumentNullException(nameof(onEventSourceAvailable));
_initialized = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
_sessionStarted = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
}
public async Task Process(DiagnosticsClient client, TimeSpan duration, bool resumeRuntime, CancellationToken token)
{
//No need to guard against reentrancy here, since the calling pipeline does this already.
IDisposable registration = token.Register(() => TryCancelCompletionSources(token));
await (await Task.Factory.StartNew(async () => {
EventPipeEventSource source = null;
EventPipeStreamProvider streamProvider = null;
Task handleEventsTask = Task.CompletedTask;
try
{
streamProvider = new EventPipeStreamProvider(_configuration);
// Allows the event handling routines to stop processing before the duration expires.
Func<Task> stopFunc = () => Task.Run(() => { streamProvider.StopProcessing(); });
Stream sessionStream = await streamProvider.ProcessEvents(client, duration, resumeRuntime, token).ConfigureAwait(false);
if (!_sessionStarted.TrySetResult(true))
{
token.ThrowIfCancellationRequested();
}
source = new EventPipeEventSource(sessionStream);
handleEventsTask = _onEventSourceAvailable(source, stopFunc, token);
lock (_lock)
{
_eventSource = source;
_stopFunc = stopFunc;
}
registration.Dispose();
if (!_initialized.TrySetResult(true))
{
token.ThrowIfCancellationRequested();
}
source.Process();
token.ThrowIfCancellationRequested();
}
catch (DiagnosticsClientException ex)
{
InvalidOperationException wrappingException = new("Failed to start the event pipe session", ex);
TryFailCompletionSourcesReturnFalse(wrappingException);
throw wrappingException;
}
catch (Exception ex) when (TryFailCompletionSourcesReturnFalse(ex))
{
throw;
}
finally
{
registration.Dispose();
EventPipeEventSource eventSource = null;
lock (_lock)
{
eventSource = _eventSource;
_eventSource = null;
}
eventSource?.Dispose();
if (streamProvider != null)
{
await streamProvider.DisposeAsync().ConfigureAwait(false);
}
}
// Await the task returned by the event handling method AFTER the EventPipeEventSource is disposed.
// The EventPipeEventSource will only raise the Completed event when it is disposed. So if this task
// is waiting for the Completed event to be raised, it will never complete until after EventPipeEventSource
// is diposed.
await handleEventsTask.ConfigureAwait(false);
}, token, TaskCreationOptions.LongRunning, TaskScheduler.Default).ConfigureAwait(false)).ConfigureAwait(false);
}
public async Task StopProcessing()
{
await _initialized.Task.ConfigureAwait(false);
EventPipeEventSource session = null;
Func<Task> stopFunc = null;
lock (_lock)
{
session = _eventSource;
stopFunc = _stopFunc;
}
//TODO This API is not sufficient to stop data flow.
session?.StopProcessing();
if (stopFunc != null)
{
await stopFunc().ConfigureAwait(false);
}
}
public async ValueTask DisposeAsync()
{
lock (_lock)
{
if (_disposed)
{
return;
}
_disposed = true;
}
_initialized.TrySetCanceled();
try
{
await _initialized.Task.ConfigureAwait(false);
}
catch
{
}
_sessionStarted.TrySetCanceled();
_eventSource?.Dispose();
}
// Helper method for observing an exception while processing the trace session
// so that session start task completion source can be failed and the exception handler
// does not catch the exception.
private bool TryFailCompletionSourcesReturnFalse(Exception ex)
{
// Use best-effort to set the completion sources to be cancelled or failed.
if (ex is OperationCanceledException canceledException)
{
TryCancelCompletionSources(canceledException.CancellationToken);
}
else
{
_initialized.TrySetException(ex);
_sessionStarted.TrySetException(ex);
}
// Return false to make the exception handler not handle the exception.
return false;
}
private void TryCancelCompletionSources(CancellationToken token)
{
_initialized.TrySetCanceled(token);
_sessionStarted.TrySetCanceled(token);
}
}
}