implement non-blocking stream server handler#503
implement non-blocking stream server handler#503naoh87 wants to merge 4 commits intotypelevel:mainfrom
Conversation
f0e2687 to
2be5402
Compare
2be5402 to
e5e1b57
Compare
|
@naoh87 I have been busy with other stuff and have not forgotten about this PR. I'll look more closely into it when I find the time. |
e5e1b57 to
876b4ab
Compare
876b4ab to
32595a3
Compare
|
@naoh87 I have not forgotten this, however upcoming work with queues in cats-effect 3.4.x might give us better building blocks. One question, does your work here take into account back pressure? For instance, a server can't keep up with a client streaming too much. |
|
|
||
| def requestOnPull[F[_]]: Pipe[F, Request, Request] = | ||
| _.mapChunks { chunk => | ||
| call.request(chunk.size) |
There was a problem hiding this comment.
I suppose, call.request might be impure. Wouldn't it be better to delay the effect like this:
import cats.syntax.functor._
def requestOnPull[F[_]](implicit F: Sync[F]): Pipe[F, Request, Request] =
_.chunks.flatMap(chunk =>
Stream.evalUnChunk(F.delay(call.request(chunk.size)).as(chunk))
)
There was a problem hiding this comment.
It looks good. thank you.
| call <- Fs2ServerCall.setup(opt, call) | ||
| _ <- call.request(1) // prefetch size | ||
| channel <- OneShotChannel.empty[Request] | ||
| cancel <- start(call, impl(channel.stream.through(call.requestOnPull), headers)) |
There was a problem hiding this comment.
@ahjohannessen back pressure is controlled from here of call.requsetOnPull .
Server requests next messages from the client each stream chunk is pulled.
Prefetch and buffer size is declarabled by 2 lines ago. _ <- call.request(1)
There was a problem hiding this comment.
Server to client message back pressure implementation is still missing as main branch.
There was a problem hiding this comment.
I think this server to client back pressure implementation may cause performance issue.
grpc-java says free to ignore this and main branch does.
https://github.com/grpc/grpc-java/blob/v1.46.0/api/src/main/java/io/grpc/ServerCall.java#L100
I think this feature should be opt-in.
|
I think new cats-effect 3.4.x queue will now work as good as this PR, because new CE3 queue is implemented as MPMC, but we just need SPSC. and also new implementation does not reduce Dispatcher[F] cost at onMessage. |
460bdcf to
6b2e2db
Compare
Ok. I think we should try to use as much code from cats-effect/fs2 where possible. That reduces the amount of code we have to maintain - I am thinking mostly about |
What this PR do
ClientStreaming Benchmark
method implement
Request 100 messages with
ghq --cpu=4 -z 60s --connections=5 ...#503
main