Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Netty NIO HTTP Client",
"contributor": "",
"description": "Added idle body write detection to proactively close connections when no request body data is written within the write timeout period."
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.removeIfExists;

import io.netty.channel.Channel;
Expand Down Expand Up @@ -52,7 +53,9 @@ public void channelReleased(Channel channel) {
FlushOnReadHandler.class,
ResponseHandler.class,
ReadTimeoutHandler.class,
WriteTimeoutHandler.class);
WriteTimeoutHandler.class,
WriteIdleTimeoutHandler.class);
removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.RESPONSE_DATA_READ;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKey.STREAMING_COMPLETE_KEY;
import static software.amazon.awssdk.http.nio.netty.internal.NettyRequestMetrics.measureTimeTaken;
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.HTTP_STREAMS_HANDLER_NAME;
import static software.amazon.awssdk.http.nio.netty.internal.utils.ChannelUtils.WRITE_IDLE_STATE_HANDLER_NAME;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
Expand All @@ -39,6 +41,7 @@
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.handler.timeout.WriteTimeoutHandler;
import io.netty.util.Attribute;
Expand Down Expand Up @@ -221,7 +224,7 @@ private void configurePipeline() throws IOException {
if (protocol == Protocol.HTTP2) {
pipeline.addLast(FlushOnReadHandler.getInstance());
}
pipeline.addLast(new HttpStreamsClientHandler());
pipeline.addLast(HTTP_STREAMS_HANDLER_NAME, new HttpStreamsClientHandler());
pipeline.addLast(ResponseHandler.getInstance());

// It's possible that the channel could become inactive between checking it out from the pool, and adding our response
Expand All @@ -238,14 +241,14 @@ private void makeRequest() {
}

private void writeRequest(HttpRequest request) {
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
TimeUnit.MILLISECONDS));
addWriteTimeoutHandlers();
StreamedRequest streamedRequest = new StreamedRequest(request,
context.executeRequest().requestContentPublisher());

