diff --git a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml
index 6611cdd628ff..6c0e028a7efd 100644
--- a/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml
+++ b/tracer/src/Datadog.Trace.Trimming/build/Datadog.Trace.Trimming.xml
@@ -1068,6 +1068,7 @@
+
diff --git a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
index 6c5c47b154d1..5a4a79e8f88a 100644
--- a/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
+++ b/tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
@@ -37,10 +37,8 @@ internal class DataStreamsWriter : IDataStreamsWriter
private readonly SemaphoreSlim _flushSemaphore = new(1, 1);
private MemoryStream? _serializationBuffer;
private long _pointsDropped;
- private int _flushRequested;
private Task? _processTask;
private Timer? _flushTimer;
- private TaskCompletionSource? _currentFlushTcs;
private int _isSupported = SupportState.Unknown;
private bool _isInitialized;
@@ -87,6 +85,7 @@ public static DataStreamsWriter Create(
private void Initialize()
{
+ Log.Warning("ROBC Custom .NET tracer branch with flush logic changes");
lock (_initLock)
{
if (_processTask != null)
@@ -94,10 +93,10 @@ private void Initialize()
return;
}
- _processTask = Task.Run(ProcessQueueLoopAsync);
+ _processTask = Task.Factory.StartNew(ProcessQueueLoopAsync, TaskCreationOptions.LongRunning);
_processTask.ContinueWith(t => Log.Error(t.Exception, "Error in processing task"), TaskContinuationOptions.OnlyOnFaulted);
_flushTimer = new Timer(
- x => ((DataStreamsWriter)x!).RequestFlush(),
+ async x => await ((DataStreamsWriter)x!).FlushAsync().ConfigureAwait(false),
this,
dueTime: _bucketDurationMs,
period: _bucketDurationMs);
@@ -159,6 +158,7 @@ public async Task DisposeAsync()
private async Task FlushAndCloseAsync()
{
+ Log.Debug("ROBC Flush and close...");
if (!_processExit.TrySetResult(true))
{
return;
@@ -170,12 +170,6 @@ private async Task FlushAndCloseAsync()
return;
}
- // request a final flush - as the _processExit flag is now set
- // this ensures we will definitely flush all the stats
- // (and sets the mutex if it isn't already set)
- RequestFlush();
-
- // wait for the processing loop to complete
var completedTask = await Task.WhenAny(
_processTask,
Task.Delay(TimeSpan.FromSeconds(20)))
@@ -185,54 +179,43 @@ private async Task FlushAndCloseAsync()
{
Log.Error("Could not flush all data streams stats before process exit");
}
+
+ await FlushAsync().ConfigureAwait(false);
}
public async Task FlushAsync()
{
- await _flushSemaphore.WaitAsync().ConfigureAwait(false);
- try
+ Log.Debug("ROB Flushing Async");
+ if (!Volatile.Read(ref _isInitialized) || _processTask == null)
{
- var timeout = TimeSpan.FromSeconds(5);
-
- if (_processExit.Task.IsCompleted)
- {
- return;
- }
-
- if (!Volatile.Read(ref _isInitialized) || _processTask == null)
- {
- return;
- }
-
- var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
- Interlocked.Exchange(ref _currentFlushTcs, tcs);
-
- RequestFlush();
+ return;
+ }
- var completedTask = await Task.WhenAny(
- tcs.Task,
- _processExit.Task,
- Task.Delay(timeout)).ConfigureAwait(false);
+ if (!await _flushSemaphore.WaitAsync(TimeSpan.FromSeconds(5)).ConfigureAwait(false))
+ {
+ Log.Error("Data streams flush timeout");
+ return;
+ }
- if (completedTask != tcs.Task)
- {
- Log.Error("Data streams flush timeout after {Timeout}ms", timeout.TotalMilliseconds);
- }
+ try
+ {
+ Log.Debug("ROB Write API async");
+ await WriteToApiAsync().ConfigureAwait(false);
+ FlushComplete?.Invoke(this, EventArgs.Empty);
+ }
+ catch (Exception ex)
+ {
+ Log.Error(ex, "Error during flush");
}
finally
{
- _currentFlushTcs = null;
_flushSemaphore.Release();
}
}
- private void RequestFlush()
- {
- Interlocked.Exchange(ref _flushRequested, 1);
- }
-
private async Task WriteToApiAsync()
{
+ Log.Debug("ROBC Writing to API Async");
// This method blocks ingestion of new stats points into the aggregator,
// but they will continue to be added to the queue, and will be processed later
// Default buffer capacity matches Java implementation:
@@ -269,13 +252,27 @@ private async Task WriteToApiAsync()
}
}
- private async Task ProcessQueueLoopAsync()
+ private void ProcessQueueLoopAsync()
{
- var isFinalFlush = false;
while (true)
{
+ Log.Debug("ROBC Processing Queue Loop - Sleep");
+ Thread.Sleep(_waitTimeSpan);
+
+ if (!_flushSemaphore.Wait(TimeSpan.FromSeconds(10)))
+ {
+ Log.Warning("Queue Loop Semaphore timeout - continuing");
+ if (_processExit.Task.IsCompleted)
+ {
+ return;
+ }
+
+ continue;
+ }
+
try
{
+ Log.Debug("ROBC Adding points to aggregator");
while (_buffer.TryDequeue(out var statsPoint))
{
_aggregator.Add(in statsPoint);
@@ -285,40 +282,19 @@ private async Task ProcessQueueLoopAsync()
{
_aggregator.AddBacklog(in backlogPoint);
}
-
- var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
- if (flushRequested == 1)
- {
- await WriteToApiAsync().ConfigureAwait(false);
- var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
- currentFlushTcs?.TrySetResult(true);
- FlushComplete?.Invoke(this, EventArgs.Empty);
- }
}
catch (Exception ex)
{
Log.Error(ex, "An error occured in the processing thread");
}
-
- if (_processExit.Task.IsCompleted)
+ finally
{
- if (isFinalFlush)
- {
- return;
- }
-
- // do one more loop to make sure everything is flushed
- RequestFlush();
- isFinalFlush = true;
- continue;
+ _flushSemaphore.Release();
}
- // The logic is copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs#L26
- // and modified to avoid dealing with exceptions
- var tcs = new TaskCompletionSource();
- using (new Timer(s => ((TaskCompletionSource)s!).SetResult(true), tcs, _waitTimeSpan, Timeout.InfiniteTimeSpan))
+ if (_processExit.Task.IsCompleted)
{
- await Task.WhenAny(_processExit.Task, tcs.Task).ConfigureAwait(false);
+ return;
}
}
}
diff --git a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs
index f993e2c936c7..5dcb3f51d80e 100644
--- a/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs
+++ b/tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs
@@ -205,7 +205,7 @@ public void WhenEnabled_SetCheckpoint_SetsSpanTags()
var dsm = GetDataStreamManager(true, out _);
var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow);
- span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
+ span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
span.Tags.GetTag("pathway.hash").Should().NotBeNull();
}