Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion buffer-netty/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {

annotationProcessor(projects.micronautInjectJava)

testRuntimeOnly(projects.micronautHttpTck) // leak detection module
testRuntimeOnly(libs.micronaut.test.netty.leak)
}

tasks {
Expand Down
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ micronaut-build-plugins="7.6.4"
micronaut-groovy = "4.7.0"
micronaut-session = "4.6.0"
micronaut-sql = "5.3.0"
micronaut-test = "4.6.2"
micronaut-test = "4.10.1"
micronaut-validation = "4.9.0"
micronaut-rxjava2 = "2.7.0"
micronaut-rxjava3 = "3.7.0"
Expand Down Expand Up @@ -264,6 +264,7 @@ micronaut-test-junit5 = { module = "io.micronaut.test:micronaut-test-junit5", ve
micronaut-test-kotest5 = { module = "io.micronaut.test:micronaut-test-kotest5", version.ref = "micronaut-test" }
micronaut-test-spock = { module = "io.micronaut.test:micronaut-test-spock", version.ref = "micronaut-test" }
micronaut-test-type-pollution = { module = "io.micronaut.test:micronaut-test-type-pollution", version.ref = "micronaut-test" }
micronaut-test-netty-leak = { module = "io.micronaut.test:micronaut-test-netty-leak", version.ref = "micronaut-test" }

micronaut-sql-jdbc = { module = "io.micronaut.sql:micronaut-jdbc", version.ref = "micronaut-sql" }
micronaut-sql-jdbc-tomcat = { module = "io.micronaut.sql:micronaut-jdbc-tomcat", version.ref = "micronaut-sql" }
Expand Down
3 changes: 2 additions & 1 deletion http-server-netty/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ dependencies {
testImplementation(libs.bcpkix)
testImplementation(libs.managed.netty.pkitesting)
testImplementation(projects.micronautJacksonDatabind)
testImplementation(projects.micronautHttpTck)
// Add Micronaut Jackson XML after v4 Migration
// testImplementation(libs.managed.micronaut.xml) {
// exclude module:'micronaut-inject'
Expand Down Expand Up @@ -125,12 +124,14 @@ dependencies {
exclude(group = "io.micronaut")
}
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.micronaut.test.netty.leak)
}

tasks.withType<Test>().configureEach {
forkEvery = 100
maxParallelForks = 4
useJUnitPlatform()
systemProperty("junit.jupiter.extensions.autodetection.enabled", "true")
}

//tasks.withType(Test).configureEach {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.micronaut.http.server.ResponseLifecycle;
import io.netty.buffer.ByteBufAllocator;
import io.netty.handler.codec.http.HttpContent;
import io.netty.util.LeakPresenceDetector;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

Expand Down Expand Up @@ -97,7 +98,7 @@ private NettyByteBodyFactory byteBodyFactory() {
}

private static class NettyConcatenatingSubscriber extends ConcatenatingSubscriber implements BufferConsumer {
static final Separators JSON_NETTY = Separators.jsonSeparators(NettyReadBufferFactory.of(ByteBufAllocator.DEFAULT));
static final Separators JSON_NETTY = LeakPresenceDetector.staticInitializer(() -> Separators.jsonSeparators(NettyReadBufferFactory.of(ByteBufAllocator.DEFAULT)));

private final EventLoopFlow flow;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,8 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
if (idle.state() == IdleState.ALL_IDLE) {
ctx.close();
}
super.userEventTriggered(ctx, evt);
}
super.userEventTriggered(ctx, evt);
}

/**
Expand All @@ -276,6 +276,8 @@ public void handleFakeRequest(io.netty.handler.codec.http2.Http2Stream onStream,
stream.onHeadersRead(fhr, empty);
if (!empty) {
stream.onDataRead(fhr.content(), true);
} else {
fhr.content().release();
}
}
}
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ import io.micronaut.http.hateoas.JsonError
import io.micronaut.http.hateoas.Link
import io.micronaut.http.server.netty.AbstractMicronautSpec
import io.micronaut.json.JsonSyntaxException
import io.micronaut.scheduling.TaskExecutors
import jakarta.inject.Inject
import jakarta.inject.Named
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.scheduler.Schedulers
import spock.lang.Issue

import java.util.concurrent.CompletableFuture
import java.util.concurrent.Executor

class JsonBodyBindingSpec extends AbstractMicronautSpec {

Expand Down Expand Up @@ -388,6 +392,9 @@ class JsonBodyBindingSpec extends AbstractMicronautSpec {
@Controller(value = "/json", produces = io.micronaut.http.MediaType.APPLICATION_JSON)
@Requires(property = "test.controller", value = "JsonController")
static class JsonController {
@Inject
@Named(TaskExecutors.BLOCKING)
Executor blocking

@Post("/params")
String params(String name, int age) {
Expand Down Expand Up @@ -472,7 +479,7 @@ class JsonBodyBindingSpec extends AbstractMicronautSpec {
@Post("/publisher-object")
Publisher<String> publisherObject(@Body Publisher<Foo> publisher) {
return Flux.from(publisher)
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(Schedulers.fromExecutor(blocking))
.map({ Foo foo ->
foo.toString()
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ class ServerRequestContextSpec extends Specification {
Mono.fromCallable({ ->
def request = ServerRequestContext.currentRequest().orElseThrow { -> new RuntimeException("no request") }
request.uri.toString()
}).subscribeOn(Schedulers.boundedElastic())
}).subscribeOn(Schedulers.fromExecutor(executorService))
}

@Get("/reactor-context")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@ import io.micronaut.context.ApplicationContext
import io.micronaut.context.BeanProvider
import io.micronaut.context.annotation.Replaces
import io.micronaut.context.annotation.Requires
import io.micronaut.http.annotation.Body
import io.micronaut.core.annotation.Nullable
import io.micronaut.http.annotation.Body
import io.micronaut.http.annotation.Controller
import io.micronaut.http.annotation.Get
import io.micronaut.http.annotation.Post
import io.micronaut.http.netty.channel.EventLoopGroupConfiguration
import io.micronaut.http.netty.channel.EventLoopGroupRegistry
import io.micronaut.http.server.HttpServerConfiguration
import io.micronaut.http.server.netty.NettyHttpServer
import io.micronaut.http.tck.netty.TestLeakDetector
import io.micronaut.http.server.util.DefaultHttpHostResolver
import io.micronaut.runtime.server.EmbeddedServer
import io.netty.bootstrap.Bootstrap
Expand All @@ -36,8 +35,6 @@ class FuzzyInputSpec extends Specification {

def 'http1 cleartext buffer leaks'() {
given:
TestLeakDetector.startTracking("")

ApplicationContext ctx = ApplicationContext.run([
'spec.name': 'FuzzyInputSpec',
"micronaut.server.port": "-1",
Expand Down Expand Up @@ -67,7 +64,7 @@ class FuzzyInputSpec extends Specification {
channel.closeFuture().sync()

then:
TestLeakDetector.stopTrackingAndReportLeaks()
noExceptionThrown()

cleanup:
embeddedServer.stop()
Expand All @@ -82,7 +79,6 @@ class FuzzyInputSpec extends Specification {
def 'http1 cleartext embedded channel'() {
given:
FlagAppender.clear()
TestLeakDetector.startTracking("")

ApplicationContext ctx = ApplicationContext.run([
'spec.name': 'FuzzyInputSpec',
Expand All @@ -107,7 +103,6 @@ class FuzzyInputSpec extends Specification {
then:
embeddedChannel.checkException()

TestLeakDetector.stopTrackingAndReportLeaks()
FlagAppender.checkTriggered()

where:
Expand All @@ -131,7 +126,6 @@ class FuzzyInputSpec extends Specification {
def 'http2 cleartext embedded channel'() {
given:
FlagAppender.clear()
TestLeakDetector.startTracking("")

ApplicationContext ctx = ApplicationContext.run([
'spec.name': 'FuzzyInputSpec',
Expand All @@ -157,7 +151,6 @@ class FuzzyInputSpec extends Specification {
then:
embeddedChannel.checkException()

TestLeakDetector.stopTrackingAndReportLeaks()
FlagAppender.checkTriggered()

where:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SupportedCipherSuiteFilter
import io.netty.handler.ssl.util.InsecureTrustManagerFactory
import io.netty.util.ReferenceCountUtil
import jakarta.inject.Singleton
import org.slf4j.LoggerFactory
import reactor.core.publisher.Mono
Expand Down Expand Up @@ -77,8 +78,10 @@ class AccessLogSpec extends Specification {
server.start()

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -128,7 +131,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand All @@ -144,8 +148,10 @@ class AccessLogSpec extends Specification {
server.start()

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -194,7 +200,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand All @@ -209,8 +216,10 @@ class AccessLogSpec extends Specification {
server.start()

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -272,7 +281,7 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down Expand Up @@ -314,8 +323,10 @@ class AccessLogSpec extends Specification {
ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT,
ApplicationProtocolNames.HTTP_2))
.build()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -386,7 +397,9 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
ReferenceCountUtil.release(ctx)
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down Expand Up @@ -415,8 +428,9 @@ class AccessLogSpec extends Specification {
request4.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), ':https')

def responses = new CopyOnWriteArrayList<FullHttpResponse>()
def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -484,7 +498,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down Expand Up @@ -514,8 +529,10 @@ class AccessLogSpec extends Specification {
request4.headers().add(HttpConversionUtil.ExtensionHeaderNames.SCHEME.text(), ':https')

def responses = new CopyOnWriteArrayList<FullHttpResponse>()

def group = new NioEventLoopGroup(1)
Bootstrap bootstrap = new Bootstrap()
.group(new NioEventLoopGroup(1))
.group(group)
.channel(NioSocketChannel)
.option(ChannelOption.AUTO_READ, true)
.handler(new ChannelInitializer<Channel>() {
Expand Down Expand Up @@ -580,7 +597,8 @@ class AccessLogSpec extends Specification {
responses*.content().forEach(ByteBuf::release)
server.close()
channel.close()
bootstrap.config().group().shutdownGracefully()
ctx.close()
group.shutdownGracefully()
}

@Issue('https://github.com/micronaut-projects/micronaut-core/issues/6782')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ class H2cSpec extends Specification {

private CompletableFuture requestUpgrade(DefaultFullHttpRequest initialRequest) {
def responseFuture = new CompletableFuture()
def group = new NioEventLoopGroup(1)
def bootstrap = new Bootstrap()
.remoteAddress(embeddedServer.host, embeddedServer.port)
.group(new NioEventLoopGroup())
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
Expand Down Expand Up @@ -120,7 +121,7 @@ class H2cSpec extends Specification {
channel.writeAndFlush(initialRequest)
channel.read()

return responseFuture
return responseFuture.whenComplete((r, e) -> group.shutdownGracefully())
}

void 'test using direct netty http2 client'() {
Expand All @@ -129,6 +130,9 @@ class H2cSpec extends Specification {

expect:
responseFuture.get(10, TimeUnit.SECONDS) != null

cleanup:
ReferenceCountUtil.release(responseFuture.getNow(null))
}

void 'test using micronaut http client: retrieve'() {
Expand Down
Loading
Loading