diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java index bdb2bb3c..c81dc78d 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskProcessingIntTest.java @@ -4,6 +4,7 @@ import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; +import com.google.common.util.concurrent.RateLimiter; import com.transferwise.common.baseutils.ExceptionUtils; import com.transferwise.common.baseutils.UuidUtils; import com.transferwise.common.context.Criticality; @@ -444,6 +445,75 @@ public void freeSpace(IBaseTask task) { })); } + private static class TestConcurrencyPolicyWithRateLimiter implements ITaskConcurrencyPolicy { + + private final RateLimiter rateLimiter; + private final ITaskConcurrencyPolicy concurrencyLimitingPolicy; + + TestConcurrencyPolicyWithRateLimiter(int maxConcurrency, double requestsPerSecond) { + this.rateLimiter = RateLimiter.create(requestsPerSecond); + this.concurrencyLimitingPolicy = new SimpleTaskConcurrencyPolicy(maxConcurrency); + } + + @Override + public @NonNull BookSpaceResponse bookSpace(IBaseTask task) { + BookSpaceResponse concurrencyLimitingResponse = concurrencyLimitingPolicy.bookSpace(task); + if (!concurrencyLimitingResponse.isHasRoom()) { + return concurrencyLimitingResponse; + } + + if (rateLimiter.tryAcquire()) { + return concurrencyLimitingResponse; + } + concurrencyLimitingPolicy.freeSpace(task); + + // Most optimal would be to set `tryAgainTime` when a new permit becomes available. + // Unfortunately Guava's implementation does not expose it's inner method for it, so we just try again after 100 millis. + // 100 millis worst case latency here and there is not a problem for us. + // There probably is a better rate limiting library. + return new BookSpaceResponse(false).setTryAgainTime(Instant.now().plusMillis(100)); + } + + @Override + public void freeSpace(IBaseTask task) { + concurrencyLimitingPolicy.freeSpace(task); + } + } + + + @Test + void taskWithRateLimitingConcurrencyPolicyWillNotHangWithManyTasks() { + AtomicInteger counter = new AtomicInteger(); + testTaskHandlerAdapter.setProcessor((ISyncTaskProcessor) task -> { + log.info("Processing task {}", counter.incrementAndGet()); + return new ProcessResult().setResultCode(ResultCode.DONE); + }); + + testTaskHandlerAdapter.setConcurrencyPolicy(new TestConcurrencyPolicyWithRateLimiter(2, 0.5d)); + + int allTasks = 10000; + int waitFor = 1000; + + for (int i = 0; i < allTasks; i++) { + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest().setType("test")) + ); + } + + log.info("Waiting for {} tasks to be processed.", waitFor); + + await().atMost(Duration.ofMinutes(10)).until(() -> transactionsHelper.withTransaction().asNew().call(() -> { + try { + int finished = testTasksService.getFinishedTasks("test", null).size(); + log.info("Finished tasks: {}", finished); + return finished >= waitFor; + } catch (Throwable t) { + log.error(t.getMessage(), t); + } + return false; + })); + } + @Test void taskProcessingWillHandlePoisonPillAttack() { // given: