diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java index 0afe0db4721..50f8a0275b7 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java +++ b/dd-java-agent/instrumentation/spark/spark-common/src/main/java/datadog/trace/instrumentation/spark/AbstractSparkInstrumentation.java @@ -40,7 +40,8 @@ public String[] knownMatchingTypes() { "org.apache.spark.util.SparkClassUtils", "org.apache.spark.scheduler.LiveListenerBus", "org.apache.spark.sql.execution.SparkPlanInfo$", - "org.apache.spark.sql.SparkSession" + "org.apache.spark.sql.SparkSession", + "org.apache.spark.sql.execution.QueryExecution" }; } @@ -77,6 +78,14 @@ public void methodAdvice(MethodTransformer transformer) { .and(isDeclaredBy(named("org.apache.spark.sql.SparkSession"))), AbstractSparkInstrumentation.class.getName() + "$SparkSqlFailureAdvice"); + // QueryExecution.assertAnalyzed() — catch all Catalyst analysis failures regardless of + // entry point (SparkSession.sql, Dataset.select, Dataset.filter, etc.) + transformer.applyAdvice( + isMethod() + .and(named("assertAnalyzed")) + .and(isDeclaredBy(named("org.apache.spark.sql.execution.QueryExecution"))), + AbstractSparkInstrumentation.class.getName() + "$QueryExecutionFailureAdvice"); + // LiveListenerBus class is used to manage spark listeners transformer.applyAdvice( isMethod() @@ -141,6 +150,15 @@ public static void exit(@Advice.Thrown Throwable throwable) { } } + public static class QueryExecutionFailureAdvice { + @Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class) + public static void exit(@Advice.Thrown Throwable throwable) { + if (throwable != null && AbstractDatadogSparkListener.listener != null) { + AbstractDatadogSparkListener.listener.onSqlFailure(throwable); + } + } + } + public static class LiveListenerBusAdvice { @Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnNonDefaultValue.class) // If OL is disabled in tracer config but user set it up manually don't interfere diff --git a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy index 592c29a6047..95748c054d3 100644 --- a/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy +++ b/dd-java-agent/instrumentation/spark/spark-common/src/testFixtures/groovy/datadog/trace/instrumentation/spark/AbstractSparkTest.groovy @@ -223,6 +223,38 @@ abstract class AbstractSparkTest extends InstrumentationSpecification { } } + def "DataFrame analysis failure on unresolved column marks application span as error"() { + setup: + def sparkSession = SparkSession.builder() + .config("spark.master", "local[2]") + .getOrCreate() + + try { + // Triggers AnalysisException via Dataset.select() -> QueryExecution.assertAnalyzed(), + // NOT through SparkSession.sql(). This exercises the QueryExecutionFailureAdvice. + sparkSession.range(1).toDF("id").select("nonexistent_column") + } catch (Exception ignored) { + // Expected: AnalysisException thrown by Catalyst analysis + } + sparkSession.stop() + + expect: + assertTraces(1) { + trace(1) { + span { + operationName "spark.application" + resourceName "spark.application" + spanType "spark" + errored true + parent() + assert span.tags["error.type"] == "Spark SQL Failed" + assert span.tags["error.message"] =~ /(?i).*nonexistent_column.*/ + assert span.tags["error.stack"] =~ /(?s).*AnalysisException.*/ + } + } + } + } + def "capture SparkSubmit.runMain() errors"() { setup: def sparkSession = SparkSession.builder()