Skip to content
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
a956b84
first pass new flush logic change
robcarlan-datadog Nov 14, 2025
67f73e0
add some more debug logs
robcarlan-datadog Nov 17, 2025
ca33f36
fix
robcarlan-datadog Nov 17, 2025
da8108b
fix
robcarlan-datadog Nov 17, 2025
6071e25
fix
robcarlan-datadog Nov 17, 2025
2fa26ce
noop flush loop
robcarlan-datadog Nov 17, 2025
f6ad692
Revert "noop flush loop"
robcarlan-datadog Nov 17, 2025
29f33b0
use noop methods and log
robcarlan-datadog Nov 17, 2025
dbbe958
add points to bucket but dont await
robcarlan-datadog Nov 18, 2025
a9b8488
flush doing work
robcarlan-datadog Nov 18, 2025
5733876
add semaphore to point aggregation path
robcarlan-datadog Nov 18, 2025
0e0e5c8
starting to clean up
robcarlan-datadog Nov 19, 2025
c67d9d7
starting to clean up
robcarlan-datadog Nov 19, 2025
8b687c3
temp revert sleep for await
robcarlan-datadog Nov 20, 2025
c461d04
make task synchronous
robcarlan-datadog Nov 20, 2025
146a2be
Timer calls async
robcarlan-datadog Nov 21, 2025
b47f447
experiment with Semaphore backoff
robcarlan-datadog Nov 21, 2025
0e7bfae
Revert "experiment with Semaphore backoff"
robcarlan-datadog Nov 21, 2025
d6c6b17
clean up tests
robcarlan-datadog Nov 21, 2025
f82407a
Merge branch 'master' into rob.carlan/apms-17551-dsm-cpu-bottleneck
robcarlan-datadog Dec 1, 2025
2619301
remove test code
robcarlan-datadog Dec 1, 2025
775b826
remove logs
robcarlan-datadog Dec 2, 2025
87cb19d
Merge branch 'master' into rob.carlan/apms-17551-dsm-cpu-bottleneck
robcarlan-datadog Dec 2, 2025
64dfaee
rename ProcessQueueLoop
robcarlan-datadog Dec 2, 2025
497d891
Remove async void from Timer, add Flush task
robcarlan-datadog Dec 3, 2025
c0ca32b
reduce Task timeouts
robcarlan-datadog Dec 5, 2025
aaa23fe
flush buffers on Lambda invocation end
robcarlan-datadog Dec 5, 2025
35aa78e
Merge branch 'master' into rob.carlan/apms-17551-dsm-cpu-bottleneck
robcarlan-datadog Dec 8, 2025
aef6407
Use async await in FlushAsync
robcarlan-datadog Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 96 additions & 58 deletions tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,29 @@ namespace Datadog.Trace.DataStreamsMonitoring;

internal class DataStreamsWriter : IDataStreamsWriter
{
private const TaskCreationOptions TaskOptions = TaskCreationOptions.RunContinuationsAsynchronously;

private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<DataStreamsWriter>();

private readonly object _initLock = new();
private readonly long _bucketDurationMs;
private readonly BoundedConcurrentQueue<StatsPoint> _buffer = new(queueLimit: 10_000);
private readonly BoundedConcurrentQueue<BacklogPoint> _backlogBuffer = new(queueLimit: 10_000);
private readonly TimeSpan _waitTimeSpan = TimeSpan.FromMilliseconds(10);
private readonly TimeSpan _flushSemaphoreWaitTime = TimeSpan.FromSeconds(1);
private readonly DataStreamsAggregator _aggregator;
private readonly IDiscoveryService _discoveryService;
private readonly IDataStreamsApi _api;
private readonly bool _isInDefaultState;

private readonly TaskCompletionSource<bool> _processExit = new(TaskCreationOptions.RunContinuationsAsynchronously);
private readonly TaskCompletionSource<bool> _processExit = new(TaskOptions);
private readonly SemaphoreSlim _flushSemaphore = new(1, 1);
private MemoryStream? _serializationBuffer;
private long _pointsDropped;
private int _flushRequested;
private Task? _processTask;
private Task? _flushTask;
private Timer? _flushTimer;
private TaskCompletionSource<bool>? _currentFlushTcs;
private TaskCompletionSource<bool> _forceFlush = new(TaskOptions);

private int _isSupported = SupportState.Unknown;
private bool _isInitialized;
Expand Down Expand Up @@ -93,8 +96,12 @@ private void Initialize()
return;
}

_processTask = Task.Run(ProcessQueueLoopAsync);
_processTask = Task.Factory.StartNew(ProcessQueueLoop, TaskCreationOptions.LongRunning);
_processTask.ContinueWith(t => Log.Error(t.Exception, "Error in processing task"), TaskContinuationOptions.OnlyOnFaulted);

_flushTask = Task.Run(FlushTaskLoopAsync);
_flushTask.ContinueWith(t => Log.Error(t.Exception, "Error in data streams flush task"), TaskContinuationOptions.OnlyOnFaulted);

_flushTimer = new Timer(
x => ((DataStreamsWriter)x!).RequestFlush(),
this,
Expand Down Expand Up @@ -169,65 +176,106 @@ private async Task FlushAndCloseAsync()
return;
}

