From 8f54e23216aa64275d1875e397992d78aabbcc84 Mon Sep 17 00:00:00 2001 From: peterborbas-wise Date: Wed, 2 Apr 2025 11:21:48 +0200 Subject: [PATCH 1/2] finished count stuck at 0 --- .../tasks/testapp/TaskProcessingIntTest.java | 70 +++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java index bdb2bb3c..1162abae 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java @@ -4,6 +4,7 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; +import com.google.common.util.concurrent.RateLimiter; import com.transferwise.common.baseutils.ExceptionUtils; import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.common.context.Criticality; @@ -444,6 +445,75 @@ public void freeSpace(IBaseTask task) { })); } + private static class TestConcurrencyPolicyWithRateLimiter implements ITaskConcurrencyPolicy { + + private final RateLimiter rateLimiter; + private final ITaskConcurrencyPolicy concurrencyLimitingPolicy; + + TestConcurrencyPolicyWithRateLimiter(int maxConcurrency, double requestsPerSecond) { + this.rateLimiter = RateLimiter.create(requestsPerSecond); + this.concurrencyLimitingPolicy = new SimpleTaskConcurrencyPolicy(maxConcurrency); + } + + @Override + public @NonNull BookSpaceResponse bookSpace(IBaseTask task) { + BookSpaceResponse concurrencyLimitingResponse = concurrencyLimitingPolicy.bookSpace(task); + if (!concurrencyLimitingResponse.isHasRoom()) { + return concurrencyLimitingResponse; + } + + if (rateLimiter.tryAcquire()) { + return concurrencyLimitingResponse; + } + concurrencyLimitingPolicy.freeSpace(task); + + // Most optimal would be to set `tryAgainTime` when a new permit becomes available. + // Unfortunately Guava's implementation does not expose it's inner method for it, so we just try again after 100 millis. + // 100 millis worst case latency here and there is not a problem for us. + // There probably is a better rate limiting library. + return new BookSpaceResponse(false).setTryAgainTime(Instant.now().plusMillis(100)); + } + + @Override + public void freeSpace(IBaseTask task) { + concurrencyLimitingPolicy.freeSpace(task); + } + } + + + @Test + void taskWithRateLimitingConcurrencyPolicyWillNotHang2() { + AtomicInteger counter = new AtomicInteger(); + testTaskHandlerAdapter.setProcessor((ISyncTaskProcessor) task -> { + log.info("Processing task {}", counter.incrementAndGet()); + return new ProcessResult().setResultCode(ResultCode.DONE); + }); + + testTaskHandlerAdapter.setConcurrencyPolicy(new TestConcurrencyPolicyWithRateLimiter(2, 0.5d)); + + int allTasks = 10000; + int waitFor = 1000; + + for (int i = 0; i < allTasks; i++) { + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest().setType("test")) + ); + } + + log.info("Waiting for {} tasks to be processed.", waitFor); + + await().atMost(Duration.ofMinutes(10)).until(() -> transactionsHelper.withTransaction().asNew().call(() -> { + try { + int finished = testTasksService.getFinishedTasks("test", null).size(); + log.info("Finished tasks: {}", finished); + return finished >= waitFor; + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + return false; + })); + } + @Test void taskProcessingWillHandlePoisonPillAttack() { // given: From b6b323e1d98979fcf386095c2e96a63f14e7ab1d Mon Sep 17 00:00:00 2001 From: peterborbas-wise Date: Wed, 2 Apr 2025 11:24:50 +0200 Subject: [PATCH 2/2] finished count stuck at 0 --- .../com/transferwise/tasks/testapp/TaskProcessingIntTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java index 1162abae..c81dc78d 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java @@ -482,7 +482,7 @@ public void freeSpace(IBaseTask task) { @Test - void taskWithRateLimitingConcurrencyPolicyWillNotHang2() { + void taskWithRateLimitingConcurrencyPolicyWillNotHangWithManyTasks() { AtomicInteger counter = new AtomicInteger(); testTaskHandlerAdapter.setProcessor((ISyncTaskProcessor) task -> { log.info("Processing task {}", counter.incrementAndGet());