Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 59 additions & 13 deletions internal/translator/openai_gcpvertexai.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ const (
gcpVertexAIBackendError = "GCPVertexAIBackendError"
)

const (
LineFeedSSEDelimiter = "\n\n"
CarriageReturnSSEDelimiter = "\r\r"
CarriageReturnLineFeedSSEDelimiter = "\r\n\r\n"
)

// detectSSEDelimiter detects which SSE delimiter is being used in the data.
// It checks for delimiters in order of preference: CRLF, LF, CR.
// Returns the detected delimiter as a byte slice, or nil if no delimiter is found.
func detectSSEDelimiter(data []byte) []byte {
if bytes.Contains(data, []byte(CarriageReturnLineFeedSSEDelimiter)) {
return []byte(CarriageReturnLineFeedSSEDelimiter)
}
if bytes.Contains(data, []byte(LineFeedSSEDelimiter)) {
return []byte(LineFeedSSEDelimiter)
}
if bytes.Contains(data, []byte(CarriageReturnSSEDelimiter)) {
return []byte(CarriageReturnSSEDelimiter)
}
return nil
}

// gcpVertexAIError represents the structure of GCP Vertex AI error responses.
type gcpVertexAIError struct {
Error gcpVertexAIErrorDetails `json:"error"`
Expand All @@ -50,7 +72,8 @@ func NewChatCompletionOpenAIToGCPVertexAITranslator(modelNameOverride internalap
type openAIToGCPVertexAITranslatorV1ChatCompletion struct {
responseMode geminiResponseMode
modelNameOverride internalapi.ModelNameOverride
stream bool // Track if this is a streaming request.
stream bool // Track if this is a streaming request.
streamDelimiter []byte
bufferedBody []byte // Buffer for incomplete JSON chunks.
requestModel internalapi.RequestModel
toolCallIndex int64
Expand Down Expand Up @@ -204,28 +227,51 @@ func (o *openAIToGCPVertexAITranslatorV1ChatCompletion) handleStreamingResponse(
func (o *openAIToGCPVertexAITranslatorV1ChatCompletion) parseGCPStreamingChunks(body io.Reader) ([]genai.GenerateContentResponse, error) {
var chunks []genai.GenerateContentResponse

// Read buffered body and new input, then split into individual chunks.
bodyBytes, err := io.ReadAll(io.MultiReader(bytes.NewReader(o.bufferedBody), body))
// Read all data from buffered body and new input into memory.
bodyReader := io.MultiReader(bytes.NewReader(o.bufferedBody), body)
allData, err := io.ReadAll(bodyReader)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to read streaming body: %w", err)
}

// If no data, return early.
if len(allData) == 0 {
return chunks, nil
}

// Detect which SSE delimiter is being used and store it for future streaming chunks.
if o.streamDelimiter == nil {
o.streamDelimiter = detectSSEDelimiter(allData)
}

// Split by the detected delimiter.
var parts [][]byte
if o.streamDelimiter != nil {
parts = bytes.Split(allData, o.streamDelimiter)
} else {
parts = [][]byte{allData}
}
lines := bytes.Split(bodyBytes, []byte("\n\n"))

for idx, line := range lines {
// Process all complete chunks (all but the last part).
for _, part := range parts {
part = bytes.TrimSpace(part)
if len(part) == 0 {
continue
}

// Remove "data: " prefix from SSE format if present.
line = bytes.TrimPrefix(line, []byte("data: "))
line := bytes.TrimPrefix(part, []byte("data: "))

// Try to parse as JSON.
var chunk genai.GenerateContentResponse
if err = json.Unmarshal(line, &chunk); err == nil {
if err := json.Unmarshal(line, &chunk); err == nil {
chunks = append(chunks, chunk)
} else if idx == len(lines)-1 {
// If we reach the last line, and it can't be parsed, keep it in the buffer
// for the next call to handle incomplete JSON chunks.
o.bufferedBody = nil
} else {
// Failed to parse, buffer it for the next call.
o.bufferedBody = line
}
// If this is not the last line and json unmarshal fails, we assume it's an invalid chunk and ignore it.
// TODO: Log this as a warning or error once logger is available in this context.
// Ignore parse errors for individual chunks to maintain stream continuity.
}

return chunks, nil
Expand Down
119 changes: 113 additions & 6 deletions internal/translator/openai_gcpvertexai_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1691,7 +1691,7 @@ func TestOpenAIToGCPVertexAITranslatorV1ChatCompletion_parseGCPStreamingChunks(t
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "multiple complete chunks",
Expand Down Expand Up @@ -1725,7 +1725,7 @@ data: {"candidates":[{"content":{"parts":[{"text":" world"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "incomplete chunk at end",
Expand Down Expand Up @@ -1780,7 +1780,54 @@ data: {"candidates":[{"content":{"parts":[{"text":"new"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "invalid JSON chunk in bufferedBody - ignored",
bufferedBody: []byte(`data: {"candidates":[{"content":{"parts":[{"text":"Hello"}]}}]}

data: invalid-json

data: {"candidates":[{"content":{"parts":[{"text":"world"}]}}]}

`),
input: `data: {"candidates":[{"content":{"parts":[{"text":"Foo"}]}}]}`,
wantChunks: []genai.GenerateContentResponse{
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Hello"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "world"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Foo"},
},
},
},
},
},
},
wantBuffered: nil,
},
{
name: "invalid JSON chunk in middle - ignored",
Expand Down Expand Up @@ -1816,14 +1863,14 @@ data: {"candidates":[{"content":{"parts":[{"text":"world"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "empty input",
bufferedBody: nil,
input: "",
wantChunks: nil,
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "chunk without data prefix",
Expand All @@ -1844,7 +1891,67 @@ data: {"candidates":[{"content":{"parts":[{"text":"world"}]}}]}
},
},
},
wantBuffered: []byte(""),
wantBuffered: nil,
},
{
name: "CRLF CRLF delimiter",
bufferedBody: nil,
input: `data: {"candidates":[{"content":{"parts":[{"text":"Hello"}]}}]}` + "\r\n\r\n" + `data: {"candidates":[{"content":{"parts":[{"text":"World"}]}}]}`,
wantChunks: []genai.GenerateContentResponse{
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Hello"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "World"},
},
},
},
},
},
},
wantBuffered: nil,
},
{
name: "CR CR delimiter",
bufferedBody: nil,
input: `data: {"candidates":[{"content":{"parts":[{"text":"Test"}]}}]}` + "\r\r" + `data: {"candidates":[{"content":{"parts":[{"text":"Message"}]}}]}`,
wantChunks: []genai.GenerateContentResponse{
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Test"},
},
},
},
},
},
{
Candidates: []*genai.Candidate{
{
Content: &genai.Content{
Parts: []*genai.Part{
{Text: "Message"},
},
},
},
},
},
},
wantBuffered: nil,
},
}

Expand Down