22
33import com .fasterxml .jackson .annotation .JsonSetter ;
44import io .kestra .core .exceptions .IllegalVariableEvaluationException ;
5- import io .kestra .core .models .annotations . PluginProperty ;
5+ import io .kestra .core .models .property . Property ;
66import io .kestra .core .models .tasks .*;
77import io .kestra .core .models .tasks .runners .AbstractLogConsumer ;
88import io .kestra .core .models .tasks .runners .ScriptService ;
@@ -43,108 +43,92 @@ public abstract class AbstractDbt extends Task implements RunnableTask<ScriptOut
4343 @ Schema (
4444 title = "Stop execution at the first failure."
4545 )
46- @ PluginProperty
47- Boolean failFast = false ;
46+ Property <Boolean > failFast = Property .of (false );
4847
4948 @ Builder .Default
5049 @ Schema (
5150 title = "When dbt would normally warn, raise an exception." ,
5251 description = "Examples include --models that selects nothing, deprecations, configurations with no " +
5352 "associated models, invalid test configurations, and missing sources/refs in tests."
5453 )
55- @ PluginProperty
56- Boolean warnError = false ;
54+ Property <Boolean > warnError = Property .of (false );
5755
5856 @ Builder .Default
5957 @ Schema (
6058 title = "Display debug logging during dbt execution." ,
6159 description = "Useful for debugging and making bug reports."
6260 )
63- @ PluginProperty
64- Boolean debug = false ;
61+ Property <Boolean > debug = Property .of (false );
6562
6663 @ Schema (
6764 title = "Which directory to look in for the dbt_project.yml file." ,
6865 description = "Default is the current working directory and its parents."
6966 )
70- @ PluginProperty
71- String projectDir ;
67+ Property <String > projectDir ;
7268
7369 @ Builder .Default
7470 @ Schema (
7571 title = "The path to the dbt CLI"
7672 )
77- @ PluginProperty (dynamic = true )
78- String dbtPath = "./bin/dbt" ;
73+ Property <String > dbtPath = Property .of ("./bin/dbt" );
7974
8075 @ Schema (
8176 title = "The `profiles.yml` file content" ,
8277 description = "If a `profile.yml` file already exist in the current working directory, it will be overridden."
8378 )
84- @ PluginProperty (dynamic = true )
85- private String profiles ;
79+ Property <String > profiles ;
8680
8781 @ Schema (
88- title = "The task runner to use." ,
89- description = """
82+ title = "The task runner to use." ,
83+ description = """
9084 Task runners are provided by plugins, each have their own properties.
9185 If you change from the default one, be careful to also configure the entrypoint to an empty list if needed."""
9286 )
93- @ PluginProperty
9487 @ Builder .Default
9588 @ Valid
96- protected TaskRunner taskRunner = Docker .builder ()
97- .type (Docker .class .getName ())
98- .entryPoint (Collections .emptyList ())
99- .build ();
89+ protected Property < TaskRunner > taskRunner = Property . of ( Docker .builder ()
90+ .type (Docker .class .getName ())
91+ .entryPoint (Collections .emptyList ())
92+ .build () );
10093
10194 @ Schema (title = "The task runner container image, only used if the task runner is container-based." )
102- @ PluginProperty (dynamic = true )
10395 @ Builder .Default
104- protected String containerImage = DEFAULT_IMAGE ;
96+ protected Property < String > containerImage = Property . of ( DEFAULT_IMAGE ) ;
10597
10698 @ Schema (
10799 title = "The runner type." ,
108100 description = "Deprecated, use 'taskRunner' instead."
109101 )
110102 @ Deprecated
111- @ PluginProperty
112- protected RunnerType runner ;
103+ protected Property <RunnerType > runner ;
113104
114105 @ Schema (
115106 title = "Deprecated, use 'taskRunner' instead"
116107 )
117- @ PluginProperty
118108 @ Deprecated
119- private DockerOptions docker ;
109+ private Property < DockerOptions > docker ;
120110
121111 @ Schema (title = "Deprecated, use the `docker` property instead" , deprecated = true )
122- @ PluginProperty
123112 @ Deprecated
124- private DockerOptions dockerOptions ;
113+ private Property < DockerOptions > dockerOptions ;
125114
126115 @ JsonSetter
127- public void setDockerOptions (DockerOptions dockerOptions ) {
116+ public void setDockerOptions (Property < DockerOptions > dockerOptions ) {
128117 this .dockerOptions = dockerOptions ;
129118 this .docker = dockerOptions ;
130119 }
131120
132121 @ Schema (
133122 title = "Additional environment variables for the current process."
134123 )
135- @ PluginProperty (
136- additionalProperties = String .class ,
137- dynamic = true
138- )
139- protected Map <String , String > env ;
124+ protected Property <Map <String , String >> env ;
140125
141126 @ Builder .Default
142127 @ Schema (
143128 title = "Parse run result" ,
144129 description = "Parsing run result to display duration of each task inside dbt"
145130 )
146- @ PluginProperty
147- protected Boolean parseRunResults = true ;
131+ protected Property <Boolean > parseRunResults = Property .of (Boolean .TRUE );
148132
149133 private NamespaceFiles namespaceFiles ;
150134
@@ -157,14 +141,14 @@ public void setDockerOptions(DockerOptions dockerOptions) {
157141 @ Override
158142 public ScriptOutput run (RunContext runContext ) throws Exception {
159143 CommandsWrapper commandsWrapper = new CommandsWrapper (runContext )
160- .withEnv (this .getEnv ())
144+ .withEnv (this .getEnv (). asMap ( runContext , String . class , String . class ) )
161145 .withNamespaceFiles (namespaceFiles )
162146 .withInputFiles (inputFiles )
163147 .withOutputFiles (outputFiles )
164- .withRunnerType (this .getRunner ())
165- .withDockerOptions (this .getDocker ())
166- .withContainerImage (this .containerImage )
167- .withTaskRunner (this .taskRunner )
148+ .withRunnerType (this .getRunner (). as ( runContext , RunnerType . class ) )
149+ .withDockerOptions (this .getDocker (). as ( runContext , DockerOptions . class ) )
150+ .withContainerImage (this .containerImage . as ( runContext , String . class ) )
151+ .withTaskRunner (this .taskRunner . as ( runContext , TaskRunner . class ) )
168152 .withLogConsumer (new AbstractLogConsumer () {
169153 @ Override
170154 public void accept (String line , Boolean isStdErr ) {
@@ -174,14 +158,15 @@ public void accept(String line, Boolean isStdErr) {
174158 .withEnableOutputDirectory (true ); //force output files on task runners
175159 Path workingDirectory = commandsWrapper .getWorkingDirectory ();
176160
177- if (profiles != null && !profiles .isEmpty ()) {
161+ String profileString = profiles .as (runContext , String .class );
162+ if (profileString != null && !profileString .isEmpty ()) {
178163 if (Files .exists (Path .of (".profiles/profiles.yml" ))) {
179164 runContext .logger ().warn ("A 'profiles.yml' file already exist in the task working directory, it will be overridden." );
180165 }
181166
182167 FileUtils .writeStringToFile (
183168 new File (workingDirectory .resolve (".profile" ).toString (), "profiles.yml" ),
184- runContext . render ( profiles ) ,
169+ profileString ,
185170 StandardCharsets .UTF_8
186171 );
187172 }
@@ -207,26 +192,26 @@ public void accept(String line, Boolean isStdErr) {
207192
208193 private String createDbtCommand (RunContext runContext ) throws IllegalVariableEvaluationException {
209194 List <String > commands = new ArrayList <>(List .of (
210- runContext . render ( dbtPath ),
195+ dbtPath . as ( runContext , String . class ),
211196 "--log-format json"
212197 ));
213198
214- if (this .debug ) {
199+ if (Boolean . TRUE . equals ( this .debug . as ( runContext , Boolean . class )) ) {
215200 commands .add ("--debug" );
216201 }
217202
218- if (this .failFast ) {
203+ if (Boolean . TRUE . equals ( this .failFast . as ( runContext , Boolean . class )) ) {
219204 commands .add ("--fail-fast" );
220205 }
221206
222- if (this .warnError ) {
207+ if (Boolean . TRUE . equals ( this .warnError . as ( runContext , Boolean . class )) ) {
223208 commands .add ("--warn-error" );
224209 }
225210
226211 commands .addAll (dbtCommands (runContext ));
227212
228213 if (this .projectDir != null ) {
229- commands .add ("--project-dir {{" + ScriptService .VAR_WORKING_DIR + "}}" + runContext . render ( this .projectDir ));
214+ commands .add ("--project-dir {{" + ScriptService .VAR_WORKING_DIR + "}}" + this .projectDir . as ( runContext , String . class ));
230215 } else {
231216 commands .add ("--project-dir {{" + ScriptService .VAR_WORKING_DIR + "}}" );
232217 }
@@ -235,11 +220,11 @@ private String createDbtCommand(RunContext runContext) throws IllegalVariableEva
235220 }
236221
237222 protected void parseResults (RunContext runContext , Path workingDirectory , ScriptOutput scriptOutput ) throws IllegalVariableEvaluationException , IOException {
238- String baseDir = this .projectDir != null ? runContext . render ( this .projectDir ) : "" ;
223+ String baseDir = this .projectDir != null ? this .projectDir . as ( runContext , String . class ) : "" ;
239224
240225 File runResults = workingDirectory .resolve (baseDir + "target/run_results.json" ).toFile ();
241226
242- if (this .parseRunResults && runResults .exists ()) {
227+ if (this .parseRunResults . as ( runContext , Boolean . class ) && runResults .exists ()) {
243228 URI results = ResultParser .parseRunResult (runContext , runResults );
244229 scriptOutput .getOutputFiles ().put ("run_results.json" , results );
245230 }
0 commit comments