@@ -127,6 +127,7 @@ func (r *Client) StreamPrediction(ctx context.Context, prediction *Prediction) (
127127
128128func (r * Client ) streamPrediction (ctx context.Context , prediction * Prediction , lastEvent * SSEEvent , sseChan chan SSEEvent , errChan chan error ) {
129129 g , ctx := errgroup .WithContext (ctx )
130+ done := make (chan struct {})
130131
131132 url := prediction .URLs ["stream" ]
132133 if url == "" {
@@ -161,8 +162,6 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
161162 return
162163 }
163164
164- done := make (chan struct {})
165-
166165 reader := bufio .NewReader (resp .Body )
167166 var buf bytes.Buffer
168167 lineChan := make (chan []byte )
@@ -211,12 +210,22 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
211210
212211 event := SSEEvent {Type : SSETypeDefault }
213212 if err := event .decode (b ); err != nil {
214- errChan <- err
213+ select {
214+ case errChan <- err :
215+ default :
216+ }
215217 close (done )
216218 return
217219 }
218220
219- sseChan <- event
221+ select {
222+ case sseChan <- event :
223+ case <- done :
224+ return
225+ case <- ctx .Done ():
226+ return
227+ }
228+
220229 if event .Type == SSETypeDone {
221230 close (done )
222231 return
@@ -239,15 +248,11 @@ func (r *Client) streamPrediction(ctx context.Context, prediction *Prediction, l
239248 return
240249 }
241250
242- if errors .Is (err , context .Canceled ) {
243- // Context was canceled, simply return
244- return
245- }
246-
247- select {
248- case errChan <- err :
249- default :
250- // errChan is full or closed
251+ if ! errors .Is (err , context .Canceled ) {
252+ select {
253+ case errChan <- err :
254+ default :
255+ }
251256 }
252257 }
253258 }()
0 commit comments