Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit b20abc4

Browse files
committedMar 5, 2025·
fix: stream handle
1 parent 74d6449 commit b20abc4

File tree

2 files changed

+46
-11
lines changed

2 files changed

+46
-11
lines changed
 

‎stream_reader.go‎

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,12 @@ import (
1111
)
1212

1313
var (
14-
headerData = []byte("data: ")
15-
errorPrefix = []byte(`data: {"error":`)
14+
dataField = []byte("data")
15+
errorPrefix = []byte(`{"error":`)
16+
)
17+
18+
const (
19+
splitParts = 2
1620
)
1721

1822
type streamable interface {
@@ -55,13 +59,14 @@ func (stream *streamReader[T]) RecvRaw() ([]byte, error) {
5559
//nolint:gocognit
5660
func (stream *streamReader[T]) processLines() ([]byte, error) {
5761
var (
58-
emptyMessagesCount uint
59-
hasErrorPrefix bool
62+
emptyMessagesCount uint
63+
dataFieldNotFound bool
64+
valueHasErrorPrefix bool
6065
)
6166

6267
for {
6368
rawLine, readErr := stream.reader.ReadBytes('\n')
64-
if readErr != nil || hasErrorPrefix {
69+
if readErr != nil || valueHasErrorPrefix {
6570
respErr := stream.unmarshalError()
6671
if respErr != nil {
6772
return nil, fmt.Errorf("error, %w", respErr.Error)
@@ -70,12 +75,28 @@ func (stream *streamReader[T]) processLines() ([]byte, error) {
7075
}
7176

7277
noSpaceLine := bytes.TrimSpace(rawLine)
73-
if bytes.HasPrefix(noSpaceLine, errorPrefix) {
74-
hasErrorPrefix = true
78+
79+
var value []byte
80+
81+
split := bytes.SplitN(noSpaceLine, []byte(":"), splitParts)
82+
83+
if len(split) != splitParts || !bytes.Equal(split[0], dataField) {
84+
dataFieldNotFound = true
85+
} else {
86+
value = split[1]
87+
88+
if bytes.HasPrefix(value, []byte(" ")) {
89+
value = value[1:]
90+
}
91+
92+
if bytes.HasPrefix(value, errorPrefix) {
93+
valueHasErrorPrefix = true
94+
}
7595
}
76-
if !bytes.HasPrefix(noSpaceLine, headerData) || hasErrorPrefix {
77-
if hasErrorPrefix {
78-
noSpaceLine = bytes.TrimPrefix(noSpaceLine, headerData)
96+
97+
if dataFieldNotFound || valueHasErrorPrefix {
98+
if valueHasErrorPrefix {
99+
noSpaceLine = value
79100
}
80101
writeErr := stream.errAccumulator.Write(noSpaceLine)
81102
if writeErr != nil {
@@ -85,11 +106,12 @@ func (stream *streamReader[T]) processLines() ([]byte, error) {
85106
if emptyMessagesCount > stream.emptyMessagesLimit {
86107
return nil, ErrTooManyEmptyStreamMessages
87108
}
109+
dataFieldNotFound = false
88110

89111
continue
90112
}
91113

92-
noPrefixLine := bytes.TrimPrefix(noSpaceLine, headerData)
114+
noPrefixLine := value
93115
if string(noPrefixLine) == "[DONE]" {
94116
stream.isFinished = true
95117
return nil, io.EOF

‎stream_reader_test.go‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,16 @@ func TestStreamReaderRecvRaw(t *testing.T) {
7676
t.Fatalf("Did not return raw line: %v", string(rawLine))
7777
}
7878
}
79+
80+
func TestStreamReaderRecvRawWithNoSpaceInFieldAndValue(t *testing.T) {
81+
stream := &streamReader[ChatCompletionStreamResponse]{
82+
reader: bufio.NewReader(bytes.NewReader([]byte("data:{\"key\": \"value\"}\n"))),
83+
}
84+
rawLine, err := stream.RecvRaw()
85+
if err != nil {
86+
t.Fatalf("Did not return raw line: %v", err)
87+
}
88+
if !bytes.Equal(rawLine, []byte("{\"key\": \"value\"}")) {
89+
t.Fatalf("Did not return raw line: %v", string(rawLine))
90+
}
91+
}

0 commit comments

Comments
 (0)
Please sign in to comment.