diff --git a/feign-reactor-core/pom.xml b/feign-reactor-core/pom.xml index 8f415583..0b6ce3b4 100644 --- a/feign-reactor-core/pom.xml +++ b/feign-reactor-core/pom.xml @@ -31,6 +31,17 @@ Feign Reactive Core + + org.jetbrains.kotlin + kotlin-stdlib + provided + + + + org.jetbrains.kotlinx + kotlinx-coroutines-reactor + provided + io.projectreactor @@ -196,6 +207,37 @@ + + org.jetbrains.kotlin + kotlin-maven-plugin + + + compile + process-sources + + compile + + + + src/main/java + target/generated-sources/annotations + + + + + + + -Xjsr305=strict + + + + + org.jetbrains.kotlin + kotlin-maven-allopen + ${kotlin.version} + + + diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java index 2e7c57ab..c4964760 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java @@ -15,6 +15,8 @@ import feign.Contract; import feign.MethodMetadata; +import kotlinx.coroutines.flow.Flow; +import reactivefeign.utils.KtCoroutinesUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -48,6 +50,12 @@ public List parseAndValidateMetadata(final Class targetType) for (final MethodMetadata metadata : methodsMetadata) { final Type type = metadata.returnType(); + + if (KtCoroutinesUtils.isSuspend(metadata.method())) { + modifySuspendMethodMetadata(metadata); + continue; + } + if (!isReactorType(type)) { throw new IllegalArgumentException(String.format( "Method %s of contract %s doesn't returns reactor.core.publisher.Mono or reactor.core.publisher.Flux", @@ -64,7 +72,24 @@ public List parseAndValidateMetadata(final Class targetType) return methodsMetadata; } - private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class)); + private static void modifySuspendMethodMetadata(MethodMetadata metadata) { + Type kotlinMethodReturnType = KtCoroutinesUtils.suspendReturnType(metadata.method()); + if (kotlinMethodReturnType == null) { + throw new IllegalArgumentException(String.format( + "Method %s can't have continuation argument, only kotlin method is allowed", + metadata.configKey())); + } + metadata.returnType(kotlinMethodReturnType); + + int continuationIndex = metadata.method().getParameterCount() - 1; + metadata.ignoreParamater(continuationIndex); + + if(metadata.bodyIndex() != null && metadata.bodyIndex().equals(continuationIndex)) { + metadata.bodyIndex(null); + } + } + + private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class, Flow.class)); public static boolean isReactorType(final Type type) { return (type instanceof ParameterizedType) diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java index 2bc74210..a77d6b5b 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java +++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java @@ -18,6 +18,7 @@ import feign.InvocationHandlerFactory; import feign.MethodMetadata; import feign.Target; +import kotlinx.coroutines.flow.Flow; import org.reactivestreams.Publisher; import reactivefeign.client.ReactiveErrorMapper; import reactivefeign.client.ReactiveHttpClient; @@ -43,6 +44,7 @@ import reactivefeign.publisher.retry.FluxRetryPublisherHttpClient; import reactivefeign.publisher.retry.MonoRetryPublisherHttpClient; import reactivefeign.retry.ReactiveRetryPolicy; +import reactivefeign.utils.KtCoroutinesUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -316,9 +318,9 @@ public static PublisherHttpClient retry( MethodMetadata methodMetadata, ReactiveRetryPolicy retryPolicy) { Type returnPublisherType = returnPublisherType(methodMetadata); - if(returnPublisherType == Mono.class){ + if (returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) { return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy); - } else if(returnPublisherType == Flux.class) { + } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) { return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType); @@ -331,9 +333,9 @@ protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient, } Class returnPublisherType = returnPublisherType(methodMetadata); - if(returnPublisherType == Mono.class){ + if(returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) { return new MonoPublisherHttpClient(reactiveHttpClient); - } else if(returnPublisherType == Flux.class){ + } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) { return new FluxPublisherHttpClient(reactiveHttpClient); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType); diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java new file mode 100644 index 00000000..3ee78306 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java @@ -0,0 +1,30 @@ +package reactivefeign.methodhandler; + +import kotlinx.coroutines.flow.Flow; +import kotlinx.coroutines.reactive.ReactiveFlowKt; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Flux; + +public class FlowMethodHandler implements MethodHandler { + + private final MethodHandler methodHandler; + + public FlowMethodHandler(MethodHandler methodHandler) { + this.methodHandler = methodHandler; + } + + @Override + public Flow invoke(final Object[] argv) { + return ReactiveFlowKt.asFlow(invokeFlux(argv)); + } + + @SuppressWarnings("unchecked") + protected Flux invokeFlux(final Object[] argv) { + try { + return Flux.from((Publisher) methodHandler.invoke(argv)); + } catch (Throwable throwable) { + return Flux.error(throwable); + } + } + +} diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java index f7996dd8..279ae216 100644 --- a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/ReactiveMethodHandlerFactory.java @@ -2,8 +2,11 @@ import feign.MethodMetadata; import feign.Target; +import kotlin.coroutines.Continuation; +import kotlinx.coroutines.flow.Flow; import reactivefeign.publisher.PublisherClientFactory; import reactivefeign.publisher.ResponsePublisherHttpClient; +import reactivefeign.utils.KtCoroutinesUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -44,6 +47,10 @@ public MethodHandler create(MethodMetadata metadata) { return new MonoMethodHandler(methodHandler); } else if(returnPublisherType == Flux.class) { return new FluxMethodHandler(methodHandler); + } else if(KtCoroutinesUtils.isSuspend(metadata.method())){ + return new SuspendMethodHandler(methodHandler); + } else if(returnPublisherType == Flow.class) { + return new FlowMethodHandler(methodHandler); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType); } @@ -57,6 +64,10 @@ public MethodHandler createDefault(Method method) { return new MonoMethodHandler(defaultMethodHandler); } else if(method.getReturnType() == Flux.class) { return new FluxMethodHandler(defaultMethodHandler); + } else if(method.getReturnType() == Continuation.class){ + return new SuspendMethodHandler(defaultMethodHandler); + } else if(method.getReturnType() == Flow.class) { + return new FlowMethodHandler(defaultMethodHandler); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + method.getReturnType()); } diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/SuspendMethodHandler.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/SuspendMethodHandler.java new file mode 100644 index 00000000..a6084b54 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/SuspendMethodHandler.java @@ -0,0 +1,33 @@ +package reactivefeign.methodhandler; + +import kotlin.coroutines.Continuation; +import kotlinx.coroutines.reactor.MonoKt; +import org.reactivestreams.Publisher; +import reactor.core.publisher.Mono; + +import java.util.Arrays; + +public class SuspendMethodHandler implements MethodHandler { + + private final MethodHandler methodHandler; + + public SuspendMethodHandler(MethodHandler methodHandler) { + this.methodHandler = methodHandler; + } + + @Override + public Object invoke(final Object[] argv) { + Object[] args = Arrays.copyOf(argv, argv.length - 1); + Continuation continuation = (Continuation) argv[argv.length - 1]; + return MonoKt.awaitSingleOrNull(invokeMono(args), continuation); + } + + @SuppressWarnings("unchecked") + public Mono invokeMono(final Object[] argv) { + try { + return Mono.from((Publisher) methodHandler.invoke(argv)); + } catch (Throwable throwable) { + return Mono.error(throwable); + } + } +} diff --git a/feign-reactor-core/src/main/java/reactivefeign/utils/KtCoroutinesUtils.kt b/feign-reactor-core/src/main/java/reactivefeign/utils/KtCoroutinesUtils.kt new file mode 100644 index 00000000..05049cf7 --- /dev/null +++ b/feign-reactor-core/src/main/java/reactivefeign/utils/KtCoroutinesUtils.kt @@ -0,0 +1,19 @@ +@file:JvmName("KtCoroutinesUtils") + +package reactivefeign.utils + +import java.lang.reflect.Method +import java.lang.reflect.ParameterizedType +import java.lang.reflect.Type +import kotlin.coroutines.Continuation + +internal fun Method.isSuspend(): Boolean { + val classes: Array> = parameterTypes + return classes.isNotEmpty() && classes[classes.size - 1] == Continuation::class.java +} + +internal fun Method.suspendReturnType() = if (isSuspend()) { + val types: Array = genericParameterTypes + val conType = types[types.size - 1] as ParameterizedType + conType +} else returnType \ No newline at end of file diff --git a/feign-reactor-parent/pom.xml b/feign-reactor-parent/pom.xml index 53500c58..b67767da 100644 --- a/feign-reactor-parent/pom.xml +++ b/feign-reactor-parent/pom.xml @@ -14,6 +14,7 @@ 17 + 1.8.22 UTF-8 UTF-8 diff --git a/feign-reactor-webclient-core/pom.xml b/feign-reactor-webclient-core/pom.xml index 02b7c978..6db3f3be 100644 --- a/feign-reactor-webclient-core/pom.xml +++ b/feign-reactor-webclient-core/pom.xml @@ -12,6 +12,17 @@ feign-reactor-webclient-core + + org.jetbrains.kotlin + kotlin-stdlib + provided + + + org.jetbrains.kotlinx + kotlinx-coroutines-core + provided + + com.playtika.reactivefeign feign-reactor-core diff --git a/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java b/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java index 544fd9aa..984663eb 100644 --- a/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java +++ b/feign-reactor-webclient-core/src/main/java/reactivefeign/webclient/client/WebReactiveHttpResponse.java @@ -1,5 +1,7 @@ package reactivefeign.webclient.client; +import kotlin.coroutines.Continuation; +import kotlinx.coroutines.flow.Flow; import org.reactivestreams.Publisher; import org.springframework.core.ParameterizedTypeReference; import org.springframework.core.io.buffer.DataBuffer; @@ -49,9 +51,9 @@ public Map> headers() { @Override public P body() { - if (returnPublisherType == Mono.class) { + if (returnPublisherType == Mono.class || returnPublisherType == Continuation.class) { return (P)clientResponse.bodyToMono(returnActualType); - } else if(returnPublisherType == Flux.class){ + } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) { return (P)clientResponse.bodyToFlux(returnActualType); } else { throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType);