Skip to content

Commit ebf3550

Browse files
authored
Refactoring to support DSM default state (#7082)
## Summary of changes This PR adds undefined state to `DATA_STREAMS_ENABLED` flag. The end-user behavior remains unchanged, as `undefined` defaults to `false` for now. This will change in followup PRs when DSM is enabled by default. ## Reason for change See details in [this](https://docs.google.com/document/d/111QQyLxz5q5G3wccj5Q2Zu7e3NemlXDSt6HcchA2LKU/edit?tab=t.0) doc. ## Implementation details DSM state is now represented by enum. The heaviest parts of DSM logic will be now disabled if DSM is in the default (undefined) state. ## Test coverage All existing tests were updated to support the change. ## Other details [Jira](https://datadoghq.atlassian.net/browse/DSMON-777)
1 parent dc5795e commit ebf3550

File tree

10 files changed

+44
-16
lines changed

10 files changed

+44
-16
lines changed

tracer/src/Datadog.Trace/Activity/Handlers/AzureServiceBusActivityHandler.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ public void ActivityStopped<T>(string sourceName, T activity)
6464

6565
string namespaceString = span.Tags.GetTag("messaging.destination.name");
6666
long? payloadSize = null;
67-
if (AzureServiceBusCommon.TryGetMessage(applicationProperties, out var message)
68-
&& message.TryDuckCast<IServiceBusMessage>(out var serviceBusMessage))
67+
if (!dataStreamsManager.IsInDefaultState && (AzureServiceBusCommon.TryGetMessage(applicationProperties, out var message)
68+
&& message.TryDuckCast<IServiceBusMessage>(out var serviceBusMessage)))
6969
{
7070
payloadSize = AzureServiceBusCommon.GetMessageSize(serviceBusMessage);
7171
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Azure/ServiceBus/ProcessMessageIntegration.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ internal static CallTargetState OnMethodBegin<TTarget, TMessage>(TTarget instanc
8585
var edgeTags = string.IsNullOrEmpty(namespaceString)
8686
? new[] { "direction:in", "type:servicebus" }
8787
: new[] { "direction:in", $"topic:{namespaceString}", "type:servicebus" };
88-
88+
var msgSize = dataStreamsManager.IsInDefaultState ? 0 : AzureServiceBusCommon.GetMessageSize(message);
8989
span.SetDataStreamsCheckpoint(
9090
dataStreamsManager,
9191
CheckpointKind.Consume,
9292
edgeTags,
93-
AzureServiceBusCommon.GetMessageSize(message),
93+
msgSize,
9494
(long)messageQueueTimeMs,
9595
pathwayContext);
9696
}

tracer/src/Datadog.Trace/ClrProfiler/AutoInstrumentation/Kafka/KafkaHelper.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ private static long GetMessageSize<T>(T message)
252252
dataStreamsManager,
253253
CheckpointKind.Consume,
254254
edgeTags,
255-
message is null ? 0 : GetMessageSize(message),
255+
message is null || dataStreamsManager.IsInDefaultState ? 0 : GetMessageSize(message),
256256
tags.MessageQueueTimeMs == null ? 0 : (long)tags.MessageQueueTimeMs,
257257
pathwayContext);
258258

@@ -341,8 +341,9 @@ internal static void TryInjectHeaders<TTopicPartitionMarker, TMessage>(
341341
var edgeTags = string.IsNullOrEmpty(topic)
342342
? defaultProduceEdgeTags
343343
: new[] { "direction:out", $"topic:{topic}", "type:kafka" };
344+
var msgSize = dataStreamsManager.IsInDefaultState ? 0 : GetMessageSize(message);
344345
// produce is always the start of the edge, so defaultEdgeStartMs is always 0
345-
span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, GetMessageSize(message), 0);
346+
span.SetDataStreamsCheckpoint(dataStreamsManager, CheckpointKind.Produce, edgeTags, msgSize, 0);
346347
dataStreamsManager.InjectPathwayContext(span.Context.PathwayContext, adapter);
347348
}
348349
}

tracer/src/Datadog.Trace/Configuration/TracerSettings.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ public record TracerSettings
5252
private readonly bool _traceEnabled;
5353
private readonly bool _apmTracingEnabled;
5454
private readonly bool _isDataStreamsMonitoringEnabled;
55+
private readonly bool _isDataStreamsMonitoringInDefaultState;
5556
private readonly ReadOnlyDictionary<string, string> _headerTags;
5657
private readonly ReadOnlyDictionary<string, string> _serviceNameMappings;
5758
private readonly ReadOnlyDictionary<string, string> _globalTags;
@@ -651,6 +652,9 @@ _ when x.ToBoolean() is { } boolean => boolean,
651652
_isDataStreamsMonitoringEnabled = config
652653
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.Enabled)
653654
.AsBool(false);
655+
_isDataStreamsMonitoringInDefaultState = config
656+
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.Enabled)
657+
.AsBool() == null;
654658

