11package io .kestra .plugin .airflow .dags ;
22
33import com .fasterxml .jackson .core .JsonProcessingException ;
4+ import io .kestra .core .exceptions .IllegalVariableEvaluationException ;
45import io .kestra .core .models .annotations .Example ;
56import io .kestra .core .models .annotations .Plugin ;
67import io .kestra .core .models .annotations .PluginProperty ;
1718
1819import java .time .Duration ;
1920import java .time .LocalDateTime ;
20- import java .time .ZonedDateTime ;
2121import java .util .Map ;
22- import java .util .concurrent .atomic .AtomicInteger ;
2322
2423import static io .kestra .core .utils .Rethrow .throwSupplier ;
2524
@@ -99,12 +98,6 @@ public class TriggerDagRun extends AirflowConnection implements RunnableTask<Tri
9998 @ PluginProperty (dynamic = true )
10099 private String dagId ;
101100
102- @ Schema (
103- title = "The job ID to check status for."
104- )
105- @ PluginProperty (dynamic = true )
106- private String jobId ;
107-
108101 @ Schema (
109102 title = "The maximum total wait duration."
110103 )
@@ -130,7 +123,7 @@ public class TriggerDagRun extends AirflowConnection implements RunnableTask<Tri
130123 @ Schema (
131124 title = "Overrides the default configuration payload"
132125 )
133- @ PluginProperty
126+ @ PluginProperty ( dynamic = true )
134127 private Map <String , Object > body ;
135128
136129 @ Override
@@ -175,21 +168,21 @@ public Output run(RunContext runContext) throws Exception {
175168 .build ();
176169 }
177170
178- private String buildBody (RunContext runContext ) throws JsonProcessingException {
171+ private String buildBody (RunContext runContext ) throws JsonProcessingException , IllegalVariableEvaluationException {
179172 RunContext .FlowInfo flowInfo = runContext .flowInfo ();
180173
181- Map <String , Object > conf = this .body ;
182-
183- if (this .body == null ) {
184- conf = Map .of (
185- "source" , "kestra" ,
186- "flow" , flowInfo .id (),
187- "namespace" , flowInfo .namespace (),
188- "task" , this .id ,
189- "execution" , runContext .getTriggerExecutionId ()
190- );
174+ if (this .body != null ) {
175+ return objectMapper .writeValueAsString (runContext .render (this .body ));
191176 }
192177
178+ Map <String , Object > conf = Map .of (
179+ "source" , "kestra" ,
180+ "flow" , flowInfo .id (),
181+ "namespace" , flowInfo .namespace (),
182+ "task" , this .id ,
183+ "execution" , runContext .getTriggerExecutionId ()
184+ );
185+
193186 return objectMapper .writeValueAsString (conf );
194187 }
195188
0 commit comments