Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,7 @@
<type fullname="System.OperationCanceledException" />
<type fullname="System.Runtime.CompilerServices.AsyncTaskMethodBuilder" />
<type fullname="System.Runtime.CompilerServices.AsyncTaskMethodBuilder`1" />
<type fullname="System.Runtime.CompilerServices.AsyncVoidMethodBuilder" />
<type fullname="System.Runtime.CompilerServices.ConfiguredAsyncDisposable" />
<type fullname="System.Threading.CancellationTokenSource" />
<type fullname="System.Threading.Tasks.TaskAsyncEnumerableExtensions" />
Expand Down
114 changes: 45 additions & 69 deletions tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ internal class DataStreamsWriter : IDataStreamsWriter
private readonly SemaphoreSlim _flushSemaphore = new(1, 1);
private MemoryStream? _serializationBuffer;
private long _pointsDropped;
private int _flushRequested;
private Task? _processTask;
private Timer? _flushTimer;
private TaskCompletionSource<bool>? _currentFlushTcs;

private int _isSupported = SupportState.Unknown;
private bool _isInitialized;
Expand Down Expand Up @@ -87,17 +85,18 @@ public static DataStreamsWriter Create(

private void Initialize()
{
Log.Warning("ROBC Custom .NET tracer branch with flush logic changes");
lock (_initLock)
{
if (_processTask != null)
{
return;
}

_processTask = Task.Run(ProcessQueueLoopAsync);
_processTask = Task.Factory.StartNew(ProcessQueueLoopAsync, TaskCreationOptions.LongRunning);
_processTask.ContinueWith(t => Log.Error(t.Exception, "Error in processing task"), TaskContinuationOptions.OnlyOnFaulted);
_flushTimer = new Timer(
x => ((DataStreamsWriter)x!).RequestFlush(),
async x => await ((DataStreamsWriter)x!).FlushAsync().ConfigureAwait(false),
this,
dueTime: _bucketDurationMs,
period: _bucketDurationMs);
Expand Down Expand Up @@ -159,6 +158,7 @@ public async Task DisposeAsync()

private async Task FlushAndCloseAsync()
{
Log.Debug("ROBC Flush and close...");
if (!_processExit.TrySetResult(true))
{
return;
Expand All @@ -170,12 +170,6 @@ private async Task FlushAndCloseAsync()
return;
}

// request a final flush - as the _processExit flag is now set
// this ensures we will definitely flush all the stats
// (and sets the mutex if it isn't already set)
RequestFlush();

// wait for the processing loop to complete
var completedTask = await Task.WhenAny(
_processTask,
Task.Delay(TimeSpan.FromSeconds(20)))
Expand All @@ -185,54 +179,43 @@ private async Task FlushAndCloseAsync()
{
Log.Error("Could not flush all data streams stats before process exit");
}

await FlushAsync().ConfigureAwait(false);
}

public async Task FlushAsync()
{
await _flushSemaphore.WaitAsync().ConfigureAwait(false);
try
Log.Debug("ROB Flushing Async");
if (!Volatile.Read(ref _isInitialized) || _processTask == null)
{
var timeout = TimeSpan.FromSeconds(5);

if (_processExit.Task.IsCompleted)
{
return;
}

if (!Volatile.Read(ref _isInitialized) || _processTask == null)
{
return;
}

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Interlocked.Exchange(ref _currentFlushTcs, tcs);

RequestFlush();
return;
}

var completedTask = await Task.WhenAny(
tcs.Task,
_processExit.Task,
Task.Delay(timeout)).ConfigureAwait(false);
if (!await _flushSemaphore.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false))
{
Log.Error("Data streams flush timeout");
return;
}

if (completedTask != tcs.Task)
{
Log.Error("Data streams flush timeout after {Timeout}ms", timeout.TotalMilliseconds);
}
try
{
Log.Debug("ROB Write API async");
await WriteToApiAsync().ConfigureAwait(false);
FlushComplete?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
Log.Error(ex, "Error during flush");
}
finally
{
_currentFlushTcs = null;
_flushSemaphore.Release();
}
}

private void RequestFlush()
{
Interlocked.Exchange(ref _flushRequested, 1);
}

private async Task WriteToApiAsync()
{
Log.Debug("ROBC Writing to API Async");
// This method blocks ingestion of new stats points into the aggregator,
// but they will continue to be added to the queue, and will be processed later
// Default buffer capacity matches Java implementation:
Expand Down Expand Up @@ -269,13 +252,27 @@ private async Task WriteToApiAsync()
}
}

private async Task ProcessQueueLoopAsync()
private void ProcessQueueLoopAsync()
{
var isFinalFlush = false;
while (true)
{
Log.Debug("ROBC Processing Queue Loop - Sleep");
Thread.Sleep(_waitTimeSpan);

if (!_flushSemaphore.Wait(TimeSpan.FromSeconds(10)))
{
Log.Warning("Queue Loop Semaphore timeout - continuing");
if (_processExit.Task.IsCompleted)
{
return;
}

continue;
}

try
{
Log.Debug("ROBC Adding points to aggregator");
while (_buffer.TryDequeue(out var statsPoint))
{
_aggregator.Add(in statsPoint);
Expand All @@ -285,40 +282,19 @@ private async Task ProcessQueueLoopAsync()
{
_aggregator.AddBacklog(in backlogPoint);
}

var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
if (flushRequested == 1)
{
await WriteToApiAsync().ConfigureAwait(false);
var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
currentFlushTcs?.TrySetResult(true);
FlushComplete?.Invoke(this, EventArgs.Empty);
}
}
catch (Exception ex)
{
Log.Error(ex, "An error occured in the processing thread");
}

if (_processExit.Task.IsCompleted)
finally
{
if (isFinalFlush)
{
return;
}

// do one more loop to make sure everything is flushed
RequestFlush();
isFinalFlush = true;
continue;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might need to call the submit API inline here a final time

_flushSemaphore.Release();
}

// The logic is copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs#L26
// and modified to avoid dealing with exceptions
var tcs = new TaskCompletionSource<bool>();
using (new Timer(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs, _waitTimeSpan, Timeout.InfiniteTimeSpan))
if (_processExit.Task.IsCompleted)
{
await Task.WhenAny(_processExit.Task, tcs.Task).ConfigureAwait(false);
return;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public void WhenEnabled_SetCheckpoint_SetsSpanTags()
var dsm = GetDataStreamManager(true, out _);
var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow);

span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
span.Tags.GetTag("pathway.hash").Should().NotBeNull();
}

Expand Down
Loading