Skip to content

Parallel&InclusiveGateway数据合并部分有Bug #76

@cfanlu

Description

@cfanlu

好像并行网关数据合并时候有bug,之保留了最后一个task的输出,改了下就好了。
public class TurboBranchMergeJoinAll extends BranchMergeJoinAll {

@Override
public void joinMerge(RuntimeContext runtimeContext, NodeInstancePO joinNodeInstancePo, NodeInstanceBO currentNodeInstance,
                      String parentExecuteId, String currentExecuteId, Set<String> allExecuteIdSet, DataMergeStrategy dataMergeStrategy) {
    Set<String> arrivedExecuteIds = ExecutorUtil.getExecuteIdSet((String) joinNodeInstancePo.get("executeId"));
    arrivedExecuteIds.add(currentExecuteId);

    // 1. 获取当前分支的数据
    InstanceDataPO currentBranchData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), runtimeContext.getInstanceDataId());

    // 2. 获取已合并的数据(之前所有分支的合并结果)
    InstanceDataPO mergedData = instanceDataDAO.select(runtimeContext.getFlowInstanceId(), joinNodeInstancePo.getInstanceDataId());
    if (mergedData == null) {
        mergedData = new InstanceDataPO();
        mergedData.setInstanceDataId(genId());
        joinNodeInstancePo.setInstanceDataId(mergedData.getInstanceDataId());
    }

    // 3. 合并数据
    InstanceDataPO mergePo = dataMergeStrategy.merge(runtimeContext, mergedData, currentBranchData);

    // 4. 根据是否所有分支都到达,决定是完成还是等待
    if (ExecutorUtil.allArrived(allExecuteIdSet, arrivedExecuteIds)) {
        // 所有分支都到达,更新状态为完成
        if (StringUtils.isBlank(mergedData.getInstanceDataId())) {
            instanceDataDAO.insert(mergePo);
        } else {
            instanceDataDAO.updateData(mergePo);
        }
        buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, NodeInstanceStatus.COMPLETED);
        nodeInstanceDAO.updateById(joinNodeInstancePo);
        nodeInstanceLogDAO.insert(buildCurrentNodeInstanceLogPO(currentNodeInstance, currentExecuteId, joinNodeInstancePo));
    } else {
        // 还有分支未到达,更新状态为等待
        if (StringUtils.isBlank(mergedData.getInstanceDataId())) {
            instanceDataDAO.insert(mergePo);
        } else {
            instanceDataDAO.updateData(mergePo);
        }
        buildParallelNodeInstancePo(joinNodeInstancePo, currentNodeInstance, ParallelNodeInstanceStatus.WAITING);
        nodeInstanceDAO.updateById(joinNodeInstancePo);
        nodeInstanceLogDAO.insert(buildNodeInstanceLogPO(joinNodeInstancePo));

        throw new SuspendException(ParallelErrorEnum.WAITING_SUSPEND.getErrNo(), MessageFormat.format(Constants.NODE_INSTANCE_FORMAT,
                runtimeContext.getCurrentNodeModel().getKey(),
                runtimeContext.getCurrentNodeModel().getProperties().getOrDefault(Constants.ELEMENT_PROPERTIES.NAME, StringUtils.EMPTY),
                currentNodeInstance.getNodeInstanceId()));
    }
}

}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions