@@ -105,10 +105,10 @@ private void handleIncomingMessages(Function<Mono<JSONRPCMessage>, Mono<JSONRPCM
105105 .flatMap (message -> Mono .just (message )
106106 .transform (inboundMessageHandler )
107107 .contextWrite (ctx -> ctx .put ("observation" , "myObservation" )))
108- .doOnComplete (() -> {
108+ .doOnTerminate (() -> {
109+ // The outbound processing will dispose its scheduler upon completion
109110 this .outboundSink .tryEmitComplete ();
110111 this .inboundScheduler .dispose ();
111- this .outboundScheduler .dispose ();
112112 })
113113 .subscribe ();
114114 }
@@ -208,13 +208,13 @@ else if (isClosing) {
208208 })
209209 .doOnComplete (() -> {
210210 isClosing = true ;
211- outboundSink . tryEmitComplete ();
211+ outboundScheduler . dispose ();
212212 })
213213 .doOnError (e -> {
214214 if (!isClosing ) {
215215 logger .error ("Error in outbound processing" , e );
216216 isClosing = true ;
217- outboundSink . tryEmitComplete ();
217+ outboundScheduler . dispose ();
218218 }
219219 })
220220 .map (msg -> (JSONRPCMessage ) msg );
@@ -224,26 +224,15 @@ else if (isClosing) {
224224
225225 @ Override
226226 public Mono <Void > closeGracefully () {
227-
228- return Mono .fromRunnable (() -> {
227+ return Mono .<Void >defer (() -> {
229228 isClosing = true ;
230229 logger .debug ("Initiating graceful shutdown" );
231- }). then ( Mono . defer (() -> {
232- // First complete the sinks to stop processing
230+ // Completing the inbound causes the outbound to be completed as well, so
231+ // we only close the inbound.
233232 inboundSink .tryEmitComplete ();
234- outboundSink .tryEmitComplete ();
235- return Mono .delay (Duration .ofMillis (100 ));
236- })).then (Mono .fromRunnable (() -> {
237- try {
238- // Dispose schedulers with longer timeout
239- inboundScheduler .dispose ();
240- outboundScheduler .dispose ();
241- logger .info ("Graceful shutdown completed" );
242- }
243- catch (Exception e ) {
244- logger .error ("Error during graceful shutdown" , e );
245- }
246- })).then ().subscribeOn (Schedulers .boundedElastic ());
233+ logger .info ("Graceful shutdown complete" );
234+ return Mono .empty ();
235+ }).subscribeOn (Schedulers .boundedElastic ());
247236 }
248237
249238 @ Override
0 commit comments