Skip to content

Commit 3188ac5

Browse files
committed
mcp: add StreamableHTTPOptions.SessionTimeout
Add a timeout option for the streamable HTTP handler that automatically cleans up idle sessions. Also, fix a bug in the streamable client, where we hang on a request even though the client can never get a response (because the HTTP request terminated without a response or Last-Event-Id). Fixes #499
1 parent a5dae3e commit 3188ac5

File tree

5 files changed

+302
-51
lines changed

5 files changed

+302
-51
lines changed

mcp/client.go

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"iter"
1212
"slices"
1313
"sync"
14+
"sync/atomic"
1415
"time"
1516

1617
"github.com/google/jsonschema-go/jsonschema"
@@ -177,7 +178,11 @@ func (c *Client) Connect(ctx context.Context, t Transport, _ *ClientSessionOptio
177178
// Call [ClientSession.Close] to close the connection, or await server
178179
// termination with [ClientSession.Wait].
179180
type ClientSession struct {
180-
onClose func()
181+
// Ensure that onClose is called at most once.
182+
// We defensively use an atomic CompareAndSwap rather than a sync.Once, in case the
183+
// onClose callback triggers a re-entrant call to Close.
184+
calledOnClose atomic.Bool
185+
onClose func()
181186

182187
conn *jsonrpc2.Connection
183188
client *Client
@@ -205,6 +210,8 @@ func (cs *ClientSession) ID() string {
205210
// Close performs a graceful close of the connection, preventing new requests
206211
// from being handled, and waiting for ongoing requests to return. Close then
207212
// terminates the connection.
213+
//
214+
// Close is idempotent and concurrency safe.
208215
func (cs *ClientSession) Close() error {
209216
// Note: keepaliveCancel access is safe without a mutex because:
210217
// 1. keepaliveCancel is only written once during startKeepalive (happens-before all Close calls)
@@ -216,7 +223,7 @@ func (cs *ClientSession) Close() error {
216223
}
217224
err := cs.conn.Close()
218225

219-
if cs.onClose != nil {
226+
if cs.onClose != nil && cs.calledOnClose.CompareAndSwap(false, true) {
220227
cs.onClose()
221228
}
222229

mcp/server.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import (
1919
"reflect"
2020
"slices"
2121
"sync"
22+
"sync/atomic"
2223
"time"
2324

2425
"github.com/google/jsonschema-go/jsonschema"
@@ -825,7 +826,7 @@ func (s *Server) disconnect(cc *ServerSession) {
825826
type ServerSessionOptions struct {
826827
State *ServerSessionState
827828

828-
onClose func()
829+
onClose func() // used to clean up associated resources
829830
}
830831

831832
// Connect connects the MCP server over the given transport and starts handling
@@ -920,7 +921,11 @@ func newServerRequest[P Params](ss *ServerSession, params P) *ServerRequest[P] {
920921
// Call [ServerSession.Close] to close the connection, or await client
921922
// termination with [ServerSession.Wait].
922923
type ServerSession struct {
923-
onClose func()
924+
// Ensure that onClose is called at most once.
925+
// We defensively use an atomic CompareAndSwap rather than a sync.Once, in case the
926+
// onClose callback triggers a re-entrant call to Close.
927+
calledOnClose atomic.Bool
928+
onClose func()
924929

925930
server *Server
926931
conn *jsonrpc2.Connection
@@ -1185,6 +1190,8 @@ func (ss *ServerSession) setLevel(_ context.Context, params *SetLoggingLevelPara
11851190
// Close performs a graceful shutdown of the connection, preventing new
11861191
// requests from being handled, and waiting for ongoing requests to return.
11871192
// Close then terminates the connection.
1193+
//
1194+
// Close is idempotent and concurrency safe.
11881195
func (ss *ServerSession) Close() error {
11891196
if ss.keepaliveCancel != nil {
11901197
// Note: keepaliveCancel access is safe without a mutex because:
@@ -1196,7 +1203,7 @@ func (ss *ServerSession) Close() error {
11961203
}
11971204
err := ss.conn.Close()
11981205

1199-
if ss.onClose != nil {
1206+
if ss.onClose != nil && ss.calledOnClose.CompareAndSwap(false, true) {
12001207
ss.onClose()
12011208
}
12021209

0 commit comments

Comments
 (0)