Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 40 additions & 14 deletions internal/extproc/translator/openai_gcpvertexai.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package translator

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
Expand All @@ -27,6 +28,36 @@ const (
gcpVertexAIBackendError = "GCPVertexAIBackendError"
)

const (
LineFeedSSEDelimiter = "\n\n"
CarriageReturnSSEDelimiter = "\r\r"
CarriageReturnLineFeedSSEDelimiter = "\r\n\r\n"
)

func sseSplitFunc(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
// Check for \r\n\r\n (longest delimiter first)
if i := bytes.Index(data, []byte(CarriageReturnLineFeedSSEDelimiter)); i >= 0 {
return i + len(CarriageReturnLineFeedSSEDelimiter), data[0:i], nil
}
// Check for \n\n
if i := bytes.Index(data, []byte(LineFeedSSEDelimiter)); i >= 0 {
return i + len(LineFeedSSEDelimiter), data[0:i], nil
}
// Check for \r\r
if i := bytes.Index(data, []byte(CarriageReturnSSEDelimiter)); i >= 0 {
return i + len(CarriageReturnSSEDelimiter), data[0:i], nil
}
// If at EOF, return remaining data
if atEOF {
return len(data), data, nil
}
// Request more data
return 0, nil, nil
}

// gcpVertexAIError represents the structure of GCP Vertex AI error responses.
type gcpVertexAIError struct {
Error gcpVertexAIErrorDetails `json:"error"`
Expand Down Expand Up @@ -195,7 +226,7 @@ func (o *openAIToGCPVertexAITranslatorV1ChatCompletion) handleStreamingResponse(
}
sseChunkBuf.WriteString("data: ")
sseChunkBuf.Write(chunkBytes)
sseChunkBuf.WriteString("\n\n")
sseChunkBuf.WriteString(LineFeedSSEDelimiter)

if span != nil {
span.RecordResponseChunk(openAIChunk)
Expand All @@ -218,27 +249,22 @@ func (o *openAIToGCPVertexAITranslatorV1ChatCompletion) parseGCPStreamingChunks(
var chunks []genai.GenerateContentResponse

// Read buffered body and new input, then split into individual chunks.
bodyBytes, err := io.ReadAll(io.MultiReader(bytes.NewReader(o.bufferedBody), body))
if err != nil {
return nil, err
}
lines := bytes.Split(bodyBytes, []byte("\n\n"))
bodyReader := io.MultiReader(bytes.NewReader(o.bufferedBody), body)
scanner := bufio.NewScanner(bodyReader)
scanner.Split(sseSplitFunc)

for idx, line := range lines {
for scanner.Scan() {
// Remove "data: " prefix from SSE format if present.
line = bytes.TrimPrefix(line, []byte("data: "))
line := bytes.TrimPrefix(scanner.Bytes(), []byte("data: "))

// Try to parse as JSON.
var chunk genai.GenerateContentResponse
if err = json.Unmarshal(line, &chunk); err == nil {
if err := json.Unmarshal(line, &chunk); err == nil {
chunks = append(chunks, chunk)
} else if idx == len(lines)-1 {
// If we reach the last line, and it can't be parsed, keep it in the buffer
// for the next call to handle incomplete JSON chunks.
o.bufferedBody = nil
} else {
o.bufferedBody = line
}
// If this is not the last line and json unmarshal fails, we assume it's an invalid chunk and ignore it.
// TODO: Log this as a warning or error once logger is available in this context.
}

return chunks, nil
Expand Down
190 changes: 184 additions & 6 deletions internal/extproc/translator/openai_gcpvertexai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package translator

import (
"bufio"
"bytes"
"encoding/json"
"slices"
Expand Down Expand Up @@ -1809,7 +1810,7 @@ func TestOpenAIToGCPVertexAITranslatorV1ChatCompletion_parseGCPStreamingChunks(t
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "multiple complete chunks",
Expand Down Expand Up @@ -1843,7 +1844,7 @@ data: {"candidates":[{"content":{"parts":[{"text":" world"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "incomplete chunk at end",
Expand Down Expand Up @@ -1898,7 +1899,54 @@ data: {"candidates":[{"content":{"parts":[{"text":"new"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "invalid JSON chunk in bufferedBody - ignored",
bufferedBody: []byte(`data: {"candidates":[{"content":{"parts":[{"text":"Hello"}]}}]}

data: invalid-json

data: {"candidates":[{"content":{"parts":[{"text":"world"}]}}]}

`),
input: `data: {"candidates":[{"content":{"parts":[{"text":"Foo"}]}}]}`,
wantChunks: []genai.GenerateContentResponse{
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Hello"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "world"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Foo"},
},
},
},
},
},
},
wantBuffered: nil,
},
{
name: "invalid JSON chunk in middle - ignored",
Expand Down Expand Up @@ -1934,14 +1982,14 @@ data: {"candidates":[{"content":{"parts":[{"text":"world"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "empty input",
bufferedBody: nil,
input: "",
wantChunks: nil,
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "chunk without data prefix",
Expand All @@ -1962,7 +2010,67 @@ data: {"candidates":[{"content":{"parts":[{"text":"world"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "CRLF CRLF delimiter",
bufferedBody: nil,
input: `data: {"candidates":[{"content":{"parts":[{"text":"Hello"}]}}]}` + "\r\n\r\n" + `data: {"candidates":[{"content":{"parts":[{"text":"World"}]}}]}`,
wantChunks: []genai.GenerateContentResponse{
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Hello"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "World"},
},
},
},
},
},
},
wantBuffered: nil,
},
{
name: "CR CR delimiter",
bufferedBody: nil,
input: `data: {"candidates":[{"content":{"parts":[{"text":"Test"}]}}]}` + "\r\r" + `data: {"candidates":[{"content":{"parts":[{"text":"Message"}]}}]}`,
wantChunks: []genai.GenerateContentResponse{
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Test"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Message"},
},
},
},
},
},
},
wantBuffered: nil,
},
}

Expand Down Expand Up @@ -1995,6 +2103,76 @@ data: {"candidates":[{"content":{"parts":[{"text":"world"}]}}]}
}
}

func TestSSESplitFunc(t *testing.T) {
tests := []struct {
name string
data string
atEOF bool
wantTokens []string
}{
{
name: "CRLF CRLF delimiter",
data: "event: chunk\r\ndata: {\"key\": \"value\"}\r\n\r\nevent: done\r\ndata: [DONE]\r\n\r\n",
atEOF: true,
wantTokens: []string{"event: chunk\r\ndata: {\"key\": \"value\"}", "event: done\r\ndata: [DONE]"},
},
{
name: "LF LF delimiter",
data: "event: chunk\ndata: {\"key\": \"value\"}\n\nevent: done\ndata: [DONE]\n\n",
atEOF: true,
wantTokens: []string{"event: chunk\ndata: {\"key\": \"value\"}", "event: done\ndata: [DONE]"},
},
{
name: "CR CR delimiter",
data: "event: chunk\rdata: {\"key\": \"value\"}\r\revent: done\rdata: [DONE]\r\r",
atEOF: true,
wantTokens: []string{"event: chunk\rdata: {\"key\": \"value\"}", "event: done\rdata: [DONE]"},
},
{
name: "incomplete data at EOF",
data: "incomplete chunk data",
atEOF: true,
wantTokens: []string{"incomplete chunk data"},
},
{
name: "incomplete data not at EOF",
data: "incomplete chunk data",
atEOF: false,
wantTokens: []string{"incomplete chunk data"}, // scanner will eventually hit EOF and return the data
},
{
name: "empty data at EOF",
data: "",
atEOF: true,
wantTokens: nil,
},
{
name: "empty data not at EOF",
data: "",
atEOF: false,
wantTokens: nil,
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
scanner := bufio.NewScanner(strings.NewReader(tc.data))
scanner.Split(sseSplitFunc)

var gotTokens []string
for scanner.Scan() {
gotTokens = append(gotTokens, string(scanner.Bytes()))
}

if diff := cmp.Diff(tc.wantTokens, gotTokens); diff != "" {
t.Errorf("tokens mismatch (-want +got):\n%s", diff)
}

require.NoError(t, scanner.Err())
})
}
}

func TestOpenAIToGCPVertexAITranslatorV1ChatCompletion_ResponseError(t *testing.T) {
tests := []struct {
name string
Expand Down
Loading