Skip to content

Commit

Permalink
fix: event stream with carriage return
Browse files Browse the repository at this point in the history
  • Loading branch information
samuel-videau committed Jul 22, 2024
1 parent 8364226 commit 8763a01
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 7 deletions.
4 changes: 2 additions & 2 deletions api/client/event/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ load("@prysm//tools/go:def.bzl", "go_library", "go_test")

go_library(
name = "go_default_library",
srcs = ["event_stream.go"],
srcs = ["event_stream.go", "utils.go"],
importpath = "github.com/prysmaticlabs/prysm/v5/api/client/event",
visibility = ["//visibility:public"],
deps = [
Expand All @@ -15,7 +15,7 @@ go_library(

go_test(
name = "go_default_test",
srcs = ["event_stream_test.go"],
srcs = ["event_stream_test.go", "utils_test.go"],
embed = [":go_default_library"],
deps = [
"//testing/require:go_default_library",
Expand Down
4 changes: 3 additions & 1 deletion api/client/event/event_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
}()
// Create a new scanner to read lines from the response body
scanner := bufio.NewScanner(resp.Body)
// Set the split function for the scanning operation
scanner.Split(scanLinesWithCarriage)

var eventType, data string // Variables to store event type and data

Expand All @@ -113,7 +115,7 @@ func (h *EventStream) Subscribe(eventsChannel chan<- *Event) {
close(eventsChannel)
return
default:
line := scanner.Text() // TODO(13730): scanner does not handle /r and does not fully adhere to https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface
line := scanner.Text()
// Handle the event based on your specific format
if line == "" {
// Empty line indicates the end of an event
Expand Down
9 changes: 5 additions & 4 deletions api/client/event/event_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ func TestEventStream(t *testing.T) {
mux.HandleFunc("/eth/v1/events", func(w http.ResponseWriter, r *http.Request) {
flusher, ok := w.(http.Flusher)
require.Equal(t, true, ok)
for i := 1; i <= 2; i++ {
_, err := fmt.Fprintf(w, "event: head\ndata: data%d\n\n", i)
for i := 1; i <= 3; i++ {
events := [3]string{"event: head\ndata: data%d\n\n", "event: head\rdata: data%d\r\r", "event: head\r\ndata: data%d\r\n\r\n"}
_, err := fmt.Fprintf(w, events[i-1], i)
require.NoError(t, err)
flusher.Flush() // Trigger flush to simulate streaming data
time.Sleep(100 * time.Millisecond) // Simulate delay between events
Expand All @@ -62,7 +63,7 @@ func TestEventStream(t *testing.T) {
// Collect events
var events []*Event

for len(events) != 2 {
for len(events) != 3 {
select {
case event := <-eventsChannel:
log.Info(event)
Expand All @@ -71,7 +72,7 @@ func TestEventStream(t *testing.T) {
}

// Assertions to verify the events content
expectedData := []string{"data1", "data2"}
expectedData := []string{"data1", "data2", "data3"}
for i, event := range events {
if string(event.Data) != expectedData[i] {
t.Errorf("Expected event data %q, got %q", expectedData[i], string(event.Data))
Expand Down
36 changes: 36 additions & 0 deletions api/client/event/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package event

import (
"bytes"
)

// adapted from ScanLines in scan.go to handle carriage return characters as separators
func scanLinesWithCarriage(data []byte, atEOF bool) (advance int, token []byte, err error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}
if i, j := bytes.IndexByte(data, '\n'), bytes.IndexByte(data, '\r'); i >= 0 || j >= 0 {
in := i
// Select the first index of \n or \r or the second index of \r if it is followed by \n
if i < 0 || (i > j && i != j+1 && j >= 0) {
in = j
}

// We have a full newline-terminated line.
return in + 1, dropCR(data[0:in]), nil
}
// If we're at EOF, we have a final, non-terminated line. Return it.
if atEOF {
return len(data), dropCR(data), nil
}
// Request more data.
return 0, nil, nil
}

// dropCR drops a terminal \r from the data.
func dropCR(data []byte) []byte {
if len(data) > 0 && data[len(data)-1] == '\r' {
return data[0 : len(data)-1]
}
return data
}
97 changes: 97 additions & 0 deletions api/client/event/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package event

import (
"bufio"
"bytes"
"testing"

"github.com/prysmaticlabs/prysm/v5/testing/require"
)

func TestScanLinesWithCarriage(t *testing.T) {
testCases := []struct {
name string
input string
expected []string
}{
{
name: "LF line endings",
input: "line1\nline2\nline3",
expected: []string{"line1", "line2", "line3"},
},
{
name: "CR line endings",
input: "line1\rline2\rline3",
expected: []string{"line1", "line2", "line3"},
},
{
name: "CRLF line endings",
input: "line1\r\nline2\r\nline3",
expected: []string{"line1", "line2", "line3"},
},
{
name: "Mixed line endings",
input: "line1\nline2\rline3\r\nline4",
expected: []string{"line1", "line2", "line3", "line4"},
},
{
name: "Empty lines",
input: "line1\n\nline2\r\rline3",
expected: []string{"line1", "", "line2", "", "line3"},
},
{
name: "Empty lines 2",
input: "line1\n\rline2\n\rline3",
expected: []string{"line1", "", "line2", "", "line3"},
},
{
name: "No line endings",
input: "single line without ending",
expected: []string{"single line without ending"},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
scanner := bufio.NewScanner(bytes.NewReader([]byte(tc.input)))
scanner.Split(scanLinesWithCarriage)

var lines []string
for scanner.Scan() {
lines = append(lines, scanner.Text())
}

require.NoError(t, scanner.Err())
require.Equal(t, len(tc.expected), len(lines), "Number of lines does not match")
for i, line := range lines {
require.Equal(t, tc.expected[i], line, "Line %d does not match", i)
}
})
}
}

// TestScanLinesWithCarriageEdgeCases tests edge cases and potential error scenarios
func TestScanLinesWithCarriageEdgeCases(t *testing.T) {
t.Run("Empty input", func(t *testing.T) {
scanner := bufio.NewScanner(bytes.NewReader([]byte("")))
scanner.Split(scanLinesWithCarriage)
require.Equal(t, scanner.Scan(), false)
require.NoError(t, scanner.Err())
})

t.Run("Very long line", func(t *testing.T) {
longLine := bytes.Repeat([]byte("a"), bufio.MaxScanTokenSize+1)
scanner := bufio.NewScanner(bytes.NewReader(longLine))
scanner.Split(scanLinesWithCarriage)
require.Equal(t, scanner.Scan(), false)
require.NotNil(t, scanner.Err())
})

t.Run("Line ending at max token size", func(t *testing.T) {
input := append(bytes.Repeat([]byte("a"), bufio.MaxScanTokenSize-1), '\n')
scanner := bufio.NewScanner(bytes.NewReader(input))
scanner.Split(scanLinesWithCarriage)
require.Equal(t, scanner.Scan(), true)
require.Equal(t, string(bytes.Repeat([]byte("a"), bufio.MaxScanTokenSize-1)), scanner.Text())
})
}

0 comments on commit 8763a01

Please sign in to comment.