Skip to content

Commit 0e18402

Browse files
committed
Merge pull request #890 from mattrjacobs/multiple-responses-per-collapser-arg
Multiple responses per collapser arg
2 parents b21f8ca + f88bb26 commit 0e18402

File tree

5 files changed

+728
-197
lines changed

5 files changed

+728
-197
lines changed

hystrix-core/src/jmh/java/com/netflix/hystrix/perf/ObservableCollapserPerfTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static class CollapserState {
6161
@Param({"1", "10", "100", "1000"})
6262
int numToCollapse;
6363

64-
@Param({"1"}) //until bugfix for https://github.com/Netflix/Hystrix/issues/865, 1 is only value that works as expected
64+
@Param({"1", "10", "100"})
6565
int numResponsesPerArg;
6666

6767
@Param({"1", "1000", "1000000"})

hystrix-core/src/main/java/com/netflix/hystrix/HystrixCollapser.java

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828
import rx.Observable;
2929
import rx.Scheduler;
30-
import rx.functions.Func1;
30+
import rx.functions.Action1;
3131
import rx.schedulers.Schedulers;
3232
import rx.subjects.ReplaySubject;
3333

@@ -163,16 +163,13 @@ public Observable<BatchReturnType> createObservableCommand(Collection<CollapsedR
163163

164164
@Override
165165
public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchResponse, final Collection<CollapsedRequest<ResponseType, RequestArgumentType>> requests) {
166-
return batchResponse.single().flatMap(new Func1<BatchReturnType, Observable<Void>>() {
167-
166+
return batchResponse.single().doOnNext(new Action1<BatchReturnType>() {
168167
@Override
169-
public Observable<Void> call(BatchReturnType response) {
168+
public void call(BatchReturnType batchReturnType) {
170169
// this is a blocking call in HystrixCollapser
171-
self.mapResponseToRequests(response, requests);
172-
return Observable.empty();
170+
self.mapResponseToRequests(batchReturnType, requests);
173171
}
174-
175-
});
172+
}).ignoreElements().cast(Void.class);
176173
}
177174

178175
@Override
@@ -509,23 +506,40 @@ public interface CollapsedRequest<ResponseType, RequestArgumentType> {
509506
public RequestArgumentType getArgument();
510507

511508
/**
512-
* When set any client thread blocking on get() will immediately be unblocked and receive the response.
513-
*
509+
* This corresponds in a OnNext(Response); OnCompleted pair of emissions. It represents a single-value usecase.
510+
*
514511
* @throws IllegalStateException
515-
* if called more than once or after setException.
512+
* if called more than once or after setException/setComplete.
516513
* @param response
517514
* ResponseType
518515
*/
519516
public void setResponse(ResponseType response);
520517

521518
/**
522-
* When set any client thread blocking on get() will immediately be unblocked and receive the exception.
519+
* When invoked, any Observer will be OnNexted this value
520+
* @throws IllegalStateException
521+
* if called after setException/setResponse/setComplete.
522+
* @param response
523+
*/
524+
public void emitResponse(ResponseType response);
525+
526+
/**
527+
* When set, any Observer will be OnErrored this exception
523528
*
524529
* @param exception exception to set on response
525530
* @throws IllegalStateException
526-
* if called more than once or after setResponse.
531+
* if called more than once or after setResponse/setComplete.
527532
*/
528533
public void setException(Exception exception);
534+
535+
/**
536+
* When set, any Observer will have an OnCompleted emitted.
537+
* The intent is to use if after a series of emitResponses
538+
*
539+
* Note that, unlike the other 3 methods above, this method does not throw an IllegalStateException.
540+
* This allows Hystrix-core to unilaterally call it without knowing the internal state.
541+
*/
542+
public void setComplete();
529543
}
530544

531545
/**

hystrix-core/src/main/java/com/netflix/hystrix/HystrixObservableCollapser.java

Lines changed: 33 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@
1818
import java.util.Collection;
1919
import java.util.Collections;
2020
import java.util.HashMap;
21+
import java.util.HashSet;
2122
import java.util.Map;
23+
import java.util.Set;
2224
import java.util.concurrent.ConcurrentHashMap;
2325

2426
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisherFactory;
@@ -29,6 +31,7 @@
2931
import rx.Observable;
3032
import rx.Scheduler;
3133
import rx.functions.Action0;
34+
import rx.functions.Action1;
3235
import rx.functions.Func1;
3336
import rx.schedulers.Schedulers;
3437
import rx.subjects.ReplaySubject;
@@ -171,32 +174,46 @@ public Observable<Void> mapResponseToRequests(Observable<BatchReturnType> batchR
171174
// index the requests by key
172175
final Map<K, CollapsedRequest<ResponseType, RequestArgumentType>> requestsByKey = new HashMap<K, CollapsedRequest<ResponseType, RequestArgumentType>>(requests.size());
173176
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requests) {
174-
requestsByKey.put(requestKeySelector.call(cr.getArgument()), cr);
177+
K requestArg = requestKeySelector.call(cr.getArgument());
178+
requestsByKey.put(requestArg, cr);
175179
}
180+
final Set<K> seenKeys = new HashSet<K>();
176181

177182
// observe the responses and join with the requests by key
178-
return batchResponse.flatMap(new Func1<BatchReturnType, Observable<Void>>() {
179-
183+
return batchResponse.doOnNext(new Action1<BatchReturnType>() {
180184
@Override
181-
public Observable<Void> call(BatchReturnType r) {
182-
K responseKey = batchResponseKeySelector.call(r);
183-
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
184-
requestForResponse.setResponse(mapBatchTypeToResponseType.call(r));
185-
// now remove from map so we know what wasn't set at end
186-
requestsByKey.remove(responseKey);
187-
return Observable.empty();
185+
public void call(BatchReturnType batchReturnType) {
186+
try {
187+
K responseKey = batchResponseKeySelector.call(batchReturnType);
188+
CollapsedRequest<ResponseType, RequestArgumentType> requestForResponse = requestsByKey.get(responseKey);
189+
if (requestForResponse != null) {
190+
requestForResponse.emitResponse(mapBatchTypeToResponseType.call(batchReturnType));
191+
// now add this to seenKeys, so we can later check what was seen, and what was unseen
192+
seenKeys.add(responseKey);
193+
} else {
194+
logger.warn("Batch Response contained a response key not in request batch : " + responseKey);
195+
}
196+
} catch (Throwable ex) {
197+
logger.warn("Uncaught error during demultiplexing of BatchResponse", ex);
198+
}
188199
}
189-
190200
}).doOnTerminate(new Action0() {
191-
192201
@Override
193202
public void call() {
194-
for (CollapsedRequest<ResponseType, RequestArgumentType> cr : requestsByKey.values()) {
195-
onMissingResponse(cr);
203+
for (K key: requestsByKey.keySet()) {
204+
CollapsedRequest<ResponseType, RequestArgumentType> collapsedReq = requestsByKey.get(key);
205+
if (!seenKeys.contains(key)) {
206+
try {
207+
onMissingResponse(collapsedReq);
208+
} catch (Throwable ex) {
209+
collapsedReq.setException(new RuntimeException("Error in HystrixObservableCollapser.onMissingResponse handler", ex));
210+
}
211+
}
212+
//then unconditionally issue an onCompleted. this ensures the downstream gets a terminal, regardless of how onMissingResponse was implemented
213+
collapsedReq.setComplete();
196214
}
197215
}
198-
199-
});
216+
}).ignoreElements().cast(Void.class);
200217
}
201218

202219
@Override

0 commit comments

Comments
 (0)