Skip to content

Commit 26ad45d

Browse files
committed
Merge remote-tracking branch 'origin/dev' into dev
2 parents 0169adc + 5050d2a commit 26ad45d

File tree

13 files changed

+444
-59
lines changed

13 files changed

+444
-59
lines changed

.github/ISSUE_TEMPLATE/bug-report.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ body:
103103
- 3.2.x
104104
- 3.3.0-alpha
105105
- 3.3.1
106+
- 3.3.2
106107
validations:
107108
required: true
108109

.github/workflows/lychee.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ exclude = [
2020
'^http://who-t\.blogspot\.com/2009/12/on-commit-messages\.html',
2121
'^https://openmldb\.ai',
2222
'^https://archive\.apache\.org/dist',
23+
'^https://www\.bilibili\.com/video/BV1d64y1s7eZ'
2324
]
2425

2526
# Exclude paths from getting checked. The values are treated as regular expressions

docs/configs/index.md.jsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ import docs321Config from '../../../site_config/docs3-2-1';
6666
import docs322Config from '../../../site_config/docs3-2-2';
6767
import docs330Config from '../../../site_config/docs3-3-0-alpha';
6868
import docs331Config from '../../../site_config/docs3-3-1';
69+
import docs332Config from '../../../site_config/docs3-3-2';
6970
import docsDevConfig from '../../../site_config/docsdev';
7071

7172
const docsSource = {
@@ -108,6 +109,7 @@ const docsSource = {
108109
'3.2.2': docs322Config,
109110
'3.3.0-alpha': docs330Config,
110111
'3.3.1': docs331Config,
112+
'3.3.2': docs332Config,
111113
dev: docsDevConfig,
112114
};
113115

docs/configs/site.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ export default {
2424
port: 8080,
2525
domain: 'dolphinscheduler.apache.org',
2626
copyToDist: ['asset', 'img', 'file', '.asf.yaml', 'sitemap.xml', '.nojekyll', '.htaccess', 'googled0df7b96f277a143.html'],
27-
docsLatest: '3.3.1',
27+
docsLatest: '3.3.2',
2828
defaultSearch: 'google', // default search engine
2929
defaultLanguage: 'en-us',
3030
'en-us': {
@@ -45,7 +45,7 @@ export default {
4545
children: [
4646
{
4747
key: 'docs0',
48-
text: 'latest(3.3.1)',
48+
text: 'latest(3.3.2)',
4949
link: '/en-us/docs/latest/user_doc/about/introduction.html',
5050
},
5151
{
@@ -173,7 +173,7 @@ export default {
173173
children: [
174174
{
175175
key: 'docs0',
176-
text: '最新版本latest(3.3.1)',
176+
text: '最新版本latest(3.3.2)',
177177
link: '/zh-cn/docs/latest/user_doc/about/introduction.html',
178178
},
179179
{

docs/docs/en/history-versions.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
#### Setup instructions, are available for each stable version of Apache DolphinScheduler below:
66

7+
### Versions: 3.3.2
8+
9+
#### Links: [3.3.2 Document](../3.3.2/user_doc/about/introduction.md)
10+
711
### Versions: 3.3.1
812

913
#### Links: [3.3.1 Document](../3.3.1/user_doc/about/introduction.md)

docs/docs/zh/history-versions.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@
44

55
#### 以下是Apache DolphinScheduler每个稳定版本的设置说明。
66

7+
### Versions: 3.3.2
8+
9+
#### Links: [3.3.2 Document](../3.3.2/user_doc/about/introduction.md)
10+
711
### Versions: 3.3.1
812

913
#### Links: [3.3.1 Document](../3.3.1/user_doc/about/introduction.md)

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/command/CommandEngine.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
2929
import org.apache.dolphinscheduler.meter.metrics.MetricsProvider;
3030
import org.apache.dolphinscheduler.meter.metrics.SystemMetrics;
31+
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
3132
import org.apache.dolphinscheduler.server.master.config.MasterConfig;
3233
import org.apache.dolphinscheduler.server.master.config.MasterServerLoadProtection;
3334
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
@@ -129,10 +130,15 @@ public void run() {
129130

130131
List<CompletableFuture<Void>> allCompleteFutures = new ArrayList<>();
131132
for (Command command : commands) {
132-
CompletableFuture<Void> completableFuture = bootstrapCommand(command)
133+
CompletableFuture<Void> completableFuture = supplyAsync(() -> {
134+
LogUtils.setWorkflowInstanceIdMDC(command.getWorkflowInstanceId());
135+
return command;
136+
}, commandHandleThreadPool)
137+
.thenApply(this::bootstrapCommand)
133138
.thenAccept(this::bootstrapWorkflowExecutionRunnable)
134139
.thenAccept((unused) -> bootstrapSuccess(command))
135-
.exceptionally(throwable -> bootstrapError(command, throwable));
140+
.exceptionally(throwable -> bootstrapError(command, throwable))
141+
.whenComplete((result, throwable) -> LogUtils.removeWorkflowInstanceIdMDC());
136142
allCompleteFutures.add(completableFuture);
137143
}
138144
CompletableFuture.allOf(allCompleteFutures.toArray(new CompletableFuture[0])).join();
@@ -148,14 +154,14 @@ public void run() {
148154
}
149155
}
150156

151-
private CompletableFuture<IWorkflowExecutionRunnable> bootstrapCommand(Command command) {
152-
return supplyAsync(
153-
() -> workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command), commandHandleThreadPool);
157+
private IWorkflowExecutionRunnable bootstrapCommand(Command command) {
158+
return workflowExecutionRunnableFactory.createWorkflowExecuteRunnable(command);
154159
}
155160

156161
private CompletableFuture<Void> bootstrapWorkflowExecutionRunnable(IWorkflowExecutionRunnable workflowExecutionRunnable) {
157162
final WorkflowInstance workflowInstance =
158163
workflowExecutionRunnable.getWorkflowExecuteContext().getWorkflowInstance();
164+
159165
if (workflowInstance.getState() == WorkflowExecutionStatus.SERIAL_WAIT) {
160166
log.info("The workflow {} state is: {} will not be trigger now",
161167
workflowInstance.getName(),

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/graph/WorkflowExecutionGraph.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,8 @@ public boolean isEndOfTaskChain(final ITaskExecutionRunnable taskExecutionRunnab
290290
return successors.get(taskExecutionRunnable.getName()).isEmpty()
291291
|| isTaskExecutionRunnableKilled(taskExecutionRunnable)
292292
|| isTaskExecutionRunnablePaused(taskExecutionRunnable)
293-
|| isTaskExecutionRunnableFailed(taskExecutionRunnable);
293+
|| (isTaskExecutionRunnableFailed(taskExecutionRunnable)
294+
&& !isAllSuccessorsAreConditionTask(taskExecutionRunnable));
294295
}
295296

296297
@Override
@@ -335,7 +336,8 @@ public boolean isAllSuccessorsAreConditionTask(final ITaskExecutionRunnable task
335336
}
336337
return successors.stream().allMatch(
337338
successor -> isTaskExecutionRunnableSkipped(successor)
338-
|| TaskTypeUtils.isConditionTask(taskExecutionRunnable.getTaskInstance().getTaskType()));
339+
|| (TaskTypeUtils.isConditionTask(successor.getTaskDefinition().getTaskType())
340+
&& !isTaskExecutionRunnableForbidden(successor)));
339341
}
340342

341343
private void assertTaskExecutionRunnableState(final ITaskExecutionRunnable taskExecutionRunnable,

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ public void onFailedEvent(final IWorkflowExecutionRunnable workflowExecutionRunn
178178
// And the DAG will continue to execute.
179179
final IWorkflowExecutionGraph workflowExecutionGraph = taskExecutionRunnable.getWorkflowExecutionGraph();
180180
if (workflowExecutionGraph.isAllSuccessorsAreConditionTask(taskExecutionRunnable)) {
181+
mergeTaskVarPoolToWorkflow(workflowExecutionRunnable, taskExecutionRunnable);
181182
publishWorkflowInstanceTopologyLogicalTransitionEvent(taskExecutionRunnable);
182183
return;
183184
}

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java

Lines changed: 89 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.dolphinscheduler.server.master.rpc;
1919

2020
import org.apache.dolphinscheduler.extract.master.ITaskExecutorEventListener;
21+
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
2122
import org.apache.dolphinscheduler.server.master.engine.IWorkflowRepository;
2223
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskDispatchedLifecycleEvent;
2324
import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.event.TaskFailedLifecycleEvent;
@@ -53,79 +54,118 @@ public class TaskExecutorEventListenerImpl implements ITaskExecutorEventListener
5354

5455
@Override
5556
public void onTaskExecutorDispatched(final TaskExecutorDispatchedLifecycleEvent taskExecutorDispatchedLifecycleEvent) {
56-
final ITaskExecutionRunnable taskExecutionRunnable =
57-
getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
58-
final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent = TaskDispatchedLifecycleEvent.builder()
59-
.taskExecutionRunnable(taskExecutionRunnable)
60-
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
61-
.build();
62-
63-
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
57+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorDispatchedLifecycleEvent.getWorkflowInstanceId());
58+
try {
59+
final ITaskExecutionRunnable taskExecutionRunnable =
60+
getTaskExecutionRunnable(taskExecutorDispatchedLifecycleEvent);
61+
final TaskDispatchedLifecycleEvent taskDispatchedLifecycleEvent = TaskDispatchedLifecycleEvent.builder()
62+
.taskExecutionRunnable(taskExecutionRunnable)
63+
.executorHost(taskExecutorDispatchedLifecycleEvent.getTaskInstanceHost())
64+
.build();
65+
66+
taskExecutionRunnable.getWorkflowEventBus().publish(taskDispatchedLifecycleEvent);
67+
} finally {
68+
LogUtils.removeWorkflowInstanceIdMDC();
69+
}
6470
}
6571

6672
@Override
6773
public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskExecutorStartedLifecycleEvent) {
68-
final ITaskExecutionRunnable taskExecutionRunnable =
69-
getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
70-
final TaskRunningLifecycleEvent taskRunningEvent = TaskRunningLifecycleEvent.builder()
71-
.taskExecutionRunnable(taskExecutionRunnable)
72-
.startTime(new Date(taskExecutorStartedLifecycleEvent.getStartTime()))
73-
.logPath(taskExecutorStartedLifecycleEvent.getLogPath())
74-
.build();
75-
76-
taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
74+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorStartedLifecycleEvent.getWorkflowInstanceId());
75+
try {
76+
final ITaskExecutionRunnable taskExecutionRunnable =
77+
getTaskExecutionRunnable(taskExecutorStartedLifecycleEvent);
78+
final TaskRunningLifecycleEvent taskRunningEvent = TaskRunningLifecycleEvent.builder()
79+
.taskExecutionRunnable(taskExecutionRunnable)
80+
.startTime(new Date(taskExecutorStartedLifecycleEvent.getStartTime()))
81+
.logPath(taskExecutorStartedLifecycleEvent.getLogPath())
82+
.build();
83+
84+
taskExecutionRunnable.getWorkflowEventBus().publish(taskRunningEvent);
85+
} finally {
86+
LogUtils.removeWorkflowInstanceIdMDC();
87+
}
7788
}
7889

7990
@Override
8091
public void onTaskExecutorRuntimeContextChanged(final TaskExecutorRuntimeContextChangedLifecycleEvent taskExecutorRuntimeContextChangedLifecycleEventr) {
81-
final ITaskExecutionRunnable taskExecutionRunnable =
82-
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
83-
84-
final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent = TaskRuntimeContextChangedEvent.builder()
85-
.taskExecutionRunnable(taskExecutionRunnable)
86-
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
87-
.build();
88-
89-
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
92+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorRuntimeContextChangedLifecycleEventr.getWorkflowInstanceId());
93+
try {
94+
final ITaskExecutionRunnable taskExecutionRunnable =
95+
getTaskExecutionRunnable(taskExecutorRuntimeContextChangedLifecycleEventr);
96+
97+
final TaskRuntimeContextChangedEvent taskRuntimeContextChangedEvent =
98+
TaskRuntimeContextChangedEvent.builder()
99+
.taskExecutionRunnable(taskExecutionRunnable)
100+
.runtimeContext(taskExecutorRuntimeContextChangedLifecycleEventr.getAppIds())
101+
.build();
102+
103+
taskExecutionRunnable.getWorkflowEventBus().publish(taskRuntimeContextChangedEvent);
104+
} finally {
105+
LogUtils.removeWorkflowInstanceIdMDC();
106+
}
90107
}
91108

92109
@Override
93110
public void onTaskExecutorSuccess(final TaskExecutorSuccessLifecycleEvent taskExecutorSuccessLifecycleEvent) {
94-
final ITaskExecutionRunnable taskExecutionRunnable =
95-
getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
96-
final TaskSuccessLifecycleEvent taskSuccessEvent = TaskSuccessLifecycleEvent.builder()
97-
.taskExecutionRunnable(taskExecutionRunnable)
98-
.endTime(new Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
99-
.varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
100-
.build();
101-
taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
111+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorSuccessLifecycleEvent.getWorkflowInstanceId());
112+
try {
113+
final ITaskExecutionRunnable taskExecutionRunnable =
114+
getTaskExecutionRunnable(taskExecutorSuccessLifecycleEvent);
115+
final TaskSuccessLifecycleEvent taskSuccessEvent = TaskSuccessLifecycleEvent.builder()
116+
.taskExecutionRunnable(taskExecutionRunnable)
117+
.endTime(new Date(taskExecutorSuccessLifecycleEvent.getEndTime()))
118+
.varPool(taskExecutorSuccessLifecycleEvent.getVarPool())
119+
.build();
120+
taskExecutionRunnable.getWorkflowEventBus().publish(taskSuccessEvent);
121+
} finally {
122+
LogUtils.removeWorkflowInstanceIdMDC();
123+
}
102124
}
103125

104126
@Override
105127
public void onTaskExecutorFailed(final TaskExecutorFailedLifecycleEvent taskExecutorFailedLifecycleEvent) {
106-
final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
107-
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
108-
.taskExecutionRunnable(taskExecutionRunnable)
109-
.endTime(new Date(taskExecutorFailedLifecycleEvent.getEndTime()))
110-
.build();
111-
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
128+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorFailedLifecycleEvent.getWorkflowInstanceId());
129+
try {
130+
final ITaskExecutionRunnable taskExecutionRunnable =
131+
getTaskExecutionRunnable(taskExecutorFailedLifecycleEvent);
132+
final TaskFailedLifecycleEvent taskFailedEvent = TaskFailedLifecycleEvent.builder()
133+
.taskExecutionRunnable(taskExecutionRunnable)
134+
.endTime(new Date(taskExecutorFailedLifecycleEvent.getEndTime()))
135+
.build();
136+
taskExecutionRunnable.getWorkflowEventBus().publish(taskFailedEvent);
137+
} finally {
138+
LogUtils.removeWorkflowInstanceIdMDC();
139+
}
112140
}
113141

114142
@Override
115143
public void onTaskExecutorKilled(final TaskExecutorKilledLifecycleEvent taskExecutorKilledLifecycleEvent) {
116-
final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
117-
final TaskKilledLifecycleEvent taskKilledEvent = TaskKilledLifecycleEvent.builder()
118-
.taskExecutionRunnable(taskExecutionRunnable)
119-
.endTime(new Date(taskExecutorKilledLifecycleEvent.getEndTime()))
120-
.build();
121-
taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
144+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorKilledLifecycleEvent.getWorkflowInstanceId());
145+
try {
146+
final ITaskExecutionRunnable taskExecutionRunnable =
147+
getTaskExecutionRunnable(taskExecutorKilledLifecycleEvent);
148+
final TaskKilledLifecycleEvent taskKilledEvent = TaskKilledLifecycleEvent.builder()
149+
.taskExecutionRunnable(taskExecutionRunnable)
150+
.endTime(new Date(taskExecutorKilledLifecycleEvent.getEndTime()))
151+
.build();
152+
taskExecutionRunnable.getWorkflowEventBus().publish(taskKilledEvent);
153+
} finally {
154+
LogUtils.removeWorkflowInstanceIdMDC();
155+
}
122156
}
123157

124158
@Override
125159
public void onTaskExecutorPaused(final TaskExecutorPausedLifecycleEvent taskExecutorPausedLifecycleEvent) {
126-
final ITaskExecutionRunnable taskExecutionRunnable = getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
127-
final TaskPausedLifecycleEvent taskPausedEvent = TaskPausedLifecycleEvent.of(taskExecutionRunnable);
128-
taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
160+
LogUtils.setWorkflowInstanceIdMDC(taskExecutorPausedLifecycleEvent.getWorkflowInstanceId());
161+
try {
162+
final ITaskExecutionRunnable taskExecutionRunnable =
163+
getTaskExecutionRunnable(taskExecutorPausedLifecycleEvent);
164+
final TaskPausedLifecycleEvent taskPausedEvent = TaskPausedLifecycleEvent.of(taskExecutionRunnable);
165+
taskExecutionRunnable.getWorkflowEventBus().publish(taskPausedEvent);
166+
} finally {
167+
LogUtils.removeWorkflowInstanceIdMDC();
168+
}
129169
}
130170

131171
private ITaskExecutionRunnable getTaskExecutionRunnable(final IReportableTaskExecutorLifecycleEvent reportableTaskExecutorLifecycleEvent) {

0 commit comments

Comments
 (0)