655659
IsDataStreamsLegacyHeadersEnabled = config
656660
.WithKeys(ConfigurationKeys.DataStreamsMonitoring.LegacyHeadersEnabled)
@@ -1184,6 +1188,11 @@ public bool DiagnosticSourceEnabled
11841188
/// </summary>
11851189
internal bool IsDataStreamsMonitoringEnabled => DynamicSettings.DataStreamsMonitoringEnabled ?? _isDataStreamsMonitoringEnabled;
11861190

1191+
/// <summary>
1192+
/// Gets a value indicating whether data streams configuration is present or not (set to true or false).
1193+
/// </summary>
1194+
internal bool IsDataStreamsMonitoringInDefaultState => DynamicSettings.DataStreamsMonitoringEnabled == null && _isDataStreamsMonitoringInDefaultState;
1195+
11871196
/// <summary>
11881197
/// Gets a value indicating whether to inject legacy binary headers for Data Streams.
11891198
/// </summary>

tracer/src/Datadog.Trace/DataStreamsMonitoring/Aggregation/DataStreamsMessagePackFormatter.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ internal class DataStreamsMessagePackFormatter
2020
private readonly byte[] _environmentValueBytes;
2121
private readonly byte[] _serviceBytes = StringEncoding.UTF8.GetBytes("Service");
2222
private readonly long _productMask;
23+
private readonly bool _isInDefaultState;
2324

2425
private readonly byte[] _serviceValueBytes;
2526

@@ -48,6 +49,7 @@ internal class DataStreamsMessagePackFormatter
4849
private readonly byte[] _backlogTagsBytes = StringEncoding.UTF8.GetBytes("Tags");
4950
private readonly byte[] _backlogValueBytes = StringEncoding.UTF8.GetBytes("Value");
5051
private readonly byte[] _productMaskBytes = StringEncoding.UTF8.GetBytes("ProductMask");
52+
private readonly byte[] _isInDefaultStateBytes = StringEncoding.UTF8.GetBytes("IsInDefaultState");
5153

5254
public DataStreamsMessagePackFormatter(TracerSettings tracerSettings, string defaultServiceName)
5355
{
@@ -59,8 +61,10 @@ public DataStreamsMessagePackFormatter(TracerSettings tracerSettings, string def
5961
: StringEncoding.UTF8.GetBytes(env);
6062
_serviceValueBytes = StringEncoding.UTF8.GetBytes(defaultServiceName);
6163
_productMask = GetProductsMask(tracerSettings);
64+
_isInDefaultState = tracerSettings.IsDataStreamsMonitoringInDefaultState;
6265
}
6366

67+
// should be the same across all languages
6468
[Flags]
6569
private enum Products : long
6670
{
@@ -95,7 +99,7 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
9599
// https://github.com/DataDog/dd-trace-java/blob/a4b7a7b177709e6bdfd9261904cb9a777e4febbe/dd-trace-core/src/main/java/datadog/trace/core/datastreams/MsgPackDatastreamsPayloadWriter.java#L35
96100
// -1 because we don't have a primary tag
97101
// -1 because service name override is not supported
98-
bytesWritten += MessagePackBinary.WriteMapHeader(stream, 6);
102+
bytesWritten += MessagePackBinary.WriteMapHeader(stream, 7);
99103

100104
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _environmentBytes);
101105
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _environmentValueBytes);
@@ -189,6 +193,9 @@ public int Serialize(Stream stream, long bucketDurationNs, List<SerializableStat
189193
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _productMaskBytes);
190194
bytesWritten += MessagePackBinary.WriteInt64(stream, _productMask);
191195

196+
bytesWritten += MessagePackBinary.WriteStringBytes(stream, _isInDefaultStateBytes);
197+
bytesWritten += MessagePackBinary.WriteBoolean(stream, _isInDefaultState);
198+
192199
return bytesWritten;
193200
}
194201

tracer/src/Datadog.Trace/DataStreamsMonitoring/DataStreamsManager.cs

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@
1212
using Datadog.Trace.Configuration;
1313
using Datadog.Trace.DataStreamsMonitoring.Aggregation;
1414
using Datadog.Trace.DataStreamsMonitoring.Hashes;
15-
using Datadog.Trace.ExtensionMethods;
1615
using Datadog.Trace.Headers;
1716
using Datadog.Trace.Logging;
1817
using Datadog.Trace.Vendors.Serilog.Events;
@@ -29,21 +28,26 @@ internal class DataStreamsManager
2928
private readonly ConcurrentDictionary<string, RateLimiter> _schemaRateLimiters = new();
3029
private readonly NodeHashBase _nodeHashBase;
3130
private bool _isEnabled;
31+
private bool _isInDefaultState;
3232
private IDataStreamsWriter? _writer;
3333

3434
public DataStreamsManager(
3535
string? env,
3636
string defaultServiceName,
37-
IDataStreamsWriter? writer)
37+
IDataStreamsWriter? writer,
38+
bool isInDefaultState)
3839
{
3940
// We don't yet support primary tag in .NET yet
4041
_nodeHashBase = HashHelper.CalculateNodeHashBase(defaultServiceName, env, primaryTag: null);
4142
_isEnabled = writer is not null;
4243
_writer = writer;
44+
_isInDefaultState = isInDefaultState;
4345
}
4446

4547
public bool IsEnabled => Volatile.Read(ref _isEnabled);
4648

49+
public bool IsInDefaultState => Volatile.Read(ref _isInDefaultState);
50+
4751
public static DataStreamsManager Create(
4852
TracerSettings settings,
4953
IDiscoveryService discoveryService,
@@ -53,7 +57,7 @@ public static DataStreamsManager Create(
5357
? DataStreamsWriter.Create(settings, discoveryService, defaultServiceName)
5458
: null;
5559

56-
return new DataStreamsManager(settings.Environment, defaultServiceName, writer);
60+
return new DataStreamsManager(settings.Environment, defaultServiceName, writer, settings.IsDataStreamsMonitoringInDefaultState);
5761
}
5862

5963
public async Task DisposeAsync()

tracer/src/Datadog.Trace/DataStreamsMonitoring/SpanExtensions.cs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
// This product includes software developed at Datadog (https://www.datadoghq.com/). Copyright 2017 Datadog, Inc.
44
// </copyright>
55
#nullable enable
6+
using System.Globalization;
7+
68
namespace Datadog.Trace.DataStreamsMonitoring;
79

810
internal static class SpanExtensions
@@ -25,10 +27,13 @@ internal static void SetDataStreamsCheckpoint(this Span span, DataStreamsManager
2527
}
2628

2729
span.Context.SetCheckpoint(manager, checkpointKind, edgeTags, payloadSizeBytes, timeInQueueMs, parent);
28-
var hash = span.Context.PathwayContext?.Hash.Value ?? 0;
29-
if (hash != 0)
30+
if (!manager.IsInDefaultState)
3031
{
31-
span.SetTag("pathway.hash", hash.ToString());
32+
var hash = span.Context.PathwayContext?.Hash.Value ?? 0;
33+
if (hash != 0)
34+
{
35+
span.SetTag("pathway.hash", hash.ToString(CultureInfo.InvariantCulture));
36+
}
3237
}
3338
}
3439
}

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/DataStreamsManagerTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,8 @@ private static DataStreamsManager GetDataStreamManager(bool enabled, out DataStr
303303
return new DataStreamsManager(
304304
env: "foo",
305305
defaultServiceName: "bar",
306-
writer);
306+
writer,
307+
isInDefaultState: false);
307308
}
308309

309310
internal class DataStreamsWriterMock : IDataStreamsWriter

tracer/test/Datadog.Trace.Tests/DataStreamsMonitoring/SpanContextDataStreamsManagerTests.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ private static DataStreamsManager GetEnabledDataStreamManager()
7373
var dsm = new DataStreamsManager(
7474
env: "env",
7575
defaultServiceName: "service",
76-
new Mock<IDataStreamsWriter>().Object);
76+
new Mock<IDataStreamsWriter>().Object,
77+
isInDefaultState: false);
7778
return dsm;
7879
}
7980
}

tracer/test/Datadog.Trace.Tests/TracerManagerFactoryTests.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ private static TracerManager CreateTracerManager(TracerSettings settings)
8383
BuildLogSubmissionManager(),
8484
Mock.Of<ITelemetryController>(),
8585
Mock.Of<IDiscoveryService>(),
86-
new DataStreamsManager("env", "service", Mock.Of<IDataStreamsWriter>()),
86+
new DataStreamsManager("env", "service", Mock.Of<IDataStreamsWriter>(), isInDefaultState: false),
8787
remoteConfigurationManager: null,
8888
dynamicConfigurationManager: null,
8989
tracerFlareManager: null,

0 commit comments

Comments
 (0)