Skip to content

Commit 11bdc03

Browse files
committed
feat: allow defining custom topology nodes from tasks
Part-of: kestra-io/kestra-ee#4729
1 parent 86a1fa7 commit 11bdc03

File tree

4 files changed

+69
-0
lines changed

4 files changed

+69
-0
lines changed
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package io.kestra.core.models.hierarchies;
2+
3+
import lombok.Getter;
4+
5+
import java.util.List;
6+
7+
@SuppressWarnings("this-escape")
8+
@Getter
9+
public class CustomGraphCluster extends GraphCluster {
10+
public CustomGraphCluster(String uid, GraphTask rootTask, List<CustomGraphNode> nodes) {
11+
super(rootTask, uid, RelationType.SEQUENTIAL); // TODO should we add a custom relation type?
12+
13+
this.getGraph().addNode(rootTask);
14+
this.addEdge(this.getRoot(), rootTask, new Relation());
15+
16+
this.getGraph().removeEdge(rootTask, this.getFinally());
17+
this.getGraph().removeEdge(rootTask, this.getAfterExecution());
18+
this.getGraph().removeNode(this.getFinally());
19+
this.getGraph().removeNode(this.getAfterExecution());
20+
21+
nodes.forEach(node -> {
22+
this.getGraph().addNode(node);
23+
this.addEdge(rootTask, node, new Relation(RelationType.SEQUENTIAL, null));
24+
this.addEdge(node, this.getEnd(), new Relation());
25+
});
26+
}
27+
}
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.kestra.core.models.hierarchies;
2+
3+
import io.kestra.core.models.Plugin;
4+
5+
public class CustomGraphNode extends AbstractGraph {
6+
private final String label;
7+
private final Plugin plugin;
8+
9+
public CustomGraphNode(String uid, String label, Plugin plugin) {
10+
super(uid);
11+
12+
this.label = label;
13+
this.plugin = plugin;
14+
}
15+
16+
@Override
17+
public String getLabel() {
18+
return label;
19+
}
20+
21+
public Plugin getPlugin() {
22+
return plugin;
23+
}
24+
}

core/src/main/java/io/kestra/core/models/tasks/RunnableTask.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,14 @@
22

33
import io.kestra.core.models.Plugin;
44
import io.kestra.core.models.WorkerJobLifecycle;
5+
import io.kestra.core.models.executions.TaskRun;
6+
import io.kestra.core.models.hierarchies.AbstractGraph;
7+
import io.kestra.core.models.hierarchies.GraphTask;
8+
import io.kestra.core.models.hierarchies.RelationType;
59
import io.kestra.core.runners.RunContext;
610

11+
import java.util.List;
12+
713
/**
814
* Interface for tasks that are run in the Worker.
915
*/
@@ -12,4 +18,13 @@ public interface RunnableTask <T extends Output> extends Plugin, WorkerJobLifecy
1218
* This method is called inside the Worker to run (execute) the task.
1319
*/
1420
T run(RunContext runContext) throws Exception;
21+
22+
/**
23+
* Create the topology representation of a runnable task.
24+
* <p>
25+
* By default, it returns a single GraphTask, tasks may override it to provide a custom topology representation.
26+
*/
27+
default AbstractGraph graph(TaskRun taskRun, List<String> values, RelationType relationType) {
28+
return new GraphTask((Task) this, taskRun, values, relationType);
29+
}
1530
}

core/src/main/java/io/kestra/core/utils/GraphUtils.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.kestra.core.models.hierarchies.*;
88
import io.kestra.core.models.tasks.ExecutableTask;
99
import io.kestra.core.models.tasks.FlowableTask;
10+
import io.kestra.core.models.tasks.RunnableTask;
1011
import io.kestra.core.models.tasks.Task;
1112
import io.kestra.core.models.triggers.AbstractTrigger;
1213
import io.kestra.core.models.triggers.Trigger;
@@ -391,6 +392,8 @@ private static void fillGraph(
391392
currentGraph = flowableTask.tasksTree(execution, currentTaskRun, parentValues);
392393
} else if (currentTask instanceof ExecutableTask<?> subflowTask) {
393394
currentGraph = new SubflowGraphTask(subflowTask, currentTaskRun, parentValues, relationType);
395+
} else if (currentTask instanceof RunnableTask<?> runnableTask) {
396+
currentGraph = runnableTask.graph(currentTaskRun, parentValues, relationType);
394397
} else {
395398
currentGraph = new GraphTask(currentTask, currentTaskRun, parentValues, relationType);
396399
}

0 commit comments

Comments
 (0)