Skip to content

Commit

Permalink
Merge pull request #35 from jeffreydwalter/master
Browse files Browse the repository at this point in the history
Refactor client to make it handle events correctly (according to the spec.)
  • Loading branch information
purehyperbole authored Sep 18, 2018
2 parents 110da76 + 98b5b7d commit cbf29df
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 41 deletions.
113 changes: 76 additions & 37 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
package sse

import (
"bufio"
"bytes"
"encoding/base64"
"errors"
"log"
"net/http"
"time"

backoff "gopkg.in/cenkalti/backoff.v1"
)
Expand All @@ -19,13 +19,14 @@ var (
headerID = []byte("id:")
headerData = []byte("data:")
headerEvent = []byte("event:")
headerError = []byte("error:")
headerRetry = []byte("retry:")
)

// Client handles an incoming server stream
type Client struct {
URL string
Connection *http.Client
Retry time.Time
Headers map[string]string
EncodingBase64 bool
EventID string
Expand All @@ -51,17 +52,26 @@ func (c *Client) Subscribe(stream string, handler func(msg *Event)) error {
}
defer resp.Body.Close()

reader := bufio.NewReader(resp.Body)
reader := NewEventStreamReader(resp.Body)

for {
// Read each new line and process the type of event
line, err := reader.ReadBytes('\n')
event, err := reader.ReadEvent()
if err != nil {
return err
}
msg := c.processEvent(line)
if msg != nil {
handler(msg)

if len(event) > 0 {
msg := c.processEvent(event)
if msg != nil {
if len(msg.ID) > 0 {
c.EventID = string(msg.ID)
} else {
msg.ID = []byte(c.EventID)
}

handler(msg)
}
}
}
}
Expand All @@ -78,31 +88,42 @@ func (c *Client) SubscribeChan(stream string, ch chan *Event) error {
}

if resp.StatusCode != 200 {
resp.Body.Close()
close(ch)
return errors.New("could not connect to stream")
}

reader := bufio.NewReader(resp.Body)
reader := NewEventStreamReader(resp.Body)

c.subscribed[ch] = make(chan bool)

go func() {
defer resp.Body.Close()
for {
// Read each new line and process the type of event
line, err := reader.ReadBytes('\n')
event, err := reader.ReadEvent()
if err != nil {
resp.Body.Close()
close(ch)
return
}
msg := c.processEvent(line)
if msg != nil {
select {
case <-c.subscribed[ch]:
resp.Body.Close()
return
default:
ch <- msg

if len(event) > 0 {
msg := c.processEvent(event)
if msg != nil {
if len(msg.ID) > 0 {
c.EventID = string(msg.ID)
} else {
msg.ID = []byte(c.EventID)
}

select {
case <-c.subscribed[ch]:
resp.Body.Close()
return
default:
ch <- msg
}
}
}
}
Expand All @@ -124,7 +145,7 @@ func (c *Client) SubscribeChanRaw(ch chan *Event) error {
return c.SubscribeChan("", ch)
}

// Unsubscribe : unsubscribes a channel
// Unsubscribe unsubscribes a channel
func (c *Client) Unsubscribe(ch chan *Event) {
c.subscribed[ch] <- true
close(c.subscribed[ch])
Expand Down Expand Up @@ -163,31 +184,49 @@ func (c *Client) request(stream string) (*http.Response, error) {
func (c *Client) processEvent(msg []byte) *Event {
var e Event

switch h := msg; {
case bytes.Contains(h, headerID):
e.ID = trimHeader(len(headerID), msg)
case bytes.Contains(h, headerData):
e.Data = trimHeader(len(headerData), msg)
case bytes.Contains(h, headerEvent):
e.Event = trimHeader(len(headerEvent), msg)
case bytes.Contains(h, headerError):
e.Error = trimHeader(len(headerError), msg)
default:
return nil
// Normalize the crlf to lf to make it easier to split the lines.
bytes.Replace(msg, []byte("\n\r"), []byte("\n"), -1)
// Split the line by "\n" or "\r", per the spec.
for _, line := range bytes.FieldsFunc(msg, func(r rune) bool { return r == '\n' || r == '\r' }) {
switch {
case bytes.HasPrefix(line, headerID):
e.ID = trimHeader(len(headerID), line)
case bytes.HasPrefix(line, headerData):
// The spec allows for multiple data fields per event, concatenated them with "\n".
e.Data = append(append(trimHeader(len(headerData), line), e.Data[:]...), byte('\n'))
// The spec says that a line that simply contains the string "data" should be treated as a data field with an empty body.
case bytes.Equal(line, bytes.TrimSuffix(headerData, []byte(":"))):
e.Data = append(e.Data, byte('\n'))
case bytes.HasPrefix(line, headerEvent):
e.Event = trimHeader(len(headerEvent), line)
case bytes.HasPrefix(line, headerRetry):
e.Retry = trimHeader(len(headerRetry), line)
default:
return nil
}
}

if len(e.Data) > 0 && c.EncodingBase64 {
buf := make([]byte, base64.StdEncoding.DecodedLen(len(e.Data)))
// Trim the last "\n" per the spec.
e.Data = bytes.TrimSuffix(e.Data, []byte("\n"))

_, err := base64.StdEncoding.Decode(buf, e.Data)
if err != nil {
log.Println(err)
}
if len(e.Data) > 0 {
if c.EncodingBase64 {
buf := make([]byte, base64.StdEncoding.DecodedLen(len(e.Data)))

e.Data = buf
_, err := base64.StdEncoding.Decode(buf, e.Data)
if err != nil {
// TODO: We shouldn't be printing stuff from this library.
// Change this to return an error.
log.Println(err)
}

e.Data = buf
}
return &e
}

return &e
// If we made it here, then the event had a problem, so just return an empty event.
return new(Event)
}

func trimHeader(size int, data []byte) []byte {
Expand Down
60 changes: 59 additions & 1 deletion event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,68 @@

package sse

import (
"bufio"
"bytes"
"io"
)

// Event holds all of the event source fields
type Event struct {
ID []byte
Data []byte
Event []byte
Error []byte
Retry []byte
}

// EventStreamReader scans an io.Reader looking for EventStream messages.
type EventStreamReader struct {
scanner *bufio.Scanner
buffer []byte
idx int
}

// NewEventStreamReader creates an instance of EventStreamReader.
func NewEventStreamReader(eventStream io.Reader) *EventStreamReader {
scanner := bufio.NewScanner(eventStream)
split := func(data []byte, atEOF bool) (int, []byte, error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

// We have a full event payload to parse.
if i := bytes.Index(data, []byte("\r\n\r\n")); i >= 0 {
return i + 1, data[0:i], nil
}
if i := bytes.Index(data, []byte("\r\r")); i >= 0 {
return i + 1, data[0:i], nil
}
if i := bytes.Index(data, []byte("\n\n")); i >= 0 {
return i + 1, data[0:i], nil
}
// If we're at EOF, we have all of the data.
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
// Set the split function for the scanning operation.
scanner.Split(split)

return &EventStreamReader{
scanner: scanner,
}
}

// ReadEvent scans the EventStream for events.
func (self *EventStreamReader) ReadEvent() ([]byte, error) {
if self.scanner.Scan() {
event := self.scanner.Bytes()
return event, nil
}
if err := self.scanner.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}
6 changes: 3 additions & 3 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@ func (s *Server) HTTPHandler(w http.ResponseWriter, r *http.Request) {
if len(ev.Event) > 0 {
fmt.Fprintf(w, "event: %s\n", ev.Event)
}
if len(ev.Error) > 0 {
fmt.Fprintf(w, "error: %s\n", ev.Error)
}
if len(ev.Data) > 0 {
fmt.Fprintf(w, "data: %s\n", ev.Data)
}
if len(ev.Retry) > 0 {
fmt.Fprintf(w, "retry: %s\n", ev.Retry)
}
fmt.Fprint(w, "\n")
flusher.Flush()
}
Expand Down

0 comments on commit cbf29df

Please sign in to comment.