Skip to content

Commit 4e6dbd4

Browse files
committed
Add ability to timeout to SSE
1 parent 942ba2c commit 4e6dbd4

File tree

2 files changed

+37
-2
lines changed

2 files changed

+37
-2
lines changed

okhttp-sse/src/main/kotlin/okhttp3/sse/EventSourceListener.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,11 @@ package okhttp3.sse
1818
import okhttp3.Response
1919

2020
abstract class EventSourceListener {
21+
/**
22+
* Seconds elapsed between 2 events until connection failed. Doesn't timeout if null
23+
*/
24+
open var idleTimeoutMillis: Long? = null
25+
2126
/**
2227
* Invoked when an event source has been accepted by the remote peer and may begin transmitting
2328
* events.

okhttp-sse/src/main/kotlin/okhttp3/sse/internal/RealEventSource.kt

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
package okhttp3.sse.internal
1717

1818
import java.io.IOException
19+
import kotlin.time.Duration
20+
import kotlin.time.Duration.Companion.seconds
1921
import okhttp3.Call
2022
import okhttp3.Callback
2123
import okhttp3.Request
@@ -24,6 +26,8 @@ import okhttp3.ResponseBody
2426
import okhttp3.internal.stripBody
2527
import okhttp3.sse.EventSource
2628
import okhttp3.sse.EventSourceListener
29+
import okio.AsyncTimeout
30+
import okio.Timeout.Companion.timeout
2731

2832
internal class RealEventSource private constructor(
2933
private val call: Call?,
@@ -38,6 +42,25 @@ internal class RealEventSource private constructor(
3842

3943
@Volatile private var canceled = false
4044

45+
fun connect(callFactory: Call.Factory) {
46+
call =
47+
callFactory.newCall(request).apply {
48+
enqueue(this@RealEventSource)
49+
}
50+
}
51+
52+
private fun updateTimeout(call: Call?, duration: Duration) {
53+
if (call?.timeout() is AsyncTimeout) {
54+
(call.timeout() as AsyncTimeout).apply {
55+
if (this.timeoutNanos() > 0L) {
56+
exit()
57+
}
58+
timeout(duration)
59+
enter()
60+
}
61+
}
62+
}
63+
4164
override fun onResponse(
4265
call: Call,
4366
response: Response,
@@ -63,8 +86,11 @@ internal class RealEventSource private constructor(
6386
return
6487
}
6588

66-
// This is a long-lived response. Cancel full-call timeouts.
67-
call?.timeout()?.cancel()
89+
// This is a long-lived response. Cancel full-call timeouts if no timeout has been set
90+
listener.idleTimeoutMillis?.let {
91+
// We spend at most timeout seconds if set
92+
updateTimeout(call, it.seconds)
93+
} ?: call?.timeout()?.cancel()
6894

6995
// Replace the body with a stripped one so the callbacks can't see real data.
7096
val response = response.stripBody()
@@ -74,6 +100,10 @@ internal class RealEventSource private constructor(
74100
if (!canceled) {
75101
listener.onOpen(this, response)
76102
while (!canceled && reader.processNextEvent()) {
103+
listener.idleTimeoutMillis?.let {
104+
// We spend at most timeout seconds if set
105+
updateTimeout(call, it.seconds)
106+
}
77107
}
78108
}
79109
} catch (e: Exception) {

0 commit comments

Comments
 (0)