Skip to content

Commit 1813369

Browse files
Fixing DSM flush logic & CPU usage (#7827)
## Summary of changes Moves DSM flush work to a dedicated thread to improve thread pool performance. Also simplifies the flushing logic to more closely match the AgentWriter logic. (See below for test results) ## Reason for change A = DSM enabled after the fix B = DSM enabled before the fix <img width="1168" height="573" alt="Screenshot 2025-12-01 at 5 23 44 pm" src="https://github.com/user-attachments/assets/95d77f11-927c-4e7a-bfa6-451a1cf527a9" /> DSM flush logic was previously running entirely on the thread pool, which includes the task to frequently move DSM checkpoints off of the buffer and add them to the backlog. This was causing thread contention shown in the above profile. ## Implementation details Moves the DSM buffer flushing logic into it's own thread. Below is a profile after the fix (DSM disabled on the right). <img width="1422" height="622" alt="Screenshot 2025-12-01 at 5 26 53 pm" src="https://github.com/user-attachments/assets/4a0cb28a-633f-47c8-9e04-cff9ce6829b9" /> ## Test coverage Mostly manual testing. I ran a set of three apps: 1. DSM enabled, after the fix 2. DSM disabled, after the fix 3. DSM enabled, before the fix The test app just forwards a Kafka message every 0.5 seconds. And run those via docker-compose with a restricted CPU count of 0.1 and 0.5 in the below screenshots. In both cases, DSM disabled is the lowest CPU usage, and the line just above is DSM enabled after the fix. The highest CPU is DSM enabled, prior to the fix. The performance is a lot better, but I'm not sure if the gap can be closed further. <img width="1046" height="523" alt="Screenshot 2025-12-01 at 6 30 10 pm" src="https://github.com/user-attachments/assets/09cc8cdb-ba8d-4b44-be6e-b866cccb022d" /> <img width="694" height="379" alt="Screenshot 2025-12-01 at 5 55 28 pm" src="https://github.com/user-attachments/assets/f4e1c24d-4800-46ed-8e86-42eec3def22d" /> ## Other details
1 parent 24ae76f commit 1813369

File tree

2 files changed

+99
-63
lines changed

2 files changed

+99
-63
lines changed

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsWriter.cs

Lines changed: 98 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,29 @@ namespace Datadog.Trace.DataStreamsMonitoring;
2121

2222
internal class DataStreamsWriter : IDataStreamsWriter
2323
{
24+
private const TaskCreationOptions TaskOptions = TaskCreationOptions.RunContinuationsAsynchronously;
25+
2426
private static readonly IDatadogLogger Log = DatadogLogging.GetLoggerFor<DataStreamsWriter>();
2527

2628
private readonly object _initLock = new();
2729
private readonly long _bucketDurationMs;
2830
private readonly BoundedConcurrentQueue<StatsPoint> _buffer = new(queueLimit: 10_000);
2931
private readonly BoundedConcurrentQueue<BacklogPoint> _backlogBuffer = new(queueLimit: 10_000);
3032
private readonly TimeSpan _waitTimeSpan = TimeSpan.FromMilliseconds(10);
33+
private readonly TimeSpan _flushSemaphoreWaitTime = TimeSpan.FromSeconds(1);
3134
private readonly DataStreamsAggregator _aggregator;
3235
private readonly IDiscoveryService _discoveryService;
3336
private readonly IDataStreamsApi _api;
3437
private readonly bool _isInDefaultState;
3538

36-
private readonly TaskCompletionSource<bool> _processExit = new(TaskCreationOptions.RunContinuationsAsynchronously);
39+
private readonly TaskCompletionSource<bool> _processExit = new(TaskOptions);
3740
private readonly SemaphoreSlim _flushSemaphore = new(1, 1);
3841
private MemoryStream? _serializationBuffer;
3942
private long _pointsDropped;
40-
private int _flushRequested;
4143
private Task? _processTask;
44+
private Task? _flushTask;
4245
private Timer? _flushTimer;
43-
private TaskCompletionSource<bool>? _currentFlushTcs;
46+
private TaskCompletionSource<bool> _forceFlush = new(TaskOptions);
4447

4548
private int _isSupported = SupportState.Unknown;
4649
private bool _isInitialized;
@@ -93,8 +96,12 @@ private void Initialize()
9396
return;
9497
}
9598

96-
_processTask = Task.Run(ProcessQueueLoopAsync);
99+
_processTask = Task.Factory.StartNew(ProcessQueueLoop, TaskCreationOptions.LongRunning);
97100
_processTask.ContinueWith(t => Log.Error(t.Exception, "Error in processing task"), TaskContinuationOptions.OnlyOnFaulted);
101+
102+
_flushTask = Task.Run(FlushTaskLoopAsync);
103+
_flushTask.ContinueWith(t => Log.Error(t.Exception, "Error in data streams flush task"), TaskContinuationOptions.OnlyOnFaulted);
104+
98105
_flushTimer = new Timer(
99106
x => ((DataStreamsWriter)x!).RequestFlush(),
100107
this,
@@ -169,67 +176,106 @@ private async Task FlushAndCloseAsync()
169176
return;
170177
}
171178

172-
// request a final flush - as the _processExit flag is now set
173-
// this ensures we will definitely flush all the stats
174-
// (and sets the mutex if it isn't already set)
175-
RequestFlush();
176-
177-
// wait for the processing loop to complete
178179
var completedTask = await Task.WhenAny(
179180
_processTask,
180-
Task.Delay(TimeSpan.FromSeconds(20)))
181+
Task.Delay(TimeSpan.FromSeconds(1)))
181182
.ConfigureAwait(false);
182183

