Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import datadog.trace.bootstrap.instrumentation.api.Tags
import datadog.trace.common.writer.ListWriter
import datadog.trace.core.DDSpan
import datadog.trace.core.datastreams.StatsGroup
import datadog.trace.test.util.Flaky
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.KafkaConsumer
Expand Down Expand Up @@ -104,6 +103,32 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}
}

private static class SortBatchKafkaTraces implements Comparator<List<DDSpan>> {
@Override
int compare(List<DDSpan> o1, List<DDSpan> o2) {
return Long.compare(batchSortKey(o1), batchSortKey(o2))
}
}

private static long batchSortKey(List<DDSpan> trace) {
assert !trace.isEmpty()
if (trace.get(0).localRootSpan.operationName.toString() == "parent") {
return Long.MIN_VALUE
}
def deliverSpan = trace.find { it.operationName.toString() == "kafka.deliver" }
return deliverSpan ? deliverSpan.parentId : trace.get(0).parentId
}

private static List<DDSpan> producerSpans(List<List<DDSpan>> traces) {
def producerTrace = traces.find { trace ->
!trace.isEmpty() && trace.get(0).localRootSpan.operationName.toString() == "parent"
}
assert producerTrace != null
return producerTrace
.findAll { it.getTag(Tags.SPAN_KIND) == Tags.SPAN_KIND_PRODUCER }
.sort { it.spanId }
}


static {
PRODUCER_PATHWAY_EDGE_TAGS = new LinkedHashMap<>(3)
Expand Down Expand Up @@ -835,7 +860,6 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
producer.close()
}

@Flaky("Repeatedly fails with a partition set to 1 but expects 0 https://github.com/DataDog/dd-trace-java/issues/3864")
def "test spring kafka template produce and batch consume"() {
setup:
def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString())
Expand All @@ -857,14 +881,14 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
def container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties)
def records = new LinkedBlockingQueue<ConsumerRecord<String, String>>()
container.setupMessageListener(new BatchMessageListener<String, String>() {
@Override
void onMessage(List<ConsumerRecord<String, String>> consumerRecords) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
consumerRecords.each {
records.add(it)
}
@Override
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reformats here and below are probably due to a spotless trigger on commit. I'm not sure what has changed.

void onMessage(List<ConsumerRecord<String, String>> consumerRecords) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
consumerRecords.each {
records.add(it)
}
})
}
})
container.start()
ContainerTestUtils.waitForAssignment(container, embeddedKafka.getPartitionsPerTopic())

Expand All @@ -874,7 +898,8 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
for (g in greetings) {
kafkaTemplate.send(SHARED_TOPIC, g).addCallback({
runUnderTrace("producer callback") {}
}, { ex ->
}, {
ex ->
runUnderTrace("producer exception: " + ex) {}
})
}
Expand All @@ -888,17 +913,31 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {

then:
def receivedSet = greetings.toSet()
greetings.eachWithIndex { g, i ->
def receivedRecords = []
greetings.eachWithIndex {
g, i ->
def received = records.poll(5, TimeUnit.SECONDS)
receivedSet.remove(received.value()) //maybe received out of order in case several partitions
assert received.key() == null

def headers = received.headers()
assert headers.iterator().hasNext()
receivedRecords.add(received)
}
assert receivedSet.isEmpty()

assertTraces(4, SORT_TRACES_BY_ID) {
TEST_WRITER.waitForTraces(4)
def traces = Arrays.asList(TEST_WRITER.toArray()) as List<List<DDSpan>>
def produceSpans = producerSpans(traces)
def spanIdToRecord = receivedRecords.collectEntries {
record ->
def header = record.headers().headers("x-datadog-parent-id").iterator()
assert header.hasNext()
[(Long.parseLong(new String(header.next().value(), StandardCharsets.UTF_8))): record]
}

// Batch listener delivery order can vary; match each consumer trace to its producer via the propagated parent ID.
assertTraces(4, new SortBatchKafkaTraces()) {
trace(7) {
basicSpan(it, "parent")
basicSpan(it, "producer callback", span(0))
Expand All @@ -910,46 +949,44 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
}

if (hasQueueSpan()) {
trace(2) {
consumerSpan(it, consumerProperties, trace(1)[1], 0..0)
queueSpan(it, trace(0)[6])
}
trace(2) {
consumerSpan(it, consumerProperties, trace(2)[1], 0..1)
queueSpan(it, trace(0)[4])
}
trace(2) {
consumerSpan(it, consumerProperties, trace(3)[1], 0..1)
queueSpan(it, trace(0)[2])
[0, 1, 2].each {
i ->
def expectedOffset = spanIdToRecord[produceSpans[i].spanId].offset()
trace(2) {
consumerSpan(it, consumerProperties, span(1), expectedOffset..expectedOffset)
queueSpan(it, produceSpans[i])
}
}
} else {
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[6], 0..0)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[4], 0..1)
}
trace(1) {
consumerSpan(it, consumerProperties, trace(0)[2], 0..1)
[0, 1, 2].each {
i ->
def expectedOffset = spanIdToRecord[produceSpans[i].spanId].offset()
trace(1) {
consumerSpan(it, consumerProperties, produceSpans[i], expectedOffset..expectedOffset)
}
}
}
}

if (isDataStreamsEnabled()) {
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == 0 }
StatsGroup first = TEST_DATA_STREAMS_WRITER.groups.find {
it.parentHash == 0
}
verifyAll(first) {
tags.hasAllTags("direction:out", "kafka_cluster_id:$clusterId", "topic:$SHARED_TOPIC".toString(), "type:kafka")
}

StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find { it.parentHash == first.hash }
StatsGroup second = TEST_DATA_STREAMS_WRITER.groups.find {
it.parentHash == first.hash
}
verifyAll(second) {
tags.hasAllTags(
"direction:in",
"group:sender",
"kafka_cluster_id:$clusterId",
"topic:$SHARED_TOPIC".toString(),
"type:kafka"
)
"direction:in",
"group:sender",
"kafka_cluster_id:$clusterId",
"topic:$SHARED_TOPIC".toString(),
"type:kafka"
)
}
}

Expand Down Expand Up @@ -981,16 +1018,16 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {

// setup a Kafka message listener
container.setupMessageListener(new MessageListener<String, String>() {
@Override
void onMessage(ConsumerRecord<String, String> record) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
if (isDataStreamsEnabled()) {
// even if header propagation is disabled, we want data streams to work.
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
}
@Override
void onMessage(ConsumerRecord<String, String> record) {
TEST_WRITER.waitForTraces(1) // ensure consistent ordering of traces
records.add(record)
if (isDataStreamsEnabled()) {
// even if header propagation is disabled, we want data streams to work.
TEST_DATA_STREAMS_WRITER.waitForGroups(2)
}
})
}
})

// start the container and underlying message listener
container.start()
Expand Down Expand Up @@ -1028,9 +1065,9 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
def existingSpanId = 9876543210987654L
def headers = new RecordHeaders()
headers.add(new RecordHeader("x-datadog-trace-id",
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
String.valueOf(existingTraceId).getBytes(StandardCharsets.UTF_8)))
headers.add(new RecordHeader("x-datadog-parent-id",
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))
String.valueOf(existingSpanId).getBytes(StandardCharsets.UTF_8)))

when:
def record = new ProducerRecord(SHARED_TOPIC, 0, null, "test-context-extraction", headers)
Expand Down Expand Up @@ -1063,16 +1100,16 @@ abstract class KafkaClientTestBase extends VersionedNamingTestBase {
def oldExtractorsByType = extractorsByTypeField.get(TEST_DATA_STREAMS_MONITORING)

def extractor = new DataStreamsTransactionExtractor() {
String getName() {
return "kafka-produce-test"
}
DataStreamsTransactionExtractor.Type getType() {
return DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS
}
String getValue() {
return "x-transaction-id"
}
String getName() {
return "kafka-produce-test"
}
DataStreamsTransactionExtractor.Type getType() {
return DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS
}
String getValue() {
return "x-transaction-id"
}
}
def extractorsByType = new EnumMap<>(DataStreamsTransactionExtractor.Type)
extractorsByType.put(DataStreamsTransactionExtractor.Type.KAFKA_PRODUCE_HEADERS, [extractor])
extractorsByTypeField.set(TEST_DATA_STREAMS_MONITORING, extractorsByType)
Expand Down
Loading