diff --git a/.changes/next-release/bugfix-NettyNIOHTTPClient-97abd66.json b/.changes/next-release/bugfix-NettyNIOHTTPClient-97abd66.json new file mode 100644 index 00000000000..79211f6ba6c --- /dev/null +++ b/.changes/next-release/bugfix-NettyNIOHTTPClient-97abd66.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "Netty NIO HTTP Client", + "contributor": "", + "description": "Added idle body write detection to proactively close connections when no request body data is written within the write timeout period." +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java index 20c89850f71..75799c7216a 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListener.java @@ -15,6 +15,7 @@ package software.amazon.awssdk.http.nio.netty.internal; +import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME; import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists; import io.netty.channel.Channel; @@ -52,7 +53,9 @@ public void channelReleased(Channel channel) { FlushOnReadHandler.class, ResponseHandler.class, ReadTimeoutHandler.class, - WriteTimeoutHandler.class); + WriteTimeoutHandler.class, + WriteIdleTimeoutHandler.class); + removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME); } } } diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java index c49124ec6da..627addee730 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/NettyRequestExecutor.java @@ -27,6 +27,8 @@ import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ; import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY; import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken; +import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.HTTP_STREAMS_HANDLER_NAME; +import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; @@ -39,6 +41,7 @@ import io.netty.handler.codec.http.HttpMethod; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpVersion; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import io.netty.util.Attribute; @@ -221,7 +224,7 @@ private void configurePipeline() throws IOException { if (protocol == Protocol.HTTP2) { pipeline.addLast(FlushOnReadHandler.getInstance()); } - pipeline.addLast(new HttpStreamsClientHandler()); + pipeline.addLast(HTTP_STREAMS_HANDLER_NAME, new HttpStreamsClientHandler()); pipeline.addLast(ResponseHandler.getInstance()); // It's possible that the channel could become inactive between checking it out from the pool, and adding our response @@ -238,14 +241,14 @@ private void makeRequest() { } private void writeRequest(HttpRequest request) { - channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(), - TimeUnit.MILLISECONDS)); + addWriteTimeoutHandlers(); StreamedRequest streamedRequest = new StreamedRequest(request, context.executeRequest().requestContentPublisher()); + channel.writeAndFlush(streamedRequest) .addListener(wireCall -> { // Done writing so remove the idle write timeout handler - ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class); + removeWriteTimeoutHandlers(); if (wireCall.isSuccess()) { NettyRequestMetrics.publishHttp2StreamMetrics(context.metricCollector(), channel); @@ -270,7 +273,7 @@ private void writeRequest(HttpRequest request) { // Add before HttpStreamsClientHandler so that raw TLS handshake bytes cannot // prematurely remove this one-time handler. See Expect100ContinueReadTimeoutTest. channel.pipeline().addBefore( - channel.pipeline().context(HttpStreamsClientHandler.class).name(), null, + HTTP_STREAMS_HANDLER_NAME, null, new OneTimeReadTimeoutHandler(Duration.ofMillis(context.configuration().readTimeoutMillis()))); } else { channel.pipeline().addFirst(new ReadTimeoutHandler(context.configuration().readTimeoutMillis(), @@ -281,6 +284,22 @@ private void writeRequest(HttpRequest request) { } } + private void removeWriteTimeoutHandlers() { + ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class, + WriteIdleTimeoutHandler.class); + ChannelUtils.removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME); + } + + private void addWriteTimeoutHandlers() { + channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(), + TimeUnit.MILLISECONDS)); + channel.pipeline().addBefore(HTTP_STREAMS_HANDLER_NAME, WRITE_IDLE_STATE_HANDLER_NAME, + new IdleStateHandler(0, context.configuration().writeTimeoutMillis(), 0, + TimeUnit.MILLISECONDS)); + channel.pipeline().addBefore(HTTP_STREAMS_HANDLER_NAME, null, + new WriteIdleTimeoutHandler(context.configuration().writeTimeoutMillis())); + } + /** * It should explicitly trigger Read for the following situations: * diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/WriteIdleTimeoutHandler.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/WriteIdleTimeoutHandler.java new file mode 100644 index 00000000000..0cd5419b557 --- /dev/null +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/WriteIdleTimeoutHandler.java @@ -0,0 +1,63 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.errorMessageWithChannelDiagnostics; + +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import java.io.IOException; +import software.amazon.awssdk.annotations.SdkInternalApi; + +/** + * Handles writer idle events from IdleStateHandler to detect idle body write gaps. + */ +@SdkInternalApi +public final class WriteIdleTimeoutHandler extends ChannelDuplexHandler { + private final long timeoutMillis; + private boolean closed; + + public WriteIdleTimeoutHandler(long timeoutMillis) { + this.timeoutMillis = timeoutMillis; + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof IdleStateEvent) { + IdleStateEvent e = (IdleStateEvent) evt; + if (e.state() == IdleState.WRITER_IDLE) { + writeTimeout(ctx); + } + } + super.userEventTriggered(ctx, evt); + } + + private void writeTimeout(ChannelHandlerContext ctx) { + if (!closed) { + String message = String.format("No data was written to the request body within %dms. " + + "This can occur if the request body publisher is not producing data, " + + "for example when using AsyncRequestBody.fromInputStream() with an " + + "executor that has fewer threads than concurrent requests. " + + "Verify that the request body publisher is sending data or " + + "investigate why it may be blocked.", timeoutMillis); + ctx.fireExceptionCaught(new IOException(errorMessageWithChannelDiagnostics(ctx.channel(), message))); + ctx.close(); + closed = true; + } + } +} diff --git a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/ChannelUtils.java b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/ChannelUtils.java index 0901fa8e059..000804dffbc 100644 --- a/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/ChannelUtils.java +++ b/http-clients/netty-nio-client/src/main/java/software/amazon/awssdk/http/nio/netty/internal/utils/ChannelUtils.java @@ -26,6 +26,9 @@ @SdkInternalApi public final class ChannelUtils { + public static final String WRITE_IDLE_STATE_HANDLER_NAME = "WriteIdleStateHandler"; + public static final String HTTP_STREAMS_HANDLER_NAME = "HttpStreamsClientHandler"; + private ChannelUtils() { } @@ -50,6 +53,20 @@ public static void removeIfExists(ChannelPipeline pipeline, Class future = sendRequest(request, new NeverWritesContentPublisher(1024)); + + assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS)) + .hasCauseInstanceOf(IOException.class) + .hasStackTraceContaining("No data was written to the request body within 500ms"); + } + + private CompletableFuture sendRequest(SdkHttpFullRequest request, SdkHttpContentPublisher contentPublisher) { + return netty.execute(AsyncExecuteRequest.builder() + .responseHandler(new SdkAsyncHttpResponseHandler() { + @Override + public void onHeaders(SdkHttpResponse headers) { + } + + @Override + public void onStream(Publisher stream) { + } + + @Override + public void onError(Throwable error) { + } + }) + .request(request) + .requestContentPublisher(contentPublisher) + .build()); + } + + /** + * A content publisher that accepts a subscription but never calls onNext/onComplete, + * simulating a stalled body write. + */ + private static class NeverWritesContentPublisher implements SdkHttpContentPublisher { + private final long contentLength; + + NeverWritesContentPublisher(long contentLength) { + this.contentLength = contentLength; + } + + @Override + public Optional contentLength() { + return Optional.of(contentLength); + } + + @Override + public void subscribe(Subscriber s) { + // Request subscription but never produce data + s.onSubscribe(new Subscription() { + @Override + public void request(long n) { + // intentionally do nothing + } + + @Override + public void cancel() { + } + }); + } + } + + private static class Server extends ChannelInitializer { + private ServerBootstrap bootstrap; + private ServerSocketChannel serverSock; + private final NioEventLoopGroup group = new NioEventLoopGroup(); + private SslContext sslCtx; + + void init() throws Exception { + SelfSignedCertificate ssc = new SelfSignedCertificate(); + sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); + + bootstrap = new ServerBootstrap() + .channel(NioServerSocketChannel.class) + .group(group) + .childHandler(this); + + serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel(); + } + + @Override + protected void initChannel(Channel ch) { + ChannelPipeline pipeline = ch.pipeline(); + pipeline.addLast(sslCtx.newHandler(ch.alloc())); + } + + void shutdown() throws InterruptedException { + group.shutdownGracefully().await(); + serverSock.close(); + } + + int port() { + return serverSock.localAddress().getPort(); + } + + } +} diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListenerTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListenerTest.java index 7a08755a9cb..e6ca011c690 100644 --- a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListenerTest.java +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/HandlerRemovingChannelPoolListenerTest.java @@ -26,6 +26,7 @@ import io.netty.channel.nio.NioEventLoopGroup; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; +import io.netty.handler.timeout.IdleStateHandler; import io.netty.handler.timeout.ReadTimeoutHandler; import io.netty.handler.timeout.WriteTimeoutHandler; import org.junit.After; @@ -37,6 +38,7 @@ import software.amazon.awssdk.http.async.AsyncExecuteRequest; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.nio.netty.internal.nrs.HttpStreamsClientHandler; +import software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils; @RunWith(MockitoJUnitRunner.class) public class HandlerRemovingChannelPoolListenerTest { @@ -67,10 +69,12 @@ public void setup() throws Exception { mockChannel.attr(REQUEST_CONTEXT_KEY).set(requestContext); mockChannel.attr(RESPONSE_COMPLETE_KEY).set(true); - pipeline.addLast(new HttpStreamsClientHandler()); + pipeline.addLast(ChannelUtils.HTTP_STREAMS_HANDLER_NAME, new HttpStreamsClientHandler()); pipeline.addLast(ResponseHandler.getInstance()); pipeline.addLast(new ReadTimeoutHandler(10)); pipeline.addLast(new WriteTimeoutHandler(10)); + pipeline.addLast(ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME, new IdleStateHandler(0, 10, 0)); + pipeline.addLast(new WriteIdleTimeoutHandler(10000)); handler = HandlerRemovingChannelPoolListener.create(); } @@ -107,16 +111,20 @@ public void release_deregisteredOpenChannel_handlerShouldBeRemovedFromChannelPoo } private void assertHandlersRemoved() { - assertThat(pipeline.get(HttpStreamsClientHandler.class)).isNull(); + assertThat(pipeline.get(ChannelUtils.HTTP_STREAMS_HANDLER_NAME)).isNull(); assertThat(pipeline.get(ResponseHandler.class)).isNull(); assertThat(pipeline.get(ReadTimeoutHandler.class)).isNull(); assertThat(pipeline.get(WriteTimeoutHandler.class)).isNull(); + assertThat(pipeline.get(ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME)).isNull(); + assertThat(pipeline.get(WriteIdleTimeoutHandler.class)).isNull(); } private void assertHandlersNotRemoved() { - assertThat(pipeline.get(HttpStreamsClientHandler.class)).isNotNull(); + assertThat(pipeline.get(ChannelUtils.HTTP_STREAMS_HANDLER_NAME)).isNotNull(); assertThat(pipeline.get(ResponseHandler.class)).isNotNull(); assertThat(pipeline.get(ReadTimeoutHandler.class)).isNotNull(); assertThat(pipeline.get(WriteTimeoutHandler.class)).isNotNull(); + assertThat(pipeline.get(ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME)).isNotNull(); + assertThat(pipeline.get(WriteIdleTimeoutHandler.class)).isNotNull(); } } diff --git a/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/WriteIdleTimeoutHandlerTest.java b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/WriteIdleTimeoutHandlerTest.java new file mode 100644 index 00000000000..de02dfb4d25 --- /dev/null +++ b/http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/internal/WriteIdleTimeoutHandlerTest.java @@ -0,0 +1,59 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.nio.netty.internal; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import io.netty.channel.embedded.EmbeddedChannel; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import java.io.IOException; +import org.junit.jupiter.api.Test; + +class WriteIdleTimeoutHandlerTest { + + @Test + void writerIdleEvent_shouldFireExceptionAndCloseChannel() { + EmbeddedChannel channel = new EmbeddedChannel(new WriteIdleTimeoutHandler(30000)); + + channel.pipeline().fireUserEventTriggered(IdleStateEvent.WRITER_IDLE_STATE_EVENT); + + assertThat(channel.isOpen()).isFalse(); + assertThatThrownBy(channel::checkException) + .isInstanceOf(IOException.class) + .hasMessageContaining("No data was written to the request body within 30000ms"); + } + + @Test + void readerIdleEvent_shouldBeIgnored() { + EmbeddedChannel channel = new EmbeddedChannel(new WriteIdleTimeoutHandler(30000)); + + channel.pipeline().fireUserEventTriggered(IdleStateEvent.READER_IDLE_STATE_EVENT); + + assertThat(channel.isOpen()).isTrue(); + } + + @Test + void duplicateWriterIdleEvent_shouldNotFireTwice() { + EmbeddedChannel channel = new EmbeddedChannel(new WriteIdleTimeoutHandler(30000)); + + channel.pipeline().fireUserEventTriggered(IdleStateEvent.WRITER_IDLE_STATE_EVENT); + + // Channel is already closed; second event should not throw + channel.pipeline().fireUserEventTriggered(IdleStateEvent.WRITER_IDLE_STATE_EVENT); + } +}