From 9ee3f1ea7dd92e087e7036a1ee269949ac276817 Mon Sep 17 00:00:00 2001 From: Gilbert <109980169+bud-primordium@users.noreply.github.com> Date: Tue, 2 Jun 2026 16:16:05 +0800 Subject: [PATCH 1/2] feat(gateway): add downstream keepalive for non-stream compact responses While waiting for upstream compact responses, periodically write blank lines to the downstream connection and flush, preventing reverse proxies (e.g. Cloudflare Tunnel, Nginx) from closing the connection due to idle timeout. New config: gateway.openai_compact_nonstream_keepalive_interval (seconds) Default 0 (disabled); valid range 5-60 when enabled. Refs #2773, #2243 --- backend/internal/config/config.go | 12 ++ backend/internal/config/config_test.go | 25 +++ .../service/openai_gateway_service.go | 149 +++++++++++++- .../service/openai_oauth_passthrough_test.go | 182 ++++++++++++++++++ deploy/.env.example | 2 + deploy/config.example.yaml | 3 + deploy/docker-compose.local.yml | 1 + 7 files changed, 369 insertions(+), 5 deletions(-) diff --git a/backend/internal/config/config.go b/backend/internal/config/config.go index 2fefe820c99..359c5427de5 100644 --- a/backend/internal/config/config.go +++ b/backend/internal/config/config.go @@ -717,6 +717,10 @@ type GatewayConfig struct { // OpenAIPassthroughAllowTimeoutHeaders: OpenAI 透传模式是否放行客户端超时头 // 关闭(默认)可避免 x-stainless-timeout 等头导致上游提前断流。 OpenAIPassthroughAllowTimeoutHeaders bool `mapstructure:"openai_passthrough_allow_timeout_headers"` + // OpenAICompactNonstreamKeepaliveInterval: /responses/compact 非流式空行 keepalive 间隔(秒) + // 等待上游 compact 响应期间,按此间隔向下游写 \n 并 flush,防止反代/CDN 空闲超时断连。 + // 0 表示禁用;非 0 时必须为 5-60。 + OpenAICompactNonstreamKeepaliveInterval int `mapstructure:"openai_compact_nonstream_keepalive_interval"` // OpenAIWS: OpenAI Responses WebSocket 配置(默认开启,可按需回滚到 HTTP) OpenAIWS GatewayOpenAIWSConfig `mapstructure:"openai_ws"` // OpenAIScheduler: OpenAI 高级调度器粘性逃逸配置 @@ -1829,6 +1833,7 @@ func setDefaults() { viper.SetDefault("gateway.force_codex_cli", false) viper.SetDefault("gateway.codex_image_generation_bridge_enabled", false) viper.SetDefault("gateway.openai_passthrough_allow_timeout_headers", false) + viper.SetDefault("gateway.openai_compact_nonstream_keepalive_interval", 0) // OpenAI Responses WebSocket(默认开启;可通过 force_http 紧急回滚) viper.SetDefault("gateway.openai_ws.enabled", true) viper.SetDefault("gateway.openai_ws.mode_router_v2_enabled", false) @@ -2459,6 +2464,13 @@ func (c *Config) Validate() error { if c.Gateway.OpenAIResponseHeaderTimeout < 0 { return fmt.Errorf("gateway.openai_response_header_timeout must be non-negative") } + if c.Gateway.OpenAICompactNonstreamKeepaliveInterval < 0 { + return fmt.Errorf("gateway.openai_compact_nonstream_keepalive_interval must be non-negative") + } + if c.Gateway.OpenAICompactNonstreamKeepaliveInterval != 0 && + (c.Gateway.OpenAICompactNonstreamKeepaliveInterval < 5 || c.Gateway.OpenAICompactNonstreamKeepaliveInterval > 60) { + return fmt.Errorf("gateway.openai_compact_nonstream_keepalive_interval must be 0 or between 5-60 seconds") + } if strings.TrimSpace(c.Gateway.ConnectionPoolIsolation) != "" { switch c.Gateway.ConnectionPoolIsolation { case ConnectionPoolIsolationProxy, ConnectionPoolIsolationAccount, ConnectionPoolIsolationAccountProxy: diff --git a/backend/internal/config/config_test.go b/backend/internal/config/config_test.go index ecdfaac4d18..0c2a113c616 100644 --- a/backend/internal/config/config_test.go +++ b/backend/internal/config/config_test.go @@ -216,6 +216,31 @@ func TestLoadOpenAIResponseHeaderTimeoutFromEnv(t *testing.T) { require.Equal(t, 1800, cfg.Gateway.OpenAIResponseHeaderTimeout) } +func TestLoadDefaultOpenAICompactNonstreamKeepaliveDisabled(t *testing.T) { + resetViperWithJWTSecret(t) + + cfg, err := Load() + require.NoError(t, err) + require.Equal(t, 0, cfg.Gateway.OpenAICompactNonstreamKeepaliveInterval) +} + +func TestLoadOpenAICompactNonstreamKeepaliveFromEnv(t *testing.T) { + resetViperWithJWTSecret(t) + t.Setenv("GATEWAY_OPENAI_COMPACT_NONSTREAM_KEEPALIVE_INTERVAL", "5") + + cfg, err := Load() + require.NoError(t, err) + require.Equal(t, 5, cfg.Gateway.OpenAICompactNonstreamKeepaliveInterval) +} + +func TestLoadOpenAICompactNonstreamKeepaliveRejectsOutOfRange(t *testing.T) { + resetViperWithJWTSecret(t) + t.Setenv("GATEWAY_OPENAI_COMPACT_NONSTREAM_KEEPALIVE_INTERVAL", "1") + + _, err := Load() + require.ErrorContains(t, err, "gateway.openai_compact_nonstream_keepalive_interval must be 0 or between 5-60 seconds") +} + func TestLoadOpenAIWSStickyTTLCompatibility(t *testing.T) { resetViperWithJWTSecret(t) t.Setenv("GATEWAY_OPENAI_WS_STICKY_RESPONSE_ID_TTL_SECONDS", "0") diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index 2bed9e0394d..d926c47211e 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -3272,18 +3272,35 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( c.Set("openai_passthrough", true) } + stopCompactKeepalive := func() {} + if !reqStream { + stopCompactKeepalive = s.startCompactNonstreamKeepalive(ctx, c) + } + upstreamStart := time.Now() resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(upstreamStart).Milliseconds()) if err != nil { + stopCompactKeepalive() // Transport-level failure (proxy/DNS/TCP/TLS — no HTTP response). Convert to // a failover so the handler switches to a healthy account, and temporarily // unschedule the account on durable faults (e.g. rejected proxy credentials). - return nil, s.handleOpenAIUpstreamTransportError(ctx, c, account, err, true) + transportErr := s.handleOpenAIUpstreamTransportError(ctx, c, account, err, true) + if openAICompactKeepaliveCommitted(c) { + logOpenAICompactKeepaliveCommitted(ctx, c, account, nil) + writeOpenAICommittedTransportError(c) + return nil, fmt.Errorf("upstream request failed after compact keepalive: %s", sanitizeUpstreamErrorMessage(err.Error())) + } + return nil, transportErr } defer func() { _ = resp.Body.Close() }() if resp.StatusCode >= 400 { + stopCompactKeepalive() + if openAICompactKeepaliveCommitted(c) { + logOpenAICompactKeepaliveCommitted(ctx, c, account, resp) + return nil, s.handleErrorResponsePassthrough(ctx, resp, c, account, body) + } // 透传模式默认保持原样代理;但 429/529 属于网关必须兜底的 // 上游容量类错误,应先触发多账号 failover 以维持基础 SLA。 if shouldFailoverOpenAIPassthroughResponse(resp.StatusCode) { @@ -3300,6 +3317,7 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( imageCount := 0 var imageOutputSizes []string if reqStream { + stopCompactKeepalive() result, err := s.handleStreamingResponsePassthrough(ctx, resp, c, account, startTime, reqModel, upstreamPassthroughModel) if err != nil { return nil, err @@ -3310,7 +3328,7 @@ func (s *OpenAIGatewayService) forwardOpenAIPassthrough( imageCount = result.imageCount imageOutputSizes = result.imageOutputSizes } else { - result, err := s.handleNonStreamingResponsePassthrough(ctx, resp, c, reqModel, upstreamPassthroughModel) + result, err := s.handleNonStreamingResponsePassthrough(ctx, resp, c, reqModel, upstreamPassthroughModel, stopCompactKeepalive) if err != nil { return nil, err } @@ -3640,6 +3658,113 @@ func (s *OpenAIGatewayService) isOpenAIPassthroughTimeoutHeadersAllowed() bool { return s != nil && s.cfg != nil && s.cfg.Gateway.OpenAIPassthroughAllowTimeoutHeaders } +// compactNonstreamKeepaliveInterval 返回 compact 非流式空行 keepalive 间隔;0 表示禁用。 +func (s *OpenAIGatewayService) compactNonstreamKeepaliveInterval() time.Duration { + if s == nil || s.cfg == nil { + return 0 + } + seconds := s.cfg.Gateway.OpenAICompactNonstreamKeepaliveInterval + if seconds <= 0 { + return 0 + } + return time.Duration(seconds) * time.Second +} + +// startCompactNonstreamKeepalive 为 compact 非流式请求启动下游空行心跳,防止反代空闲断连。 +func (s *OpenAIGatewayService) startCompactNonstreamKeepalive(ctx context.Context, c *gin.Context) func() { + if s == nil || c == nil || c.Writer == nil || !isOpenAIResponsesCompactPath(c) { + return func() {} + } + interval := s.compactNonstreamKeepaliveInterval() + if interval <= 0 { + return func() {} + } + flusher, ok := c.Writer.(http.Flusher) + if !ok { + return func() {} + } + if ctx == nil { + ctx = context.Background() + } + + headers := c.Writer.Header() + headers.Set("Content-Type", "application/json") + headers.Set("Cache-Control", "no-cache") + headers.Set("X-Accel-Buffering", "no") + headers.Del("Content-Length") + + stopCh := make(chan struct{}) + var stopOnce sync.Once + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-stopCh: + return + case <-ctx.Done(): + return + case <-ticker.C: + _, _ = c.Writer.Write([]byte("\n")) + flusher.Flush() + } + } + }() + + return func() { + stopOnce.Do(func() { + close(stopCh) + }) + wg.Wait() + } +} + +// logOpenAICompactKeepaliveCommitted 记录 keepalive 已提交响应后上游返回错误的诊断日志。 +func logOpenAICompactKeepaliveCommitted(ctx context.Context, c *gin.Context, account *Account, resp *http.Response) { + if ctx == nil { + ctx = context.Background() + } + accountID := int64(0) + accountName := "" + if account != nil { + accountID = account.ID + accountName = strings.TrimSpace(account.Name) + } + statusCode := 0 + upstreamRequestID := "" + if resp != nil { + statusCode = resp.StatusCode + upstreamRequestID = strings.TrimSpace(resp.Header.Get("x-request-id")) + } + requestPath := "" + if c != nil && c.Request != nil && c.Request.URL != nil { + requestPath = strings.TrimSpace(c.Request.URL.Path) + } + logger.FromContext(ctx).With( + zap.String("component", "service.openai_gateway"), + zap.Bool("compact_keepalive_committed", true), + zap.Int64("account_id", accountID), + zap.String("account_name", accountName), + zap.Int("upstream_status", statusCode), + zap.String("upstream_request_id", upstreamRequestID), + zap.String("request_path", requestPath), + ).Warn("OpenAI compact non-stream keepalive committed response before upstream error; proxying error without failover") +} + +func openAICompactKeepaliveCommitted(c *gin.Context) bool { + return c != nil && c.Writer != nil && c.Writer.Written() && isOpenAIResponsesCompactPath(c) +} + +func writeOpenAICommittedTransportError(c *gin.Context) { + if c == nil || c.Writer == nil { + return + } + c.Data(http.StatusBadGateway, "application/json; charset=utf-8", openAITransportFailoverBody) +} + func collectOpenAIPassthroughTimeoutHeaders(h http.Header) []string { if h == nil { return nil @@ -3987,9 +4112,17 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( c *gin.Context, originalModel string, mappedModel string, + stopBeforeWrite func(), ) (*openaiNonStreamingResultPassthrough, error) { - body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError) + if stopBeforeWrite == nil { + stopBeforeWrite = func() {} + } + body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, func(c *gin.Context) { + stopBeforeWrite() + openAITooLargeError(c) + }) if err != nil { + stopBeforeWrite() return nil, err } @@ -3998,7 +4131,7 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( // stream=false was requested. Without this conversion the client would // receive raw SSE text or a terminal event with empty output. if isEventStreamResponse(resp.Header) { - return s.handlePassthroughSSEToJSON(resp, c, body, originalModel, mappedModel) + return s.handlePassthroughSSEToJSON(resp, c, body, originalModel, mappedModel, stopBeforeWrite) } usage := &OpenAIUsage{} @@ -4023,6 +4156,7 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( if originalModel != "" && mappedModel != "" && originalModel != mappedModel { body = s.replaceModelInResponseBody(body, mappedModel, originalModel) } + stopBeforeWrite() c.Data(resp.StatusCode, contentType, body) return &openaiNonStreamingResultPassthrough{ OpenAIUsage: usage, @@ -4037,7 +4171,10 @@ func (s *OpenAIGatewayService) handleNonStreamingResponsePassthrough( // response for the passthrough path. It mirrors handleSSEToJSON while // preserving passthrough payloads, except compact-only model remapping may // rewrite model fields back to the original requested model. -func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel string, mappedModel string) (*openaiNonStreamingResultPassthrough, error) { +func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel string, mappedModel string, stopBeforeWrite func()) (*openaiNonStreamingResultPassthrough, error) { + if stopBeforeWrite == nil { + stopBeforeWrite = func() {} + } bodyText := string(body) finalResponse, ok := extractCodexFinalResponse(bodyText) @@ -4068,6 +4205,7 @@ func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c if msg == "" { msg = "Upstream compact response failed" } + stopBeforeWrite() return nil, s.writeOpenAINonStreamingProtocolError(resp, c, msg) } usage = s.parseSSEUsageFromBody(bodyText) @@ -4086,6 +4224,7 @@ func (s *OpenAIGatewayService) handlePassthroughSSEToJSON(resp *http.Response, c contentType = "text/event-stream" } } + stopBeforeWrite() c.Data(resp.StatusCode, contentType, body) return &openaiNonStreamingResultPassthrough{ diff --git a/backend/internal/service/openai_oauth_passthrough_test.go b/backend/internal/service/openai_oauth_passthrough_test.go index b371808066d..ef422307677 100644 --- a/backend/internal/service/openai_oauth_passthrough_test.go +++ b/backend/internal/service/openai_oauth_passthrough_test.go @@ -3,6 +3,8 @@ package service import ( "bytes" "context" + "encoding/json" + "errors" "fmt" "io" "net/http" @@ -32,6 +34,7 @@ type httpUpstreamRecorder struct { resp *http.Response responses []*http.Response err error + delay time.Duration } func (u *httpUpstreamRecorder) Do(req *http.Request, proxyURL string, accountID int64, accountConcurrency int) (*http.Response, error) { @@ -47,6 +50,9 @@ func (u *httpUpstreamRecorder) Do(req *http.Request, proxyURL string, accountID if u.err != nil { return nil, u.err } + if u.delay > 0 { + time.Sleep(u.delay) + } if len(u.responses) > 0 { resp := u.responses[0] u.responses = u.responses[1:] @@ -456,6 +462,182 @@ func TestOpenAIGatewayService_OAuthPassthrough_CompactUsesJSONAndKeepsNonStreami require.Contains(t, rec.Body.String(), `"id":"cmp_123"`) } +func TestOpenAIGatewayService_OAuthPassthrough_CompactNonstreamKeepaliveWritesLeadingNewline(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + originalBody := []byte(`{"model":"gpt-5.1-codex","stream":true,"store":true,"instructions":"local-test-instructions","input":[{"type":"text","text":"compact me"}]}`) + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact-keepalive"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"cmp_keepalive","usage":{"input_tokens":11,"output_tokens":22}}`)), + } + upstream := &httpUpstreamRecorder{resp: resp, delay: 1200 * time.Millisecond} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + ForceCodexCLI: false, + OpenAICompactNonstreamKeepaliveInterval: 1, + }}, + httpUpstream: upstream, + } + account := newOpenAICompactPassthroughTestAccount() + + result, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.NotNil(t, result) + body := rec.Body.Bytes() + require.True(t, bytes.HasPrefix(body, []byte("\n")), "compact keepalive should write a leading blank line") + require.True(t, json.Valid(bytes.TrimSpace(body))) + require.Contains(t, string(body), `"id":"cmp_keepalive"`) +} + +func TestOpenAIGatewayService_OAuthPassthrough_CompactNonstreamKeepaliveDisabledHasNoLeadingWhitespace(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + originalBody := []byte(`{"model":"gpt-5.1-codex","stream":true,"store":true,"instructions":"local-test-instructions","input":[{"type":"text","text":"compact me"}]}`) + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact-no-keepalive"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"cmp_no_keepalive","usage":{"input_tokens":11,"output_tokens":22}}`)), + } + upstream := &httpUpstreamRecorder{resp: resp} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ForceCodexCLI: false}}, + httpUpstream: upstream, + } + account := newOpenAICompactPassthroughTestAccount() + + result, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.NotNil(t, result) + body := rec.Body.Bytes() + require.False(t, bytes.HasPrefix(body, []byte("\n")), "disabled compact keepalive should not prepend whitespace") + require.True(t, json.Valid(bytes.TrimSpace(body))) + require.Contains(t, string(body), `"id":"cmp_no_keepalive"`) +} + +func TestOpenAIGatewayService_OAuthPassthrough_CompactNonstreamKeepalive400BeforeTickUsesExistingErrorHandling(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + originalBody := []byte(`{"model":"gpt-5.1-codex","stream":true,"instructions":"local-test-instructions","input":[{"type":"text","text":"compact me"}]}`) + resp := &http.Response{ + StatusCode: http.StatusBadRequest, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact-400-before"}}, + Body: io.NopCloser(strings.NewReader(`{"error":{"type":"invalid_request_error","message":"bad compact request"}}`)), + } + upstream := &httpUpstreamRecorder{resp: resp} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + ForceCodexCLI: false, + OpenAICompactNonstreamKeepaliveInterval: 1, + }}, + httpUpstream: upstream, + } + account := newOpenAICompactPassthroughTestAccount() + + result, err := svc.Forward(context.Background(), c, account, originalBody) + require.Error(t, err) + require.Nil(t, result) + var failoverErr *UpstreamFailoverError + require.False(t, errors.As(err, &failoverErr)) + require.False(t, bytes.HasPrefix(rec.Body.Bytes(), []byte("\n"))) + require.Contains(t, rec.Body.String(), "bad compact request") +} + +func TestOpenAIGatewayService_OAuthPassthrough_CompactNonstreamKeepaliveCommittedErrorDoesNotFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + logSink, restore := captureStructuredLog(t) + defer restore() + + testCases := []struct { + name string + statusCode int + message string + }{ + { + name: "400_proxy_error_body", + statusCode: http.StatusBadRequest, + message: "bad compact request after keepalive", + }, + { + name: "429_suppress_failover", + statusCode: http.StatusTooManyRequests, + message: "rate limited after keepalive", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + originalBody := []byte(`{"model":"gpt-5.1-codex","stream":true,"instructions":"local-test-instructions","input":[{"type":"text","text":"compact me"}]}`) + resp := &http.Response{ + StatusCode: tc.statusCode, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact-committed"}}, + Body: io.NopCloser(strings.NewReader(fmt.Sprintf(`{"error":{"type":"upstream_error","message":%q}}`, tc.message))), + } + upstream := &httpUpstreamRecorder{resp: resp, delay: 1200 * time.Millisecond} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + ForceCodexCLI: false, + OpenAICompactNonstreamKeepaliveInterval: 1, + }}, + httpUpstream: upstream, + } + account := newOpenAICompactPassthroughTestAccount() + + result, err := svc.Forward(context.Background(), c, account, originalBody) + require.Error(t, err) + require.Nil(t, result) + var failoverErr *UpstreamFailoverError + require.False(t, errors.As(err, &failoverErr), "committed compact keepalive must suppress passthrough failover") + require.True(t, bytes.HasPrefix(rec.Body.Bytes(), []byte("\n"))) + require.Contains(t, rec.Body.String(), tc.message) + require.True(t, logSink.ContainsFieldValue("compact_keepalive_committed", "true")) + }) + } +} + +func newOpenAICompactPassthroughTestAccount() *Account { + return &Account{ + ID: 123, + Name: "acc", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{"access_token": "oauth-token", "chatgpt_account_id": "chatgpt-acc"}, + Extra: map[string]any{"openai_passthrough": true}, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } +} + func TestOpenAIGatewayService_OAuthPassthrough_UpstreamRequestIgnoresClientCancel(t *testing.T) { gin.SetMode(gin.TestMode) diff --git a/deploy/.env.example b/deploy/.env.example index e80663ef1f2..68f90b8e713 100644 --- a/deploy/.env.example +++ b/deploy/.env.example @@ -259,6 +259,8 @@ GATEWAY_OPENAI_HTTP2_ALLOW_PROXY_FALLBACK_TO_HTTP1=true GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=2 GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=60 GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=600 +# OpenAI /responses/compact 非流式空行 keepalive 间隔(秒)。0 表示禁用;非 0 时必须为 5-60。 +GATEWAY_OPENAI_COMPACT_NONSTREAM_KEEPALIVE_INTERVAL=0 # 上游连接池:每主机最大连接数(默认 1024;流式/HTTP1.1 场景可调大,如 2400/4096) GATEWAY_MAX_CONNS_PER_HOST=2048 # 上游连接池:最大空闲连接总数(默认 2560;账号/代理隔离 + 高并发场景可调大) diff --git a/deploy/config.example.yaml b/deploy/config.example.yaml index 2cb65d8feca..c6d58806b5a 100644 --- a/deploy/config.example.yaml +++ b/deploy/config.example.yaml @@ -370,6 +370,9 @@ gateway: # Stream keepalive interval (seconds), 0=disable # 流式 keepalive 间隔(秒),0=禁用 stream_keepalive_interval: 10 + # OpenAI /responses/compact non-stream blank-line keepalive interval (seconds), 0=disable; non-zero must be 5-60 + # OpenAI /responses/compact 非流式空行 keepalive 间隔(秒),0=禁用;非 0 时必须为 5-60 + openai_compact_nonstream_keepalive_interval: 0 # Image stream data interval timeout (seconds), 0=disable; independent from ordinary text streams # 图片流数据间隔超时(秒),0=禁用;独立于普通文本流式 image_stream_data_interval_timeout: 900 diff --git a/deploy/docker-compose.local.yml b/deploy/docker-compose.local.yml index b15be2402d0..8c74731c6c1 100644 --- a/deploy/docker-compose.local.yml +++ b/deploy/docker-compose.local.yml @@ -158,6 +158,7 @@ services: - GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD=${GATEWAY_OPENAI_HTTP2_FALLBACK_ERROR_THRESHOLD:-2} - GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_WINDOW_SECONDS:-60} - GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS=${GATEWAY_OPENAI_HTTP2_FALLBACK_TTL_SECONDS:-600} + - GATEWAY_OPENAI_COMPACT_NONSTREAM_KEEPALIVE_INTERVAL=${GATEWAY_OPENAI_COMPACT_NONSTREAM_KEEPALIVE_INTERVAL:-0} - GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT=${GATEWAY_IMAGE_STREAM_DATA_INTERVAL_TIMEOUT:-900} - GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL=${GATEWAY_IMAGE_STREAM_KEEPALIVE_INTERVAL:-10} - GATEWAY_IMAGE_CONCURRENCY_ENABLED=${GATEWAY_IMAGE_CONCURRENCY_ENABLED:-false} From a4a738871c6d88bcc1cbd212f69265b9a6c30b18 Mon Sep 17 00:00:00 2001 From: Gilbert <109980169+bud-primordium@users.noreply.github.com> Date: Thu, 4 Jun 2026 11:36:26 +0800 Subject: [PATCH 2/2] fix(gateway): extend compact keepalive to non-passthrough OAuth path The initial implementation only covered the passthrough branch, but production traffic showed compact requests also flow through the standard OAuth HTTP forward path. Add keepalive to both Forward and forwardOpenAIPassthrough, with structured logging for diagnostics. --- .../service/openai_gateway_service.go | 83 ++++++++++-- .../service/openai_oauth_passthrough_test.go | 120 +++++++++++++++++- 2 files changed, 191 insertions(+), 12 deletions(-) diff --git a/backend/internal/service/openai_gateway_service.go b/backend/internal/service/openai_gateway_service.go index d926c47211e..079db5edb9b 100644 --- a/backend/internal/service/openai_gateway_service.go +++ b/backend/internal/service/openai_gateway_service.go @@ -443,9 +443,21 @@ func NewOpenAIGatewayService( openAITokenProvider.SetAccountRuntimeBlocker(svc) } svc.logOpenAIWSModeBootstrap() + svc.logOpenAICompactNonstreamKeepaliveBootstrap() return svc } +func (s *OpenAIGatewayService) logOpenAICompactNonstreamKeepaliveBootstrap() { + interval := s.compactNonstreamKeepaliveInterval() + if interval <= 0 { + return + } + logger.L().With( + zap.String("component", "service.openai_gateway"), + zap.Int("interval_seconds", int(interval.Seconds())), + ).Info("OpenAI compact non-stream keepalive enabled") +} + // ResolveChannelMapping 解析渠道级模型映射(代理到 ChannelService) func (s *OpenAIGatewayService) ResolveChannelMapping(ctx context.Context, groupID int64, model string) ChannelMappingResult { if s.channelService == nil { @@ -2982,18 +2994,31 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco } // Send request + stopCompactKeepalive := func() {} + if !reqStream { + stopCompactKeepalive = s.startCompactNonstreamKeepalive(ctx, c) + } + upstreamStart := time.Now() resp, err := s.httpUpstream.Do(upstreamReq, proxyURL, account.ID, account.Concurrency) SetOpsLatencyMs(c, OpsUpstreamLatencyMsKey, time.Since(upstreamStart).Milliseconds()) if err != nil { + stopCompactKeepalive() // Transport-level failure (proxy/DNS/TCP/TLS — no HTTP response). Convert to // a failover so the handler switches to a healthy account, and temporarily // unschedule the account on durable faults (e.g. rejected proxy credentials). - return nil, s.handleOpenAIUpstreamTransportError(ctx, c, account, err, false) + transportErr := s.handleOpenAIUpstreamTransportError(ctx, c, account, err, false) + if openAICompactKeepaliveCommitted(c) { + logOpenAICompactKeepaliveCommitted(ctx, c, account, nil) + writeOpenAICommittedTransportError(c) + return nil, fmt.Errorf("upstream request failed after compact keepalive: %s", sanitizeUpstreamErrorMessage(err.Error())) + } + return nil, transportErr } // Handle error response if resp.StatusCode >= 400 { + stopCompactKeepalive() respBody := s.readUpstreamErrorBody(resp) _ = resp.Body.Close() resp.Body = io.NopCloser(bytes.NewReader(respBody)) @@ -3001,6 +3026,10 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco upstreamMsg := strings.TrimSpace(extractUpstreamErrorMessage(respBody)) upstreamMsg = sanitizeUpstreamErrorMessage(upstreamMsg) upstreamCode := extractUpstreamErrorCode(respBody) + if openAICompactKeepaliveCommitted(c) { + logOpenAICompactKeepaliveCommitted(ctx, c, account, resp) + return s.handleErrorResponse(ctx, resp, c, account, body, billingModel) + } if !httpInvalidEncryptedContentRetryTried && resp.StatusCode == http.StatusBadRequest && upstreamCode == "invalid_encrypted_content" { decoded, decodeErr := ensureReqBody() if decodeErr != nil { @@ -3063,6 +3092,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco imageCount := 0 var imageOutputSizes []string if reqStream { + stopCompactKeepalive() streamResult, err := s.handleStreamingResponse(ctx, resp, c, account, startTime, originalModel, upstreamModel) if err != nil { return nil, err @@ -3073,7 +3103,7 @@ func (s *OpenAIGatewayService) Forward(ctx context.Context, c *gin.Context, acco imageCount = streamResult.imageCount imageOutputSizes = streamResult.imageOutputSizes } else { - nonStreamResult, err := s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, upstreamModel) + nonStreamResult, err := s.handleNonStreamingResponse(ctx, resp, c, account, originalModel, upstreamModel, stopCompactKeepalive) if err != nil { return nil, err } @@ -3686,6 +3716,16 @@ func (s *OpenAIGatewayService) startCompactNonstreamKeepalive(ctx context.Contex if ctx == nil { ctx = context.Background() } + path := "" + if c.Request != nil && c.Request.URL != nil { + path = strings.TrimSpace(c.Request.URL.Path) + } + log := logger.FromContext(ctx).With( + zap.String("component", "service.openai_gateway"), + zap.String("request_path", path), + zap.Int("interval_seconds", int(interval.Seconds())), + ) + log.Info("OpenAI compact non-stream keepalive started") headers := c.Writer.Header() headers.Set("Content-Type", "application/json") @@ -3701,6 +3741,7 @@ func (s *OpenAIGatewayService) startCompactNonstreamKeepalive(ctx context.Contex defer wg.Done() ticker := time.NewTicker(interval) defer ticker.Stop() + flushedLogged := false for { select { case <-stopCh: @@ -3708,8 +3749,15 @@ func (s *OpenAIGatewayService) startCompactNonstreamKeepalive(ctx context.Contex case <-ctx.Done(): return case <-ticker.C: - _, _ = c.Writer.Write([]byte("\n")) + if _, err := c.Writer.Write([]byte("\n")); err != nil { + log.Warn("OpenAI compact non-stream keepalive write failed", zap.Error(err)) + return + } flusher.Flush() + if !flushedLogged { + log.Info("OpenAI compact non-stream keepalive flushed") + flushedLogged = true + } } } }() @@ -5385,9 +5433,14 @@ func openAIUsageFromGJSON(value gjson.Result) (OpenAIUsage, bool) { }, true } -func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, originalModel, mappedModel string) (*openaiNonStreamingResult, error) { - body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, openAITooLargeError) +func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, resp *http.Response, c *gin.Context, account *Account, originalModel, mappedModel string, stopBeforeWrite ...func()) (*openaiNonStreamingResult, error) { + stop := compactStopFunc(stopBeforeWrite...) + body, err := ReadUpstreamResponseBody(resp.Body, s.cfg, c, func(c *gin.Context) { + stop() + openAITooLargeError(c) + }) if err != nil { + stop() return nil, err } @@ -5395,7 +5448,7 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r // Some OpenAI-compatible upstreams (including other sub2api instances) // may return SSE even when stream=false was requested. if isEventStreamResponse(resp.Header) { - return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel) + return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel, stop) } bodyLooksLikeSSE := bytes.Contains(body, []byte("data:")) || bytes.Contains(body, []byte("event:")) @@ -5405,14 +5458,15 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r // positives on JSON responses that coincidentally contain "data:" or // "event:" in their text content. if account.Type == AccountTypeOAuth && bodyLooksLikeSSE { - return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel) + return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel, stop) } usageValue, usageOK := extractOpenAIUsageFromJSONBytes(body) if !usageOK { if bodyLooksLikeSSE { - return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel) + return s.handleSSEToJSON(resp, c, body, originalModel, mappedModel, stop) } + stop() return nil, fmt.Errorf("parse response: invalid json response") } usage := &usageValue @@ -5431,6 +5485,7 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r } } + stop() c.Data(resp.StatusCode, contentType, body) return &openaiNonStreamingResult{ @@ -5442,12 +5497,20 @@ func (s *OpenAIGatewayService) handleNonStreamingResponse(ctx context.Context, r }, nil } +func compactStopFunc(stops ...func()) func() { + if len(stops) == 0 || stops[0] == nil { + return func() {} + } + return stops[0] +} + func isEventStreamResponse(header http.Header) bool { contentType := strings.ToLower(header.Get("Content-Type")) return strings.Contains(contentType, "text/event-stream") } -func (s *OpenAIGatewayService) handleSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string) (*openaiNonStreamingResult, error) { +func (s *OpenAIGatewayService) handleSSEToJSON(resp *http.Response, c *gin.Context, body []byte, originalModel, mappedModel string, stopBeforeWrite ...func()) (*openaiNonStreamingResult, error) { + stop := compactStopFunc(stopBeforeWrite...) bodyText := string(body) finalResponse, ok := extractCodexFinalResponse(bodyText) @@ -5479,6 +5542,7 @@ func (s *OpenAIGatewayService) handleSSEToJSON(resp *http.Response, c *gin.Conte if msg == "" { msg = "Upstream compact response failed" } + stop() return nil, s.writeOpenAINonStreamingProtocolError(resp, c, msg) } usage = s.parseSSEUsageFromBody(bodyText) @@ -5497,6 +5561,7 @@ func (s *OpenAIGatewayService) handleSSEToJSON(resp *http.Response, c *gin.Conte contentType = "text/event-stream" } } + stop() c.Data(resp.StatusCode, contentType, body) return &openaiNonStreamingResult{ diff --git a/backend/internal/service/openai_oauth_passthrough_test.go b/backend/internal/service/openai_oauth_passthrough_test.go index ef422307677..0f01c07e478 100644 --- a/backend/internal/service/openai_oauth_passthrough_test.go +++ b/backend/internal/service/openai_oauth_passthrough_test.go @@ -47,12 +47,12 @@ func (u *httpUpstreamRecorder) Do(req *http.Request, proxyURL string, accountID req.Body = io.NopCloser(bytes.NewReader(b)) } u.requests = append(u.requests, req) - if u.err != nil { - return nil, u.err - } if u.delay > 0 { time.Sleep(u.delay) } + if u.err != nil { + return nil, u.err + } if len(u.responses) > 0 { resp := u.responses[0] u.responses = u.responses[1:] @@ -497,6 +497,55 @@ func TestOpenAIGatewayService_OAuthPassthrough_CompactNonstreamKeepaliveWritesLe require.Contains(t, string(body), `"id":"cmp_keepalive"`) } +func TestOpenAIGatewayService_OAuthCompactNonPassthroughKeepaliveWritesLeadingNewline(t *testing.T) { + gin.SetMode(gin.TestMode) + + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "Codex Desktop/0.135.0-alpha.1") + c.Request.Header.Set("Content-Type", "application/json") + + originalBody := []byte(`{"model":"gpt-5.5","stream":false,"instructions":"local-test-instructions","input":[{"type":"message","role":"user","content":"compact me"}]}`) + resp := &http.Response{ + StatusCode: http.StatusOK, + Header: http.Header{"Content-Type": []string{"application/json"}, "x-request-id": []string{"rid-compact-native-keepalive"}}, + Body: io.NopCloser(strings.NewReader(`{"id":"cmp_native_keepalive","usage":{"input_tokens":11,"output_tokens":22}}`)), + } + upstream := &httpUpstreamRecorder{resp: resp, delay: 1200 * time.Millisecond} + + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + OpenAICompactNonstreamKeepaliveInterval: 1, + }}, + httpUpstream: upstream, + } + account := &Account{ + ID: 124, + Name: "acc-native", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + } + + result, err := svc.Forward(context.Background(), c, account, originalBody) + require.NoError(t, err) + require.NotNil(t, result) + body := rec.Body.Bytes() + require.True(t, bytes.HasPrefix(body, []byte("\n")), "native compact keepalive should write a leading blank line") + require.True(t, json.Valid(bytes.TrimSpace(body))) + require.Contains(t, string(body), `"id":"cmp_native_keepalive"`) + require.NotNil(t, upstream.lastReq) + require.Equal(t, chatgptCodexURL+"/compact", upstream.lastReq.URL.String()) +} + func TestOpenAIGatewayService_OAuthPassthrough_CompactNonstreamKeepaliveDisabledHasNoLeadingWhitespace(t *testing.T) { gin.SetMode(gin.TestMode) @@ -623,6 +672,71 @@ func TestOpenAIGatewayService_OAuthPassthrough_CompactNonstreamKeepaliveCommitte } } +func TestOpenAIGatewayService_OAuthCompactNonstreamKeepaliveCommittedTransportErrorDoesNotFailover(t *testing.T) { + gin.SetMode(gin.TestMode) + + testCases := []struct { + name string + body []byte + account *Account + }{ + { + name: "passthrough", + body: []byte(`{"model":"gpt-5.1-codex","stream":true,"instructions":"local-test-instructions","input":[{"type":"text","text":"compact me"}]}`), + account: newOpenAICompactPassthroughTestAccount(), + }, + { + name: "non_passthrough", + body: []byte(`{"model":"gpt-5.5","stream":false,"instructions":"local-test-instructions","input":[{"type":"message","role":"user","content":"compact me"}]}`), + account: &Account{ + ID: 124, + Name: "acc-native", + Platform: PlatformOpenAI, + Type: AccountTypeOAuth, + Concurrency: 1, + Credentials: map[string]any{ + "access_token": "oauth-token", + "chatgpt_account_id": "chatgpt-acc", + }, + Status: StatusActive, + Schedulable: true, + RateMultiplier: f64p(1), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + rec := httptest.NewRecorder() + c, _ := gin.CreateTestContext(rec) + c.Request = httptest.NewRequest(http.MethodPost, "/v1/responses/compact", bytes.NewReader(nil)) + c.Request.Header.Set("User-Agent", "codex_cli_rs/0.1.0") + c.Request.Header.Set("Content-Type", "application/json") + + upstream := &httpUpstreamRecorder{ + err: errors.New("dial tcp: i/o timeout"), + delay: 1200 * time.Millisecond, + } + svc := &OpenAIGatewayService{ + cfg: &config.Config{Gateway: config.GatewayConfig{ + ForceCodexCLI: false, + OpenAICompactNonstreamKeepaliveInterval: 1, + }}, + httpUpstream: upstream, + } + + result, err := svc.Forward(context.Background(), c, tc.account, tc.body) + require.Error(t, err) + require.Nil(t, result) + var failoverErr *UpstreamFailoverError + require.False(t, errors.As(err, &failoverErr), "committed compact keepalive must suppress transport failover") + body := rec.Body.Bytes() + require.True(t, bytes.HasPrefix(body, []byte("\n"))) + require.Contains(t, string(body), "Upstream request failed") + }) + } +} + func newOpenAICompactPassthroughTestAccount() *Account { return &Account{ ID: 123,