Skip to content

Commit

Permalink
Make library more closely (still not perfect) follow the EventSource …
Browse files Browse the repository at this point in the history
…spec.
  • Loading branch information
jwalter1-quest committed Sep 17, 2018
1 parent 110da76 commit 998bb67
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 27 deletions.
61 changes: 34 additions & 27 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package sse

import (
"bufio"
"bytes"
"encoding/base64"
"errors"
Expand Down Expand Up @@ -51,17 +50,20 @@ func (c *Client) Subscribe(stream string, handler func(msg *Event)) error {
}
defer resp.Body.Close()

reader := bufio.NewReader(resp.Body)
reader := NewEventReader(resp.Body)

for {
// Read each new line and process the type of event
line, err := reader.ReadBytes('\n')
event, err := reader.ReadEvent()
if err != nil {
return err
}
msg := c.processEvent(line)
if msg != nil {
handler(msg)

if len(event) != 0 {
msg := c.processEvent(event)
if msg != nil {
handler(msg)
}
}
}
}
Expand All @@ -82,27 +84,30 @@ func (c *Client) SubscribeChan(stream string, ch chan *Event) error {
return errors.New("could not connect to stream")
}

reader := bufio.NewReader(resp.Body)
reader := NewEventReader(resp.Body)

c.subscribed[ch] = make(chan bool)

go func() {
for {
// Read each new line and process the type of event
line, err := reader.ReadBytes('\n')
event, err := reader.ReadEvent()
if err != nil {
resp.Body.Close()
close(ch)
return
}
msg := c.processEvent(line)
if msg != nil {
select {
case <-c.subscribed[ch]:
resp.Body.Close()
return
default:
ch <- msg

if len(event) != 0 {
msg := c.processEvent(event)
if msg != nil {
select {
case <-c.subscribed[ch]:
resp.Body.Close()
return
default:
ch <- msg
}
}
}
}
Expand Down Expand Up @@ -163,17 +168,19 @@ func (c *Client) request(stream string) (*http.Response, error) {
func (c *Client) processEvent(msg []byte) *Event {
var e Event

switch h := msg; {
case bytes.Contains(h, headerID):
e.ID = trimHeader(len(headerID), msg)
case bytes.Contains(h, headerData):
e.Data = trimHeader(len(headerData), msg)
case bytes.Contains(h, headerEvent):
e.Event = trimHeader(len(headerEvent), msg)
case bytes.Contains(h, headerError):
e.Error = trimHeader(len(headerError), msg)
default:
return nil
for _, line := range bytes.Split(msg, []byte("\n")) {
switch {
case bytes.Contains(line, headerID):
e.ID = bytes.TrimSpace(append(append(trimHeader(len(headerID), line), e.ID[:]...), byte('\n')))
case bytes.Contains(line, headerData):
e.Data = bytes.TrimSpace(append(append(trimHeader(len(headerData), line), e.Data[:]...), byte('\n')))
case bytes.Contains(line, headerEvent):
e.Event = bytes.TrimSpace(append(append(trimHeader(len(headerEvent), line), e.Event[:]...), byte('\n')))
case bytes.Contains(line, headerError):
e.Error = bytes.TrimSpace(append(append(trimHeader(len(headerError), line), e.Error[:]...), byte('\n')))
default:
return nil
}
}

if len(e.Data) > 0 && c.EncodingBase64 {
Expand Down
55 changes: 55 additions & 0 deletions event.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,65 @@

package sse

import (
"bufio"
"bytes"
"io"
)

// Event holds all of the event source fields
type Event struct {
ID []byte
Data []byte
Event []byte
Error []byte
}

type EventReader struct {
scanner *bufio.Scanner
buffer []byte
idx int
}

func NewEventReader(source io.Reader) *EventReader {
scanner := bufio.NewScanner(source)
split := func(data []byte, atEOF bool) (int, []byte, error) {
if atEOF && len(data) == 0 {
return 0, nil, nil
}

// We have a full data
if i := bytes.Index(data, []byte("\r\n\r\n")); i >= 0 {
return i + 1, data[0:i], nil
}
if i := bytes.Index(data, []byte("\r\r")); i >= 0 {
return i + 1, data[0:i], nil
}
if i := bytes.Index(data, []byte("\n\n")); i >= 0 {
return i + 1, data[0:i], nil
}
// If we're at EOF, we have a final data
if atEOF {
return len(data), data, nil
}
// Request more data.
return 0, nil, nil
}
// Set the split function for the scanning operation.
scanner.Split(split)

return &EventReader{
scanner: scanner,
}
}

func (self *EventReader) ReadEvent() ([]byte, error) {
if self.scanner.Scan() {
event := self.scanner.Bytes()
return event, nil
}
if err := self.scanner.Err(); err != nil {
return nil, err
}
return nil, io.EOF
}

0 comments on commit 998bb67

Please sign in to comment.