package llm_api import ( "bufio" "bytes" "context" "encoding/json" "errors" "fmt" "io" "log" "net/http" "strings" ) func OpenaiStreamChatResponses( ctx context.Context, client *http.Client, baseURL string, apiKey string, model string, reasoningEffort string, temperature *float64, msgs []OpenaiChatMessage, ) (io.ReadCloser, error) { if msgs == nil || len(msgs) == 0 { return nil, errors.New("missing messages") } endpoint := strings.TrimRight(baseURL, "/") + "/responses" body := OpenaiChatResponseReq{ Model: model, Input: msgs, Temperature: temperature, Reasoning: OpenaiResponseReasoning{Effort: reasoningEffort}, Stream: true, } payload, err := json.Marshal(body) if err != nil { return nil, err } pr, pw := io.Pipe() go func() { defer func(pw *io.PipeWriter) { err := pw.Close() if err != nil { log.Println(err) } }(pw) req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(payload)) if err != nil { _ = pw.CloseWithError(err) return } req.Header.Set("Content-Type", "application/json") req.Header.Set("Accept", "text/event-stream") req.Header.Set("Authorization", "Bearer "+apiKey) resp, err := client.Do(req) if err != nil { _ = pw.CloseWithError(err) return } defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { log.Println(err) } }(resp.Body) if resp.StatusCode < 200 || resp.StatusCode > 299 { b, _ := io.ReadAll(resp.Body) _ = pw.CloseWithError(fmt.Errorf("HTTP %d: %s", resp.StatusCode, strings.TrimSpace(string(b)))) return } sc := bufio.NewScanner(resp.Body) sc.Buffer(make([]byte, 0, 64*1024), 2*1024*1024) var dataLines []string flushEvent := func() bool { if len(dataLines) == 0 { _ = pw.Close() return true } data := strings.Join(dataLines, "\n") dataLines = dataLines[:0] var evt OpenaiResponseStreamEvent if err := json.Unmarshal([]byte(data), &evt); err != nil { _ = pw.CloseWithError(fmt.Errorf("failed to unmarshal event: %w, data=%q", err, data)) return false } if evt.Type == "response.output_text.delta" { if _, err := io.WriteString(pw, evt.Delta); err != nil { _ = pw.CloseWithError(err) return false } } return true } for sc.Scan() { line := sc.Text() if line == "" { if ok := flushEvent(); !ok { return } continue } if strings.HasPrefix(line, "data:") { v := strings.TrimSpace(strings.TrimPrefix(line, "data:")) dataLines = append(dataLines, v) } if strings.HasPrefix(line, "event: response.completed") { break } } if len(dataLines) > 0 { _ = flushEvent() return } if err := sc.Err(); err != nil { _ = pw.CloseWithError(err) return } }() return pr, nil }