diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java index 2e1c807edf80..ced681c930e8 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/WorkflowEventBusFireWorker.java @@ -156,6 +156,40 @@ private void doFireSingleEvent(final IWorkflowExecutionRunnable workflowExecutio throw new RuntimeException("No EventHandler found for event: " + event.getEventType()); } lifecycleEventHandler.handle(workflowExecutionRunnable, event); + + recordTaskInstanceMetrics(event); + } + + private void recordTaskInstanceMetrics(AbstractLifecycleEvent event) { + if (!(event + .getEventType() instanceof org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType)) { + return; + } + + switch ((org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType) event + .getEventType()) { + case DISPATCHED: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("dispatch"); + break; + case SUCCEEDED: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("success"); + break; + case FAILED: + case FATAL: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("fail"); + break; + case KILLED: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("kill"); + break; + case RETRY: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("retry"); + break; + case TIMEOUT: + org.apache.dolphinscheduler.server.master.metrics.TaskMetrics.incTaskInstanceByState("timeout"); + break; + default: + break; + } } } diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java index 2f5b5841283a..b5bc3aeab45d 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/runnable/WorkflowExecutionRunnableFactory.java @@ -23,6 +23,7 @@ import org.apache.dolphinscheduler.dao.repository.WorkflowInstanceDao; import org.apache.dolphinscheduler.server.master.engine.command.ICommandHandler; import org.apache.dolphinscheduler.server.master.engine.exceptions.CommandDuplicateHandleException; +import org.apache.dolphinscheduler.server.master.metrics.WorkflowInstanceMetrics; import java.util.List; @@ -52,8 +53,11 @@ public class WorkflowExecutionRunnableFactory { */ @Transactional public IWorkflowExecutionRunnable createWorkflowExecuteRunnable(Command command) { + long startTime = System.currentTimeMillis(); deleteCommandOrThrow(command); - return doCreateWorkflowExecutionRunnable(command); + IWorkflowExecutionRunnable workflowExecutionRunnable = doCreateWorkflowExecutionRunnable(command); + WorkflowInstanceMetrics.recordWorkflowInstanceGenerateTime(System.currentTimeMillis() - startTime); + return workflowExecutionRunnable; } /** diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java index 59e770c4fb1e..483fce4dd418 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/engine/workflow/statemachine/AbstractWorkflowStateAction.java @@ -176,6 +176,9 @@ protected void transformWorkflowInstanceState(final IWorkflowExecutionRunnable w workflowInstanceDao.updateById(workflowInstance); log.info("Success set WorkflowExecuteRunnable: {} state from: {} to {}", workflowInstance.getName(), originState.name(), targetState.name()); + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + targetState, + String.valueOf(workflowInstance.getWorkflowDefinitionCode())); } catch (Exception ex) { workflowInstance.setState(originState); throw ex; diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java index 8ea9f13c4dae..00928152ca34 100644 --- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java +++ b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetrics.java @@ -17,6 +17,8 @@ package org.apache.dolphinscheduler.server.master.metrics; +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType; + import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -71,6 +73,13 @@ public synchronized void registerTaskPrepared(Supplier consumer) { .register(Metrics.globalRegistry); } + public void incTaskInstanceByState(final String state) { + if (taskInstanceCounters.get(state) == null) { + return; + } + taskInstanceCounters.get(state).increment(); + } + public void incTaskDispatchFailed(int failedCount) { taskDispatchFailCounter.increment(failedCount); } @@ -83,11 +92,33 @@ public void incTaskDispatch() { taskDispatchCounter.increment(); } - public void incTaskInstanceByState(final String state) { - if (taskInstanceCounters.get(state) == null) { + public void incTaskInstanceByLifecycleEvent(final TaskLifecycleEventType eventType) { + if (eventType == null) { return; } - taskInstanceCounters.get(state).increment(); + switch (eventType) { + case DISPATCHED: + incTaskInstanceByState("dispatch"); + break; + case SUCCEEDED: + incTaskInstanceByState("success"); + break; + case FAILED: + case FATAL: + incTaskInstanceByState("fail"); + break; + case KILLED: + incTaskInstanceByState("kill"); + break; + case RETRY: + incTaskInstanceByState("retry"); + break; + case TIMEOUT: + incTaskInstanceByState("timeout"); + break; + default: + break; + } } } diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetricsTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetricsTest.java new file mode 100644 index 000000000000..cc4f1a068e29 --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/TaskMetricsTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.metrics; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import org.apache.dolphinscheduler.server.master.engine.task.lifecycle.TaskLifecycleEventType; + +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.Metrics; + +class TaskMetricsTest { + + @Test + void testRegisterTaskPrepared() { + TaskMetrics.registerTaskPrepared(() -> 5); + assertNotNull(Metrics.globalRegistry.find("ds.task.prepared").gauge(), + "Task prepared gauge should be registered"); + } + + @Test + void testIncTaskInstanceByLifecycleEvent() { + // Test that the new method doesn't throw exceptions + TaskMetrics.incTaskInstanceByLifecycleEvent(TaskLifecycleEventType.DISPATCHED); + TaskMetrics.incTaskInstanceByLifecycleEvent(TaskLifecycleEventType.SUCCEEDED); + TaskMetrics.incTaskInstanceByLifecycleEvent(TaskLifecycleEventType.FAILED); + TaskMetrics.incTaskInstanceByLifecycleEvent(TaskLifecycleEventType.KILLED); + TaskMetrics.incTaskInstanceByLifecycleEvent(TaskLifecycleEventType.RETRY); + TaskMetrics.incTaskInstanceByLifecycleEvent(TaskLifecycleEventType.TIMEOUT); + TaskMetrics.incTaskInstanceByLifecycleEvent(TaskLifecycleEventType.FATAL); + TaskMetrics.incTaskInstanceByLifecycleEvent(null); + // Should not throw any exceptions + } + +} diff --git a/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetricsTest.java b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetricsTest.java new file mode 100644 index 000000000000..a45f2715931b --- /dev/null +++ b/dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/metrics/WorkflowInstanceMetricsTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.dolphinscheduler.server.master.metrics; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; + +import org.junit.jupiter.api.Test; + +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; + +class WorkflowInstanceMetricsTest { + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_submitState() { + String defCode = "test_submit_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.SUBMITTED_SUCCESS, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "submit") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for submit state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_failureState() { + String defCode = "test_failure_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.FAILURE, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "fail") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for fail state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_successState() { + String defCode = "test_success_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.SUCCESS, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "success") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for success state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_stopState() { + String defCode = "test_stop_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.STOP, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "stop") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for stop state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_pauseState() { + String defCode = "test_pause_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.PAUSE, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "pause") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for pause state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_failoverState() { + String defCode = "test_failover_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.FAILOVER, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "failover") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for failover state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testIncWorkflowInstanceByStateAndWorkflowDefinitionCode_defaultMapping() { + String defCode = "test_running_1"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.RUNNING_EXECUTION, defCode); + Counter counter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "running_execution") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counter, "Counter should be registered for default-mapped state"); + assertEquals(1, counter.count(), 0.001); + } + + @Test + void testRecordCommandQueryTime() { + WorkflowInstanceMetrics.recordCommandQueryTime(100L); + Timer timer = Metrics.globalRegistry.find("ds.workflow.command.query.duration").timer(); + assertNotNull(timer, "Command query timer should be registered"); + assertEquals(1, timer.count(), "Timer should have recorded one event"); + } + + @Test + void testRecordWorkflowInstanceGenerateTime() { + WorkflowInstanceMetrics.recordWorkflowInstanceGenerateTime(200L); + Timer timer = Metrics.globalRegistry.find("ds.workflow.instance.generate.duration").timer(); + assertNotNull(timer, "Workflow instance generate timer should be registered"); + assertEquals(1, timer.count(), "Timer should have recorded one event"); + } + + @Test + void testRegisterWorkflowInstanceRunningGauge() { + WorkflowInstanceMetrics.registerWorkflowInstanceRunningGauge(() -> 10); + assertNotNull(Metrics.globalRegistry.find("ds.workflow.instance.running").gauge(), + "Running gauge should be registered"); + } + + @Test + void testRegisterWorkflowInstanceResubmitGauge() { + WorkflowInstanceMetrics.registerWorkflowInstanceResubmitGauge(() -> 3); + assertNotNull(Metrics.globalRegistry.find("ds.workflow.instance.resubmit").gauge(), + "Resubmit gauge should be registered"); + } + + @Test + void testCleanUpWorkflowInstanceCountMetricsByDefinitionCode() { + String defCode = "99999"; + WorkflowInstanceMetrics.incWorkflowInstanceByStateAndWorkflowDefinitionCode( + WorkflowExecutionStatus.SUCCESS, defCode); + Counter counterBefore = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "success") + .tag("workflow.definition.code", defCode) + .counter(); + assertNotNull(counterBefore, "Counter should exist before cleanup"); + + WorkflowInstanceMetrics.cleanUpWorkflowInstanceCountMetricsByDefinitionCode(99999L); + + Counter counterAfter = Metrics.globalRegistry.find("ds.workflow.instance.count") + .tag("state", "success") + .tag("workflow.definition.code", defCode) + .counter(); + assertNull(counterAfter, "Counter should be removed after cleanup"); + } + +} diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java index e17db2f34663..61f07a90d2c7 100644 --- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java +++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/metrics/WorkerServerMetrics.java @@ -30,47 +30,41 @@ @UtilityClass public class WorkerServerMetrics { - private final Counter workerOverloadCounter = - Counter.builder("ds.worker.overload.count") - .description("overloaded workers count") - .register(Metrics.globalRegistry); - - private final Counter workerFullSubmitQueueCounter = - Counter.builder("ds.worker.full.submit.queue.count") - .description("full worker submit queues count") - .register(Metrics.globalRegistry); - - private final Counter workerResourceDownloadSuccessCounter = - Counter.builder("ds.worker.resource.download.count") - .tag("status", "success") - .description("worker resource download success count") - .register(Metrics.globalRegistry); - - private final Counter workerResourceDownloadFailCounter = - Counter.builder("ds.worker.resource.download.count") - .tag("status", "fail") - .description("worker resource download failure count") - .register(Metrics.globalRegistry); - - private final Counter workerHeartBeatCounter = - Counter.builder("ds.worker.heartbeat.count") - .description("worker heartbeat count") - .register(Metrics.globalRegistry); - - private final Timer workerResourceDownloadDurationTimer = - Timer.builder("ds.worker.resource.download.duration") - .publishPercentiles(0.5, 0.75, 0.95, 0.99) - .publishPercentileHistogram() - .description("time cost of resource download on workers") - .register(Metrics.globalRegistry); - - private final DistributionSummary workerResourceDownloadSizeDistribution = - DistributionSummary.builder("ds.worker.resource.download.size") - .baseUnit("bytes") - .publishPercentiles(0.5, 0.75, 0.95, 0.99) - .publishPercentileHistogram() - .description("size of downloaded resource files on worker") - .register(Metrics.globalRegistry); + private final Counter workerOverloadCounter = Counter.builder("ds.worker.overload.count") + .description("overloaded workers count") + .register(Metrics.globalRegistry); + + private final Counter workerFullSubmitQueueCounter = Counter.builder("ds.worker.full.submit.queue.count") + .description("full worker submit queues count") + .register(Metrics.globalRegistry); + + private final Counter workerResourceDownloadSuccessCounter = Counter.builder("ds.worker.resource.download.count") + .tag("status", "success") + .description("worker resource download success count") + .register(Metrics.globalRegistry); + + private final Counter workerResourceDownloadFailCounter = Counter.builder("ds.worker.resource.download.count") + .tag("status", "fail") + .description("worker resource download failure count") + .register(Metrics.globalRegistry); + + private final Counter workerHeartBeatCounter = Counter.builder("ds.worker.heartbeat.count") + .description("worker heartbeat count") + .register(Metrics.globalRegistry); + + private final Timer workerResourceDownloadDurationTimer = Timer.builder("ds.worker.resource.download.duration") + .publishPercentiles(0.5, 0.75, 0.95, 0.99) + .publishPercentileHistogram() + .description("time cost of resource download on workers") + .register(Metrics.globalRegistry); + + private final DistributionSummary workerResourceDownloadSizeDistribution = DistributionSummary + .builder("ds.worker.resource.download.size") + .baseUnit("bytes") + .publishPercentiles(0.5, 0.75, 0.95, 0.99) + .publishPercentileHistogram() + .description("size of downloaded resource files on worker") + .register(Metrics.globalRegistry); public void incWorkerOverloadCount() { workerOverloadCounter.increment();