183184
if (completedTask != _processTask)
184185
{
185186
Log.Error("Could not flush all data streams stats before process exit");
186187
}
188+
189+
await FlushAsync().ConfigureAwait(false);
187190
}
188191

189-
public async Task FlushAsync()
192+
private void RequestFlush()
190193
{
191-
await _flushSemaphore.WaitAsync().ConfigureAwait(false);
192-
try
194+
_forceFlush.TrySetResult(true);
195+
}
196+
197+
private async Task FlushTaskLoopAsync()
198+
{
199+
Task[] tasks = new Task[2];
200+
tasks[0] = _processTask!;
201+
tasks[1] = _forceFlush.Task;
202+
203+
while (true)
193204
{
194-
var timeout = TimeSpan.FromSeconds(5);
205+
await Task.WhenAny(tasks).ConfigureAwait(false);
195206

196-
if (_processExit.Task.IsCompleted)
207+
if (_forceFlush.Task.IsCompleted)
197208
{
198-
return;
209+
_forceFlush = new TaskCompletionSource<bool>(TaskOptions);
210+
tasks[1] = _forceFlush.Task;
199211
}
200212

201-
if (!Volatile.Read(ref _isInitialized) || _processTask == null)
213+
await FlushAggregatorAsync().ConfigureAwait(false);
214+
215+
if (_processTask!.IsCompleted)
202216
{
203217
return;
204218
}
219+
}
220+
}
205221

206-
var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
207-
Interlocked.Exchange(ref _currentFlushTcs, tcs);
208-
209-
RequestFlush();
222+
public async Task FlushAsync()
223+
{
224+
if (!Volatile.Read(ref _isInitialized) || _processTask == null)
225+
{
226+
return;
227+
}
210228

211-
var completedTask = await Task.WhenAny(
212-
tcs.Task,
213-
_processExit.Task,
214-
Task.Delay(timeout)).ConfigureAwait(false);
229+
if (await _flushSemaphore.WaitAsync(TimeSpan.FromMilliseconds(100)).ConfigureAwait(false))
230+
{
231+
try
232+
{
233+
while (_buffer.TryDequeue(out var statsPoint))
234+
{
235+
_aggregator.Add(in statsPoint);
236+
}
215237

216-
if (completedTask != tcs.Task)
238+
while (_backlogBuffer.TryDequeue(out var backlogPoint))
239+
{
240+
_aggregator.AddBacklog(in backlogPoint);
241+
}
242+
}
243+
catch (Exception ex)
244+
{
245+
Log.Error(ex, "An error occured while processing data streams buffers");
246+
}
247+
finally
217248
{
218-
Log.Error("Data streams flush timeout after {Timeout}ms", timeout.TotalMilliseconds);
249+
_flushSemaphore.Release();
219250
}
220251
}
252+
253+
await FlushAggregatorAsync().ConfigureAwait(false);
254+
}
255+
256+
private async Task FlushAggregatorAsync()
257+
{
258+
if (!await _flushSemaphore.WaitAsync(_flushSemaphoreWaitTime).ConfigureAwait(false))
259+
{
260+
Log.Warning("Data streams flush timeout");
261+
return;
262+
}
263+
264+
try
265+
{
266+
await WriteToApiAsync().ConfigureAwait(false);
267+
FlushComplete?.Invoke(this, EventArgs.Empty);
268+
}
269+
catch (Exception ex)
270+
{
271+
Log.Error(ex, "An error occured while writing data streams");
272+
}
221273
finally
222274
{
223-
_currentFlushTcs = null;
224275
_flushSemaphore.Release();
225276
}
226277
}
227278

228-
private void RequestFlush()
229-
{
230-
Interlocked.Exchange(ref _flushRequested, 1);
231-
}
232-
233279
private async Task WriteToApiAsync()
234280
{
235281
// This method blocks ingestion of new stats points into the aggregator,
@@ -268,11 +314,22 @@ private async Task WriteToApiAsync()
268314
}
269315
}
270316

271-
private async Task ProcessQueueLoopAsync()
317+
private void ProcessQueueLoop()
272318
{
273-
var isFinalFlush = false;
274319
while (true)
275320
{
321+
Thread.Sleep(_waitTimeSpan);
322+
323+
if (!_flushSemaphore.Wait(_flushSemaphoreWaitTime))
324+
{
325+
if (_processExit.Task.IsCompleted)
326+
{
327+
return;
328+
}
329+
330+
continue;
331+
}
332+
276333
try
277334
{
278335
while (_buffer.TryDequeue(out var statsPoint))
@@ -284,40 +341,19 @@ private async Task ProcessQueueLoopAsync()
284341
{
285342
_aggregator.AddBacklog(in backlogPoint);
286343
}
287-
288-
var flushRequested = Interlocked.CompareExchange(ref _flushRequested, 0, 1);
289-
if (flushRequested == 1)
290-
{
291-
await WriteToApiAsync().ConfigureAwait(false);
292-
var currentFlushTcs = Volatile.Read(ref _currentFlushTcs);
293-
currentFlushTcs?.TrySetResult(true);
294-
FlushComplete?.Invoke(this, EventArgs.Empty);
295-
}
296344
}
297345
catch (Exception ex)
298346
{
299-
Log.Error(ex, "An error occured in the processing thread");
347+
Log.Error(ex, "An error occured in the data streams processing thread");
300348
}
301-
302-
if (_processExit.Task.IsCompleted)
349+
finally
303350
{
304-
if (isFinalFlush)
305-
{
306-
return;
307-
}
308-
309-
// do one more loop to make sure everything is flushed
310-
RequestFlush();
311-
isFinalFlush = true;
312-
continue;
351+
_flushSemaphore.Release();
313352
}
314353

315-
// The logic is copied from https://github.com/dotnet/runtime/blob/main/src/libraries/Common/tests/System/Threading/Tasks/TaskTimeoutExtensions.cs#L26
316-
// and modified to avoid dealing with exceptions
317-
var tcs = new TaskCompletionSource<bool>();
318-
using (new Timer(s => ((TaskCompletionSource<bool>)s!).SetResult(true), tcs, _waitTimeSpan, Timeout.InfiniteTimeSpan))
354+
if (_processExit.Task.IsCompleted)
319355
{
320-
await Task.WhenAny(_processExit.Task, tcs.Task).ConfigureAwait(false);
356+
return;
321357
}
322358
}
323359
}

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ public void WhenEnabled_SetCheckpoint_SetsSpanTags()
218218
var dsm = GetDataStreamManager(true, out _);
219219
var span = new Span(new SpanContext(traceId: 123, spanId: 456), DateTimeOffset.UtcNow);
220220

221-
span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
221+
span.SetDataStreamsCheckpoint(dsm, CheckpointKind.Produce, new[] { "direction:out" }, 100, 0);
222222
span.Tags.GetTag("pathway.hash").Should().NotBeNull();
223223
}
224224

0 commit comments

Comments
 (0)