Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions core/src/main/java/io/grpc/internal/CloseWithHeadersMarker.java

This file was deleted.

24 changes: 4 additions & 20 deletions core/src/main/java/io/grpc/internal/ServerCallImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,17 +279,6 @@ private void handleInternalError(Throwable internalError) {
serverCallTracer.reportCallEnded(false); // error so always false
}

/**
* Close the {@link ServerStream} because parsing request message failed.
* Similar to {@link #handleInternalError(Throwable)}.
*/
private void handleParseError(StatusRuntimeException parseError) {
cancelled = true;
log.log(Level.WARNING, "Cancelling the stream because of parse error", parseError);
stream.cancel(parseError.getStatus().withCause(new CloseWithHeadersMarker()));
serverCallTracer.reportCallEnded(false); // error so always false
}

/**
* All of these callbacks are assumed to called on an application thread, and the caller is
* responsible for handling thrown exceptions.
Expand Down Expand Up @@ -338,23 +327,18 @@ private void messagesAvailableInternal(final MessageProducer producer) {
return;
}

InputStream message = null;
InputStream message;
try {
while ((message = producer.next()) != null) {
ReqT parsed;
try {
parsed = call.method.parseRequest(message);
} catch (StatusRuntimeException e) {
listener.onMessage(call.method.parseRequest(message));
} catch (Throwable t) {
GrpcUtil.closeQuietly(message);
GrpcUtil.closeQuietly(producer);
call.handleParseError(e);
return;
throw t;
}
message.close();
listener.onMessage(parsed);
}
} catch (Throwable t) {
GrpcUtil.closeQuietly(message);
GrpcUtil.closeQuietly(producer);
Throwables.throwIfUnchecked(t);
throw new RuntimeException(t);
Expand Down
42 changes: 0 additions & 42 deletions core/src/test/java/io/grpc/internal/ServerCallImplTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,9 @@
import io.grpc.SecurityLevel;
import io.grpc.ServerCall;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl;
import io.perfmark.PerfMark;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.junit.Before;
Expand All @@ -71,8 +69,6 @@ public class ServerCallImplTest {

@Mock private ServerStream stream;
@Mock private ServerCall.Listener<Long> callListener;
@Mock private StreamListener.MessageProducer messageProducer;
@Mock private InputStream message;

private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create();
private ServerCallImpl<Long, Long> call;
Expand Down Expand Up @@ -497,44 +493,6 @@ public void streamListener_unexpectedRuntimeException() {
assertThat(e).hasMessageThat().isEqualTo("unexpected exception");
}

@Test
public void streamListener_statusRuntimeException() throws IOException {
MethodDescriptor<Long, Long> failingParseMethod = MethodDescriptor.<Long, Long>newBuilder()
.setType(MethodType.UNARY)
.setFullMethodName("service/method")
.setRequestMarshaller(new LongMarshaller() {
@Override
public Long parse(InputStream stream) {
throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED
.withDescription("Decompressed gRPC message exceeds maximum size"));
}
})
.setResponseMarshaller(new LongMarshaller())
.build();

call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context,
DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(),
serverCallTracer, PerfMark.createTag());

ServerStreamListenerImpl<Long> streamListener =
new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context);

when(messageProducer.next()).thenReturn(message, (InputStream) null);
streamListener.messagesAvailable(messageProducer);
ArgumentCaptor<Status> statusCaptor = ArgumentCaptor.forClass(Status.class);
verify(stream).cancel(statusCaptor.capture());
Status status = statusCaptor.getValue();
assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode());
assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription());

streamListener.halfClosed();
verify(callListener, never()).onHalfClose();

when(messageProducer.next()).thenReturn(message, (InputStream) null);
streamListener.messagesAvailable(messageProducer);
verify(callListener, never()).onMessage(any());
}

private static class LongMarshaller implements Marshaller<Long> {
@Override
public InputStream stream(Long value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2024,7 +2024,7 @@ private void assertPayload(Payload expected, Payload actual) {
}
}

protected static void assertCodeEquals(Status.Code expected, Status actual) {
private static void assertCodeEquals(Status.Code expected, Status actual) {
assertWithMessage("Unexpected status: %s", actual).that(actual.getCode()).isEqualTo(expected);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package io.grpc.testing.integration;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

import com.google.protobuf.ByteString;
Expand All @@ -38,8 +37,6 @@
import io.grpc.ServerCall.Listener;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import io.grpc.internal.GrpcUtil;
import io.grpc.netty.InternalNettyChannelBuilder;
import io.grpc.netty.InternalNettyServerBuilder;
Expand All @@ -56,9 +53,7 @@
import java.io.OutputStream;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

Expand Down Expand Up @@ -89,16 +84,10 @@ public static void registerCompressors() {
compressors.register(Codec.Identity.NONE);
}

@Rule
public final TestName currentTest = new TestName();

@Override
protected ServerBuilder<?> getServerBuilder() {
NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create())
.maxInboundMessageSize(
DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName())
? 1000
: AbstractInteropTest.MAX_MESSAGE_SIZE)
.maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE)
.compressorRegistry(compressors)
.decompressorRegistry(decompressors)
.intercept(new ServerInterceptor() {
Expand Down Expand Up @@ -137,22 +126,6 @@ public void compresses() {
assertTrue(FZIPPER.anyWritten);
}

private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME =
"decompressedMessageTooLong";

@Test
public void decompressedMessageTooLong() {
assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName());
final SimpleRequest bigRequest = SimpleRequest.newBuilder()
.setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000])))
.build();
StatusRuntimeException e = assertThrows(StatusRuntimeException.class,
() -> blockingStub.withCompression("gzip").unaryCall(bigRequest));
assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus());
assertEquals("Decompressed gRPC message exceeds maximum size 1000",
e.getStatus().getDescription());
}

@Override
protected NettyChannelBuilder createChannelBuilder() {
NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress())
Expand Down
7 changes: 1 addition & 6 deletions netty/src/main/java/io/grpc/netty/NettyServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.internal.AbstractServerStream;
import io.grpc.internal.CloseWithHeadersMarker;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.internal.WritableBuffer;
Expand Down Expand Up @@ -131,11 +130,7 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status)
@Override
public void cancel(Status status) {
try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) {
CancelServerStreamCommand cmd =
status.getCause() instanceof CloseWithHeadersMarker
? CancelServerStreamCommand.withReason(transportState(), status)
: CancelServerStreamCommand.withReset(transportState(), status);
writeQueue.enqueue(cmd, true);
writeQueue.enqueue(CancelServerStreamCommand.withReset(transportState(), status), true);
}
}
}
Expand Down
Loading