// request a final flush - as the _processExit flag is now set
// this ensures we will definitely flush all the stats
// (and sets the mutex if it isn't already set)
RequestFlush();

// wait for the processing loop to complete
var completedTask = await Task.WhenAny(
_processTask,
Task.Delay(TimeSpan.FromSeconds(20)))
Task.Delay(TimeSpan.FromSeconds(1)))
.ConfigureAwait(false);

if (completedTask != _processTask)
{
Log.Error("Could not flush all data streams stats before process exit");
}

await FlushAsync().ConfigureAwait(false);
}

public async Task FlushAsync()
private void RequestFlush()
{
await _flushSemaphore.WaitAsync().ConfigureAwait(false);
try
_forceFlush.TrySetResult(true);
}

private async Task FlushTaskLoopAsync()
{
Task[] tasks = new Task[2];
tasks[0] = _processTask!;
tasks[1] = _forceFlush.Task;

while (true)
{
var timeout = TimeSpan.FromSeconds(5);
await Task.WhenAny(tasks).ConfigureAwait(false);

if (_processExit.Task.IsCompleted)
if (_forceFlush.Task.IsCompleted)
{
return;
_forceFlush = new TaskCompletionSource<bool>(TaskOptions);
tasks[1] = _forceFlush.Task;
}

if (!Volatile.Read(ref _isInitialized) || _processTask == null)
await FlushAggregatorAsync().ConfigureAwait(false);

if (_processTask!.IsCompleted)
{
return;
}
}
}

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
Interlocked.Exchange(ref _currentFlushTcs, tcs);
public async Task FlushAsync()
{
if (!Volatile.Read(ref _isInitialized) || _processTask == null)
{
return;
}

RequestFlush();
if (!_flushSemaphore.Wait(TimeSpan.FromMilliseconds(100)))
{
return;
}

var completedTask = await Task.WhenAny(
tcs.Task,
_processExit.Task,
Task.Delay(timeout)).ConfigureAwait(false);
try
{
while (_buffer.TryDequeue(out var statsPoint))
{
_aggregator.Add(in statsPoint);
}

if (completedTask != tcs.Task)
while (_backlogBuffer.TryDequeue(out var backlogPoint))
{
Log.Error("Data streams flush timeout after {Timeout}ms", timeout.TotalMilliseconds);
_aggregator.AddBacklog(in backlogPoint);
}
}
catch (Exception ex)
{
Log.Error(ex, "An error occured while processing data streams buffers");
}
finally
{
_currentFlushTcs = null;
_flushSemaphore.Release();
}

await FlushAggregatorAsync().ConfigureAwait(false);
}

private void RequestFlush()
private async Task FlushAggregatorAsync()
{
Interlocked.Exchange(ref _flushRequested, 1);
if (!await _flushSemaphore.WaitAsync(_flushSemaphoreWaitTime).ConfigureAwait(false))
{
Log.Warning("Data streams flush timeout");
return;
}

try
{
await WriteToApiAsync().ConfigureAwait(false);
FlushComplete?.Invoke(this, EventArgs.Empty);
}
catch (Exception ex)
{
Log.Error(ex, "An error occured while writing data streams");
}
finally
{
_flushSemaphore.Release();
}
}

private async Task WriteToApiAsync()
Expand Down Expand Up @@ -268,11 +316,22 @@ private async Task WriteToApiAsync()
}
}

private async Task ProcessQueueLoopAsync()
private void ProcessQueueLoop()
{
var isFinalFlush = false;
while (true)
{
Thread.Sleep(_waitTimeSpan);

if (!_flushSemaphore.Wait(_flushSemaphoreWaitTime))
{
if (_processExit.Task.IsCompleted)
{
return;
}

continue;
}

try
{
while (_buffer.TryDequeue(out var statsPoint))
Expand All @@ -284,40 +343,19 @@ private async Task ProcessQueueLoopAsync()
{
_aggregator.AddBacklog(in backlogPoint);
}

var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
if (flushRequested == 1)
{
await WriteToApiAsync().ConfigureAwait(false);
var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
currentFlushTcs?.TrySetResult(true);
FlushComplete?.Invoke(this, EventArgs.Empty);
}
}
catch (Exception ex)
{
Log.Error(ex, "An error occured in the processing thread");
Log.Error(ex, "An error occured in the data streams processing thread");
}

if (_processExit.Task.IsCompleted)
finally
{
if (isFinalFlush)
{
return;
}

// do one more loop to make sure everything is flushed
RequestFlush();
isFinalFlush = true;
continue;
_flushSemaphore.Release();
}

// The logic is copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs#L26
// and modified to avoid dealing with exceptions
var tcs = new TaskCompletionSource<bool>();
using (new Timer(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs, _waitTimeSpan, Timeout.InfiniteTimeSpan))
if (_processExit.Task.IsCompleted)
{
await Task.WhenAny(_processExit.Task, tcs.Task).ConfigureAwait(false);
return;
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void WhenEnabled_SetCheckpoint_SetsSpanTags()
var dsm = GetDataStreamManager(true, out _);
var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow);

span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
span.Tags.GetTag("pathway.hash").Should().NotBeNull();
}

Expand Down
Loading