-
Notifications
You must be signed in to change notification settings - Fork 221
Open
Description
好像并行网关数据合并时候有bug,之保留了最后一个task的输出,改了下就好了。
public class TurboBranchMergeJoinAll extends BranchMergeJoinAll {
@Override
public void joinMerge(RuntimeContext runtimeContext, NodeInstancePO joinNodeInstancePo, NodeInstanceBO currentNodeInstance,
String parentExecuteId, String currentExecuteId, Set<String> allExecuteIdSet, DataMergeStrategy dataMergeStrategy) {
Set<String> arrivedExecuteIds = ExecutorUtil.getExecuteIdSet((String) joinNodeInstancePo.get("executeId"));
arrivedExecuteIds.add(currentExecuteId);
// 1. 获取当前分支的数据
InstanceDataPO currentBranchData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), runtimeContext.getInstanceDataId());
// 2. 获取已合并的数据(之前所有分支的合并结果)
InstanceDataPO mergedData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), joinNodeInstancePo.getInstanceDataId());
if (mergedData == null) {
mergedData = new InstanceDataPO();
mergedData.setInstanceDataId(genId());
joinNodeInstancePo.setInstanceDataId(mergedData.getInstanceDataId());
}
// 3. 合并数据
InstanceDataPO mergePo = dataMergeStrategy.merge(runtimeContext, mergedData, currentBranchData);
// 4. 根据是否所有分支都到达,决定是完成还是等待
if (ExecutorUtil.allArrived(allExecuteIdSet, arrivedExecuteIds)) {
// 所有分支都到达,更新状态为完成
if (StringUtils.isBlank(mergedData.getInstanceDataId())) {
instanceDataDAO.insert(mergePo);
} else {
instanceDataDAO.updateData(mergePo);
}
buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, NodeInstanceStatus.COMPLETED);
nodeInstanceDAO.updateById(joinNodeInstancePo);
nodeInstanceLogDAO.insert(buildCurrentNodeInstanceLogPO(currentNodeInstance, currentExecuteId, joinNodeInstancePo));
} else {
// 还有分支未到达,更新状态为等待
if (StringUtils.isBlank(mergedData.getInstanceDataId())) {
instanceDataDAO.insert(mergePo);
} else {
instanceDataDAO.updateData(mergePo);
}
buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, ParallelNodeInstanceStatus.WAITING);
nodeInstanceDAO.updateById(joinNodeInstancePo);
nodeInstanceLogDAO.insert(buildNodeInstanceLogPO(joinNodeInstancePo));
throw new SuspendException(ParallelErrorEnum.WAITING_SUSPEND.getErrNo(), MessageFormat.format(Constants.NODE_INSTANCE_FORMAT,
runtimeContext.getCurrentNodeModel().getKey(),
runtimeContext.getCurrentNodeModel().getProperties().getOrDefault(Constants.ELEMENT_PROPERTIES.NAME, StringUtils.EMPTY),
currentNodeInstance.getNodeInstanceId()));
}
}
}
Metadata
Metadata
Assignees
Labels
No labels