diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/FineGrainedLock.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/FineGrainedLock.java new file mode 100644 index 0000000000..4e00b89838 --- /dev/null +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/FineGrainedLock.java @@ -0,0 +1,62 @@ +/* + * Copyright 2006-2025 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.batch.core.repository.dao.jdbc; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Fine-grained lock implementation. + * + * @author Yanming Zhou + */ +class FineGrainedLock { + + record Wrapper(Lock lock, AtomicInteger numberOfThreadsInQueue) { + + private Wrapper addThreadInQueue() { + numberOfThreadsInQueue.incrementAndGet(); + return this; + } + + private int removeThreadFromQueue() { + return numberOfThreadsInQueue.decrementAndGet(); + } + } + + private final ConcurrentHashMap locks = new ConcurrentHashMap<>(); + + public void lock(T key) { + Wrapper wrapper = locks.compute(key, + (k, v) -> v == null ? new Wrapper(new ReentrantLock(), new AtomicInteger(1)) : v.addThreadInQueue()); + wrapper.lock.lock(); + } + + public void unlock(T key) { + Wrapper wrapper = locks.get(key); + if (wrapper == null) { + throw new IllegalStateException("Lock on '" + key + "' doesn't exist, please lock it first"); + } + wrapper.lock.unlock(); + if (wrapper.removeThreadFromQueue() == 0) { + locks.remove(key, wrapper); + } + } + +} diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java index 1feaf76ef7..d45702267b 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcExecutionContextDao.java @@ -30,8 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.springframework.batch.core.job.JobExecution; @@ -58,6 +56,7 @@ * @author Michael Minella * @author David Turanski * @author Mahmoud Ben Hassine + * @author Yanming Zhou */ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implements ExecutionContextDao { @@ -113,7 +112,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem private ExecutionContextSerializer serializer = new DefaultExecutionContextSerializer(); - private final Lock lock = new ReentrantLock(); + private final FineGrainedLock lock = new FineGrainedLock<>(); /** * Setter for {@link Serializer} implementation @@ -194,9 +193,9 @@ public void updateExecutionContext(JobExecution jobExecution) { public void updateExecutionContext(StepExecution stepExecution) { // Attempt to prevent concurrent modification errors by blocking here if // someone is already trying to do it. - this.lock.lock(); + Long executionId = stepExecution.getId(); + this.lock.lock(executionId); try { - Long executionId = stepExecution.getId(); ExecutionContext executionContext = stepExecution.getExecutionContext(); Assert.notNull(executionId, "ExecutionId must not be null."); Assert.notNull(executionContext, "The ExecutionContext must not be null."); @@ -206,7 +205,7 @@ public void updateExecutionContext(StepExecution stepExecution) { persistSerializedContext(executionId, serializedContext, UPDATE_STEP_EXECUTION_CONTEXT); } finally { - this.lock.unlock(); + this.lock.unlock(executionId); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java index e0a712b041..d0a0cc41a3 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcJobExecutionDao.java @@ -24,8 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -153,7 +151,7 @@ SELECT COUNT(*) private DataFieldMaxValueIncrementer jobExecutionIncrementer; - private final Lock lock = new ReentrantLock(); + private final FineGrainedLock lock = new FineGrainedLock<>(); /** * Public setter for the exit message length in database. Do not set this if you @@ -250,13 +248,15 @@ public void updateJobExecution(JobExecution jobExecution) { validateJobExecution(jobExecution); - Assert.notNull(jobExecution.getId(), + Long executionId = jobExecution.getId(); + + Assert.notNull(executionId, "JobExecution ID cannot be null. JobExecution must be saved before it can be updated"); Assert.notNull(jobExecution.getVersion(), "JobExecution version cannot be null. JobExecution must be saved before it can be updated"); - this.lock.lock(); + this.lock.lock(executionId); try { String exitDescription = jobExecution.getExitStatus().getExitDescription(); @@ -304,7 +304,7 @@ public void updateJobExecution(JobExecution jobExecution) { jobExecution.incrementVersion(); } finally { - this.lock.unlock(); + this.lock.unlock(executionId); } } diff --git a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java index 884bc446b9..af58b035b9 100644 --- a/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java +++ b/spring-batch-core/src/main/java/org/springframework/batch/core/repository/dao/jdbc/JdbcStepExecutionDao.java @@ -22,8 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -127,7 +125,7 @@ SELECT COUNT(*) private JdbcJobExecutionDao jobExecutionDao; - private final Lock lock = new ReentrantLock(); + private final FineGrainedLock lock = new FineGrainedLock<>(); /** * Public setter for the exit message length in database. Do not set this if you @@ -215,7 +213,10 @@ private void validateStepExecution(StepExecution stepExecution) { public void updateStepExecution(StepExecution stepExecution) { validateStepExecution(stepExecution); - Assert.notNull(stepExecution.getId(), + + Long executionId = stepExecution.getId(); + + Assert.notNull(executionId, "StepExecution Id cannot be null. StepExecution must saved" + " before it can be updated."); // Do not check for existence of step execution considering @@ -225,7 +226,7 @@ public void updateStepExecution(StepExecution stepExecution) { // Attempt to prevent concurrent modification errors by blocking here if // someone is already trying to do it. - this.lock.lock(); + this.lock.lock(executionId); try { Timestamp startTime = stepExecution.getStartTime() == null ? null @@ -258,7 +259,7 @@ public void updateStepExecution(StepExecution stepExecution) { } finally { - this.lock.unlock(); + this.lock.unlock(executionId); } }