|
23 | 23 | import static org.junit.Assert.fail; |
24 | 24 |
|
25 | 25 | import java.io.IOException; |
| 26 | +import java.util.ArrayList; |
26 | 27 | import java.util.Iterator; |
27 | 28 | import java.util.List; |
28 | 29 | import java.util.concurrent.ArrayBlockingQueue; |
@@ -4306,6 +4307,75 @@ protected Integer run() throws Exception { |
4306 | 4307 | assertTrue(semaphoreExceptionEncountered.get()); |
4307 | 4308 | } |
4308 | 4309 |
|
| 4310 | + @Test |
| 4311 | + public void testCommandConcurrencyViaObserveExceedsQueueSizeButNotThreadPoolSize() { |
| 4312 | + List<Observable<Boolean>> cmdResults = new ArrayList<Observable<Boolean>>(); |
| 4313 | + |
| 4314 | + HystrixThreadPool threadPool = null; |
| 4315 | + |
| 4316 | + //thread pool size is 20, so we have room for concurrent execution of all 20 commands |
| 4317 | + //but queue size is 2 - do we see any queue rejections? - we should not |
| 4318 | + for (int i = 0; i < 20; i++) { |
| 4319 | + HystrixCommand<Boolean> cmd = new CommandWithLargeThreadPoolSmallQueue(); |
| 4320 | + if (threadPool == null) { |
| 4321 | + threadPool = cmd.threadPool; |
| 4322 | + } |
| 4323 | + cmdResults.add(cmd.toObservable()); |
| 4324 | + } |
| 4325 | + |
| 4326 | + Observable<Boolean> allObservables = Observable.merge(cmdResults); |
| 4327 | + |
| 4328 | + TestSubscriber<Boolean> subscriber = new TestSubscriber<Boolean>(); |
| 4329 | + |
| 4330 | + allObservables.subscribe(subscriber); |
| 4331 | + |
| 4332 | + subscriber.awaitTerminalEvent(1, TimeUnit.SECONDS); |
| 4333 | + if (subscriber.getOnErrorEvents().size() > 0) { |
| 4334 | + subscriber.getOnErrorEvents().get(0).printStackTrace(); |
| 4335 | + } |
| 4336 | + |
| 4337 | + subscriber.assertCompleted(); |
| 4338 | + subscriber.assertNoErrors(); |
| 4339 | + subscriber.assertValueCount(20); |
| 4340 | + } |
| 4341 | + |
| 4342 | + @Test |
| 4343 | + public void stressTestLargeThreadPoolSmallQueueUsingObserve() { |
| 4344 | + for (int n = 0; n < 20; n++) { |
| 4345 | + testCommandConcurrencyViaObserveExceedsQueueSizeButNotThreadPoolSize(); |
| 4346 | + Hystrix.reset(); |
| 4347 | + } |
| 4348 | + } |
| 4349 | + |
| 4350 | + @Test |
| 4351 | + public void testCommandConcurrencyViaQueueExceedsQueueSizeButNotThreadPoolSize() throws Exception { |
| 4352 | + List<Future<Boolean>> cmdResults = new ArrayList<Future<Boolean>>(); |
| 4353 | + |
| 4354 | + HystrixThreadPool threadPool = null; |
| 4355 | + |
| 4356 | + //thread pool size is 20, so we have room for concurrent execution of all 20 commands |
| 4357 | + //but queue size is 2 - do we see any queue rejections? - we should not |
| 4358 | + for (int i = 0; i < 20; i++) { |
| 4359 | + HystrixCommand<Boolean> cmd = new CommandWithLargeThreadPoolSmallQueue(); |
| 4360 | + if (threadPool == null) { |
| 4361 | + threadPool = cmd.threadPool; |
| 4362 | + } |
| 4363 | + cmdResults.add(cmd.queue()); |
| 4364 | + } |
| 4365 | + |
| 4366 | + for (Future<Boolean> f: cmdResults) { |
| 4367 | + f.get(1, TimeUnit.SECONDS); |
| 4368 | + } |
| 4369 | + } |
| 4370 | + |
| 4371 | + @Test |
| 4372 | + public void stressTestLargeThreadPoolSmallQueueUsingQueue() throws Exception { |
| 4373 | + for (int n = 0; n < 20; n++) { |
| 4374 | + testCommandConcurrencyViaQueueExceedsQueueSizeButNotThreadPoolSize(); |
| 4375 | + Hystrix.reset(); |
| 4376 | + } |
| 4377 | + } |
| 4378 | + |
4309 | 4379 | /* ******************************************************************************** */ |
4310 | 4380 | /* ******************************************************************************** */ |
4311 | 4381 | /* private HystrixCommand class implementations for unit testing */ |
@@ -5176,4 +5246,17 @@ protected Boolean getFallback() { |
5176 | 5246 | return false; |
5177 | 5247 | } |
5178 | 5248 | } |
| 5249 | + |
| 5250 | + private static class CommandWithLargeThreadPoolSmallQueue extends TestHystrixCommand<Boolean> { |
| 5251 | + |
| 5252 | + public CommandWithLargeThreadPoolSmallQueue() { |
| 5253 | + super(testPropsBuilder().setThreadPoolPropertiesDefaults( |
| 5254 | + HystrixThreadPoolProperties.Setter().withMaxQueueSize(2).withQueueSizeRejectionThreshold(2).withCoreSize(20))); |
| 5255 | + } |
| 5256 | + |
| 5257 | + @Override |
| 5258 | + protected Boolean run() throws Exception { |
| 5259 | + return true; |
| 5260 | + } |
| 5261 | + } |
5179 | 5262 | } |
0 commit comments