diff --git a/feign-reactor-core/pom.xml b/feign-reactor-core/pom.xml
index 8f415583..0b6ce3b4 100644
--- a/feign-reactor-core/pom.xml
+++ b/feign-reactor-core/pom.xml
@@ -31,6 +31,17 @@
Feign Reactive Core
+
+ org.jetbrains.kotlin
+ kotlin-stdlib
+ provided
+
+
+
+ org.jetbrains.kotlinx
+ kotlinx-coroutines-reactor
+ provided
+
io.projectreactor
@@ -196,6 +207,37 @@
+
+ org.jetbrains.kotlin
+ kotlin-maven-plugin
+
+
+ compile
+ process-sources
+
+ compile
+
+
+
+ src/main/java
+ target/generated-sources/annotations
+
+
+
+
+
+
+ -Xjsr305=strict
+
+
+
+
+ org.jetbrains.kotlin
+ kotlin-maven-allopen
+ ${kotlin.version}
+
+
+
diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java
index 2e7c57ab..c4964760 100644
--- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java
+++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveContract.java
@@ -15,6 +15,8 @@
import feign.Contract;
import feign.MethodMetadata;
+import kotlinx.coroutines.flow.Flow;
+import reactivefeign.utils.KtCoroutinesUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -48,6 +50,12 @@ public List parseAndValidateMetadata(final Class> targetType)
for (final MethodMetadata metadata : methodsMetadata) {
final Type type = metadata.returnType();
+
+ if (KtCoroutinesUtils.isSuspend(metadata.method())) {
+ modifySuspendMethodMetadata(metadata);
+ continue;
+ }
+
if (!isReactorType(type)) {
throw new IllegalArgumentException(String.format(
"Method %s of contract %s doesn't returns reactor.core.publisher.Mono or reactor.core.publisher.Flux",
@@ -64,7 +72,24 @@ public List parseAndValidateMetadata(final Class> targetType)
return methodsMetadata;
}
- private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class));
+ private static void modifySuspendMethodMetadata(MethodMetadata metadata) {
+ Type kotlinMethodReturnType = KtCoroutinesUtils.suspendReturnType(metadata.method());
+ if (kotlinMethodReturnType == null) {
+ throw new IllegalArgumentException(String.format(
+ "Method %s can't have continuation argument, only kotlin method is allowed",
+ metadata.configKey()));
+ }
+ metadata.returnType(kotlinMethodReturnType);
+
+ int continuationIndex = metadata.method().getParameterCount() - 1;
+ metadata.ignoreParamater(continuationIndex);
+
+ if(metadata.bodyIndex() != null && metadata.bodyIndex().equals(continuationIndex)) {
+ metadata.bodyIndex(null);
+ }
+ }
+
+ private static final Set REACTOR_PUBLISHERS = new HashSet<>(asList(Mono.class, Flux.class, Flow.class));
public static boolean isReactorType(final Type type) {
return (type instanceof ParameterizedType)
diff --git a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java
index 2bc74210..a77d6b5b 100644
--- a/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java
+++ b/feign-reactor-core/src/main/java/reactivefeign/ReactiveFeign.java
@@ -18,6 +18,7 @@
import feign.InvocationHandlerFactory;
import feign.MethodMetadata;
import feign.Target;
+import kotlinx.coroutines.flow.Flow;
import org.reactivestreams.Publisher;
import reactivefeign.client.ReactiveErrorMapper;
import reactivefeign.client.ReactiveHttpClient;
@@ -43,6 +44,7 @@
import reactivefeign.publisher.retry.FluxRetryPublisherHttpClient;
import reactivefeign.publisher.retry.MonoRetryPublisherHttpClient;
import reactivefeign.retry.ReactiveRetryPolicy;
+import reactivefeign.utils.KtCoroutinesUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
@@ -316,9 +318,9 @@ public static PublisherHttpClient retry(
MethodMetadata methodMetadata,
ReactiveRetryPolicy retryPolicy) {
Type returnPublisherType = returnPublisherType(methodMetadata);
- if(returnPublisherType == Mono.class){
+ if (returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) {
return new MonoRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy);
- } else if(returnPublisherType == Flux.class) {
+ } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) {
return new FluxRetryPublisherHttpClient(publisherClient, methodMetadata, retryPolicy);
} else {
throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType);
@@ -331,9 +333,9 @@ protected PublisherHttpClient toPublisher(ReactiveHttpClient reactiveHttpClient,
}
Class returnPublisherType = returnPublisherType(methodMetadata);
- if(returnPublisherType == Mono.class){
+ if(returnPublisherType == Mono.class || KtCoroutinesUtils.isSuspend(methodMetadata.method())) {
return new MonoPublisherHttpClient(reactiveHttpClient);
- } else if(returnPublisherType == Flux.class){
+ } else if(returnPublisherType == Flux.class || returnPublisherType == Flow.class) {
return new FluxPublisherHttpClient(reactiveHttpClient);
} else {
throw new IllegalArgumentException("Unknown returnPublisherType: " + returnPublisherType);
diff --git a/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java
new file mode 100644
index 00000000..3ee78306
--- /dev/null
+++ b/feign-reactor-core/src/main/java/reactivefeign/methodhandler/FlowMethodHandler.java
@@ -0,0 +1,30 @@
+package reactivefeign.methodhandler;
+
+import kotlinx.coroutines.flow.Flow;
+import kotlinx.coroutines.reactive.ReactiveFlowKt;
+import org.reactivestreams.Publisher;
+import reactor.core.publisher.Flux;
+
+public class FlowMethodHandler implements MethodHandler {
+
+ private final MethodHandler methodHandler;
+
+ public FlowMethodHandler(MethodHandler methodHandler) {
+ this.methodHandler = methodHandler;
+ }
+
+ @Override
+ public Flow