diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/SpanFinishWithStatsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/SpanFinishWithStatsBenchmark.java new file mode 100644 index 00000000000..a44c9e3fcaa --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/SpanFinishWithStatsBenchmark.java @@ -0,0 +1,155 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.MICROSECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.monitor.HealthMetrics; +import datadog.trace.util.Strings; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Measures the foreground thread cost of publishing span stats. With the background-stats + * optimization, the foreground thread should only extract lightweight SpanStatsData and offer to + * the inbox queue, while the expensive MetricKey construction and HashMap operations happen on the + * background aggregator thread. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 3, time = 5, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 5, timeUnit = SECONDS) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(MICROSECONDS) +@Fork(value = 1) +public class SpanFinishWithStatsBenchmark { + + private static final Set PEER_TAGS = Collections.singleton("peer.hostname"); + + private final DDAgentFeaturesDiscovery featuresDiscovery = + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + PEER_TAGS, Collections.emptySet()); + + private ConflatingMetricsAggregator aggregator; + + private final List> smallTrace = generateTrace(4); + private final List> mediumTrace = generateTrace(16); + private final List> largeTrace = generateTrace(64); + + @Setup(Level.Trial) + public void setup() { + aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + featuresDiscovery, + HealthMetrics.NO_OP, + new NullSink(), + 2048, + 2048, + false); + aggregator.start(); + } + + @TearDown(Level.Trial) + public void teardown() { + if (aggregator != null) { + aggregator.close(); + } + } + + static List> generateTrace(int len) { + final List> trace = new ArrayList<>(); + for (int i = 0; i < len; i++) { + SimpleSpan span = new SimpleSpan("", "", "", "", true, true, false, 0, 10, -1); + span.setTag("peer.hostname", Strings.random(10)); + trace.add(span); + } + return trace; + } + + static class NullSink implements Sink { + @Override + public void register(EventListener listener) {} + + @Override + public void accept(int messageCount, ByteBuffer buffer) {} + } + + @Benchmark + public void publishSmallTrace(Blackhole blackhole) { + blackhole.consume(aggregator.publish(smallTrace)); + } + + @Benchmark + public void publishMediumTrace(Blackhole blackhole) { + blackhole.consume(aggregator.publish(mediumTrace)); + } + + @Benchmark + public void publishLargeTrace(Blackhole blackhole) { + blackhole.consume(aggregator.publish(largeTrace)); + } + + /** Multi-threaded benchmark to measure contention under concurrent publishing. */ + @State(Scope.Benchmark) + @Warmup(iterations = 3, time = 5, timeUnit = SECONDS) + @Measurement(iterations = 5, time = 5, timeUnit = SECONDS) + @BenchmarkMode(Mode.Throughput) + @OutputTimeUnit(MICROSECONDS) + @Threads(8) + @Fork(value = 1) + public static class ConcurrentPublish { + + private ConflatingMetricsAggregator aggregator; + private final List> trace = generateTrace(16); + + @Setup(Level.Trial) + public void setup() { + DDAgentFeaturesDiscovery features = + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + PEER_TAGS, Collections.emptySet()); + aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + features, + HealthMetrics.NO_OP, + new NullSink(), + 2048, + 2048, + false); + aggregator.start(); + } + + @TearDown(Level.Trial) + public void teardown() { + if (aggregator != null) { + aggregator.close(); + } + } + + @Benchmark + public void publishConcurrent(Blackhole blackhole) { + blackhole.consume(aggregator.publish(trace)); + } + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java index 478ff520a37..8a8b7225e8f 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java @@ -46,6 +46,28 @@ public AggregateMetric recordDurations(int count, AtomicLongArray durations) { return this; } + /** + * Record a single duration value with embedded tags. Called from the background aggregator thread + * when processing SpanStatsData (no Batch intermediary needed since the aggregation is + * single-threaded). + */ + public void recordDuration(long taggedDuration) { + this.hitCount++; + long duration = taggedDuration; + if ((duration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + duration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((duration & ERROR_TAG) == ERROR_TAG) { + duration ^= ERROR_TAG; + errorLatencies.accept(duration); + ++errorCount; + } else { + okLatencies.accept(duration); + } + this.duration += duration; + } + public int getErrorCount() { return errorCount; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index dd87406ce7f..714f67faf8b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,15 +1,32 @@ package datadog.trace.common.metrics; +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; +import static java.util.Collections.unmodifiableSet; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.api.cache.DDCaches; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.StopSignal; +import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,12 +37,32 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); - private final MessagePassingQueue batchPool; + static final Set ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = + unmodifiableSet( + new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); + + private static final DDCache SERVICE_NAMES = + DDCaches.newFixedSizeCache(32); + + private static final DDCache SPAN_KINDS = + DDCaches.newFixedSizeCache(16); + private static final DDCache< + String, Pair, Function>> + PEER_TAGS_CACHE = DDCaches.newFixedSizeCache(64); + private static final Function< + String, Pair, Function>> + PEER_TAGS_CACHE_ADDER = + key -> + Pair.of( + DDCaches.newFixedSizeCache(512), + value -> UTF8BytesString.create(key + ":" + value)); + private final MessagePassingQueue inbox; private final LRUCache aggregates; - private final ConcurrentMap pending; - private final Set commonKeys; + // Downgraded from ConcurrentHashMap: only accessed on the aggregator thread + private final HashMap keys; private final MetricWriter writer; + private final HealthMetrics healthMetrics; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be // buffered by OkHttpSink) @@ -40,45 +77,43 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, - TimeUnit reportingIntervalTimeUnit) { + TimeUnit reportingIntervalTimeUnit, + HealthMetrics healthMetrics, + DDAgentFeaturesDiscovery features, + boolean includeEndpointInMetrics) { this( writer, - batchPool, inbox, - pending, - commonKeys, maxAggregates, reportingInterval, reportingIntervalTimeUnit, - DEFAULT_SLEEP_MILLIS); + DEFAULT_SLEEP_MILLIS, + healthMetrics, + features, + includeEndpointInMetrics); } Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, - long sleepMillis) { + long sleepMillis, + HealthMetrics healthMetrics, + DDAgentFeaturesDiscovery features, + boolean includeEndpointInMetrics) { this.writer = writer; - this.batchPool = batchPool; this.inbox = inbox; - this.commonKeys = commonKeys; + this.keys = new HashMap<>(); this.aggregates = - new LRUCache<>( - new CommonKeyCleaner(commonKeys), maxAggregates * 4 / 3, 0.75f, maxAggregates); - this.pending = pending; + new LRUCache<>(new CommonKeyCleaner(keys), maxAggregates * 4 / 3, 0.75f, maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; + this.healthMetrics = healthMetrics; } public void clearAggregates() { @@ -122,20 +157,73 @@ public void accept(InboxItem item) { } else { signal.ignore(); } - } else if (item instanceof Batch && !stopped) { - Batch batch = (Batch) item; - MetricKey key = batch.getKey(); - // important that it is still *this* batch pending, must not remove otherwise - pending.remove(key, batch); - AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - batch.contributeTo(aggregate); - dirty = true; - // return the batch for reuse - batchPool.offer(batch); + } else if (item instanceof TraceStatsData && !stopped) { + processTraceStats((TraceStatsData) item); } } } + /** Process all span stats from a trace on the background thread. */ + private void processTraceStats(TraceStatsData traceData) { + int counted = traceData.spans.length; + for (SpanStatsData spanData : traceData.spans) { + publishSpan(spanData); + } + healthMetrics.onClientStatTraceComputed(counted, traceData.totalSpanCount, !traceData.hasError); + } + + /** + * Construct MetricKey from SpanStatsData and aggregate -- all on the background thread. This is + * the expensive work that was previously done on the foreground span-finish thread. + */ + private void publishSpan(SpanStatsData span) { + List peerTags = buildPeerTags(span.peerTagValues); + + MetricKey newKey = + new MetricKey( + span.resourceName, + SERVICE_NAMES.computeIfAbsent(span.serviceName, UTF8_ENCODE), + span.operationName, + span.serviceNameSource, + span.spanType, + span.httpStatusCode, + span.synthetic, + span.traceRoot, + SPAN_KINDS.computeIfAbsent(span.spanKind, UTF8BytesString::create), + peerTags, + span.httpMethod, + span.httpEndpoint, + span.grpcStatusCode); + MetricKey key = keys.putIfAbsent(newKey, newKey); + if (null == key) { + key = newKey; + } + long tag = + (span.error > 0 ? AggregateMetric.ERROR_TAG : 0L) + | (span.topLevel ? AggregateMetric.TOP_LEVEL_TAG : 0L); + long durationNanos = span.durationNano; + + AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); + aggregate.recordDuration(tag | durationNanos); + dirty = true; + } + + /** Build UTF8BytesString peer tags from the flat [name, value, name, value, ...] array. */ + private static List buildPeerTags(Object[] peerTagValues) { + if (peerTagValues == null || peerTagValues.length == 0) { + return Collections.emptyList(); + } + List peerTags = new ArrayList<>(peerTagValues.length / 2); + for (int i = 0; i < peerTagValues.length; i += 2) { + String tagName = (String) peerTagValues[i]; + String tagValue = (String) peerTagValues[i + 1]; + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(tagName, PEER_TAGS_CACHE_ADDER); + peerTags.add(cacheAndCreator.getLeft().computeIfAbsent(tagValue, cacheAndCreator.getRight())); + } + return peerTags; + } + private void report(long when, SignalItem signal) { boolean skipped = true; if (dirty) { @@ -170,7 +258,7 @@ private void expungeStaleAggregates() { AggregateMetric metric = pair.getValue(); if (metric.getHitCount() == 0) { it.remove(); - commonKeys.remove(pair.getKey()); + keys.remove(pair.getKey()); } } } @@ -182,15 +270,15 @@ private long wallClockTime() { private static final class CommonKeyCleaner implements LRUCache.ExpiryListener { - private final Set commonKeys; + private final Map keys; - private CommonKeyCleaner(Set commonKeys) { - this.commonKeys = commonKeys; + private CommonKeyCleaner(Map keys) { + this.keys = keys; } @Override public void accept(Map.Entry expired) { - commonKeys.remove(expired.getKey()); + keys.remove(expired.getKey()); } } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 010993efe50..45ab5b649ff 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -2,18 +2,13 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT; import static datadog.trace.api.DDSpanTypes.RPC; -import static datadog.trace.api.DDTags.BASE_SERVICE; -import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CONSUMER; -import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_INTERNAL; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_PRODUCER; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_SERVER; -import static datadog.trace.common.metrics.AggregateMetric.ERROR_TAG; -import static datadog.trace.common.metrics.AggregateMetric.TOP_LEVEL_TAG; import static datadog.trace.common.metrics.SignalItem.ReportSignal.REPORT; import static datadog.trace.common.metrics.SignalItem.StopSignal.STOP; import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; @@ -26,19 +21,14 @@ import datadog.communication.ddagent.DDAgentFeaturesDiscovery; import datadog.communication.ddagent.SharedCommunicationObjects; import datadog.trace.api.Config; -import datadog.trace.api.Pair; import datadog.trace.api.WellKnownTags; -import datadog.trace.api.cache.DDCache; -import datadog.trace.api.cache.DDCaches; import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags; -import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.ReportSignal; import datadog.trace.common.writer.ddagent.DDAgentApi; import datadog.trace.core.CoreSpan; import datadog.trace.core.DDTraceCoreInfo; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -46,10 +36,8 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.function.Function; import javax.annotation.Nonnull; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; @@ -62,24 +50,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Map DEFAULT_HEADERS = Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final DDCache SERVICE_NAMES = - DDCaches.newFixedSizeCache(32); - - private static final DDCache SPAN_KINDS = - DDCaches.newFixedSizeCache(16); - private static final DDCache< - String, Pair, Function>> - PEER_TAGS_CACHE = - DDCaches.newFixedSizeCache( - 64); // it can be unbounded since those values are returned by the agent and should be - // under control. 64 entries is enough in this case to contain all the peer tags. - private static final Function< - String, Pair, Function>> - PEER_TAGS_CACHE_ADDER = - key -> - Pair.of( - DDCaches.newFixedSizeCache(512), - value -> UTF8BytesString.create(key + ":" + value)); private static final CharSequence SYNTHETICS_ORIGIN = "synthetics"; private static final Set ELIGIBLE_SPAN_KINDS_FOR_METRICS = @@ -88,14 +58,7 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve Arrays.asList( SPAN_KIND_SERVER, SPAN_KIND_CLIENT, SPAN_KIND_CONSUMER, SPAN_KIND_PRODUCER))); - private static final Set ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION = - unmodifiableSet( - new HashSet<>(Arrays.asList(SPAN_KIND_CLIENT, SPAN_KIND_PRODUCER, SPAN_KIND_CONSUMER))); - private final Set ignoredResources; - private final MessagePassingQueue batchPool; - private final ConcurrentHashMap pending; - private final ConcurrentHashMap keys; private final Thread thread; private final MessagePassingQueue inbox; private final Sink sink; @@ -189,22 +152,19 @@ public ConflatingMetricsAggregator( this.ignoredResources = ignoredResources; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); - this.batchPool = Queues.spmcArrayQueue(maxAggregates); - this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3); - this.keys = new ConcurrentHashMap<>(); this.features = features; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = new Aggregator( metricWriter, - batchPool, inbox, - pending, - keys.keySet(), maxAggregates, reportingInterval, - timeUnit); + timeUnit, + healthMetric, + features, + includeEndpointInMetrics); this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator); this.reportingInterval = reportingInterval; this.reportingIntervalTimeUnit = timeUnit; @@ -284,8 +244,11 @@ public Future forceReport() { @Override public boolean publish(List> trace) { boolean forceKeep = false; - int counted = 0; if (features.supportsMetrics()) { + // Pre-size to trace size; most spans will be eligible + SpanStatsData[] buffer = new SpanStatsData[trace.size()]; + int counted = 0; + boolean hasError = false; for (CoreSpan span : trace) { boolean isTopLevel = span.isTopLevel(); final CharSequence spanKind = span.unsafeGetTag(SPAN_KIND, ""); @@ -294,13 +257,33 @@ public boolean publish(List> trace) { if (resourceName != null && ignoredResources.contains(resourceName.toString())) { // skip publishing all children forceKeep = false; + counted = 0; break; } - counted++; - forceKeep |= publish(span, isTopLevel, spanKind); + int error = span.getError(); + if (error > 0) { + forceKeep = true; + hasError = true; + } + buffer[counted++] = extractSpanData(span, isTopLevel, spanKind); } } - healthMetrics.onClientStatTraceComputed(counted, trace.size(), !forceKeep); + if (counted > 0) { + SpanStatsData[] spans; + if (counted == buffer.length) { + spans = buffer; + } else { + spans = new SpanStatsData[counted]; + System.arraycopy(buffer, 0, spans, 0, counted); + } + TraceStatsData traceData = new TraceStatsData(spans, trace.size(), hasError); + inbox.offer(traceData); + } else { + // Nothing counted -- still report to health metrics on background thread, + // but avoid allocating spans array + TraceStatsData traceData = new TraceStatsData(new SpanStatsData[0], trace.size(), false); + inbox.offer(traceData); + } } return forceKeep; } @@ -317,8 +300,12 @@ private boolean spanKindEligible(@Nonnull CharSequence spanKind) { return ELIGIBLE_SPAN_KINDS_FOR_METRICS.contains(spanKind.toString()); } - private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanKind) { - // Extract HTTP method and endpoint only if the feature is enabled + /** + * Extract lightweight data from a span on the foreground thread. Only reads cheap volatile/final + * fields and tag lookups. The expensive MetricKey construction happens on the background thread. + */ + private SpanStatsData extractSpanData( + CoreSpan span, boolean isTopLevel, CharSequence spanKind) { String httpMethod = null; String httpEndpoint = null; if (includeEndpointInMetrics) { @@ -334,96 +321,68 @@ private boolean publish(CoreSpan span, boolean isTopLevel, CharSequence spanK Object grpcStatusObj = span.unsafeGetTag(InstrumentationTags.GRPC_STATUS_CODE); grpcStatusCode = grpcStatusObj != null ? grpcStatusObj.toString() : null; } - MetricKey newKey = - new MetricKey( - span.getResourceName(), - SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE), - span.getOperationName(), - span.getServiceNameSource(), - spanType, - span.getHttpStatusCode(), - isSynthetic(span), - span.getParentId() == 0, - SPAN_KINDS.computeIfAbsent( - spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span, spanKind.toString()), - httpMethod, - httpEndpoint, - grpcStatusCode); - MetricKey key = keys.putIfAbsent(newKey, newKey); - if (null == key) { - key = newKey; - } - long tag = (span.getError() > 0 ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); - long durationNanos = span.getDurationNano(); - Batch batch = pending.get(key); - if (null != batch) { - // there is a pending batch, try to win the race to add to it - // returning false means that either the batch can't take any - // more data, or it has already been consumed - if (batch.add(tag, durationNanos)) { - // added to a pending batch prior to consumption, - // so skip publishing to the queue (we also know - // the key isn't rare enough to override the sampler) - return false; - } - // recycle the older key - key = batch.getKey(); - } - batch = newBatch(key); - batch.add(tag, durationNanos); - // overwrite the last one if present, it was already full - // or had been consumed by the time we tried to add to it - pending.put(key, batch); - // must offer to the queue after adding to pending - inbox.offer(batch); - // force keep keys if there are errors - return span.getError() > 0; + + // Extract peer tag values as raw objects -- the background thread will resolve them + Object[] peerTagValues = extractPeerTagValues(span, spanKind.toString()); + + return new SpanStatsData( + span.getResourceName(), + span.getServiceName(), + span.getOperationName(), + span.getServiceNameSource(), + spanType, + spanKind, + span.getHttpStatusCode(), + isSynthetic(span), + span.getParentId() == 0, + span.getError(), + isTopLevel, + span.getDurationNano(), + peerTagValues, + httpMethod, + httpEndpoint, + grpcStatusCode); } - private List getPeerTags(CoreSpan span, String spanKind) { - if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { + /** + * Extract peer tag values as a flat array of [tagName, tagValue, tagName, tagValue, ...]. This + * avoids building UTF8BytesString or doing cache lookups on the foreground thread. + */ + private Object[] extractPeerTagValues(CoreSpan span, String spanKind) { + if (Aggregator.ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION.contains(spanKind)) { final Set eligiblePeerTags = features.peerTags(); - List peerTags = new ArrayList<>(eligiblePeerTags.size()); + // Worst case: 2 entries per peer tag (name + value) + Object[] buffer = new Object[eligiblePeerTags.size() * 2]; + int idx = 0; for (String peerTag : eligiblePeerTags) { Object value = span.unsafeGetTag(peerTag); if (value != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - peerTags.add( - cacheAndCreator - .getLeft() - .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); + buffer[idx++] = peerTag; + buffer[idx++] = value.toString(); } } - return peerTags; - } else if (SPAN_KIND_INTERNAL.equals(spanKind)) { - // in this case only the base service should be aggregated if present - final Object baseService = span.unsafeGetTag(BASE_SERVICE); + if (idx == 0) { + return null; + } + if (idx < buffer.length) { + Object[] result = new Object[idx]; + System.arraycopy(buffer, 0, result, 0, idx); + return result; + } + return buffer; + } else if ("internal".equals(spanKind)) { + final Object baseService = span.unsafeGetTag("_dd.base_service"); if (baseService != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); - return Collections.singletonList( - cacheAndCreator - .getLeft() - .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); + return new Object[] {"_dd.base_service", baseService.toString()}; } } - return Collections.emptyList(); + return null; } private static boolean isSynthetic(CoreSpan span) { return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } - private Batch newBatch(MetricKey key) { - Batch batch = batchPool.poll(); - if (null == batch) { - return new Batch(key); - } - return batch.reset(key); - } - public void stop() { if (null != cancellation) { cancellation.cancel(); @@ -466,8 +425,6 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.pending.clear(); - this.batchPool.clear(); this.inbox.clear(); this.aggregator.clearAggregates(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanStatsData.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanStatsData.java new file mode 100644 index 00000000000..843683fb677 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanStatsData.java @@ -0,0 +1,61 @@ +package datadog.trace.common.metrics; + +/** + * Immutable DTO carrying the minimal data needed from a CoreSpan for MetricKey construction on the + * background aggregator thread. All fields are extracted from simple span getters (cheap + * volatile/final field reads) on the foreground thread. + */ +final class SpanStatsData { + final CharSequence resourceName; + final String serviceName; + final CharSequence operationName; + final CharSequence serviceNameSource; + final CharSequence spanType; + final CharSequence spanKind; + final short httpStatusCode; + final boolean synthetic; + final boolean traceRoot; + final int error; + final boolean topLevel; + final long durationNano; + // Flat array of peer tag values (already resolved to UTF8BytesString via caches) + final Object[] peerTagValues; + final String httpMethod; + final String httpEndpoint; + final String grpcStatusCode; + + SpanStatsData( + CharSequence resourceName, + String serviceName, + CharSequence operationName, + CharSequence serviceNameSource, + CharSequence spanType, + CharSequence spanKind, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + int error, + boolean topLevel, + long durationNano, + Object[] peerTagValues, + String httpMethod, + String httpEndpoint, + String grpcStatusCode) { + this.resourceName = resourceName; + this.serviceName = serviceName; + this.operationName = operationName; + this.serviceNameSource = serviceNameSource; + this.spanType = spanType; + this.spanKind = spanKind; + this.httpStatusCode = httpStatusCode; + this.synthetic = synthetic; + this.traceRoot = traceRoot; + this.error = error; + this.topLevel = topLevel; + this.durationNano = durationNano; + this.peerTagValues = peerTagValues; + this.httpMethod = httpMethod; + this.httpEndpoint = httpEndpoint; + this.grpcStatusCode = grpcStatusCode; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/TraceStatsData.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TraceStatsData.java new file mode 100644 index 00000000000..74da363d4cb --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/TraceStatsData.java @@ -0,0 +1,18 @@ +package datadog.trace.common.metrics; + +/** + * InboxItem that carries extracted span data from the foreground thread to the background + * aggregator thread. The expensive MetricKey construction, HashMap operations, and Batch management + * happen on the background thread after receiving this item. + */ +final class TraceStatsData implements InboxItem { + final SpanStatsData[] spans; + final int totalSpanCount; + final boolean hasError; + + TraceStatsData(SpanStatsData[] spans, int totalSpanCount, boolean hasError) { + this.spans = spans; + this.totalSpanCount = totalSpanCount; + this.hasError = hasError; + } +}