Skip to content
Merged
7 changes: 6 additions & 1 deletion core/src/main/java/io/undertow/UndertowOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -342,10 +342,15 @@ public class UndertowOptions {
*/
public static final Option<Integer> MAX_CACHED_HEADER_SIZE = Option.simple(UndertowOptions.class, "MAX_CACHED_HEADER_SIZE", Integer.class);

/**
* Default value of {@link #HTTP_HEADERS_CACHE_SIZE} option.
*/
public static final int DEFAULT_HTTP_HEADERS_CACHE_SIZE = 15;

/**
* The maximum number of headers that are cached per connection. Defaults to 15. If this is set to zero the cache is disabled.
* The maximum number of headers that are cached per connection. If this is set to zero the cache is disabled.
* <p>
* Defaults to {@link #DEFAULT_HTTP_HEADERS_CACHE_SIZE}
*/
public static final Option<Integer> HTTP_HEADERS_CACHE_SIZE = Option.simple(UndertowOptions.class, "HTTP_HEADERS_CACHE_SIZE", Integer.class);

Expand Down
115 changes: 106 additions & 9 deletions core/src/main/java/io/undertow/conduits/ChunkedStreamSinkConduit.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.undertow.util.HeaderValues;
import io.undertow.util.Headers;
import io.undertow.util.ImmediatePooledByteBuffer;
import org.xnio.Buffers;
import org.xnio.IoUtils;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;
Expand Down Expand Up @@ -128,6 +129,109 @@ public int write(final ByteBuffer src) throws IOException {
return doWrite(src);
}

long doWrite(final ByteBuffer[] srcs, int offset, int length) throws IOException {
if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
throw new ClosedChannelException();
}
// Write as many buffers as possible without a chunk-size overflowing an integer.
long totalRemaining = 0;
for (int i = 0; i < length; i++) {
ByteBuffer buf = srcs[i + offset];
int remaining = buf.remaining();
if (totalRemaining + remaining > Integer.MAX_VALUE) {
// Avoid producing chunks too large for clients by reducing the number of buffers
// until total remaining fits within a 32-bit signed integer value. This is safe
// because a single java ByteBuffer has a capacity represented by an integer.
length = i;
break;
}
totalRemaining += remaining;
}
if(totalRemaining == 0) {
return 0;
}
int remaining = (int) totalRemaining;
this.state |= FLAG_FIRST_DATA_WRITTEN;
int oldLimit = srcs[length - 1].limit();
boolean dataRemaining = false; //set to true if there is data in src that still needs to be written out
if (chunkleft == 0 && !chunkingSepBuffer.hasRemaining()) {
chunkingBuffer.clear();
putIntAsHexString(chunkingBuffer, remaining);
chunkingBuffer.put(CRLF);
chunkingBuffer.flip();
chunkingSepBuffer.clear();
chunkingSepBuffer.put(CRLF);
chunkingSepBuffer.flip();
state |= FLAG_WRITTEN_FIRST_CHUNK;
chunkleft = remaining;
} else {
int maxRemaining = chunkleft;
for (int i = 0; i < length; i++) {
ByteBuffer buf = srcs[offset + i];
int bufRemaining = buf.remaining();
if (bufRemaining >= maxRemaining) {
length = i + 1;
oldLimit = buf.limit();
dataRemaining = true;
buf.limit(buf.position() + maxRemaining);
break;
}
maxRemaining -= bufRemaining;
}
}
try {
int chunkingSize = chunkingBuffer.remaining();
int chunkingSepSize = chunkingSepBuffer.remaining();
if (chunkingSize > 0 || chunkingSepSize > 0 || lastChunkBuffer != null) {
int originalRemaining = (int) Buffers.remaining(srcs, offset, length);
long result;
if (lastChunkBuffer == null || dataRemaining) {
// chunkingBuffer
// srcs (taking into account offset+length)
// chunkingSepBuffer
final ByteBuffer[] buf = new ByteBuffer[2 + length];
buf[0] = chunkingBuffer;
System.arraycopy(srcs, offset , buf, 1, length);
buf[length + 1] = chunkingSepBuffer;
result = next.write(buf, 0, buf.length);
} else {
// chunkingBuffer
// srcs (taking into account offset+length)
// lastChunkBuffer
final ByteBuffer[] buf = new ByteBuffer[2 + length];
buf[0] = chunkingBuffer;
System.arraycopy(srcs, offset , buf, 1, length);
buf[length + 1] = lastChunkBuffer.getBuffer();
if (anyAreSet(state, CONF_FLAG_PASS_CLOSE)) {
result = next.writeFinal(buf, 0, buf.length);
} else {
result = next.write(buf, 0, buf.length);
}
if (Buffers.remaining(srcs, offset, length) == 0) {
state |= FLAG_WRITES_SHUTDOWN;
}
if (!lastChunkBuffer.getBuffer().hasRemaining()) {
state |= FLAG_NEXT_SHUTDOWN;
lastChunkBuffer.close();
}
}
int srcWritten = originalRemaining - (int) Buffers.remaining(srcs, offset, length);
chunkleft -= srcWritten;
if (result < chunkingSize) {
return 0;
} else {
return srcWritten;
}
} else {
long result = next.write(srcs, offset, length);
chunkleft -= result;
return result;

}
} finally {
srcs[length - 1].limit(oldLimit);
}
}

int doWrite(final ByteBuffer src) throws IOException {
if (anyAreSet(state, FLAG_WRITES_SHUTDOWN)) {
Expand Down Expand Up @@ -195,7 +299,6 @@ int doWrite(final ByteBuffer src) throws IOException {
} finally {
src.limit(oldLimit);
}

}

@Override
Expand All @@ -217,13 +320,7 @@ public void truncateWrites() throws IOException {

@Override
public long write(final ByteBuffer[] srcs, final int offset, final int length) throws IOException {
for (int i = 0; i < length; i++) {
ByteBuffer srcBuffer = srcs[offset + i];
if (srcBuffer.hasRemaining()) {
return write(srcBuffer);
}
}
return 0;
return doWrite(srcs, offset, length);
}

@Override
Expand Down Expand Up @@ -382,7 +479,7 @@ private void createLastChunk(final boolean writeFinal) throws UnsupportedEncodin
lastChunkBuffer.put(CRLF);
}
//horrible hack
//there is a situation where we can get a buffer leak here if the connection is terminated abnormaly
//there is a situation where we can get a buffer leak here if the connection is terminated abnormally
//this should be fixed once this channel has its lifecycle tied to the connection, same as fixed length
lastChunkBuffer.flip();
ByteBuffer data = ByteBuffer.allocate(lastChunkBuffer.remaining());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.URLConnection;
import java.nio.ByteBuffer;
import java.nio.file.DirectoryStream;
import java.nio.file.FileSystemNotFoundException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -299,8 +300,15 @@ public Path getFilePath() {
} catch (URISyntaxException e) {
return null;
}
} else {
//deffer to Paths/FS --> ServiceLoader for java.nio.file.spi.FileSystemProvider
//NOTE: FS has to be installed: java.nio.file.FileSystems#newFileSystem
try {
return Paths.get(url.toURI());
} catch(FileSystemNotFoundException|IllegalArgumentException|URISyntaxException e) {
return null;
}
}
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,31 @@ public long transferTo(long position, long count, FileChannel target) throws IOE
return 0;
}
try {
final PooledByteBuffer localData = data;
if (frameDataRemaining == 0 && anyAreSet(state, STATE_LAST_FRAME)) {
synchronized (lock) {
state |= STATE_RETURNED_MINUS_ONE;
return -1;
}
} else if (data != null) {
int old = data.getBuffer().limit();
} else if (localData != null) {
try {
if (count < data.getBuffer().remaining()) {
data.getBuffer().limit((int) (data.getBuffer().position() + count));
final int old = localData.getBuffer().limit();
try {
if (count < localData.getBuffer().remaining()) {
localData.getBuffer().limit((int) (localData.getBuffer().position() + count));
}
return target.write(localData.getBuffer(), position);
} finally {
localData.getBuffer().limit(old);
decrementFrameDataRemaining();
}
} catch (IllegalStateException e) {
// NPE should be covered. ISE in case of closed buffer
if (anyAreSet(state, STATE_DONE | STATE_CLOSED | STATE_STREAM_BROKEN)) {
return -1;
} else {
throw e;
}
return target.write(data.getBuffer(), position);
} finally {
data.getBuffer().limit(old);
decrementFrameDataRemaining();
}
}
return 0;
Expand All @@ -146,7 +156,8 @@ public long transferTo(long position, long count, FileChannel target) throws IOE
}

private void decrementFrameDataRemaining() {
if(!data.getBuffer().hasRemaining()) {
final PooledByteBuffer localData = data;
if(localData != null && !localData.getBuffer().hasRemaining()) {
frameDataRemaining -= currentDataOriginalSize;
}
}
Expand All @@ -162,36 +173,44 @@ public long transferTo(long count, ByteBuffer throughBuffer, StreamSinkChannel s
return 0;
}
try {
final PooledByteBuffer localData = data;
if (frameDataRemaining == 0 && anyAreSet(state, STATE_LAST_FRAME)) {
synchronized (lock) {
state |= STATE_RETURNED_MINUS_ONE;
return -1;
}
} else if (data != null && data.getBuffer().hasRemaining()) {
int old = data.getBuffer().limit();
} else if (localData != null && localData.getBuffer().hasRemaining()) {
int old = localData.getBuffer().limit();
try {
if (count < data.getBuffer().remaining()) {
data.getBuffer().limit((int) (data.getBuffer().position() + count));
if (count < localData.getBuffer().remaining()) {
localData.getBuffer().limit((int) (localData.getBuffer().position() + count));
}
int written = streamSinkChannel.write(data.getBuffer());
if(data.getBuffer().hasRemaining()) {
int written = streamSinkChannel.write(localData.getBuffer());
if(localData.getBuffer().hasRemaining()) {
//we can still add more data
//stick it it throughbuffer, otherwise transfer code will continue to attempt to use this method
throughBuffer.clear();
Buffers.copy(throughBuffer, data.getBuffer());
Buffers.copy(throughBuffer, localData.getBuffer());
throughBuffer.flip();
} else {
throughBuffer.position(throughBuffer.limit());
}
return written;
} finally {
data.getBuffer().limit(old);
localData.getBuffer().limit(old);
decrementFrameDataRemaining();
}
} else {
throughBuffer.position(throughBuffer.limit());
}
return 0;
} catch (IllegalStateException e) {
// NPE should be covered. ISE in case of closed buffer
if (anyAreSet(state, STATE_DONE | STATE_CLOSED | STATE_STREAM_BROKEN)) {
return -1;
} else {
throw e;
}
} finally {
exitRead();
}
Expand Down Expand Up @@ -589,29 +608,29 @@ private void beforeRead() throws IOException {
}

private void exitRead() throws IOException {
if (data != null && !data.getBuffer().hasRemaining()) {
data.close();
data = null;
}
if (frameDataRemaining == 0) {
try {
synchronized (lock) {
synchronized (lock) {
if (data != null && !data.getBuffer().hasRemaining()) {
data.close();
data = null;
}
if (frameDataRemaining == 0) {
try {
readFrameCount++;
if (pendingFrameData.isEmpty()) {
if (anyAreSet(state, STATE_RETURNED_MINUS_ONE)) {
state |= STATE_DONE;
complete();
close();
} else if(anyAreSet(state, STATE_LAST_FRAME)) {
} else if (anyAreSet(state, STATE_LAST_FRAME)) {
state |= STATE_WAITNG_MINUS_ONE;
} else {
waitingForFrame = true;
}
}
}
} finally {
if (pendingFrameData.isEmpty()) {
framedChannel.notifyFrameReadComplete(this);
} finally {
if (pendingFrameData.isEmpty()) {
framedChannel.notifyFrameReadComplete(this);
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import io.undertow.UndertowOptions;
import io.undertow.conduits.BytesReceivedStreamSourceConduit;
import io.undertow.conduits.BytesSentStreamSinkConduit;
import io.undertow.conduits.ReadTimeoutStreamSourceConduit;
import io.undertow.conduits.WriteTimeoutStreamSinkConduit;
import io.undertow.protocols.http2.Http2Channel;
import io.undertow.server.ConnectorStatistics;
import io.undertow.server.ConnectorStatisticsImpl;
Expand All @@ -32,12 +34,14 @@
import org.xnio.ChannelListener;
import org.xnio.IoUtils;
import org.xnio.OptionMap;
import org.xnio.Options;
import io.undertow.connector.ByteBufferPool;
import io.undertow.connector.PooledByteBuffer;

import org.xnio.Pool;
import org.xnio.StreamConnection;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.Set;
Expand Down Expand Up @@ -120,6 +124,22 @@ public void handleEvent(final StreamConnection channel, PooledByteBuffer buffer)
if (idleTimeout != null && idleTimeout > 0) {
http2Channel.setIdleTimeout(idleTimeout);
}
try {
Integer readTimeout = channel.getOption(Options.READ_TIMEOUT);
if (readTimeout != null && readTimeout > 0) {
channel.getSourceChannel().setConduit(new ReadTimeoutStreamSourceConduit(channel.getSourceChannel().getConduit(), channel, this));
}
} catch (IOException e) {
UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
}
try {
Integer writeTimeout = channel.getOption(Options.WRITE_TIMEOUT);
if (writeTimeout != null && writeTimeout > 0) {
channel.getSinkChannel().setConduit(new WriteTimeoutStreamSinkConduit(channel.getSinkChannel().getConduit(), channel, this));
}
} catch (IOException e) {
UndertowLogger.REQUEST_IO_LOGGER.ioException(e);
}
if(statisticsEnabled) {
channel.getSinkChannel().setConduit(new BytesSentStreamSinkConduit(channel.getSinkChannel().getConduit(), connectorStatistics.sentAccumulator()));
channel.getSourceChannel().setConduit(new BytesReceivedStreamSourceConduit(channel.getSourceChannel().getConduit(), connectorStatistics.receivedAccumulator()));
Expand Down
Loading