Skip to content

Commit a8b064d

Browse files
committed
Merge pull request #905 from ruhkopf/1.4.x_ObservableCollapser_example
observable collapser example
2 parents 881a4c9 + 1c1cfd3 commit a8b064d

File tree

2 files changed

+349
-0
lines changed

2 files changed

+349
-0
lines changed
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
package com.netflix.hystrix.examples.basic;
2+
3+
import static org.hamcrest.CoreMatchers.equalTo;
4+
import static org.hamcrest.CoreMatchers.is;
5+
import static org.junit.Assert.assertThat;
6+
import static org.junit.Assert.assertTrue;
7+
8+
import java.util.ArrayList;
9+
import java.util.Collection;
10+
import java.util.HashMap;
11+
import java.util.List;
12+
import java.util.Map;
13+
import java.util.Map.Entry;
14+
import java.util.concurrent.TimeUnit;
15+
import java.util.concurrent.atomic.AtomicInteger;
16+
17+
import org.junit.After;
18+
import org.junit.Before;
19+
import org.junit.Test;
20+
21+
import rx.Observable;
22+
import rx.functions.Func0;
23+
import rx.functions.Func1;
24+
import rx.observers.TestSubscriber;
25+
import rx.schedulers.Schedulers;
26+
27+
import com.netflix.hystrix.HystrixCollapser.CollapsedRequest;
28+
import com.netflix.hystrix.HystrixObservableCollapser;
29+
import com.netflix.hystrix.HystrixObservableCommand;
30+
import com.netflix.hystrix.HystrixRequestLog;
31+
import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord;
32+
import com.netflix.hystrix.strategy.concurrency.HystrixContextScheduler;
33+
import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
34+
35+
/**
36+
* Example that uses {@link HystrixObservableCollapser} to batch multiple {@link ObservableCommandNumbersToWords} requests.
37+
*
38+
* @author Patrick Ruhkopf
39+
*/
40+
public class ObservableCollapserGetWordForNumber extends HystrixObservableCollapser<Integer, NumberWord, String, Integer>
41+
{
42+
private final Integer number;
43+
44+
private final static AtomicInteger counter = new AtomicInteger();
45+
46+
public static void resetCmdCounter()
47+
{
48+
counter.set(0);
49+
}
50+
51+
public static int getCmdCount()
52+
{
53+
return counter.get();
54+
}
55+
56+
public ObservableCollapserGetWordForNumber(final Integer number)
57+
{
58+
this.number = number;
59+
}
60+
61+
@Override
62+
public Integer getRequestArgument()
63+
{
64+
return number;
65+
}
66+
67+
@SuppressWarnings("boxing")
68+
@Override
69+
protected HystrixObservableCommand<NumberWord> createCommand(final Collection<CollapsedRequest<String, Integer>> requests)
70+
{
71+
final int count = counter.incrementAndGet();
72+
System.out.println("Creating batch for " + requests.size() + " requests. Total invocations so far: " + count);
73+
74+
final List<Integer> numbers = new ArrayList<Integer>();
75+
for (final CollapsedRequest<String, Integer> request : requests)
76+
{
77+
numbers.add(request.getArgument());
78+
}
79+
80+
return new ObservableCommandNumbersToWords(numbers);
81+
}
82+
83+
@Override
84+
protected Func1<NumberWord, Integer> getBatchReturnTypeKeySelector()
85+
{
86+
// Java 8: (final NumberWord nw) -> nw.getNumber();
87+
88+
return new Func1<NumberWord, Integer>()
89+
{
90+
@Override
91+
public Integer call(final NumberWord nw)
92+
{
93+
return nw.getNumber();
94+
}
95+
};
96+
}
97+
98+
@Override
99+
protected Func1<Integer, Integer> getRequestArgumentKeySelector()
100+
{
101+
// Java 8: return (final Integer no) -> no;
102+
103+
return new Func1<Integer, Integer>()
104+
{
105+
@Override
106+
public Integer call(final Integer no)
107+
{
108+
return no;
109+
}
110+
111+
};
112+
}
113+
114+
@Override
115+
protected Func1<NumberWord, String> getBatchReturnTypeToResponseTypeMapper()
116+
{
117+
// Java 8: return (final NumberWord nw) -> nw.getWord();
118+
119+
return new Func1<NumberWord, String>()
120+
{
121+
@Override
122+
public String call(final NumberWord nw)
123+
{
124+
return nw.getWord();
125+
}
126+
};
127+
}
128+
129+
@Override
130+
protected void onMissingResponse(final CollapsedRequest<String, Integer> request)
131+
{
132+
request.setException(new Exception("No word"));
133+
}
134+
135+
public static class ObservableCollapserGetWordForNumberTest
136+
{
137+
private HystrixRequestContext ctx;
138+
139+
@Before
140+
public void before()
141+
{
142+
ctx = HystrixRequestContext.initializeContext();
143+
ObservableCollapserGetWordForNumber.resetCmdCounter();
144+
}
145+
146+
@After
147+
public void after()
148+
{
149+
System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommandsAsString());
150+
ctx.shutdown();
151+
}
152+
153+
/**
154+
* Example where we subscribe without using a specific scheduler. That means we run the actions on the same thread.
155+
*/
156+
@Test
157+
public void shouldCollapseRequestsSync()
158+
{
159+
final int noOfRequests = 10;
160+
final Map<Integer, TestSubscriber<String>> subscribersByNumber = new HashMap<Integer, TestSubscriber<String>>(
161+
noOfRequests);
162+
163+
TestSubscriber<String> subscriber;
164+
for (int number = 0; number < noOfRequests; number++)
165+
{
166+
subscriber = new TestSubscriber<String>();
167+
new ObservableCollapserGetWordForNumber(number).toObservable().subscribe(subscriber);
168+
subscribersByNumber.put(number, subscriber);
169+
170+
// wait a little bit after running half of the requests so that we don't collapse all of them into one batch
171+
// TODO this can probably be improved by using a test scheduler
172+
if (number == noOfRequests / 2)
173+
sleep(1000);
174+
175+
}
176+
177+
assertThat(subscribersByNumber.size(), is(noOfRequests));
178+
for (final Entry<Integer, TestSubscriber<String>> subscriberByNumber : subscribersByNumber.entrySet())
179+
{
180+
subscriber = subscriberByNumber.getValue();
181+
subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);
182+
183+
assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0));
184+
assertThat(subscriber.getOnNextEvents().size(), is(1));
185+
186+
final String word = subscriber.getOnNextEvents().get(0);
187+
System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word);
188+
assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey())));
189+
}
190+
191+
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1);
192+
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests);
193+
}
194+
195+
/**
196+
* Example where we subscribe on the computation scheduler. For this we need the {@link HystrixContextScheduler}, that
197+
* passes the {@link HystrixRequestContext} to the thread that runs the action.
198+
*/
199+
@Test
200+
public void shouldCollapseRequestsAsync()
201+
{
202+
final HystrixContextScheduler contextAwareScheduler = new HystrixContextScheduler(Schedulers.computation());
203+
204+
final int noOfRequests = 10;
205+
final Map<Integer, TestSubscriber<String>> subscribersByNumber = new HashMap<Integer, TestSubscriber<String>>(
206+
noOfRequests);
207+
208+
TestSubscriber<String> subscriber;
209+
for (int number = 0; number < noOfRequests; number++)
210+
{
211+
subscriber = new TestSubscriber<String>();
212+
final int finalNumber = number;
213+
214+
// defer and subscribe on specific scheduler
215+
Observable.defer(new Func0<Observable<String>>()
216+
{
217+
@Override
218+
public Observable<String> call()
219+
{
220+
return new ObservableCollapserGetWordForNumber(finalNumber).toObservable();
221+
}
222+
}).subscribeOn(contextAwareScheduler).subscribe(subscriber);
223+
224+
subscribersByNumber.put(number, subscriber);
225+
226+
// wait a little bit after running half of the requests so that we don't collapse all of them into one batch
227+
// TODO this can probably be improved by using a test scheduler
228+
if (number == noOfRequests / 2)
229+
sleep(1000);
230+
}
231+
232+
assertThat(subscribersByNumber.size(), is(noOfRequests));
233+
for (final Entry<Integer, TestSubscriber<String>> subscriberByNumber : subscribersByNumber.entrySet())
234+
{
235+
subscriber = subscriberByNumber.getValue();
236+
subscriber.awaitTerminalEvent(10, TimeUnit.SECONDS);
237+
238+
assertThat(subscriber.getOnErrorEvents().toString(), subscriber.getOnErrorEvents().size(), is(0));
239+
assertThat(subscriber.getOnNextEvents().size(), is(1));
240+
241+
final String word = subscriber.getOnNextEvents().get(0);
242+
System.out.println("Translated " + subscriberByNumber.getKey() + " to " + word);
243+
assertThat(word, equalTo(numberToWord(subscriberByNumber.getKey())));
244+
}
245+
246+
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() > 1);
247+
assertTrue(ObservableCollapserGetWordForNumber.getCmdCount() < noOfRequests);
248+
}
249+
250+
private String numberToWord(final int number)
251+
{
252+
return ObservableCommandNumbersToWords.dict.get(number);
253+
}
254+
255+
private void sleep(final long ms)
256+
{
257+
try
258+
{
259+
Thread.sleep(1000);
260+
}
261+
catch (final InterruptedException e)
262+
{
263+
throw new IllegalStateException(e);
264+
}
265+
}
266+
267+
}
268+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package com.netflix.hystrix.examples.basic;
2+
3+
import java.util.HashMap;
4+
import java.util.List;
5+
import java.util.Map;
6+
7+
import rx.Observable;
8+
import rx.functions.Func1;
9+
10+
import com.netflix.hystrix.HystrixCommandGroupKey;
11+
import com.netflix.hystrix.HystrixObservableCommand;
12+
import com.netflix.hystrix.examples.basic.ObservableCommandNumbersToWords.NumberWord;
13+
14+
/**
15+
* A simple Hystrix Observable command that translates a number (<code>Integer</code>) into an English text.
16+
*/
17+
class ObservableCommandNumbersToWords extends HystrixObservableCommand<NumberWord>
18+
{
19+
private final List<Integer> numbers;
20+
21+
// in the real world you'd probably want to replace this very simple code by using ICU or similar
22+
static Map<Integer, String> dict = new HashMap<Integer, String>(11);
23+
static
24+
{
25+
dict.put(0, "zero");
26+
dict.put(1, "one");
27+
dict.put(2, "two");
28+
dict.put(3, "three");
29+
dict.put(4, "four");
30+
dict.put(5, "five");
31+
dict.put(6, "six");
32+
dict.put(7, "seven");
33+
dict.put(8, "eight");
34+
dict.put(9, "nine");
35+
dict.put(10, "ten");
36+
}
37+
38+
public ObservableCommandNumbersToWords(final List<Integer> numbers)
39+
{
40+
super(HystrixCommandGroupKey.Factory.asKey(ObservableCommandNumbersToWords.class.getName()));
41+
this.numbers = numbers;
42+
}
43+
44+
@Override
45+
protected Observable<NumberWord> construct()
46+
{
47+
return Observable.from(numbers).map(new Func1<Integer, NumberWord>()
48+
{
49+
@Override
50+
public NumberWord call(final Integer number)
51+
{
52+
return new NumberWord(number, dict.get(number));
53+
}
54+
55+
});
56+
}
57+
58+
static class NumberWord
59+
{
60+
private final Integer number;
61+
private final String word;
62+
63+
public NumberWord(final Integer number, final String word)
64+
{
65+
super();
66+
this.number = number;
67+
this.word = word;
68+
}
69+
70+
public Integer getNumber()
71+
{
72+
return number;
73+
}
74+
75+
public String getWord()
76+
{
77+
return word;
78+
}
79+
}
80+
81+
}

0 commit comments

Comments
 (0)