diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java index f5ac4020c7d3..cb8040ccf232 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/controller/LoggerController.java @@ -21,6 +21,7 @@ import static org.apache.dolphinscheduler.api.enums.Status.QUERY_TASK_INSTANCE_LOG_ERROR; import org.apache.dolphinscheduler.api.exceptions.ApiException; +import org.apache.dolphinscheduler.api.executor.logging.TaskLogType; import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; @@ -71,11 +72,12 @@ public class LoggerController extends BaseController { @GetMapping(value = "/detail") @ResponseStatus(HttpStatus.OK) @ApiException(QUERY_TASK_INSTANCE_LOG_ERROR) - public Result queryLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "taskInstanceId") int taskInstanceId, - @RequestParam(value = "skipLineNum") int skipNum, - @RequestParam(value = "limit") int limit) { - return loggerService.queryLog(loginUser, taskInstanceId, skipNum, limit); + public Result queryTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, + @RequestParam(value = "taskInstanceId") int taskInstanceId, + @RequestParam(value = "skipLineNum") int skipNum, + @RequestParam(value = "limit") int limit, + @RequestParam(value = "logType", defaultValue = "LOG") TaskLogType logType) { + return loggerService.queryLog(loginUser, taskInstanceId, skipNum, limit, logType); } /** @@ -93,12 +95,14 @@ public Result queryLog(@Parameter(hidden = true) @RequestAttrib @ResponseBody @ApiException(DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR) public ResponseEntity downloadTaskLog(@Parameter(hidden = true) @RequestAttribute(value = Constants.SESSION_USER) User loginUser, - @RequestParam(value = "taskInstanceId") int taskInstanceId) { - byte[] logBytes = loggerService.getLogBytes(loginUser, taskInstanceId); + @RequestParam(value = "taskInstanceId") int taskInstanceId, + @RequestParam(value = "logType", defaultValue = "LOG") TaskLogType logType) { + byte[] logBytes = loggerService.getLogBytes(loginUser, taskInstanceId, logType); + String fileName = logType == TaskLogType.LOG ? "task.log" : "task.out"; return ResponseEntity .ok() .header(HttpHeaders.CONTENT_DISPOSITION, - "attachment; filename=\"" + System.currentTimeMillis() + ".log" + "\"") + "attachment; filename=\"" + fileName + "\"") .body(logBytes); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java index ebe8b2e9fea2..e73947706527 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClient.java @@ -24,6 +24,7 @@ import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogFileDownloadResponse; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryRequest; import org.apache.dolphinscheduler.extract.common.transportor.TaskInstanceLogPageQueryResponse; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; import lombok.extern.slf4j.Slf4j; @@ -41,8 +42,8 @@ public class LocalLogClient { * @param taskInstance The task instance object, containing information needed to retrieve the log. * @return The complete log file download response of the task instance, including log content and metadata. */ - public TaskInstanceLogFileDownloadResponse getWholeLog(TaskInstance taskInstance) { - return getLocalWholeLog(taskInstance); + public TaskInstanceLogFileDownloadResponse getLog(TaskInstance taskInstance, TaskLogType taskLogType) { + return getLocalWholeLog(taskInstance, taskLogType); } /** @@ -55,23 +56,27 @@ public TaskInstanceLogFileDownloadResponse getWholeLog(TaskInstance taskInstance * @param limit The maximum number of lines to read, indicating the maximum number of lines to retrieve in this query. * @return The partial log query response, including log content within the specified range and metadata. */ - public TaskInstanceLogPageQueryResponse getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) { - return getLocalPartLog(taskInstance, skipLineNum, limit); + public TaskInstanceLogPageQueryResponse getLog(TaskInstance taskInstance, int skipLineNum, int limit, + TaskLogType taskLogType) { + return getLocalPartLog(taskInstance, skipLineNum, limit, taskLogType); } - private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance) { + private TaskInstanceLogFileDownloadResponse getLocalWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) { TaskInstanceLogFileDownloadRequest request = new TaskInstanceLogFileDownloadRequest( taskInstance.getId(), - taskInstance.getLogPath()); + TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(), + TaskLogFileTypeMapping.toTaskLogFileType(taskLogType))); return getProxyLogService(taskInstance).getTaskInstanceWholeLogFileBytes(request); } private TaskInstanceLogPageQueryResponse getLocalPartLog(TaskInstance taskInstance, int skipLineNum, - int limit) { + int limit, TaskLogType taskLogType) { + String logFilePath = TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(), + TaskLogFileTypeMapping.toTaskLogFileType(taskLogType)); TaskInstanceLogPageQueryRequest request = TaskInstanceLogPageQueryRequest .builder() .taskInstanceId(taskInstance.getId()) - .taskInstanceLogAbsolutePath(taskInstance.getLogPath()) + .taskInstanceLogAbsolutePath(logFilePath) .skipLineNum(skipLineNum) .limit(limit) .build(); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java index ac5b6ecb2ba4..76dff39ab053 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegate.java @@ -50,19 +50,21 @@ public class LogClientDelegate { * @param limit The maximum number of log lines to retrieve. * @return A string containing the specified portion of the log. */ - public String getPartLogString(TaskInstance taskInstance, int skipLineNum, int limit) { + + public String getLogString(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) { checkArgs(taskInstance); if (checkNodeExists(taskInstance)) { - TaskInstanceLogPageQueryResponse response = localLogClient.getPartLog(taskInstance, skipLineNum, limit); + TaskInstanceLogPageQueryResponse response = localLogClient.getLog(taskInstance, skipLineNum, limit, + taskLogType); if (response.getCode() == LogResponseStatus.SUCCESS) { return response.getLogContent(); } else { log.warn("get part log string is not success for task instance {}; reason :{}", taskInstance.getId(), response.getMessage()); - return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit); + return remoteLogClient.getLogString(taskInstance, skipLineNum, limit, taskLogType); } } else { - return remoteLogClient.getPartLog(taskInstance, skipLineNum, limit); + return remoteLogClient.getLogString(taskInstance, skipLineNum, limit, taskLogType); } } @@ -73,19 +75,19 @@ public String getPartLogString(TaskInstance taskInstance, int skipLineNum, int l * @param taskInstance The task instance object, containing information needed for log retrieval. * @return A byte array containing the complete log content. */ - public byte[] getWholeLogBytes(TaskInstance taskInstance) { + public byte[] getLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) { checkArgs(taskInstance); if (checkNodeExists(taskInstance)) { - TaskInstanceLogFileDownloadResponse response = localLogClient.getWholeLog(taskInstance); + TaskInstanceLogFileDownloadResponse response = localLogClient.getLog(taskInstance, taskLogType); if (response.getCode() == LogResponseStatus.SUCCESS) { return response.getLogBytes(); } else { log.warn("get whole log bytes is not success for task instance {}; reason :{}", taskInstance.getId(), response.getMessage()); - return remoteLogClient.getWholeLog(taskInstance); + return remoteLogClient.getLogBytes(taskInstance, taskLogType); } } else { - return remoteLogClient.getWholeLog(taskInstance); + return remoteLogClient.getLogBytes(taskInstance, taskLogType); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java index 1b3542e96209..938107e3c824 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/RemoteLogClient.java @@ -19,6 +19,7 @@ import org.apache.dolphinscheduler.common.utils.LogUtils; import org.apache.dolphinscheduler.dao.entity.TaskInstance; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; import org.springframework.stereotype.Component; @@ -32,8 +33,8 @@ public class RemoteLogClient { * @param taskInstance The task instance object, containing information such as the task ID and log path. * @return Returns the log content in byte array format. */ - public byte[] getWholeLog(TaskInstance taskInstance) { - return LogUtils.getFileContentBytesFromRemote(taskInstance.getLogPath()); + public byte[] getLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) { + return getWholeLog(taskInstance, taskLogType); } /** @@ -45,10 +46,24 @@ public byte[] getWholeLog(TaskInstance taskInstance) { * @param limit The maximum number of lines to read. * @return Returns the specified part of the log content in string format. */ - public String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit) { + + public String getLogString(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) { + return getPartLog(taskInstance, skipLineNum, limit, taskLogType); + } + + private byte[] getWholeLog(TaskInstance taskInstance, TaskLogType taskLogType) { + return LogUtils + .getFileContentBytesFromRemote(TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(), + TaskLogFileTypeMapping.toTaskLogFileType(taskLogType))); + } + + private String getPartLog(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) { // todo We can optimize requests by the actual range, reducing disk usage and network traffic. return LogUtils.rollViewLogLines( - LogUtils.readPartFileContentFromRemote(taskInstance.getLogPath(), skipLineNum, limit)); + LogUtils.readPartFileContentFromRemote( + TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(), + TaskLogFileTypeMapping.toTaskLogFileType(taskLogType)), + skipLineNum, limit)); } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogFileTypeMapping.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogFileTypeMapping.java new file mode 100644 index 000000000000..145816f1509e --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogFileTypeMapping.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.logging; + +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileType; + +import lombok.experimental.UtilityClass; + +@UtilityClass +public class TaskLogFileTypeMapping { + + public static TaskLogFileType toTaskLogFileType(TaskLogType taskLogType) { + switch (taskLogType) { + case LOG: + return TaskLogFileType.TASK_LOG; + case OUTPUT: + return TaskLogFileType.TASK_OUTPUT; + default: + throw new IllegalArgumentException("Unsupported task log type: " + taskLogType); + } + } +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogType.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogType.java new file mode 100644 index 000000000000..4fc8c5c63d91 --- /dev/null +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/executor/logging/TaskLogType.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.api.executor.logging; + +public enum TaskLogType { + LOG, + OUTPUT +} diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java index a31b73eefc29..fd74af951341 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/LoggerService.java @@ -17,6 +17,7 @@ package org.apache.dolphinscheduler.api.service; +import org.apache.dolphinscheduler.api.executor.logging.TaskLogType; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; import org.apache.dolphinscheduler.dao.entity.User; @@ -32,7 +33,8 @@ public interface LoggerService { * @param limit limit * @return log string data */ - Result queryLog(User loginUser, int taskInstId, int skipLineNum, int limit); + Result queryLog(User loginUser, int taskInstId, int skipLineNum, int limit, + TaskLogType taskLogType); /** * get log size @@ -41,27 +43,6 @@ public interface LoggerService { * @param taskInstId task instance id * @return log byte array */ - byte[] getLogBytes(User loginUser, int taskInstId); + byte[] getLogBytes(User loginUser, int taskInstId, TaskLogType taskLogType); - /** - * query log - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @param skipLineNum skip line number - * @param limit limit - * @return log string data - */ - String queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit); - - /** - * get log bytes - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @return log byte array - */ - byte[] getLogBytes(User loginUser, long projectCode, int taskInstId); } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java index 44a91275074e..4fdbdb3b3bd0 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/LoggerServiceImpl.java @@ -23,18 +23,19 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.executor.logging.LogClientDelegate; +import org.apache.dolphinscheduler.api.executor.logging.TaskLogFileTypeMapping; +import org.apache.dolphinscheduler.api.executor.logging.TaskLogType; import org.apache.dolphinscheduler.api.service.LoggerService; import org.apache.dolphinscheduler.api.service.ProjectService; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; import org.apache.dolphinscheduler.dao.entity.Project; import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; import org.apache.commons.lang3.StringUtils; @@ -62,9 +63,6 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService @Autowired private ProjectService projectService; - @Autowired - private TaskDefinitionMapper taskDefinitionMapper; - @Autowired private LogClientDelegate logClientDelegate; @@ -79,8 +77,11 @@ public class LoggerServiceImpl extends BaseServiceImpl implements LoggerService */ @Override @SuppressWarnings("unchecked") - public Result queryLog(User loginUser, int taskInstId, int skipLineNum, int limit) { - + public Result queryLog(User loginUser, + int taskInstId, + int skipLineNum, + int limit, + TaskLogType taskLogType) { TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId); if (taskInstance == null) { @@ -93,7 +94,7 @@ public Result queryLog(User loginUser, int taskInstId, int skip } projectService.checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); Result result = new Result<>(Status.SUCCESS.getCode(), Status.SUCCESS.getMsg()); - String log = queryLog(taskInstance, skipLineNum, limit); + String log = queryLog(taskInstance, skipLineNum, limit, taskLogType); int lineNum = log.split("\\r\\n").length; result.setData(new ResponseTaskLog(lineNum, log)); return result; @@ -107,68 +108,14 @@ public Result queryLog(User loginUser, int taskInstId, int skip * @return log byte array */ @Override - public byte[] getLogBytes(User loginUser, int taskInstId) { + public byte[] getLogBytes(User loginUser, int taskInstId, TaskLogType taskLogType) { TaskInstance taskInstance = taskInstanceDao.queryById(taskInstId); if (taskInstance == null || StringUtils.isBlank(taskInstance.getHost())) { throw new ServiceException("task instance is null or host is null"); } Project project = projectMapper.queryProjectByTaskInstanceId(taskInstId); projectService.checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); - return getLogBytes(taskInstance); - } - - /** - * query log - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @param skipLineNum skip line number - * @param limit limit - * @return log string data - */ - @Override - @SuppressWarnings("unchecked") - public String queryLog(User loginUser, long projectCode, int taskInstId, int skipLineNum, int limit) { - // check user access for project - projectService.checkProjectAndAuthThrowException(loginUser, projectCode, VIEW_LOG); - // check whether the task instance can be found - TaskInstance task = taskInstanceDao.queryById(taskInstId); - if (task == null || StringUtils.isBlank(task.getHost())) { - throw new ServiceException(Status.TASK_INSTANCE_NOT_FOUND); - } - - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); - if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { - throw new ServiceException(Status.TASK_INSTANCE_NOT_FOUND, taskInstId); - } - return queryLog(task, skipLineNum, limit); - } - - /** - * get log bytes - * - * @param loginUser login user - * @param projectCode project code - * @param taskInstId task instance id - * @return log byte array - */ - @Override - public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) { - // check user access for project - projectService.checkProjectAndAuthThrowException(loginUser, projectCode, DOWNLOAD_LOG); - - // check whether the task instance can be found - TaskInstance task = taskInstanceDao.queryById(taskInstId); - if (task == null || StringUtils.isBlank(task.getHost())) { - throw new ServiceException("task instance is null or host is null"); - } - - TaskDefinition taskDefinition = taskDefinitionMapper.queryByCode(task.getTaskCode()); - if (taskDefinition != null && projectCode != taskDefinition.getProjectCode()) { - throw new ServiceException("task instance does not exist in project"); - } - return getLogBytes(task); + return getLogBytes(taskInstance, taskLogType); } /** @@ -179,8 +126,8 @@ public byte[] getLogBytes(User loginUser, long projectCode, int taskInstId) { * @param limit limit * @return log string data */ - private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { - final String logPath = taskInstance.getLogPath(); + private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit, TaskLogType taskLogType) { + String logPath = getLogPath(taskInstance, taskLogType); log.info("Query task instance log, taskInstanceId:{}, taskInstanceName:{}, host: {}, logPath:{}", taskInstance.getId(), taskInstance.getName(), taskInstance.getHost(), logPath); if (StringUtils.isBlank(logPath)) { @@ -189,7 +136,7 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { } StringBuilder sb = new StringBuilder(); - if (skipLineNum == 0) { + if (shouldAppendLogHead(taskLogType) && skipLineNum == 0) { String head = String.format(LOG_HEAD_FORMAT, logPath, taskInstance.getHost(), @@ -198,7 +145,7 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { } try { - String logContent = logClientDelegate.getPartLogString(taskInstance, skipLineNum, limit); + String logContent = logClientDelegate.getLogString(taskInstance, skipLineNum, limit, taskLogType); if (logContent != null) { sb.append(logContent); } @@ -214,9 +161,9 @@ private String queryLog(TaskInstance taskInstance, int skipLineNum, int limit) { * @param taskInstance task instance * @return log byte array */ - private byte[] getLogBytes(TaskInstance taskInstance) { + private byte[] getLogBytes(TaskInstance taskInstance, TaskLogType taskLogType) { String host = taskInstance.getHost(); - String logPath = taskInstance.getLogPath(); + String logPath = getLogPath(taskInstance, taskLogType); byte[] head = String.format(LOG_HEAD_FORMAT, logPath, @@ -226,11 +173,23 @@ private byte[] getLogBytes(TaskInstance taskInstance) { byte[] logBytes; try { - logBytes = logClientDelegate.getWholeLogBytes(taskInstance); + logBytes = logClientDelegate.getLogBytes(taskInstance, taskLogType); + if (!shouldAppendLogHead(taskLogType)) { + return logBytes; + } return Bytes.concat(head, logBytes); } catch (Exception ex) { log.error("Download TaskInstance: {} Log Error", taskInstance.getName(), ex); throw new ServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR); } } + + private boolean shouldAppendLogHead(TaskLogType taskLogType) { + return taskLogType == TaskLogType.LOG; + } + + private String getLogPath(TaskInstance taskInstance, TaskLogType taskLogType) { + return TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(), + TaskLogFileTypeMapping.toTaskLogFileType(taskLogType)); + } } diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java index b5abbc86a6a7..62ae050707f2 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/TaskInstanceServiceImpl.java @@ -46,6 +46,8 @@ import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointRequest; import org.apache.dolphinscheduler.extract.worker.transportor.TaskInstanceTriggerSavepointResponse; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileType; import org.apache.dolphinscheduler.service.process.ProcessService; import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillRequest; import org.apache.dolphinscheduler.task.executor.operations.TaskExecutorKillResponse; @@ -306,13 +308,18 @@ public void deleteByWorkflowInstanceId(Integer workflowInstanceId) { return; } for (TaskInstance taskInstance : needToDeleteTaskInstances) { - if (StringUtils.isNotBlank(taskInstance.getLogPath())) { + if (StringUtils.isNotBlank(taskInstance.getTaskLogsRootPath())) { try { // Remove task instance log failed will not affect the deletion of task instance - Clients + ILogService logService = Clients .withService(ILogService.class) - .withHost(taskInstance.getHost()) - .removeTaskInstanceLog(taskInstance.getLogPath()); + .withHost(taskInstance.getHost()); + logService.removeTaskInstanceLog( + TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(), + TaskLogFileType.TASK_LOG)); + logService.removeTaskInstanceLog( + TaskLogFileProvider.getFilePath(taskInstance.getTaskLogsRootPath(), + TaskLogFileType.TASK_OUTPUT)); } catch (Exception ex) { log.error("Remove task instance log error", ex); } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java index 38ed86900790..f160886ccf0d 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LocalLogClientTest.java @@ -116,7 +116,7 @@ public void testGetWholeLogSuccess() { taskInstance.setId(1); taskInstance.setLogPath("/path/to/log"); - TaskInstanceLogFileDownloadResponse actualResponse = localLogClient.getWholeLog(taskInstance); + TaskInstanceLogFileDownloadResponse actualResponse = localLogClient.getLog(taskInstance, TaskLogType.LOG); assertNotNull(actualResponse); assertArrayEquals("".getBytes(), actualResponse.getLogBytes()); @@ -129,7 +129,8 @@ public void testGetPartLogSuccess() { taskInstance.setHost("127.0.0.1:" + nettyServerPort); taskInstance.setLogPath("/path/to/log"); - TaskInstanceLogPageQueryResponse actualResponse = localLogClient.getPartLog(taskInstance, 0, 10); + TaskInstanceLogPageQueryResponse actualResponse = localLogClient.getLog(taskInstance, 0, 10, + TaskLogType.LOG); assertNotNull(actualResponse); assertEquals("Partial log content", actualResponse.getLogContent()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java index edf85268a723..f9923f48e448 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/executor/logging/LogClientDelegateTest.java @@ -54,7 +54,8 @@ public class LogClientDelegateTest { @Test public void testGetPartLogStringTaskInstanceNullThrowsException() { - assertThrows(IllegalArgumentException.class, () -> logClientDelegate.getPartLogString(null, 0, 10)); + assertThrows(IllegalArgumentException.class, + () -> logClientDelegate.getLogString(null, 0, 10, TaskLogType.LOG)); } @Test @@ -64,12 +65,26 @@ public void testGetPartLogStringNodeExistsLocalSuccess() { taskInstance.setHost("localhost"); taskInstance.setTaskType("SHELL"); when(registryClient.checkNodeExists(eq(taskInstance.getHost()), any())).thenReturn(true); - when(localLogClient.getPartLog(taskInstance, 0, 10)) + when(localLogClient.getLog(taskInstance, 0, 10, TaskLogType.LOG)) .thenReturn(new TaskInstanceLogPageQueryResponse("logContent", LogResponseStatus.SUCCESS, "")); - String result = logClientDelegate.getPartLogString(taskInstance, 0, 10); + String result = logClientDelegate.getLogString(taskInstance, 0, 10, TaskLogType.LOG); assertEquals("logContent", result); } + @Test + public void testGetTaskOutputStringNodeExistsLocalSuccess() { + TaskInstance taskInstance = new TaskInstance(); + taskInstance.setId(1); + taskInstance.setHost("localhost"); + taskInstance.setTaskType("SHELL"); + when(registryClient.checkNodeExists(eq(taskInstance.getHost()), any())).thenReturn(true); + when(localLogClient.getLog(taskInstance, 0, 10, TaskLogType.OUTPUT)) + .thenReturn(new TaskInstanceLogPageQueryResponse("outputContent", LogResponseStatus.SUCCESS, "")); + + String result = logClientDelegate.getLogString(taskInstance, 0, 10, TaskLogType.OUTPUT); + assertEquals("outputContent", result); + } + @Test public void testGetPartLogStringNodeExistsLocalFailure() { TaskInstance taskInstance = new TaskInstance(); @@ -78,11 +93,11 @@ public void testGetPartLogStringNodeExistsLocalFailure() { taskInstance.setTaskType("SHELL"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.WORKER)).thenReturn(true); - when(localLogClient.getPartLog(taskInstance, 0, 10)).thenReturn( + when(localLogClient.getLog(taskInstance, 0, 10, TaskLogType.LOG)).thenReturn( new TaskInstanceLogPageQueryResponse(null, LogResponseStatus.ERROR, "error")); - when(remoteLogClient.getPartLog(taskInstance, 0, 10)).thenReturn("remoteLogContent"); + when(remoteLogClient.getLogString(taskInstance, 0, 10, TaskLogType.LOG)).thenReturn("remoteLogContent"); - String result = logClientDelegate.getPartLogString(taskInstance, 0, 10); + String result = logClientDelegate.getLogString(taskInstance, 0, 10, TaskLogType.LOG); assertEquals("remoteLogContent", result); } @@ -94,15 +109,15 @@ public void testGetPartLogStringNodeNotExists() { taskInstance.setTaskType("SHELL"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.WORKER)).thenReturn(false); - when(remoteLogClient.getPartLog(taskInstance, 0, 10)).thenReturn("remoteLogContent"); + when(remoteLogClient.getLogString(taskInstance, 0, 10, TaskLogType.LOG)).thenReturn("remoteLogContent"); - String result = logClientDelegate.getPartLogString(taskInstance, 0, 10); + String result = logClientDelegate.getLogString(taskInstance, 0, 10, TaskLogType.LOG); assertEquals("remoteLogContent", result); } @Test public void testGetWholeLogBytesTaskInstanceNullThrowsException() { - assertThrows(IllegalArgumentException.class, () -> logClientDelegate.getWholeLogBytes(null)); + assertThrows(IllegalArgumentException.class, () -> logClientDelegate.getLogBytes(null, TaskLogType.LOG)); } @Test @@ -113,10 +128,10 @@ public void testGetWholeLogBytesNodeExistsLocalSuccess() { taskInstance.setTaskType("SWITCH"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.MASTER)).thenReturn(true); - when(localLogClient.getWholeLog(taskInstance)).thenReturn( + when(localLogClient.getLog(taskInstance, TaskLogType.LOG)).thenReturn( new TaskInstanceLogFileDownloadResponse("logBytes".getBytes(), LogResponseStatus.SUCCESS, null)); - byte[] result = logClientDelegate.getWholeLogBytes(taskInstance); + byte[] result = logClientDelegate.getLogBytes(taskInstance, TaskLogType.LOG); assertArrayEquals("logBytes".getBytes(), result); } @@ -128,11 +143,11 @@ public void testGetWholeLogBytesNodeExistsLocalFailure() { taskInstance.setTaskType("SWITCH"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.MASTER)).thenReturn(true); - when(localLogClient.getWholeLog(taskInstance)).thenReturn( + when(localLogClient.getLog(taskInstance, TaskLogType.LOG)).thenReturn( new TaskInstanceLogFileDownloadResponse(null, LogResponseStatus.ERROR, "error")); - when(remoteLogClient.getWholeLog(taskInstance)).thenReturn("remoteLogBytes".getBytes()); + when(remoteLogClient.getLogBytes(taskInstance, TaskLogType.LOG)).thenReturn("remoteLogBytes".getBytes()); - byte[] result = logClientDelegate.getWholeLogBytes(taskInstance); + byte[] result = logClientDelegate.getLogBytes(taskInstance, TaskLogType.LOG); assertArrayEquals("remoteLogBytes".getBytes(), result); } @@ -144,9 +159,9 @@ public void testGetWholeLogBytesNodeNotExists() { taskInstance.setTaskType("SWITCH"); when(registryClient.checkNodeExists("localhost", RegistryNodeType.MASTER)).thenReturn(false); - when(remoteLogClient.getWholeLog(taskInstance)).thenReturn("remoteLogBytes".getBytes()); + when(remoteLogClient.getLogBytes(taskInstance, TaskLogType.LOG)).thenReturn("remoteLogBytes".getBytes()); - byte[] result = logClientDelegate.getWholeLogBytes(taskInstance); + byte[] result = logClientDelegate.getLogBytes(taskInstance, TaskLogType.LOG); assertArrayEquals("remoteLogBytes".getBytes(), result); } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java index 8788696cbf9c..064cfed85ff6 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/LoggerServiceTest.java @@ -17,11 +17,9 @@ package org.apache.dolphinscheduler.api.service; -import static org.apache.dolphinscheduler.api.AssertionsHelper.assertDoesNotThrow; import static org.apache.dolphinscheduler.api.AssertionsHelper.assertThrowsServiceException; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.DOWNLOAD_LOG; import static org.apache.dolphinscheduler.api.constants.ApiFuncIdentificationConstant.VIEW_LOG; -import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.doNothing; @@ -32,21 +30,18 @@ import org.apache.dolphinscheduler.api.enums.Status; import org.apache.dolphinscheduler.api.exceptions.ServiceException; import org.apache.dolphinscheduler.api.executor.logging.LogClientDelegate; +import org.apache.dolphinscheduler.api.executor.logging.TaskLogType; import org.apache.dolphinscheduler.api.service.impl.LoggerServiceImpl; import org.apache.dolphinscheduler.api.utils.Result; import org.apache.dolphinscheduler.common.constants.Constants; -import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.dao.entity.Project; -import org.apache.dolphinscheduler.dao.entity.TaskDefinition; +import org.apache.dolphinscheduler.dao.entity.ResponseTaskLog; import org.apache.dolphinscheduler.dao.entity.TaskInstance; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.mapper.ProjectMapper; -import org.apache.dolphinscheduler.dao.mapper.TaskDefinitionMapper; import org.apache.dolphinscheduler.dao.repository.TaskInstanceDao; -import java.text.MessageFormat; -import java.util.HashMap; -import java.util.Map; +import java.nio.charset.StandardCharsets; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -77,9 +72,6 @@ public class LoggerServiceTest { @Mock private ProjectService projectService; - @Mock - private TaskDefinitionMapper taskDefinitionMapper; - @Mock private LogClientDelegate logClientDelegate; @@ -93,13 +85,13 @@ public void testQueryLog() { TaskInstance taskInstance = new TaskInstance(); taskInstance.setExecutorId(loginUser.getId() + 1); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - Result result = loggerService.queryLog(loginUser, 2, 1, 1); + Result result = loggerService.queryLog(loginUser, 2, 1, 1, TaskLogType.LOG); // TASK_INSTANCE_NOT_FOUND Assertions.assertEquals(Status.TASK_INSTANCE_NOT_FOUND.getCode(), result.getCode().intValue()); try { // HOST NOT FOUND OR ILLEGAL - result = loggerService.queryLog(loginUser, 1, 1, 1); + result = loggerService.queryLog(loginUser, 1, 1, 1, TaskLogType.LOG); } catch (RuntimeException e) { Assertions.assertTrue(true); logger.error("testQueryDataSourceList error {}", e.getMessage()); @@ -112,27 +104,27 @@ public void testQueryLog() { doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.queryLog(loginUser, 1, 1, 1, TaskLogType.LOG)); // USER_NO_OPERATION_PERM doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.queryLog(loginUser, 1, 1, 1, TaskLogType.LOG)); // SUCCESS doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - result = loggerService.queryLog(loginUser, 1, 1, 1); + result = loggerService.queryLog(loginUser, 1, 1, 1, TaskLogType.LOG); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); - result = loggerService.queryLog(loginUser, 1, 0, 1); + result = loggerService.queryLog(loginUser, 1, 0, 1, TaskLogType.LOG); Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); taskInstance.setLogPath(""); assertThrowsServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.queryLog(loginUser, 1, 1, 1, TaskLogType.LOG)); } @Test @@ -140,6 +132,8 @@ public void testGetLogBytes() { User loginUser = new User(); loginUser.setId(1); + Project project = new Project(); + project.setCode(1L); TaskInstance taskInstance = new TaskInstance(); taskInstance.setId(1); taskInstance.setExecutorId(loginUser.getId() + 1); @@ -147,7 +141,7 @@ public void testGetLogBytes() { // task instance is null try { - loggerService.getLogBytes(loginUser, 2); + loggerService.getLogBytes(loginUser, 2, TaskLogType.LOG); } catch (ServiceException e) { Assertions.assertEquals(new ServiceException("task instance is null or host is null").getMessage(), e.getMessage()); @@ -156,7 +150,7 @@ public void testGetLogBytes() { // task instance host is null try { - loggerService.getLogBytes(loginUser, 1); + loggerService.getLogBytes(loginUser, 1, TaskLogType.LOG); } catch (ServiceException e) { Assertions.assertEquals(new ServiceException("task instance is null or host is null").getMessage(), e.getMessage()); @@ -166,138 +160,56 @@ public void testGetLogBytes() { // PROJECT_NOT_EXIST taskInstance.setHost("127.0.0.1:" + nettyServerPort); taskInstance.setLogPath("/temp/log"); + when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project); doThrow(new ServiceException(Status.PROJECT_NOT_EXIST)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + .checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); AssertionsHelper.assertThrowsServiceException(Status.PROJECT_NOT_EXIST, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.getLogBytes(loginUser, 1, TaskLogType.LOG)); // USER_NO_OPERATION_PERM doThrow(new ServiceException(Status.USER_NO_OPERATION_PERM)).when(projectService) - .checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), VIEW_LOG); + .checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); AssertionsHelper.assertThrowsServiceException(Status.USER_NO_OPERATION_PERM, - () -> loggerService.queryLog(loginUser, 1, 1, 1)); + () -> loggerService.getLogBytes(loginUser, 1, TaskLogType.LOG)); // SUCCESS - when(logClientDelegate.getWholeLogBytes(any())).thenReturn(new byte[0]); - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), - DOWNLOAD_LOG); - when(logClientDelegate.getWholeLogBytes(any())).thenReturn(new byte[0]); - byte[] logBytes = loggerService.getLogBytes(loginUser, 1); - Assertions.assertEquals(42, logBytes.length - String.valueOf(nettyServerPort).length()); + when(logClientDelegate.getLogBytes(any(), any())).thenReturn(new byte[0]); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); + when(logClientDelegate.getLogBytes(any(), any())).thenReturn(new byte[0]); + byte[] logBytes = loggerService.getLogBytes(loginUser, 1, TaskLogType.LOG); + String expectedHead = String.format("[LOG-PATH]: %s, [HOST]: %s%s", + taskInstance.getLogPath(), taskInstance.getHost(), Constants.SYSTEM_LINE_SEPARATOR); + Assertions.assertArrayEquals(expectedHead.getBytes(StandardCharsets.UTF_8), logBytes); } @Test - public void testQueryLogInSpecifiedProject() { - long projectCode = 1L; + public void testQueryTaskOutputAndGetOutputBytes() { User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); + loginUser.setId(1); + Project project = new Project(); + project.setCode(1L); TaskInstance taskInstance = new TaskInstance(); - when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - when(taskInstanceDao.queryById(10)).thenReturn(null); - - assertThrowsServiceException(Status.TASK_INSTANCE_NOT_FOUND, - () -> loggerService.queryLog(loginUser, projectCode, 10, 1, 1)); - - TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(projectCode); - taskDefinition.setCode(1L); - - // SUCCESS - taskInstance.setTaskCode(1L); taskInstance.setId(1); + taskInstance.setExecutorId(loginUser.getId() + 1); taskInstance.setHost("127.0.0.1:" + nettyServerPort); - taskInstance.setLogPath("/temp/log"); - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, projectCode, VIEW_LOG); + taskInstance.setTaskOutputLogPath("/temp/output.log"); when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); - assertDoesNotThrow(() -> loggerService.queryLog(loginUser, projectCode, 1, 1, 1)); - - taskDefinition.setProjectCode(10); - assertThrowsServiceException(Status.TASK_INSTANCE_NOT_FOUND, - () -> loggerService.queryLog(loginUser, projectCode, 1, 1, 1)); - - taskDefinition.setProjectCode(1); - taskInstance.setId(10); - when(taskInstanceDao.queryById(10)).thenReturn(taskInstance); - - when(logClientDelegate.getPartLogString(any(), anyInt(), anyInt())).thenReturn("log content"); - - String result = loggerService.queryLog(loginUser, projectCode, 10, 1, 1); - assertEquals("log content", result); - taskInstance.setId(100); - when(taskInstanceDao.queryById(100)).thenReturn(taskInstance); - doThrow(new ServiceException("query log error")).when(logClientDelegate).getPartLogString(any(), anyInt(), - anyInt()); - assertThrowsServiceException(Status.QUERY_TASK_INSTANCE_LOG_ERROR, - () -> loggerService.queryLog(loginUser, projectCode, 10, 1, 1)); - } - - @Test - public void testGetLogBytesInSpecifiedProject() { - long projectCode = 1L; - when(projectMapper.queryByCode(projectCode)).thenReturn(getProject(projectCode)); - - User loginUser = new User(); - loginUser.setId(-1); - loginUser.setUserType(UserType.GENERAL_USER); - Map result = new HashMap<>(); - putMsg(result, Status.SUCCESS, projectCode); - TaskInstance taskInstance = new TaskInstance(); - TaskDefinition taskDefinition = new TaskDefinition(); - taskDefinition.setProjectCode(projectCode); - taskDefinition.setCode(1L); - // SUCCESS - taskInstance.setTaskCode(1L); - taskInstance.setId(1); - taskInstance.setHost("127.0.0.1:" + nettyServerPort); - taskInstance.setLogPath("/temp/log"); - doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, projectCode, DOWNLOAD_LOG); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, taskInstance.getProjectCode(), + VIEW_LOG); + when(logClientDelegate.getLogString(any(), anyInt(), anyInt(), any())).thenReturn("output content"); - when(taskInstanceDao.queryById(1)).thenReturn(null); - assertThrowsServiceException( - Status.INTERNAL_SERVER_ERROR_ARGS, () -> loggerService.getLogBytes(loginUser, projectCode, 1)); + Result result = loggerService.queryLog(loginUser, 1, 1, 1, TaskLogType.OUTPUT); + Assertions.assertEquals(Status.SUCCESS.getCode(), result.getCode().intValue()); + Assertions.assertEquals("output content", result.getData().getMessage()); - when(taskInstanceDao.queryById(1)).thenReturn(taskInstance); - when(taskDefinitionMapper.queryByCode(taskInstance.getTaskCode())).thenReturn(taskDefinition); - when(logClientDelegate.getWholeLogBytes(any())).thenReturn(new byte[0]); - assertDoesNotThrow(() -> loggerService.getLogBytes(loginUser, projectCode, 1)); - - taskDefinition.setProjectCode(2L); - assertThrowsServiceException(Status.INTERNAL_SERVER_ERROR_ARGS, - () -> loggerService.getLogBytes(loginUser, projectCode, 1)); - - taskDefinition.setProjectCode(1L); - taskInstance.setId(100); - when(taskInstanceDao.queryById(100)).thenReturn(taskInstance); - doThrow(new ServiceException("download error")).when(logClientDelegate).getWholeLogBytes(any()); - assertThrowsServiceException(Status.DOWNLOAD_TASK_INSTANCE_LOG_FILE_ERROR, - () -> loggerService.getLogBytes(loginUser, projectCode, 100)); - } + when(projectMapper.queryProjectByTaskInstanceId(1)).thenReturn(project); + doNothing().when(projectService).checkProjectAndAuthThrowException(loginUser, project, DOWNLOAD_LOG); + when(logClientDelegate.getLogBytes(any(), any())).thenReturn(new byte[0]); - /** - * get mock Project - * - * @param projectCode projectCode - * @return Project - */ - private Project getProject(long projectCode) { - Project project = new Project(); - project.setCode(projectCode); - project.setId(1); - project.setName("test"); - project.setUserId(1); - return project; + byte[] outputBytes = loggerService.getLogBytes(loginUser, 1, TaskLogType.OUTPUT); + Assertions.assertEquals(0, outputBytes.length); } - private void putMsg(Map result, Status status, Object... statusParams) { - result.put(Constants.STATUS, status); - if (statusParams != null && statusParams.length > 0) { - result.put(Constants.MSG, MessageFormat.format(status.getMsg(), statusParams)); - } else { - result.put(Constants.MSG, status.getMsg()); - } - } } diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java index 790a23955b28..5a051aa886d1 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkflowInstanceServiceTest.java @@ -478,7 +478,6 @@ public void testQueryTaskListByWorkflowInstanceId() throws IOException { .thenReturn(Optional.of(workflowInstance)); when(taskInstanceDao.queryValidTaskListByWorkflowInstanceId(workflowInstance.getId())) .thenReturn(taskInstanceList); - when(loggerService.queryLog(loginUser, taskInstance.getId(), 0, 4098)).thenReturn(res); when(taskInstanceContextDao.batchQueryByTaskInstanceIdsAndContextType(taskInstanceIdList, ContextType.DEPENDENT_RESULT_CONTEXT)) .thenReturn(Lists.asList(taskInstanceContext, new TaskInstanceContext[0])); diff --git a/dolphinscheduler-bom/pom.xml b/dolphinscheduler-bom/pom.xml index 22f5e23a3be2..f1de0e6f4ba4 100644 --- a/dolphinscheduler-bom/pom.xml +++ b/dolphinscheduler-bom/pom.xml @@ -1015,6 +1015,12 @@ com.dolphindb jdbc ${dolphindb-jdbc.version} + + + org.slf4j + slf4j-simple + + diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java index f94e88b59513..5270903d24c9 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java @@ -21,14 +21,19 @@ import org.apache.dolphinscheduler.common.enums.Priority; import org.apache.dolphinscheduler.common.enums.TaskExecuteType; import org.apache.dolphinscheduler.plugin.task.api.enums.TaskExecutionStatus; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileType; import java.io.Serializable; import java.util.Date; +import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableField; @@ -79,7 +84,8 @@ public class TaskInstance implements Serializable { private String executePath; - private String logPath; + @TableField("task_logs_root_path") + private String taskLogsRootPath; private int retryTimes; @@ -138,4 +144,48 @@ public class TaskInstance implements Serializable { private TaskExecuteType taskExecuteType; + @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) + @Builder.Default + @TableField(exist = false) + private transient String legacyLogPath = null; + + @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) + @Builder.Default + @TableField(exist = false) + private transient String legacyTaskOutputLogPath = null; + + public String getLogPath() { + return TaskLogFileProvider.getFilePath(taskLogsRootPath, TaskLogFileType.TASK_LOG); + } + + public void setLogPath(String logPath) { + legacyLogPath = logPath; + if (logPath == null) { + if (legacyTaskOutputLogPath == null) { + taskLogsRootPath = null; + } + return; + } + taskLogsRootPath = TaskLogFileProvider.getTaskLogsRootPathFromFilePath(logPath); + } + + public String getTaskOutputLogPath() { + return TaskLogFileProvider.getFilePath(taskLogsRootPath, TaskLogFileType.TASK_OUTPUT); + } + + public void setTaskOutputLogPath(String taskOutputLogPath) { + legacyTaskOutputLogPath = taskOutputLogPath; + if (taskOutputLogPath == null) { + if (legacyLogPath == null) { + taskLogsRootPath = null; + } + return; + } + if (taskLogsRootPath == null) { + taskLogsRootPath = TaskLogFileProvider.getTaskLogsRootPathFromFilePath(taskOutputLogPath); + } + } + } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml index cc0c10c4b719..633525b740ee 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/TaskInstanceMapper.xml @@ -20,13 +20,13 @@ id, name, task_type, workflow_instance_id, workflow_instance_name, project_code, task_code, task_definition_version, state, submit_time, - start_time, end_time, host, execute_path, log_path, alert_flag, retry_times, pid, app_link, + start_time, end_time, host, execute_path, task_logs_root_path, alert_flag, retry_times, pid, app_link, flag, retry_interval, max_retry_times, task_instance_priority, worker_group,environment_code , executor_id, executor_name, first_submit_time, delay_time, task_params, var_pool, dry_run, task_group_id, cpu_quota, memory_max, task_execute_type ${alias}.id, ${alias}.name, ${alias}.task_type, ${alias}.task_code, ${alias}.task_definition_version, ${alias}.workflow_instance_id, ${alias}.state, ${alias}.submit_time, - ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.log_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, + ${alias}.start_time, ${alias}.end_time, ${alias}.host, ${alias}.execute_path, ${alias}.task_logs_root_path, ${alias}.alert_flag, ${alias}.retry_times, ${alias}.pid, ${alias}.app_link, ${alias}.flag, ${alias}.retry_interval, ${alias}.max_retry_times, ${alias}.task_instance_priority, ${alias}.worker_group,${alias}.environment_code , ${alias}.executor_id, ${alias}.first_submit_time, ${alias}.delay_time, ${alias}.task_params, ${alias}.var_pool, ${alias}.dry_run, ${alias}.task_group_id, ${alias}.task_execute_type diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 996fce0732b9..57b7daaf4b89 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -914,7 +914,7 @@ CREATE TABLE t_ds_task_instance end_time datetime DEFAULT NULL, host varchar(135) DEFAULT NULL, execute_path varchar(200) DEFAULT NULL, - log_path longtext DEFAULT NULL, + task_logs_root_path varchar(1024) DEFAULT NULL, alert_flag tinyint(4) DEFAULT NULL, retry_times int(4) DEFAULT '0', pid int(4) DEFAULT NULL, diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 7067cd7429aa..9b5eff76db61 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -913,7 +913,7 @@ CREATE TABLE `t_ds_task_instance` ( `end_time` datetime DEFAULT NULL COMMENT 'task end time', `host` varchar(135) DEFAULT NULL COMMENT 'host of task running on', `execute_path` varchar(200) DEFAULT NULL COMMENT 'task execute path in the host', - `log_path` longtext DEFAULT NULL COMMENT 'task log path', + `task_logs_root_path` varchar(1024) DEFAULT NULL COMMENT 'task output directory path', `alert_flag` tinyint(4) DEFAULT NULL COMMENT 'whether alert', `retry_times` int(4) DEFAULT '0' COMMENT 'task retry times', `pid` int(4) DEFAULT NULL COMMENT 'pid of task', diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 36261d7a8c2d..3fc715832b36 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -833,7 +833,7 @@ CREATE TABLE t_ds_task_instance ( end_time timestamp DEFAULT NULL , host varchar(135) DEFAULT NULL , execute_path varchar(200) DEFAULT NULL , - log_path text DEFAULT NULL , + task_logs_root_path varchar(1024) DEFAULT NULL , alert_flag int DEFAULT NULL , retry_times int DEFAULT '0' , pid int DEFAULT NULL , diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_ddl.sql index 30f6f2be6947..959c8eaa18ae 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_ddl.sql @@ -16,4 +16,22 @@ */ ALTER TABLE `t_ds_serial_command` -MODIFY COLUMN `workflow_definition_code` BIGINT(20) NOT NULL COMMENT 'workflow definition code'; \ No newline at end of file +MODIFY COLUMN `workflow_definition_code` BIGINT(20) NOT NULL COMMENT 'workflow definition code'; + +drop PROCEDURE if EXISTS add_task_output_log_path_to_t_ds_task_instance; +delimiter d// +CREATE PROCEDURE add_task_output_log_path_to_t_ds_task_instance() +BEGIN + IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='task_output_log_path') + THEN + ALTER TABLE `t_ds_task_instance` + ADD COLUMN `task_output_log_path` longtext DEFAULT NULL COMMENT 'task output log path' AFTER `log_path`; + END IF; +END; +d// +delimiter ; +call add_task_output_log_path_to_t_ds_task_instance(); +drop PROCEDURE if EXISTS add_task_output_log_path_to_t_ds_task_instance; \ No newline at end of file diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_ddl.sql index d25618977331..957ec5183ac3 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_ddl.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_ddl.sql @@ -18,3 +18,6 @@ CREATE SEQUENCE IF NOT EXISTS t_ds_task_instance_context_id_seq; ALTER TABLE t_ds_task_instance_context ALTER COLUMN id SET DEFAULT nextval('t_ds_task_instance_context_id_seq'::regclass); + +ALTER TABLE t_ds_task_instance +ADD COLUMN IF NOT EXISTS task_output_log_path text DEFAULT NULL; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..84ca5b7ca557 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +drop PROCEDURE if EXISTS add_task_logs_root_path_to_t_ds_task_instance; +delimiter d// +CREATE PROCEDURE add_task_logs_root_path_to_t_ds_task_instance() +BEGIN + IF NOT EXISTS (SELECT 1 FROM INFORMATION_SCHEMA.COLUMNS + WHERE TABLE_NAME='t_ds_task_instance' + AND TABLE_SCHEMA=(SELECT DATABASE()) + AND COLUMN_NAME='task_logs_root_path') + THEN + ALTER TABLE `t_ds_task_instance` + ADD COLUMN `task_logs_root_path` varchar(1024) DEFAULT NULL COMMENT 'task logs root path' AFTER `execute_path`; + END IF; +END; +d// +delimiter ; +call add_task_logs_root_path_to_t_ds_task_instance(); +drop PROCEDURE if EXISTS add_task_logs_root_path_to_t_ds_task_instance; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..499643662339 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.2_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +ALTER TABLE t_ds_task_instance +ADD COLUMN IF NOT EXISTS task_logs_root_path varchar(1024) DEFAULT NULL; diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImplTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImplTest.java index 226409b19760..824291ae0314 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImplTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/repository/impl/TaskInstanceDaoImplTest.java @@ -95,7 +95,7 @@ private void insertTaskInstance(long taskCode, TaskExecutionStatus state, Date e .endTime(endTime) .host("192.168.1.50:5678") .executePath("/tmp/dolphinscheduler/exec/" + WORKFLOW_INSTANCE_ID + "/" + taskCode) - .logPath("/tmp/dolphinscheduler/logs/" + WORKFLOW_INSTANCE_ID + "/" + taskCode + ".log") + .taskLogsRootPath("/tmp/dolphinscheduler/logs/" + WORKFLOW_INSTANCE_ID + "/" + taskCode) .build(); taskInstanceDao.upsertTaskInstance(ti); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java index 355fdf76bca0..d38762cceb6c 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/executor/LogicTaskExecutorFactory.java @@ -18,7 +18,7 @@ package org.apache.dolphinscheduler.server.master.engine.executor; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; import org.apache.dolphinscheduler.server.master.engine.executor.plugin.LogicTaskPluginFactoryBuilder; import org.apache.dolphinscheduler.task.executor.ITaskExecutor; import org.apache.dolphinscheduler.task.executor.ITaskExecutorFactory; @@ -39,7 +39,7 @@ public LogicTaskExecutorFactory(final LogicTaskPluginFactoryBuilder logicTaskPlu @Override public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecutionContext) { - assemblyTaskLogPath(taskExecutionContext); + assemblyTaskLogsRootPath(taskExecutionContext); final LogicTaskExecutorBuilder logicTaskExecutorBuilder = LogicTaskExecutorBuilder.builder() .taskExecutionContext(taskExecutionContext) @@ -48,8 +48,8 @@ public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecution return new LogicTaskExecutor(logicTaskExecutorBuilder); } - private void assemblyTaskLogPath(final TaskExecutionContext taskExecutionContext) { - taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); + private void assemblyTaskLogsRootPath(final TaskExecutionContext taskExecutionContext) { + taskExecutionContext.setTaskLogsRootPath(TaskLogFileProvider.getTaskLogsRootPath(taskExecutionContext)); } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/AbstractTaskInstanceFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/AbstractTaskInstanceFactory.java index 4a399bf3fbbc..e30c5edd7a90 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/AbstractTaskInstanceFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/AbstractTaskInstanceFactory.java @@ -50,7 +50,7 @@ protected TaskInstance cloneTaskInstance(TaskInstance originTaskInstance) { result.setEndTime(originTaskInstance.getEndTime()); result.setHost(originTaskInstance.getHost()); result.setExecutePath(originTaskInstance.getExecutePath()); - result.setLogPath(originTaskInstance.getLogPath()); + result.setTaskLogsRootPath(originTaskInstance.getTaskLogsRootPath()); result.setRetryTimes(originTaskInstance.getRetryTimes()); result.setAlertFlag(originTaskInstance.getAlertFlag()); result.setPid(originTaskInstance.getPid()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailedRecoverTaskInstanceFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailedRecoverTaskInstanceFactory.java index 1e0b4107a4e6..23ca3726c0db 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailedRecoverTaskInstanceFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailedRecoverTaskInstanceFactory.java @@ -51,7 +51,7 @@ public TaskInstance createTaskInstance(FailedRecoverTaskInstanceBuilder builder) taskInstance.setHost(null); taskInstance.setVarPool(null); taskInstance.setSubmitTime(new Date()); - taskInstance.setLogPath(null); + taskInstance.setTaskLogsRootPath(null); taskInstance.setExecutePath(null); taskInstanceDao.insert(taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailoverTaskInstanceFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailoverTaskInstanceFactory.java index 56ce22ada48b..7aa47d78ddec 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailoverTaskInstanceFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FailoverTaskInstanceFactory.java @@ -54,7 +54,7 @@ public TaskInstance createTaskInstance(FailoverTaskInstanceBuilder builder) { taskInstance.setHost(null); taskInstance.setVarPool(null); taskInstance.setSubmitTime(new Date()); - taskInstance.setLogPath(null); + taskInstance.setTaskLogsRootPath(null); taskInstance.setExecutePath(null); taskInstanceDao.insert(taskInstance); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FirstRunTaskInstanceFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FirstRunTaskInstanceFactory.java index 22caadb79e4d..c214ed19661a 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FirstRunTaskInstanceFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/FirstRunTaskInstanceFactory.java @@ -59,7 +59,7 @@ public TaskInstance createTaskInstance(FirstRunTaskInstanceBuilder builder) { taskInstance.setEndTime(null); taskInstance.setHost(null); taskInstance.setExecutePath(null); - taskInstance.setLogPath(null); + taskInstance.setTaskLogsRootPath(null); taskInstance.setRetryTimes(0); taskInstance.setAlertFlag(Flag.NO); taskInstance.setFlag(Flag.YES); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/RetryTaskInstanceFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/RetryTaskInstanceFactory.java index 0f97cfa696c3..5bed00b229ee 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/RetryTaskInstanceFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/RetryTaskInstanceFactory.java @@ -50,7 +50,7 @@ public TaskInstance createTaskInstance(RetryTaskInstanceBuilder builder) { taskInstance.setPid(0); taskInstance.setHost(null); taskInstance.setExecutePath(null); - taskInstance.setLogPath(null); + taskInstance.setTaskLogsRootPath(null); taskInstance.setStartTime(null); taskInstance.setEndTime(null); taskInstance.setSubmitTime(new Date()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/TaskExecutionContextBuilder.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/TaskExecutionContextBuilder.java index 69765d690e18..344b90d839ef 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/TaskExecutionContextBuilder.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/execution/TaskExecutionContextBuilder.java @@ -60,7 +60,7 @@ public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(final TaskInstan taskExecutionContext.setFirstSubmitTime(DateUtils.dateToTimeStamp(taskInstance.getFirstSubmitTime())); taskExecutionContext.setStartTime(DateUtils.dateToTimeStamp(taskInstance.getStartTime())); taskExecutionContext.setTaskType(taskInstance.getTaskType()); - taskExecutionContext.setLogPath(taskInstance.getLogPath()); + taskExecutionContext.setTaskLogsRootPath(taskInstance.getTaskLogsRootPath()); taskExecutionContext.setWorkerGroup(taskInstance.getWorkerGroup()); taskExecutionContext.setHost(taskInstance.getHost()); taskExecutionContext.setDryRun(taskInstance.getDryRun()); diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java index 403c695785c0..8b5a7082b79f 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/lifecycle/event/TaskRunningLifecycleEvent.java @@ -35,7 +35,7 @@ public class TaskRunningLifecycleEvent extends AbstractTaskLifecycleEvent { private final ITaskExecution taskExecution; - private final String logPath; + private final String taskLogsRootPath; private final Date startTime; @@ -48,7 +48,7 @@ public ILifecycleEventType getEventType() { public String toString() { return "TaskRunningLifecycleEvent{" + "task=" + taskExecution.getName() + - ", logPath='" + logPath + '\'' + + ", taskLogsRootPath='" + taskLogsRootPath + '\'' + ", startTime=" + startTime + '}'; } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java index c7c52ab24225..c48726c6473e 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/task/statemachine/AbstractTaskStateAction.java @@ -159,7 +159,7 @@ protected void persistentTaskInstanceStartedEventToDB(final ITaskExecution taskE final TaskInstance taskInstance = taskExecution.getTaskInstance(); taskInstance.setState(TaskExecutionStatus.RUNNING_EXECUTION); taskInstance.setStartTime(taskRunningEvent.getStartTime()); - taskInstance.setLogPath(taskRunningEvent.getLogPath()); + taskInstance.setTaskLogsRootPath(taskRunningEvent.getTaskLogsRootPath()); taskInstanceDao.updateById(taskInstance); } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java index 278f5160dc80..e4c77b86f096 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/rpc/TaskExecutorEventListenerImpl.java @@ -78,7 +78,7 @@ public void onTaskExecutorRunning(final TaskExecutorStartedLifecycleEvent taskEx final TaskRunningLifecycleEvent taskRunningEvent = TaskRunningLifecycleEvent.builder() .taskExecution(taskExecution) .startTime(new Date(taskExecutorStartedLifecycleEvent.getStartTime())) - .logPath(taskExecutorStartedLifecycleEvent.getLogPath()) + .taskLogsRootPath(taskExecutorStartedLifecycleEvent.getTaskLogsRootPath()) .build(); taskExecution.getWorkflowEventBus().publish(taskRunningEvent); diff --git a/dolphinscheduler-master/src/main/resources/logback-spring.xml b/dolphinscheduler-master/src/main/resources/logback-spring.xml index a4c3a4f22dfe..2b995f757e9f 100644 --- a/dolphinscheduler-master/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-master/src/main/resources/logback-spring.xml @@ -33,7 +33,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -51,6 +53,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-master.log @@ -77,4 +98,8 @@ + + + + diff --git a/dolphinscheduler-master/src/test/resources/logback.xml b/dolphinscheduler-master/src/test/resources/logback.xml index 1490f5b55684..70dee6aa3c80 100644 --- a/dolphinscheduler-master/src/test/resources/logback.xml +++ b/dolphinscheduler-master/src/test/resources/logback.xml @@ -33,7 +33,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -51,6 +53,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-master.log @@ -73,4 +94,8 @@ + + + + diff --git a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml index 1380995f9032..6e4488970bc5 100644 --- a/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-standalone-server/src/main/resources/logback-spring.xml @@ -54,7 +54,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -72,6 +74,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + @@ -83,4 +104,8 @@ + + + + diff --git a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java index 94126bb39d25..3b55d27a50d2 100644 --- a/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java +++ b/dolphinscheduler-task-executor/src/main/java/org/apache/dolphinscheduler/task/executor/events/TaskExecutorStartedLifecycleEvent.java @@ -41,7 +41,7 @@ public class TaskExecutorStartedLifecycleEvent extends AbstractTaskExecutorLifec private long startTime; - private String logPath; + private String taskLogsRootPath; private String executePath; @@ -55,7 +55,7 @@ public static TaskExecutorStartedLifecycleEvent of(final ITaskExecutor taskExecu .workflowInstanceId(taskExecutionContext.getWorkflowInstanceId()) .taskInstanceHost(taskExecutionContext.getHost()) .startTime(taskExecutor.getTaskExecutionContext().getStartTime()) - .logPath(taskExecutionContext.getLogPath()) + .taskLogsRootPath(taskExecutionContext.getTaskLogsRootPath()) .executePath(taskExecutionContext.getExecutePath()) .type(TaskExecutorLifecycleEventType.RUNNING) .build(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index 7abd3a98c509..7f8a98fac570 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ShellUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskOutputLogWriter; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -193,7 +194,10 @@ private Optional> collectPodLogIfNeeded() { final CompletableFuture collectPodLogFuture = CompletableFuture.runAsync(() -> { // wait for launching (driver) pod ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L); + LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); try ( + LogUtils.MDCAutoClosableContext ignored = + LogUtils.withTaskOutputLogPathMDC(taskRequest.getTaskOutputLogPath()); LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId(), "")) { if (watcher == null) { @@ -202,13 +206,15 @@ private Optional> collectPodLogIfNeeded() { String line; try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { while ((line = reader.readLine()) != null) { - log.info("[K8S-pod-log-{}]: {}", taskRequest.getTaskName(), line); + TaskOutputLogWriter.writeTaskOutput(taskRequest, line); } } } } catch (Exception e) { log.error("Collect pod log error", e); throw new RuntimeException(e); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); } }, collectPodLogExecutorService); @@ -224,15 +230,18 @@ private CompletableFuture collectProcessLog(Process process) { TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); - String line; - while ((line = inReader.readLine()) != null) { - log.info(" -> {}", line); - taskOutputParameterParser.appendParseLog(line); + try ( + LogUtils.MDCAutoClosableContext ignored = + LogUtils.withTaskOutputLogPathMDC(taskRequest.getTaskOutputLogPath())) { + for (String line : (Iterable) inReader.lines()::iterator) { + TaskOutputLogWriter.writeTaskOutput(taskRequest, line); + taskOutputParameterParser.appendParseLog(line); + } + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); } } catch (Exception e) { log.error("Parse var pool error", e); - } finally { - LogUtils.removeTaskInstanceLogFullPathMDC(); } taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); }, collectProcessLogService); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java index 85d6f682196b..6acc9371b67e 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/TaskExecutionContext.java @@ -21,15 +21,20 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.resource.ResourceParametersHelper; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileType; import java.io.Serializable; import java.util.List; import java.util.Map; +import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.Getter; import lombok.NoArgsConstructor; +import lombok.Setter; import com.fasterxml.jackson.annotation.JsonInclude; @@ -61,7 +66,7 @@ public class TaskExecutionContext implements Serializable { private String executePath; - private String logPath; + private String taskLogsRootPath; private String appInfoPath; @@ -130,7 +135,49 @@ public class TaskExecutionContext implements Serializable { private final long firstDispatchTime = System.currentTimeMillis(); + @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) + @Builder.Default + private transient String legacyLogPath = null; + + @Getter(AccessLevel.NONE) + @Setter(AccessLevel.NONE) + @Builder.Default + private transient String legacyTaskOutputLogPath = null; + public int increaseDispatchFailTimes() { return ++dispatchFailTimes; } + + public String getLogPath() { + return TaskLogFileProvider.getFilePath(taskLogsRootPath, TaskLogFileType.TASK_LOG); + } + + public void setLogPath(String logPath) { + legacyLogPath = logPath; + if (logPath == null) { + if (legacyTaskOutputLogPath == null) { + taskLogsRootPath = null; + } + return; + } + taskLogsRootPath = TaskLogFileProvider.getTaskLogsRootPathFromFilePath(logPath); + } + + public String getTaskOutputLogPath() { + return TaskLogFileProvider.getFilePath(taskLogsRootPath, TaskLogFileType.TASK_OUTPUT); + } + + public void setTaskOutputLogPath(String taskOutputLogPath) { + legacyTaskOutputLogPath = taskOutputLogPath; + if (taskOutputLogPath == null) { + if (legacyLogPath == null) { + taskLogsRootPath = null; + } + return; + } + if (taskLogsRootPath == null) { + taskLogsRootPath = TaskLogFileProvider.getTaskLogsRootPathFromFilePath(taskOutputLogPath); + } + } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java index f48169c5e150..0d5223d0bedc 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/k8s/impl/K8sTaskExecutor.java @@ -63,6 +63,10 @@ import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.api.model.Affinity; import io.fabric8.kubernetes.api.model.AffinityBuilder; import io.fabric8.kubernetes.api.model.EnvVar; @@ -84,6 +88,8 @@ @Slf4j public class K8sTaskExecutor extends AbstractK8sTaskExecutor { + private static final Logger TASK_OUTPUT_LOGGER = LoggerFactory.getLogger(LogUtils.TASK_OUTPUT_LOGGER_NAME); + private Job job; protected boolean podLogOutputIsFinished = false; protected Future podLogOutputFuture; @@ -270,12 +276,18 @@ private void parsePodLogOutput() { taskRequest.getTaskInstanceId()); LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath()); try ( + LogUtils.MDCAutoClosableContext ignored = + LogUtils.withTaskOutputLogPathMDC(taskRequest.getTaskOutputLogPath()); LogWatch watcher = ProcessUtils.getPodLogWatcher(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId(), containerName)) { String line; try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) { while ((line = reader.readLine()) != null) { - log.info("[K8S-pod-log] {}", line); + if (StringUtils.isBlank(taskRequest.getTaskOutputLogPath())) { + log.info("[K8S-pod-log] {}", line); + } else { + TASK_OUTPUT_LOGGER.info(line); + } taskOutputParameterParser.appendParseLog(line); } } @@ -283,6 +295,7 @@ private void parsePodLogOutput() { throw new RuntimeException(e); } finally { LogUtils.removeTaskInstanceLogFullPathMDC(); + LogUtils.removeWorkflowAndTaskInstanceIdMDC(); podLogOutputIsFinished = true; } taskOutputParams = taskOutputParameterParser.getTaskOutputParams(); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java index 005cabb82ffc..92894de6a1b4 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminator.java @@ -17,14 +17,9 @@ package org.apache.dolphinscheduler.plugin.task.api.log; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; - import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; - -import org.slf4j.MDC; - import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.sift.AbstractDiscriminator; @@ -39,10 +34,11 @@ public class TaskLogDiscriminator extends AbstractDiscriminator { @Override public String getDiscriminatingValue(ILoggingEvent event) { - String taskInstanceLogPath = MDC.get(LogUtils.TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); - if (taskInstanceLogPath == null) { - log.error("The task instance log path is null, please check the logback configuration, log: {}", event); + String taskLogPath = event.getMDCPropertyMap().get(key); + if (taskLogPath == null) { + log.error("The task log path in MDC key {} is null, please check the logback configuration, log: {}", + key, event); } - return taskInstanceLogPath; + return taskLogPath; } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java index 6f9d9ec90b40..29e5d72964e6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilter.java @@ -17,13 +17,12 @@ package org.apache.dolphinscheduler.plugin.task.api.log; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; - import org.apache.commons.lang3.StringUtils; +import lombok.Getter; +import lombok.Setter; import lombok.extern.slf4j.Slf4j; -import org.slf4j.MDC; import org.slf4j.Marker; import ch.qos.logback.classic.spi.ILoggingEvent; @@ -34,13 +33,17 @@ * This class is used to filter the log of the task instance. */ @Slf4j +@Getter +@Setter public class TaskLogFilter extends Filter { + private String key; + @Override public FilterReply decide(ILoggingEvent event) { - String taskInstanceLogPath = MDC.get(LogUtils.TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); - // If the taskInstanceLogPath is empty, it means that the log is not related to a task instance. - if (StringUtils.isEmpty(taskInstanceLogPath)) { + String taskLogPath = event.getMDCPropertyMap().get(key); + // If the taskLogPath is empty, it means that the log is not related to the current log file. + if (StringUtils.isEmpty(taskLogPath)) { return FilterReply.DENY; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java index b251b33f65aa..621d353c108a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/LogUtils.java @@ -59,6 +59,8 @@ public class LogUtils { private static final Path TASK_INSTANCE_LOG_BASE_PATH = getTaskInstanceLogBasePath(); public static final String TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY = "taskInstanceLogFullPath"; + public static final String TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY = "taskOutputLogFullPath"; + public static final String TASK_OUTPUT_LOGGER_NAME = "TaskOutput"; private static final Pattern APPLICATION_REGEX = Pattern.compile(TaskConstants.YARN_APPLICATION_REGEX); @@ -86,13 +88,13 @@ public List getAppIds(String logPath, String appInfoPath, String fetchWa * @param taskExecutionContext task execution context. * @return task instance log full path. */ - public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext) { + public static String getTaskInstanceLogFullPath(TaskExecutionContext taskExecutionContext, String logType) { return getTaskInstanceLogFullPath( DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), taskExecutionContext.getWorkflowDefinitionCode(), taskExecutionContext.getWorkflowDefinitionVersion(), taskExecutionContext.getWorkflowInstanceId(), - taskExecutionContext.getTaskInstanceId()); + taskExecutionContext.getTaskInstanceId(), logType); } /** @@ -110,7 +112,8 @@ public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime, Long workflowDefinitionCode, int workflowDefinitionVersion, int workflowInstanceId, - int taskInstanceId) { + int taskInstanceId, + String logType) { if (TASK_INSTANCE_LOG_BASE_PATH == null) { throw new IllegalArgumentException( "Cannot find the task instance log base path, please check your logback.xml file"); @@ -119,7 +122,7 @@ public static String getTaskInstanceLogFullPath(Date taskFirstSubmitTime, String.valueOf(workflowDefinitionCode), String.valueOf(workflowDefinitionVersion), String.valueOf(workflowInstanceId), - String.format("%s.log", taskInstanceId)).toString(); + String.format("%s.%s", taskInstanceId, logType)).toString(); return TASK_INSTANCE_LOG_BASE_PATH .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null)) .resolve(taskLogFileName) @@ -187,6 +190,10 @@ public static String getTaskInstanceLogFullPathMdc() { return MDC.get(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); } + public static String getTaskOutputLogFullPathMdc() { + return MDC.get(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY); + } + public static void setTaskInstanceLogFullPathMDC(String taskInstanceLogFullPath) { if (taskInstanceLogFullPath == null) { log.warn("taskInstanceLogFullPath is null"); @@ -199,6 +206,37 @@ public static void removeTaskInstanceLogFullPathMDC() { MDC.remove(TASK_INSTANCE_LOG_FULL_PATH_MDC_KEY); } + public static void setTaskOutputLogFullPathMDC(String taskOutputLogFullPath) { + if (taskOutputLogFullPath == null) { + log.warn("taskOutputLogFullPath is null"); + return; + } + MDC.put(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY, taskOutputLogFullPath); + } + + public static void removeTaskOutputLogFullPathMDC() { + MDC.remove(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY); + } + + public static MDCAutoClosableContext withTaskOutputLogPathMDC(String taskOutputLogFullPath) { + final String originalTaskOutputLogFullPath = getTaskOutputLogFullPathMdc(); + if (taskOutputLogFullPath == null) { + removeTaskOutputLogFullPathMDC(); + } else { + setTaskOutputLogFullPathMDC(taskOutputLogFullPath); + } + return new MDCAutoClosableContext( + () -> restoreMDC(TASK_OUTPUT_LOG_FULL_PATH_MDC_KEY, originalTaskOutputLogFullPath)); + } + + private static void restoreMDC(String key, String value) { + if (value == null) { + MDC.remove(key); + return; + } + MDC.put(key, value); + } + public static void setWorkflowAndTaskInstanceIDMDC(Integer workflowInstanceId, Integer taskInstanceId) { MDC.put(TaskConstants.WORKFLOW_INSTANCE_ID_MDC_KEY, String.valueOf(workflowInstanceId)); diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskLogFileProvider.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskLogFileProvider.java new file mode 100644 index 000000000000..2d21c7e5fa34 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskLogFileProvider.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import org.apache.dolphinscheduler.common.constants.DateConstants; +import org.apache.dolphinscheduler.common.utils.DateUtils; +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +import org.apache.commons.lang3.StringUtils; + +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Date; + +import lombok.NonNull; +import lombok.experimental.UtilityClass; + +@UtilityClass +public class TaskLogFileProvider { + + public static String getTaskLogsRootPath(@NonNull TaskExecutionContext taskExecutionContext) { + return getTaskLogsRootPath( + DateUtils.timeStampToDate(taskExecutionContext.getFirstSubmitTime()), + taskExecutionContext.getWorkflowDefinitionCode(), + taskExecutionContext.getWorkflowDefinitionVersion(), + taskExecutionContext.getWorkflowInstanceId(), + taskExecutionContext.getTaskInstanceId()); + } + + public static String getTaskLogsRootPath(Date taskFirstSubmitTime, + Long workflowDefinitionCode, + int workflowDefinitionVersion, + int workflowInstanceId, + int taskInstanceId) { + final Path taskInstanceLogBasePath = LogUtils.getTaskInstanceLogBasePath(); + if (taskInstanceLogBasePath == null) { + throw new IllegalArgumentException( + "Cannot find the task instance log base path, please check your logback.xml file"); + } + return taskInstanceLogBasePath + .resolve(DateUtils.format(taskFirstSubmitTime, DateConstants.YYYYMMDD, null)) + .resolve(Paths.get( + String.valueOf(workflowDefinitionCode), + String.valueOf(workflowDefinitionVersion), + String.valueOf(workflowInstanceId), + String.valueOf(taskInstanceId))) + .toString(); + } + + public static String getFilePath(String taskLogsRootPath, TaskLogFileType taskLogFileType) { + if (StringUtils.isBlank(taskLogsRootPath)) { + return null; + } + return Paths.get(taskLogsRootPath, taskLogFileType.getFileName()).toString(); + } + + public static String getFilePath(@NonNull TaskExecutionContext taskExecutionContext, + TaskLogFileType taskLogFileType) { + return getFilePath(taskExecutionContext.getTaskLogsRootPath(), taskLogFileType); + } + + public static String getTaskLogsRootPathFromFilePath(String taskLogFilePath) { + if (StringUtils.isBlank(taskLogFilePath)) { + return null; + } + final Path parent = Paths.get(taskLogFilePath).getParent(); + return parent == null ? null : parent.toString(); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskLogFileType.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskLogFileType.java new file mode 100644 index 000000000000..ab5566268e33 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskLogFileType.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +@Getter +@RequiredArgsConstructor +public enum TaskLogFileType { + + TASK_LOG("task.log"), + TASK_OUTPUT("task.out"), + STDERR_LOG("stderr.log"); + + private final String fileName; +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskOutputLogWriter.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskOutputLogWriter.java new file mode 100644 index 000000000000..2a0c07a84569 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/TaskOutputLogWriter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.utils; + +import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; + +import org.apache.commons.lang3.StringUtils; + +import lombok.experimental.UtilityClass; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@UtilityClass +public class TaskOutputLogWriter { + + private static final Logger TASK_OUTPUT_LOGGER = LoggerFactory.getLogger(LogUtils.TASK_OUTPUT_LOGGER_NAME); + + public static void writeTaskOutput(TaskExecutionContext taskExecutionContext, String content) { + writeTaskOutput(taskExecutionContext, () -> TASK_OUTPUT_LOGGER.info(content)); + } + + public static void writeTaskOutput(String taskLogPath, String taskOutputLogPath, String content) { + writeTaskOutput(taskLogPath, taskOutputLogPath, () -> TASK_OUTPUT_LOGGER.info(content)); + } + + public static void writeTaskOutput(TaskExecutionContext taskExecutionContext, String message, Object... args) { + writeTaskOutput(taskExecutionContext, () -> TASK_OUTPUT_LOGGER.info(message, args)); + } + + private static void writeTaskOutput(TaskExecutionContext taskExecutionContext, Runnable runnable) { + writeTaskOutput(taskExecutionContext.getLogPath(), taskExecutionContext.getTaskOutputLogPath(), runnable); + } + + private static void writeTaskOutput(String taskLogPath, String taskOutputLogPath, Runnable runnable) { + validatePath(taskLogPath, "task log"); + validatePath(taskOutputLogPath, "task output"); + LogUtils.setTaskInstanceLogFullPathMDC(taskLogPath); + try ( + LogUtils.MDCAutoClosableContext ignored = + LogUtils.withTaskOutputLogPathMDC(taskOutputLogPath)) { + runnable.run(); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + } + } + + private static void validatePath(String path, String pathType) { + if (StringUtils.isBlank(path)) { + throw new IllegalStateException(String.format("The %s log path cannot be blank", pathType)); + } + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminatorTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminatorTest.java new file mode 100644 index 000000000000..10e67175c20b --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogDiscriminatorTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import ch.qos.logback.classic.spi.ILoggingEvent; + +class TaskLogDiscriminatorTest { + + @Test + void shouldGetDiscriminatingValueByConfiguredMdcKey() { + TaskLogDiscriminator discriminator = new TaskLogDiscriminator(); + discriminator.setKey("taskOutputLogFullPath"); + + ILoggingEvent event = mock(ILoggingEvent.class); + when(event.getMDCPropertyMap()) + .thenReturn(Collections.singletonMap("taskOutputLogFullPath", "/tmp/task-output.log")); + + assertEquals("/tmp/task-output.log", discriminator.getDiscriminatingValue(event)); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilterTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilterTest.java new file mode 100644 index 000000000000..4900a61ce568 --- /dev/null +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/log/TaskLogFilterTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.plugin.task.api.log; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Collections; + +import org.junit.jupiter.api.Test; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.spi.FilterReply; + +class TaskLogFilterTest { + + @Test + void shouldAcceptLogWhenConfiguredMdcKeyExists() { + TaskLogFilter filter = new TaskLogFilter(); + filter.setKey("taskOutputLogFullPath"); + + ILoggingEvent event = mock(ILoggingEvent.class); + when(event.getMDCPropertyMap()) + .thenReturn(Collections.singletonMap("taskOutputLogFullPath", "/tmp/task-output.log")); + + assertEquals(FilterReply.ACCEPT, filter.decide(event)); + } + + @Test + void shouldDenyLogWhenConfiguredMdcKeyMissing() { + TaskLogFilter filter = new TaskLogFilter(); + filter.setKey("taskOutputLogFullPath"); + + ILoggingEvent event = mock(ILoggingEvent.class); + when(event.getMDCPropertyMap()).thenReturn(Collections.emptyMap()); + + assertEquals(FilterReply.DENY, filter.decide(event)); + } +} diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java index 0cfacf3ae57b..c4ee5f0dcbb8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-http/src/main/java/org/apache/dolphinscheduler/plugin/task/http/HttpTask.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskOutputLogWriter; import org.apache.commons.lang3.StringUtils; @@ -126,6 +127,7 @@ private void validateResponse(String body, int statusCode) { // default success log log.info("http request success, url: {}, statusCode: {}, body: {}", httpParameters.getUrl(), statusCode, body); + TaskOutputLogWriter.writeTaskOutput(taskExecutionContext, body); exitStatusCode = TaskConstants.EXIT_CODE_SUCCESS; } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java index 8a0d571fde30..ab8dfd6741d6 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java @@ -35,6 +35,7 @@ import org.apache.dolphinscheduler.plugin.task.api.model.Property; import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskOutputLogWriter; import org.apache.dolphinscheduler.spi.datasource.ConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -234,47 +235,44 @@ private Object getOutputParameter(CallableStatement stmt, int index, String prop Object value = null; switch (dataType) { case VARCHAR: - log.info("out parameter varchar key : {} , value : {}", prop, stmt.getString(index)); value = stmt.getString(index); break; case INTEGER: - log.info("out parameter integer key : {} , value : {}", prop, stmt.getInt(index)); value = stmt.getInt(index); break; case LONG: - log.info("out parameter long key : {} , value : {}", prop, stmt.getLong(index)); value = stmt.getLong(index); break; case FLOAT: - log.info("out parameter float key : {} , value : {}", prop, stmt.getFloat(index)); value = stmt.getFloat(index); break; case DOUBLE: - log.info("out parameter double key : {} , value : {}", prop, stmt.getDouble(index)); value = stmt.getDouble(index); break; case DATE: - log.info("out parameter date key : {} , value : {}", prop, stmt.getDate(index)); value = stmt.getDate(index); break; case TIME: - log.info("out parameter time key : {} , value : {}", prop, stmt.getTime(index)); value = stmt.getTime(index); break; case TIMESTAMP: - log.info("out parameter timestamp key : {} , value : {}", prop, stmt.getTimestamp(index)); value = stmt.getTimestamp(index); break; case BOOLEAN: - log.info("out parameter boolean key : {} , value : {}", prop, stmt.getBoolean(index)); value = stmt.getBoolean(index); break; default: break; } + logOutputParameter(prop, dataType, value); return value; } + private void logOutputParameter(String prop, DataType dataType, Object value) { + String message = String.format("out parameter %s key : {} , value : {}", dataType.name().toLowerCase()); + TaskOutputLogWriter.writeTaskOutput(taskExecutionContext, message, prop, value); + } + @Override public AbstractParameters getParameters() { return procedureParameters; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java index 7271e4524f27..9e1eb21e04a5 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java @@ -23,7 +23,9 @@ import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHConnectionParam; import org.apache.dolphinscheduler.plugin.task.api.TaskException; import org.apache.dolphinscheduler.plugin.task.api.parser.TaskOutputParameterParser; +import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; import org.apache.dolphinscheduler.plugin.task.api.utils.ProcessUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskOutputLogWriter; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.math.NumberUtils; @@ -61,14 +63,21 @@ public class RemoteExecutor implements AutoCloseable { protected Map taskOutputParams = new HashMap<>(); private SshClient sshClient; private ClientSession session; - private SSHConnectionParam sshConnectionParam; + private final SSHConnectionParam sshConnectionParam; + private final String taskLogPath; + private final String taskOutputLogPath; public RemoteExecutor(SSHConnectionParam sshConnectionParam) { + this(sshConnectionParam, null, null); + } + + public RemoteExecutor(SSHConnectionParam sshConnectionParam, String taskLogPath, String taskOutputLogPath) { this.sshConnectionParam = sshConnectionParam; + this.taskLogPath = taskLogPath; + this.taskOutputLogPath = taskOutputLogPath; initClient(); } - private void initClient() { sshClient = SshClient.setUpDefaultClient(); sshClient.start(); @@ -111,20 +120,25 @@ public void track(String taskId) throws Exception { String pid; log.info("Remote shell task log:"); TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser(); - do { - pid = getTaskPid(taskId); - String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 1, getRemoteShellHome(), taskId); - int readLines = runRemoteAndProcessLines(trackCommand, line -> { - log.info(line); - taskOutputParameterParser.appendParseLog(line); - }); - if (readLines > 0) { - logN += readLines; - - } else { - Thread.sleep(TRACK_INTERVAL); - } - } while (StringUtils.isNotEmpty(pid)); + LogUtils.setTaskInstanceLogFullPathMDC(taskLogPath); + try (LogUtils.MDCAutoClosableContext ignored = LogUtils.withTaskOutputLogPathMDC(taskOutputLogPath)) { + do { + pid = getTaskPid(taskId); + String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 1, getRemoteShellHome(), taskId); + int readLines = runRemoteAndProcessLines(trackCommand, line -> { + TaskOutputLogWriter.writeTaskOutput(taskLogPath, taskOutputLogPath, line); + taskOutputParameterParser.appendParseLog(line); + }); + if (readLines > 0) { + logN += readLines; + + } else { + Thread.sleep(TRACK_INTERVAL); + } + } while (StringUtils.isNotEmpty(pid)); + } finally { + LogUtils.removeTaskInstanceLogFullPathMDC(); + } taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams()); } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java index 6dc82d7d1522..b1221ef8db0a 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteShellTask.java @@ -176,6 +176,7 @@ public void initRemoteExecutor() { SSHConnectionParam sshConnectionParam = (SSHConnectionParam) DataSourceUtils.buildConnectionParams( DbType.valueOf(remoteShellParameters.getType()), dbSource.getConnectionParams()); - remoteExecutor = new RemoteExecutor(sshConnectionParam); + remoteExecutor = new RemoteExecutor(sshConnectionParam, taskExecutionContext.getLogPath(), + taskExecutionContext.getTaskOutputLogPath()); } } diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java index 113dfbcd9eb8..6367695196d8 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java @@ -37,6 +37,7 @@ import org.apache.dolphinscheduler.plugin.task.api.parameters.SqlParameters; import org.apache.dolphinscheduler.plugin.task.api.resource.ResourceContext; import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskOutputLogWriter; import org.apache.dolphinscheduler.spi.datasource.BaseConnectionParam; import org.apache.dolphinscheduler.spi.enums.DbType; @@ -67,6 +68,7 @@ import lombok.extern.slf4j.Slf4j; import com.fasterxml.jackson.databind.node.ArrayNode; +import com.fasterxml.jackson.databind.node.JsonNodeType; import com.fasterxml.jackson.databind.node.ObjectNode; @Slf4j @@ -280,11 +282,8 @@ private String resultProcess(ResultSet resultSet) throws Exception { int displayRows = sqlParameters.getDisplayRows() > 0 ? sqlParameters.getDisplayRows() : TaskConstants.DEFAULT_DISPLAY_ROWS; displayRows = Math.min(displayRows, resultJSONArray.size()); - log.info("display sql result {} rows as follows:", displayRows); - for (int i = 0; i < displayRows; i++) { - String row = JSONUtils.toJsonString(resultJSONArray.get(i)); - log.info("row {} : {}", i + 1, row); - } + + logSqlResultPreview(columnLabels, resultJSONArray, displayRows); } String result = resultJSONArray.isEmpty() ? JSONUtils.toJsonString(generateEmptyRow(resultSet)) @@ -319,6 +318,39 @@ private ArrayNode generateEmptyRow(ResultSet resultSet) throws SQLException { return resultJSONArray; } + private void logSqlResultPreview(String[] columnLabels, ArrayNode resultJSONArray, int displayRows) { + doLogSqlResultPreview(columnLabels, resultJSONArray, displayRows); + } + + private void doLogSqlResultPreview(String[] columnLabels, ArrayNode resultJSONArray, int displayRows) { + logSqlResultLine(String.join("\t", columnLabels)); + for (int i = 0; i < displayRows; i++) { + ObjectNode rowNode = (ObjectNode) resultJSONArray.get(i); + List rowValues = new ArrayList<>(columnLabels.length); + for (String columnLabel : columnLabels) { + rowValues.add(formatSqlResultValue(rowNode.get(columnLabel))); + } + logSqlResultLine(String.join("\t", rowValues)); + } + } + + private void logSqlResultLine(String line) { + TaskOutputLogWriter.writeTaskOutput(taskExecutionContext, line); + } + + private String formatSqlResultValue(com.fasterxml.jackson.databind.JsonNode valueNode) { + if (valueNode == null || valueNode.isNull()) { + return "NULL"; + } + if (valueNode.getNodeType() == JsonNodeType.STRING) { + return valueNode.asText(); + } + if (valueNode.isValueNode()) { + return valueNode.asText(); + } + return JSONUtils.toJsonString(valueNode); + } + /** * send alert as an attachment * diff --git a/dolphinscheduler-ui/src/components/log-modal/index.tsx b/dolphinscheduler-ui/src/components/log-modal/index.tsx index acc3cdbdb892..36e77deade61 100644 --- a/dolphinscheduler-ui/src/components/log-modal/index.tsx +++ b/dolphinscheduler-ui/src/components/log-modal/index.tsx @@ -56,6 +56,10 @@ const props = { showDownloadLog: { type: Boolean as PropType, default: false + }, + title: { + type: String as PropType, + default: '' } } @@ -126,7 +130,7 @@ export default defineComponent({ return ( { + variables.showOutputModalRef = false + } + const getLogs = (row: any) => { const { state } = useAsyncState( queryLog({ taskInstanceId: Number(row.id), limit: variables.limit, - skipLineNum: variables.skipLineNum + skipLineNum: variables.skipLineNum, + logType: 'LOG' }).then((res: any) => { variables.logRef += res.message || '' if (res && res.message !== '') { @@ -132,6 +137,28 @@ const BatchTaskInstance = defineComponent({ return state } + const getOutputs = (row: any) => { + const { state } = useAsyncState( + queryLog({ + taskInstanceId: Number(row.id), + limit: variables.limit, + skipLineNum: variables.skipLineNum, + logType: 'OUTPUT' + }).then((res: any) => { + variables.outputRef += res.message || '' + if (res && res.message !== '') { + variables.skipLineNum += res.lineNum + getOutputs(row) + } else { + variables.outputLoadingRef = false + } + }), + {} + ) + + return state + } + const refreshLogs = (row: any) => { variables.logRef = '' variables.limit = 1000 @@ -139,6 +166,13 @@ const BatchTaskInstance = defineComponent({ getLogs(row) } + const refreshOutputs = (row: any) => { + variables.outputRef = '' + variables.limit = 1000 + variables.skipLineNum = 0 + getOutputs(row) + } + const trim = getCurrentInstance()?.appContext.config.globalProperties.trim onMounted(() => { @@ -156,7 +190,6 @@ const BatchTaskInstance = defineComponent({ if (variables.showModalRef) { getLogs(variables.row) } else { - variables.row = {} variables.logRef = '' variables.logLoadingRef = true variables.skipLineNum = 0 @@ -165,6 +198,20 @@ const BatchTaskInstance = defineComponent({ } ) + watch( + () => variables.showOutputModalRef, + () => { + if (variables.showOutputModalRef) { + getOutputs(variables.row) + } else { + variables.outputRef = '' + variables.outputLoadingRef = true + variables.skipLineNum = 0 + variables.limit = 1000 + } + } + ) + return { t, ...toRefs(variables), @@ -179,7 +226,9 @@ const BatchTaskInstance = defineComponent({ onClearSearchStateType, onClearSearchTime, onConfirmModal, + onConfirmOutputModal, refreshLogs, + refreshOutputs, trim } }, @@ -190,8 +239,10 @@ const BatchTaskInstance = defineComponent({ onUpdatePageSize, onSearch, onConfirmModal, + onConfirmOutputModal, loadingRef, - refreshLogs + refreshLogs, + refreshOutputs } = this return ( @@ -295,6 +346,14 @@ const BatchTaskInstance = defineComponent({ onConfirmModal={onConfirmModal} onRefreshLogs={refreshLogs} /> + ) } diff --git a/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx b/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx index 5fd162e1377e..bc0955ca8285 100644 --- a/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx +++ b/dolphinscheduler-ui/src/views/projects/task/instance/stream-task.tsx @@ -93,12 +93,17 @@ const BatchTaskInstance = defineComponent({ variables.showModalRef = false } + const onConfirmOutputModal = () => { + variables.showOutputModalRef = false + } + const getLogs = (row: any) => { const { state } = useAsyncState( queryLog({ taskInstanceId: Number(row.id), limit: variables.limit, - skipLineNum: variables.skipLineNum + skipLineNum: variables.skipLineNum, + logType: 'LOG' }).then((res: any) => { variables.logRef += res.message || '' if (res && res.message !== '') { @@ -114,6 +119,28 @@ const BatchTaskInstance = defineComponent({ return state } + const getOutputs = (row: any) => { + const { state } = useAsyncState( + queryLog({ + taskInstanceId: Number(row.id), + limit: variables.limit, + skipLineNum: variables.skipLineNum, + logType: 'OUTPUT' + }).then((res: any) => { + variables.outputRef += res.message || '' + if (res && res.message !== '') { + variables.skipLineNum += res.lineNum + getOutputs(row) + } else { + variables.outputLoadingRef = false + } + }), + {} + ) + + return state + } + const refreshLogs = (row: any) => { variables.logRef = '' variables.limit = 1000 @@ -121,6 +148,13 @@ const BatchTaskInstance = defineComponent({ getLogs(row) } + const refreshOutputs = (row: any) => { + variables.outputRef = '' + variables.limit = 1000 + variables.skipLineNum = 0 + getOutputs(row) + } + const trim = getCurrentInstance()?.appContext.config.globalProperties.trim onMounted(() => { @@ -145,7 +179,6 @@ const BatchTaskInstance = defineComponent({ if (variables.showModalRef) { getLogs(variables.row) } else { - variables.row = {} variables.logRef = '' variables.logLoadingRef = true variables.skipLineNum = 0 @@ -154,6 +187,20 @@ const BatchTaskInstance = defineComponent({ } ) + watch( + () => variables.showOutputModalRef, + () => { + if (variables.showOutputModalRef) { + getOutputs(variables.row) + } else { + variables.outputRef = '' + variables.outputLoadingRef = true + variables.skipLineNum = 0 + variables.limit = 1000 + } + } + ) + return { t, ...toRefs(variables), @@ -167,7 +214,9 @@ const BatchTaskInstance = defineComponent({ onClearSearchStateType, onClearSearchTime, onConfirmModal, + onConfirmOutputModal, refreshLogs, + refreshOutputs, trim } }, @@ -178,8 +227,10 @@ const BatchTaskInstance = defineComponent({ onUpdatePageSize, onSearch, onConfirmModal, + onConfirmOutputModal, loadingRef, - refreshLogs + refreshLogs, + refreshOutputs } = this return ( @@ -274,6 +325,14 @@ const BatchTaskInstance = defineComponent({ onConfirmModal={onConfirmModal} onRefreshLogs={refreshLogs} /> + ) } diff --git a/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts b/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts index 1aff5c4d6712..4dbc48c5c4dd 100644 --- a/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts +++ b/dolphinscheduler-ui/src/views/projects/task/instance/use-stream-table.ts @@ -19,11 +19,12 @@ import { useI18n } from 'vue-i18n' import { h, reactive, ref } from 'vue' import { downloadLog, + downloadOutput, queryTaskListPaging, savePoint, streamTaskStop } from '@/service/modules/task-instances' -import { NButton, NIcon, NSpace, NTooltip, NSpin } from 'naive-ui' +import { NButton, NDropdown, NIcon, NSpace, NTooltip, NSpin } from 'naive-ui' import { AlignLeftOutlined, DownloadOutlined, @@ -63,10 +64,13 @@ export function useTable() { workflowDefinitionName: null, totalPage: 1, showModalRef: false, + showOutputModalRef: false, row: {}, loadingRef: false, logRef: '', + outputRef: '', logLoadingRef: true, + outputLoadingRef: true, skipLineNum: 0, limit: 1000 }) @@ -146,6 +150,17 @@ export function useTable() { key: 'operation', ...COLUMN_WIDTH_CONFIG['operation'](5), render(row: any) { + const logOptions = [ + { + label: t('project.task.log'), + key: 'log' + }, + { + label: t('project.task.output'), + key: 'output' + } + ] + return h(NSpace, null, { default: () => [ h( @@ -197,49 +212,80 @@ export function useTable() { } ), h( - NTooltip, - {}, + NDropdown, { - trigger: () => - h( - NButton, - { - circle: true, - type: 'info', - size: 'small', - disabled: !row.host, - onClick: () => handleLog(row) - }, - { - icon: () => - h(NIcon, null, { - default: () => h(AlignLeftOutlined) - }) - } - ), - default: () => t('project.task.view_log') - } + trigger: 'click', + options: logOptions, + disabled: !row.host, + onSelect: (key: string) => { + if (key === 'log') { + handleLog(row) + return + } + handleOutput(row) + } + }, + () => + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host + }, + { + icon: () => + h(NIcon, null, { + default: () => h(AlignLeftOutlined) + }) + } + ), + default: () => t('project.task.view_log') + } + ) ), h( - NTooltip, - {}, + NDropdown, { - trigger: () => - h( - NButton, - { - circle: true, - type: 'info', - size: 'small', - onClick: () => downloadLog(row.id) - }, - { - icon: () => - h(NIcon, null, { default: () => h(DownloadOutlined) }) - } - ), - default: () => t('project.task.download_log') - } + trigger: 'click', + options: logOptions, + disabled: !row.host, + onSelect: (key: string) => { + if (key === 'log') { + downloadLog(row.id) + return + } + downloadOutput(row.id) + } + }, + () => + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host + }, + { + icon: () => + h(NIcon, null, { default: () => h(DownloadOutlined) }) + } + ), + default: () => t('project.task.download_log') + } + ) ), h( NTooltip, @@ -280,6 +326,11 @@ export function useTable() { variables.row = row } + const handleOutput = (row: any) => { + variables.showOutputModalRef = true + variables.row = row + } + const getTableData = () => { if (variables.loadingRef) return variables.loadingRef = true diff --git a/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts b/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts index ba42abc16ff0..e19dc47cf121 100644 --- a/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts +++ b/dolphinscheduler-ui/src/views/projects/task/instance/use-table.ts @@ -21,9 +21,10 @@ import { useAsyncState } from '@vueuse/core' import { queryTaskListPaging, forceSuccess, - downloadLog + downloadLog, + downloadOutput } from '@/service/modules/task-instances' -import { NButton, NIcon, NSpace, NTooltip, NSpin, NEllipsis } from 'naive-ui' +import { NButton, NDropdown, NIcon, NSpace, NTooltip, NSpin, NEllipsis } from 'naive-ui' import ButtonLink from '@/components/button-link' import { AlignLeftOutlined, @@ -67,10 +68,13 @@ export function useTable() { workflowInstanceName: ref(null), totalPage: ref(1), showModalRef: ref(false), + showOutputModalRef: ref(false), row: {}, loadingRef: ref(false), logRef: '', + outputRef: '', logLoadingRef: ref(true), + outputLoadingRef: ref(true), skipLineNum: ref(0), limit: ref(1000) }) @@ -194,6 +198,17 @@ export function useTable() { key: 'operation', ...COLUMN_WIDTH_CONFIG['operation'](3), render(row: any) { + const logOptions = [ + { + label: t('project.task.log'), + key: 'log' + }, + { + label: t('project.task.output'), + key: 'output' + } + ] + return h(NSpace, null, { default: () => [ h( @@ -228,50 +243,82 @@ export function useTable() { } ), h( - NTooltip, - {}, + NDropdown, { - trigger: () => - h( - NButton, - { - circle: true, - type: 'info', - size: 'small', - disabled: !row.host, - onClick: () => handleLog(row) - }, - { - icon: () => - h(NIcon, null, { - default: () => h(AlignLeftOutlined) - }) - } - ), - default: () => t('project.task.view_log') - } + trigger: 'click', + options: logOptions, + disabled: !row.host, + onSelect: (key: string) => { + if (key === 'log') { + handleLog(row) + return + } + handleOutput(row) + } + }, + () => + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host + }, + { + icon: () => + h(NIcon, null, { + default: () => h(AlignLeftOutlined) + }) + } + ), + default: () => t('project.task.view_log') + } + ) ), h( - NTooltip, - {}, + NDropdown, { - trigger: () => - h( - NButton, - { - circle: true, - type: 'info', - size: 'small', - disabled: !row.host, - onClick: () => downloadLog(row.id) - }, - { - icon: () => - h(NIcon, null, { default: () => h(DownloadOutlined) }) - } - ), - default: () => t('project.task.download_log') - } + trigger: 'click', + options: logOptions, + disabled: !row.host, + onSelect: (key: string) => { + if (key === 'log') { + downloadLog(row.id) + return + } + downloadOutput(row.id) + } + }, + () => + h( + NTooltip, + {}, + { + trigger: () => + h( + NButton, + { + circle: true, + type: 'info', + size: 'small', + disabled: !row.host + }, + { + icon: () => + h(NIcon, null, { + default: () => h(DownloadOutlined) + }) + } + ), + default: () => t('project.task.download_log') + } + ) ) ] }) @@ -288,6 +335,11 @@ export function useTable() { variables.row = row } + const handleOutput = (row: any) => { + variables.showOutputModalRef = true + variables.row = row + } + const handleForcedSuccess = (row: any) => { forceSuccess({ id: row.id }, { projectCode }).then(() => { getTableData({ diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java index 7330bddcc270..11df9e2bcc9c 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/executor/PhysicalTaskExecutorFactory.java @@ -19,7 +19,7 @@ import org.apache.dolphinscheduler.plugin.storage.api.StorageOperator; import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext; -import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils; +import org.apache.dolphinscheduler.plugin.task.api.utils.TaskLogFileProvider; import org.apache.dolphinscheduler.server.worker.config.WorkerConfig; import org.apache.dolphinscheduler.task.executor.ITaskExecutor; import org.apache.dolphinscheduler.task.executor.ITaskExecutorFactory; @@ -48,7 +48,7 @@ public PhysicalTaskExecutorFactory(final WorkerConfig workerConfig, @Override public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecutionContext) { - assemblyTaskLogPath(taskExecutionContext); + assemblyTaskLogsRootPath(taskExecutionContext); final PhysicalTaskExecutorBuilder physicalTaskExecutorBuilder = PhysicalTaskExecutorBuilder.builder() .taskExecutionContext(taskExecutionContext) @@ -59,8 +59,8 @@ public ITaskExecutor createTaskExecutor(final TaskExecutionContext taskExecution return new PhysicalTaskExecutor(physicalTaskExecutorBuilder); } - private void assemblyTaskLogPath(final TaskExecutionContext taskExecutionContext) { - taskExecutionContext.setLogPath(LogUtils.getTaskInstanceLogFullPath(taskExecutionContext)); + private void assemblyTaskLogsRootPath(final TaskExecutionContext taskExecutionContext) { + taskExecutionContext.setTaskLogsRootPath(TaskLogFileProvider.getTaskLogsRootPath(taskExecutionContext)); } } diff --git a/dolphinscheduler-worker/src/main/resources/logback-spring.xml b/dolphinscheduler-worker/src/main/resources/logback-spring.xml index 762076c03cd6..f84a27359881 100644 --- a/dolphinscheduler-worker/src/main/resources/logback-spring.xml +++ b/dolphinscheduler-worker/src/main/resources/logback-spring.xml @@ -33,7 +33,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -51,6 +53,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-worker.log @@ -77,4 +98,8 @@ + + + + diff --git a/dolphinscheduler-worker/src/test/resources/logback.xml b/dolphinscheduler-worker/src/test/resources/logback.xml index 916d79e35489..7ab6138cd408 100644 --- a/dolphinscheduler-worker/src/test/resources/logback.xml +++ b/dolphinscheduler-worker/src/test/resources/logback.xml @@ -32,7 +32,9 @@ - + + taskInstanceLogFullPath + taskInstanceLogFullPath ${log.base} @@ -50,6 +52,25 @@ + + + taskOutputLogFullPath + + + taskOutputLogFullPath + ${log.base} + + + + ${taskOutputLogFullPath} + + %msg%n + UTF-8 + + true + + + ${log.base}/dolphinscheduler-worker.log @@ -72,4 +93,8 @@ + + + +