diff --git a/core/src/main/java/io/grpc/internal/CloseWithHeadersMarker.java b/core/src/main/java/io/grpc/internal/CloseWithHeadersMarker.java deleted file mode 100644 index 376b9edb614..00000000000 --- a/core/src/main/java/io/grpc/internal/CloseWithHeadersMarker.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2025 The gRPC Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.grpc.internal; - -import io.grpc.Status; - -/** - * Marker to be used for Status sent to {@link ServerStream#cancel(Status)} to signal that stream - * should be closed by sending headers. - */ -public class CloseWithHeadersMarker extends Throwable { - private static final long serialVersionUID = 0L; - - @Override - public synchronized Throwable fillInStackTrace() { - return this; - } -} diff --git a/core/src/main/java/io/grpc/internal/ServerCallImpl.java b/core/src/main/java/io/grpc/internal/ServerCallImpl.java index 1c1f76cbb12..e224384ce8f 100644 --- a/core/src/main/java/io/grpc/internal/ServerCallImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerCallImpl.java @@ -279,17 +279,6 @@ private void handleInternalError(Throwable internalError) { serverCallTracer.reportCallEnded(false); // error so always false } - /** - * Close the {@link ServerStream} because parsing request message failed. - * Similar to {@link #handleInternalError(Throwable)}. - */ - private void handleParseError(StatusRuntimeException parseError) { - cancelled = true; - log.log(Level.WARNING, "Cancelling the stream because of parse error", parseError); - stream.cancel(parseError.getStatus().withCause(new CloseWithHeadersMarker())); - serverCallTracer.reportCallEnded(false); // error so always false - } - /** * All of these callbacks are assumed to called on an application thread, and the caller is * responsible for handling thrown exceptions. @@ -338,23 +327,18 @@ private void messagesAvailableInternal(final MessageProducer producer) { return; } - InputStream message = null; + InputStream message; try { while ((message = producer.next()) != null) { - ReqT parsed; try { - parsed = call.method.parseRequest(message); - } catch (StatusRuntimeException e) { + listener.onMessage(call.method.parseRequest(message)); + } catch (Throwable t) { GrpcUtil.closeQuietly(message); - GrpcUtil.closeQuietly(producer); - call.handleParseError(e); - return; + throw t; } message.close(); - listener.onMessage(parsed); } } catch (Throwable t) { - GrpcUtil.closeQuietly(message); GrpcUtil.closeQuietly(producer); Throwables.throwIfUnchecked(t); throw new RuntimeException(t); diff --git a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java index 028f1ac93cd..7394c83eab2 100644 --- a/core/src/test/java/io/grpc/internal/ServerCallImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerCallImplTest.java @@ -48,11 +48,9 @@ import io.grpc.SecurityLevel; import io.grpc.ServerCall; import io.grpc.Status; -import io.grpc.StatusRuntimeException; import io.grpc.internal.ServerCallImpl.ServerStreamListenerImpl; import io.perfmark.PerfMark; import java.io.ByteArrayInputStream; -import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import org.junit.Before; @@ -71,8 +69,6 @@ public class ServerCallImplTest { @Mock private ServerStream stream; @Mock private ServerCall.Listener callListener; - @Mock private StreamListener.MessageProducer messageProducer; - @Mock private InputStream message; private final CallTracer serverCallTracer = CallTracer.getDefaultFactory().create(); private ServerCallImpl call; @@ -497,44 +493,6 @@ public void streamListener_unexpectedRuntimeException() { assertThat(e).hasMessageThat().isEqualTo("unexpected exception"); } - @Test - public void streamListener_statusRuntimeException() throws IOException { - MethodDescriptor failingParseMethod = MethodDescriptor.newBuilder() - .setType(MethodType.UNARY) - .setFullMethodName("service/method") - .setRequestMarshaller(new LongMarshaller() { - @Override - public Long parse(InputStream stream) { - throw new StatusRuntimeException(Status.RESOURCE_EXHAUSTED - .withDescription("Decompressed gRPC message exceeds maximum size")); - } - }) - .setResponseMarshaller(new LongMarshaller()) - .build(); - - call = new ServerCallImpl<>(stream, failingParseMethod, requestHeaders, context, - DecompressorRegistry.getDefaultInstance(), CompressorRegistry.getDefaultInstance(), - serverCallTracer, PerfMark.createTag()); - - ServerStreamListenerImpl streamListener = - new ServerCallImpl.ServerStreamListenerImpl<>(call, callListener, context); - - when(messageProducer.next()).thenReturn(message, (InputStream) null); - streamListener.messagesAvailable(messageProducer); - ArgumentCaptor statusCaptor = ArgumentCaptor.forClass(Status.class); - verify(stream).cancel(statusCaptor.capture()); - Status status = statusCaptor.getValue(); - assertEquals(Status.Code.RESOURCE_EXHAUSTED, status.getCode()); - assertEquals("Decompressed gRPC message exceeds maximum size", status.getDescription()); - - streamListener.halfClosed(); - verify(callListener, never()).onHalfClose(); - - when(messageProducer.next()).thenReturn(message, (InputStream) null); - streamListener.messagesAvailable(messageProducer); - verify(callListener, never()).onMessage(any()); - } - private static class LongMarshaller implements Marshaller { @Override public InputStream stream(Long value) { diff --git a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java index f5cd111a5b1..51295281a90 100644 --- a/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java +++ b/interop-testing/src/main/java/io/grpc/testing/integration/AbstractInteropTest.java @@ -2024,7 +2024,7 @@ private void assertPayload(Payload expected, Payload actual) { } } - protected static void assertCodeEquals(Status.Code expected, Status actual) { + private static void assertCodeEquals(Status.Code expected, Status actual) { assertWithMessage("Unexpected status: %s", actual).that(actual.getCode()).isEqualTo(expected); } diff --git a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java index 33cd624aebb..b9692383254 100644 --- a/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java +++ b/interop-testing/src/test/java/io/grpc/testing/integration/TransportCompressionTest.java @@ -17,7 +17,6 @@ package io.grpc.testing.integration; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import com.google.protobuf.ByteString; @@ -38,8 +37,6 @@ import io.grpc.ServerCall.Listener; import io.grpc.ServerCallHandler; import io.grpc.ServerInterceptor; -import io.grpc.Status.Code; -import io.grpc.StatusRuntimeException; import io.grpc.internal.GrpcUtil; import io.grpc.netty.InternalNettyChannelBuilder; import io.grpc.netty.InternalNettyServerBuilder; @@ -56,9 +53,7 @@ import java.io.OutputStream; import org.junit.Before; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -89,16 +84,10 @@ public static void registerCompressors() { compressors.register(Codec.Identity.NONE); } - @Rule - public final TestName currentTest = new TestName(); - @Override protected ServerBuilder getServerBuilder() { NettyServerBuilder builder = NettyServerBuilder.forPort(0, InsecureServerCredentials.create()) - .maxInboundMessageSize( - DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME.equals(currentTest.getMethodName()) - ? 1000 - : AbstractInteropTest.MAX_MESSAGE_SIZE) + .maxInboundMessageSize(AbstractInteropTest.MAX_MESSAGE_SIZE) .compressorRegistry(compressors) .decompressorRegistry(decompressors) .intercept(new ServerInterceptor() { @@ -137,22 +126,6 @@ public void compresses() { assertTrue(FZIPPER.anyWritten); } - private static final String DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME = - "decompressedMessageTooLong"; - - @Test - public void decompressedMessageTooLong() { - assertEquals(DECOMPRESSED_MESSAGE_TOO_LONG_METHOD_NAME, currentTest.getMethodName()); - final SimpleRequest bigRequest = SimpleRequest.newBuilder() - .setPayload(Payload.newBuilder().setBody(ByteString.copyFrom(new byte[10_000]))) - .build(); - StatusRuntimeException e = assertThrows(StatusRuntimeException.class, - () -> blockingStub.withCompression("gzip").unaryCall(bigRequest)); - assertCodeEquals(Code.RESOURCE_EXHAUSTED, e.getStatus()); - assertEquals("Decompressed gRPC message exceeds maximum size 1000", - e.getStatus().getDescription()); - } - @Override protected NettyChannelBuilder createChannelBuilder() { NettyChannelBuilder builder = NettyChannelBuilder.forAddress(getListenAddress()) diff --git a/netty/src/main/java/io/grpc/netty/NettyServerStream.java b/netty/src/main/java/io/grpc/netty/NettyServerStream.java index 681e649d1ef..836f39ddf19 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerStream.java @@ -23,7 +23,6 @@ import io.grpc.Metadata; import io.grpc.Status; import io.grpc.internal.AbstractServerStream; -import io.grpc.internal.CloseWithHeadersMarker; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.internal.WritableBuffer; @@ -131,11 +130,7 @@ public void writeTrailers(Metadata trailers, boolean headersSent, Status status) @Override public void cancel(Status status) { try (TaskCloseable ignore = PerfMark.traceTask("NettyServerStream$Sink.cancel")) { - CancelServerStreamCommand cmd = - status.getCause() instanceof CloseWithHeadersMarker - ? CancelServerStreamCommand.withReason(transportState(), status) - : CancelServerStreamCommand.withReset(transportState(), status); - writeQueue.enqueue(cmd, true); + writeQueue.enqueue(CancelServerStreamCommand.withReset(transportState(), status), true); } } }