Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright 2006-2025 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.batch.core.repository.dao.jdbc;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* Fine-grained lock implementation.
*
* @author Yanming Zhou
*/
class FineGrainedLock<T> {

record Wrapper(Lock lock, AtomicInteger numberOfThreadsInQueue) {

private Wrapper addThreadInQueue() {
numberOfThreadsInQueue.incrementAndGet();
return this;
}

private int removeThreadFromQueue() {
return numberOfThreadsInQueue.decrementAndGet();
}
}

private final ConcurrentHashMap<T, Wrapper> locks = new ConcurrentHashMap<>();

public void lock(T key) {
Wrapper wrapper = locks.compute(key,
(k, v) -> v == null ? new Wrapper(new ReentrantLock(), new AtomicInteger(1)) : v.addThreadInQueue());
wrapper.lock.lock();
}

public void unlock(T key) {
Wrapper wrapper = locks.get(key);
if (wrapper == null) {
throw new IllegalStateException("Lock on '" + key + "' doesn't exist, please lock it first");
}
wrapper.lock.unlock();
if (wrapper.removeThreadFromQueue() == 0) {
locks.remove(key, wrapper);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.springframework.batch.core.job.JobExecution;

Expand All @@ -58,6 +56,7 @@
* @author Michael Minella
* @author David Turanski
* @author Mahmoud Ben Hassine
* @author Yanming Zhou
*/
public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implements ExecutionContextDao {

Expand Down Expand Up @@ -113,7 +112,7 @@ public class JdbcExecutionContextDao extends AbstractJdbcBatchMetadataDao implem

private ExecutionContextSerializer serializer = new DefaultExecutionContextSerializer();

private final Lock lock = new ReentrantLock();
private final FineGrainedLock<Long> lock = new FineGrainedLock<>();

/**
* Setter for {@link Serializer} implementation
Expand Down Expand Up @@ -194,9 +193,9 @@ public void updateExecutionContext(JobExecution jobExecution) {
public void updateExecutionContext(StepExecution stepExecution) {
// Attempt to prevent concurrent modification errors by blocking here if
// someone is already trying to do it.
this.lock.lock();
Long executionId = stepExecution.getId();
this.lock.lock(executionId);
try {
Long executionId = stepExecution.getId();
ExecutionContext executionContext = stepExecution.getExecutionContext();
Assert.notNull(executionId, "ExecutionId must not be null.");
Assert.notNull(executionContext, "The ExecutionContext must not be null.");
Expand All @@ -206,7 +205,7 @@ public void updateExecutionContext(StepExecution stepExecution) {
persistSerializedContext(executionId, serializedContext, UPDATE_STEP_EXECUTION_CONTEXT);
}
finally {
this.lock.unlock();
this.lock.unlock(executionId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -153,7 +151,7 @@ SELECT COUNT(*)

private DataFieldMaxValueIncrementer jobExecutionIncrementer;

private final Lock lock = new ReentrantLock();
private final FineGrainedLock<Long> lock = new FineGrainedLock<>();

/**
* Public setter for the exit message length in database. Do not set this if you
Expand Down Expand Up @@ -250,13 +248,15 @@ public void updateJobExecution(JobExecution jobExecution) {

validateJobExecution(jobExecution);

Assert.notNull(jobExecution.getId(),
Long executionId = jobExecution.getId();

Assert.notNull(executionId,
"JobExecution ID cannot be null. JobExecution must be saved before it can be updated");

Assert.notNull(jobExecution.getVersion(),
"JobExecution version cannot be null. JobExecution must be saved before it can be updated");

this.lock.lock();
this.lock.lock(executionId);
try {

String exitDescription = jobExecution.getExitStatus().getExitDescription();
Expand Down Expand Up @@ -304,7 +304,7 @@ public void updateJobExecution(JobExecution jobExecution) {
jobExecution.incrementVersion();
}
finally {
this.lock.unlock();
this.lock.unlock(executionId);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -127,7 +125,7 @@ SELECT COUNT(*)

private JdbcJobExecutionDao jobExecutionDao;

private final Lock lock = new ReentrantLock();
private final FineGrainedLock<Long> lock = new FineGrainedLock<>();

/**
* Public setter for the exit message length in database. Do not set this if you
Expand Down Expand Up @@ -215,7 +213,10 @@ private void validateStepExecution(StepExecution stepExecution) {
public void updateStepExecution(StepExecution stepExecution) {

validateStepExecution(stepExecution);
Assert.notNull(stepExecution.getId(),

Long executionId = stepExecution.getId();

Assert.notNull(executionId,
"StepExecution Id cannot be null. StepExecution must saved" + " before it can be updated.");

// Do not check for existence of step execution considering
Expand All @@ -225,7 +226,7 @@ public void updateStepExecution(StepExecution stepExecution) {

// Attempt to prevent concurrent modification errors by blocking here if
// someone is already trying to do it.
this.lock.lock();
this.lock.lock(executionId);
try {

Timestamp startTime = stepExecution.getStartTime() == null ? null
Expand Down Expand Up @@ -258,7 +259,7 @@ public void updateStepExecution(StepExecution stepExecution) {

}
finally {
this.lock.unlock();
this.lock.unlock(executionId);
}
}

Expand Down