channel.writeAndFlush(streamedRequest)
.addListener(wireCall -> {
// Done writing so remove the idle write timeout handler
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class);
removeWriteTimeoutHandlers();
if (wireCall.isSuccess()) {
NettyRequestMetrics.publishHttp2StreamMetrics(context.metricCollector(), channel);

Expand All @@ -270,7 +273,7 @@ private void writeRequest(HttpRequest request) {
// Add before HttpStreamsClientHandler so that raw TLS handshake bytes cannot
// prematurely remove this one-time handler. See Expect100ContinueReadTimeoutTest.
channel.pipeline().addBefore(
channel.pipeline().context(HttpStreamsClientHandler.class).name(), null,
HTTP_STREAMS_HANDLER_NAME, null,
new OneTimeReadTimeoutHandler(Duration.ofMillis(context.configuration().readTimeoutMillis())));
} else {
channel.pipeline().addFirst(new ReadTimeoutHandler(context.configuration().readTimeoutMillis(),
Expand All @@ -281,6 +284,22 @@ private void writeRequest(HttpRequest request) {
}
}

private void removeWriteTimeoutHandlers() {
ChannelUtils.removeIfExists(channel.pipeline(), WriteTimeoutHandler.class,
WriteIdleTimeoutHandler.class);
ChannelUtils.removeIfExists(channel.pipeline(), WRITE_IDLE_STATE_HANDLER_NAME);
}

private void addWriteTimeoutHandlers() {
channel.pipeline().addFirst(new WriteTimeoutHandler(context.configuration().writeTimeoutMillis(),
TimeUnit.MILLISECONDS));
channel.pipeline().addBefore(HTTP_STREAMS_HANDLER_NAME, WRITE_IDLE_STATE_HANDLER_NAME,
new IdleStateHandler(0, context.configuration().writeTimeoutMillis(), 0,
TimeUnit.MILLISECONDS));
channel.pipeline().addBefore(HTTP_STREAMS_HANDLER_NAME, null,
new WriteIdleTimeoutHandler(context.configuration().writeTimeoutMillis()));
}

/**
* It should explicitly trigger Read for the following situations:
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.internal;

import static software.amazon.awssdk.http.nio.netty.internal.utils.NettyUtils.errorMessageWithChannelDiagnostics;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import java.io.IOException;
import software.amazon.awssdk.annotations.SdkInternalApi;

/**
* Handles writer idle events from IdleStateHandler to detect idle body write gaps.
*/
@SdkInternalApi
public final class WriteIdleTimeoutHandler extends ChannelDuplexHandler {
private final long timeoutMillis;
private boolean closed;

public WriteIdleTimeoutHandler(long timeoutMillis) {
this.timeoutMillis = timeoutMillis;
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.WRITER_IDLE) {
writeTimeout(ctx);
}
}
super.userEventTriggered(ctx, evt);
}

private void writeTimeout(ChannelHandlerContext ctx) {
if (!closed) {
String message = String.format("No data was written to the request body within %dms. "
+ "This can occur if the request body publisher is not producing data, "
+ "for example when using AsyncRequestBody.fromInputStream() with an "
+ "executor that has fewer threads than concurrent requests. "
+ "Verify that the request body publisher is sending data or "
+ "investigate why it may be blocked.", timeoutMillis);
ctx.fireExceptionCaught(new IOException(errorMessageWithChannelDiagnostics(ctx.channel(), message)));
ctx.close();
closed = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@

@SdkInternalApi
public final class ChannelUtils {
public static final String WRITE_IDLE_STATE_HANDLER_NAME = "WriteIdleStateHandler";
public static final String HTTP_STREAMS_HANDLER_NAME = "HttpStreamsClientHandler";

private ChannelUtils() {
}

Expand All @@ -50,6 +53,20 @@ public static void removeIfExists(ChannelPipeline pipeline, Class<? extends Chan
}
}

public static void removeIfExists(ChannelPipeline pipeline, String... handlers) {
for (String handler: handlers) {
if (pipeline.get(handler) != null) {
try {
pipeline.remove(handler);
} catch (NoSuchElementException exception) {
// There could still be race condition when channel gets
// closed right after removeIfExists is invoked. Ignoring
// NoSuchElementException for that edge case.
}
}
}
}

/**
* Retrieve optional attribute of the channel
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.nio.netty.fault;

import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.TRUST_ALL_CERTIFICATES;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.SelfSignedCertificate;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

Check warning on line 36 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'java.util.concurrent.ExecutionException'.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvJ&open=AZ1t2zk8-0BjvfN9zqvJ&pullRequest=6844
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpFullRequest;
import software.amazon.awssdk.http.SdkHttpMethod;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient;
import software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup;
import software.amazon.awssdk.utils.AttributeMap;

/**
* Tests that the write idle timeout handler detects when no request body data is written
* and proactively closes the connection.
*/
public class WriteIdleTimeoutTest {

Check warning on line 61 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvC&open=AZ1t2zk8-0BjvfN9zqvC&pullRequest=6844

private SdkAsyncHttpClient netty;
private Server server;

@BeforeEach
public void setup() throws Exception {

Check warning on line 67 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqu_&open=AZ1t2zk8-0BjvfN9zqu_&pullRequest=6844
server = new Server();
server.init();

netty = NettyNioAsyncHttpClient.builder()
.writeTimeout(Duration.ofMillis(500))
.readTimeout(Duration.ofSeconds(5))
.eventLoopGroup(SdkEventLoopGroup.builder().numberOfThreads(2).build())
.protocol(Protocol.HTTP1_1)
.buildWithDefaults(AttributeMap.builder()
.put(TRUST_ALL_CERTIFICATES, true)
.build());
}

@AfterEach
public void teardown() throws InterruptedException {

Check warning on line 82 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvA&open=AZ1t2zk8-0BjvfN9zqvA&pullRequest=6844
if (server != null) {
server.shutdown();
}
if (netty != null) {
netty.close();
}
}

/**
* A request body publisher that subscribes but never produces any data, simulating
* the thread starvation scenario from the customer issue.
*/
@Test
public void stalledBodyPublisher_shouldTriggerWriteIdleTimeout() throws InterruptedException, TimeoutException {

Check warning on line 96 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'java.util.concurrent.TimeoutException', as it cannot be thrown from method's body.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvE&open=AZ1t2zk8-0BjvfN9zqvE&pullRequest=6844

Check warning on line 96 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove the declaration of thrown exception 'java.lang.InterruptedException', as it cannot be thrown from method's body.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvD&open=AZ1t2zk8-0BjvfN9zqvD&pullRequest=6844

Check warning on line 96 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this 'public' modifier.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvB&open=AZ1t2zk8-0BjvfN9zqvB&pullRequest=6844
SdkHttpFullRequest request = SdkHttpFullRequest.builder()
.method(SdkHttpMethod.PUT)
.protocol("https")
.host("localhost")
.port(server.port())
.putHeader("Content-Length", "1024")
.build();

CompletableFuture<Void> future = sendRequest(request, new NeverWritesContentPublisher(1024));

assertThatThrownBy(() -> future.get(5, TimeUnit.SECONDS))
.hasCauseInstanceOf(IOException.class)
.hasStackTraceContaining("No data was written to the request body within 500ms");
}

private CompletableFuture<Void> sendRequest(SdkHttpFullRequest request, SdkHttpContentPublisher contentPublisher) {
return netty.execute(AsyncExecuteRequest.builder()
.responseHandler(new SdkAsyncHttpResponseHandler() {
@Override
public void onHeaders(SdkHttpResponse headers) {

Check failure on line 116 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvF&open=AZ1t2zk8-0BjvfN9zqvF&pullRequest=6844
}

@Override
public void onStream(Publisher<ByteBuffer> stream) {

Check failure on line 120 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvG&open=AZ1t2zk8-0BjvfN9zqvG&pullRequest=6844
}

@Override
public void onError(Throwable error) {

Check failure on line 124 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvH&open=AZ1t2zk8-0BjvfN9zqvH&pullRequest=6844
}
})
.request(request)
.requestContentPublisher(contentPublisher)
.build());
}

/**
* A content publisher that accepts a subscription but never calls onNext/onComplete,
* simulating a stalled body write.
*/
private static class NeverWritesContentPublisher implements SdkHttpContentPublisher {
private final long contentLength;

NeverWritesContentPublisher(long contentLength) {
this.contentLength = contentLength;
}

@Override
public Optional<Long> contentLength() {
return Optional.of(contentLength);
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
// Request subscription but never produce data
s.onSubscribe(new Subscription() {
@Override
public void request(long n) {
// intentionally do nothing
}

@Override
public void cancel() {

Check failure on line 158 in http-clients/netty-nio-client/src/test/java/software/amazon/awssdk/http/nio/netty/fault/WriteIdleTimeoutTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Add a nested comment explaining why this method is empty, throw an UnsupportedOperationException or complete the implementation.

See more on https://sonarcloud.io/project/issues?id=aws_aws-sdk-java-v2&issues=AZ1t2zk8-0BjvfN9zqvI&open=AZ1t2zk8-0BjvfN9zqvI&pullRequest=6844
}
});
}
}

private static class Server extends ChannelInitializer<Channel> {
private ServerBootstrap bootstrap;
private ServerSocketChannel serverSock;
private final NioEventLoopGroup group = new NioEventLoopGroup();
private SslContext sslCtx;

void init() throws Exception {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();

bootstrap = new ServerBootstrap()
.channel(NioServerSocketChannel.class)
.group(group)
.childHandler(this);

serverSock = (ServerSocketChannel) bootstrap.bind(0).sync().channel();
}

@Override
protected void initChannel(Channel ch) {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(sslCtx.newHandler(ch.alloc()));
}

void shutdown() throws InterruptedException {
group.shutdownGracefully().await();
serverSock.close();
}

int port() {
return serverSock.localAddress().getPort();
}

}
}
Loading
Loading