@@ -84,7 +84,6 @@ final class StreamProcessor implements DataSource {
8484 @ VisibleForTesting final URI streamUri ;
8585 @ VisibleForTesting final Duration initialReconnectDelay ;
8686 private final DiagnosticAccumulator diagnosticAccumulator ;
87- private final EventSourceCreator eventSourceCreator ;
8887 private final int threadPriority ;
8988 private final DataStoreStatusProvider .StatusListener statusListener ;
9089 private volatile EventSource es ;
@@ -94,34 +93,9 @@ final class StreamProcessor implements DataSource {
9493
9594 ConnectionErrorHandler connectionErrorHandler = createDefaultConnectionErrorHandler (); // exposed for testing
9695
97- static final class EventSourceParams {
98- final EventHandler handler ;
99- final URI streamUri ;
100- final Duration initialReconnectDelay ;
101- final ConnectionErrorHandler errorHandler ;
102- final Headers headers ;
103- final HttpConfiguration httpConfig ;
104-
105- EventSourceParams (EventHandler handler , URI streamUri , Duration initialReconnectDelay ,
106- ConnectionErrorHandler errorHandler , Headers headers , HttpConfiguration httpConfig ) {
107- this .handler = handler ;
108- this .streamUri = streamUri ;
109- this .initialReconnectDelay = initialReconnectDelay ;
110- this .errorHandler = errorHandler ;
111- this .headers = headers ;
112- this .httpConfig = httpConfig ;
113- }
114- }
115-
116- @ FunctionalInterface
117- static interface EventSourceCreator {
118- EventSource createEventSource (EventSourceParams params );
119- }
120-
12196 StreamProcessor (
12297 HttpConfiguration httpConfig ,
12398 DataSourceUpdates dataSourceUpdates ,
124- EventSourceCreator eventSourceCreator ,
12599 int threadPriority ,
126100 DiagnosticAccumulator diagnosticAccumulator ,
127101 URI streamUri ,
@@ -130,7 +104,6 @@ static interface EventSourceCreator {
130104 this .dataSourceUpdates = dataSourceUpdates ;
131105 this .httpConfig = httpConfig ;
132106 this .diagnosticAccumulator = diagnosticAccumulator ;
133- this .eventSourceCreator = eventSourceCreator != null ? eventSourceCreator : this ::defaultEventSourceCreator ;
134107 this .threadPriority = threadPriority ;
135108 this .streamUri = streamUri ;
136109 this .initialReconnectDelay = initialReconnectDelay ;
@@ -202,13 +175,26 @@ public Future<Void> start() {
202175 };
203176
204177 EventHandler handler = new StreamEventHandler (initFuture );
205-
206- es = eventSourceCreator .createEventSource (new EventSourceParams (handler ,
207- concatenateUriPath (streamUri , STREAM_URI_PATH ),
208- initialReconnectDelay ,
209- wrappedConnectionErrorHandler ,
210- headers ,
211- httpConfig ));
178+ URI endpointUri = concatenateUriPath (streamUri , STREAM_URI_PATH );
179+
180+ EventSource .Builder builder = new EventSource .Builder (handler , endpointUri )
181+ .threadPriority (threadPriority )
182+ .loggerBaseName (Loggers .DATA_SOURCE_LOGGER_NAME )
183+ .clientBuilderActions (new EventSource .Builder .ClientConfigurer () {
184+ public void configure (OkHttpClient .Builder builder ) {
185+ configureHttpClientBuilder (httpConfig , builder );
186+ }
187+ })
188+ .connectionErrorHandler (wrappedConnectionErrorHandler )
189+ .headers (headers )
190+ .reconnectTime (initialReconnectDelay )
191+ .readTimeout (DEAD_CONNECTION_INTERVAL );
192+ // Note that this is not the same read timeout that can be set in LDConfig. We default to a smaller one
193+ // there because we don't expect long delays within any *non*-streaming response that the LD client gets.
194+ // A read timeout on the stream will result in the connection being cycled, so we set this to be slightly
195+ // more than the expected interval between heartbeat signals.
196+
197+ es = builder .build ();
212198 esStarted = System .currentTimeMillis ();
213199 es .start ();
214200 return initFuture ;
@@ -356,27 +342,6 @@ public void onError(Throwable throwable) {
356342 }
357343 }
358344
359- private EventSource defaultEventSourceCreator (EventSourceParams params ) {
360- EventSource .Builder builder = new EventSource .Builder (params .handler , params .streamUri )
361- .threadPriority (threadPriority )
362- .loggerBaseName (Loggers .DATA_SOURCE_LOGGER_NAME )
363- .clientBuilderActions (new EventSource .Builder .ClientConfigurer () {
364- public void configure (OkHttpClient .Builder builder ) {
365- configureHttpClientBuilder (params .httpConfig , builder );
366- }
367- })
368- .connectionErrorHandler (params .errorHandler )
369- .headers (params .headers )
370- .reconnectTime (params .initialReconnectDelay )
371- .readTimeout (DEAD_CONNECTION_INTERVAL );
372- // Note that this is not the same read timeout that can be set in LDConfig. We default to a smaller one
373- // there because we don't expect long delays within any *non*-streaming response that the LD client gets.
374- // A read timeout on the stream will result in the connection being cycled, so we set this to be slightly
375- // more than the expected interval between heartbeat signals.
376-
377- return builder .build ();
378- }
379-
380345 private static Map .Entry <DataKind , String > getKindAndKeyFromStreamApiPath (String path ) throws StreamInputException {
381346 if (path == null ) {
382347 throw new StreamInputException ("missing item path" );
0 commit comments