|
29 | 29 | @NoArgsConstructor |
30 | 30 | @Slf4j |
31 | 31 | @Schema( |
32 | | - title = "Trigger Airflow DAG", |
33 | | - description = "Trigger an Airflow DAG run and wait for its completion." |
| 32 | + title = "Trigger an Airflow DAG with custom inputs and wait for its completion.", |
| 33 | + description = "Launch a DAG run, optionally wait for its completion and return the final state of the DAG run." |
34 | 34 | ) |
35 | 35 | @Plugin( |
36 | 36 | examples = { |
37 | 37 | @Example( |
38 | | - title = "Basic authorization", |
39 | | - code = { |
40 | | - " - id: trigger_dag", |
41 | | - " type: io.kestra.plugin.airflow.TriggerDagRun", |
42 | | - " baseUrl: http://airflow.example.com", |
43 | | - " dagId: example_dag", |
44 | | - " checkFrequency: PT30S", |
45 | | - " interval: PT30S", |
46 | | - " maxIterations: 100", |
47 | | - " maxDuration: PT1H", |
48 | | - " options:", |
49 | | - " basicAuthUser: myusername", |
50 | | - " basicAuthPassword: mypassword" |
51 | | - } |
| 38 | + title = "Trigger a DAG run with custom inputs, and authenticate with basic authentication", |
| 39 | + full = true, |
| 40 | + code = """ |
| 41 | +id: airflow |
| 42 | +namespace: company.team |
| 43 | +
|
| 44 | +tasks: |
| 45 | + - id: run_dag |
| 46 | + type: io.kestra.plugin.airflow.dags.TriggerDagRun |
| 47 | + baseUrl: http://host.docker.internal:8080 |
| 48 | + dagId: example_astronauts |
| 49 | + wait: true |
| 50 | + pollFrequency: PT1S |
| 51 | + options: |
| 52 | + basicAuthUser: "{{ secret('AIRFLOW_USERNAME') }}" |
| 53 | + basicAuthPassword: "{{ secret('AIRFLOW_PASSWORD') }}" |
| 54 | + body: |
| 55 | + conf: |
| 56 | + source: kestra |
| 57 | + namespace: "{{ flow.namespace }}" |
| 58 | + flow: "{{ flow.id }}" |
| 59 | + task: "{{ task.id }}" |
| 60 | + execution: "{{ execution.id }}" |
| 61 | +""" |
52 | 62 | ), |
53 | 63 | @Example( |
54 | | - title = "Bearer authorization", |
55 | | - code = { |
56 | | - " - id: trigger_dag", |
57 | | - " type: io.kestra.plugin.airflow.TriggerDagRun", |
58 | | - " baseUrl: http://airflow.example.com", |
59 | | - " dagId: example_dag", |
60 | | - " checkFrequency: PT30S", |
61 | | - " interval: PT30S", |
62 | | - " headers:", |
63 | | - " authorization: 'Bearer {{ TOKEN }}'" |
64 | | - } |
65 | | - ), |
66 | | - @Example( |
67 | | - title = "Basic authorization. Custom body", |
68 | | - code = { |
69 | | - " - id: trigger_dag", |
70 | | - " type: io.kestra.plugin.airflow.TriggerDagRun", |
71 | | - " baseUrl: http://airflow.example.com", |
72 | | - " dagId: example_dag", |
73 | | - " checkFrequency: PT30S", |
74 | | - " interval: PT30S", |
75 | | - " options:", |
76 | | - " basicAuthUser: myusername", |
77 | | - " basicAuthPassword: mypassword", |
78 | | - " body: |", |
79 | | - " {", |
80 | | - " \"conf\": {", |
81 | | - " \"source\": \"kestra\",", |
82 | | - " \"flow\": \"{{ flow.id }}\",", |
83 | | - " \"namespace\": \"{{ flow.namespace }}\",", |
84 | | - " \"task\": \"{{ task.id }}\",", |
85 | | - " \"execution\": \"{{ execution.id }}\"", |
86 | | - " }", |
87 | | - " }" |
88 | | - } |
89 | | - ), |
| 64 | + title = "Trigger a DAG run with custom inputs, and authenticate with a Bearer token", |
| 65 | + full = true, |
| 66 | + code = """ |
| 67 | +id: airflow_header_authorization |
| 68 | +namespace: company.team |
| 69 | +
|
| 70 | +tasks: |
| 71 | + - id: run_dag |
| 72 | + type: io.kestra.plugin.airflow.dags.TriggerDagRun |
| 73 | + baseUrl: http://host.docker.internal:8080 |
| 74 | + dagId: example_astronauts |
| 75 | + wait: true |
| 76 | + headers: |
| 77 | + authorization: "Bearer {{ secret('AIRFLOW_TOKEN') }}" |
| 78 | +""" |
| 79 | + ) |
90 | 80 | } |
91 | 81 | ) |
92 | 82 | public class TriggerDagRun extends AirflowConnection implements RunnableTask<TriggerDagRun.Output> { |
|
0 commit comments