Skip to content

Commit

Permalink
Add since=all; make poll=1 default to since=all
Browse files Browse the repository at this point in the history
  • Loading branch information
Philipp Heckel committed Nov 8, 2021
1 parent 43c9a92 commit d453db8
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 14 deletions.
2 changes: 1 addition & 1 deletion server/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

type cache interface {
AddMessage(m *message) error
Messages(topic string, since time.Time) ([]*message, error)
Messages(topic string, since sinceTime) ([]*message, error)
MessageCount(topic string) (int, error)
Topics() (map[string]*topic, error)
Prune(keep time.Duration) error
Expand Down
4 changes: 2 additions & 2 deletions server/cache_mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func (s *memCache) AddMessage(m *message) error {
return nil
}

func (s *memCache) Messages(topic string, since time.Time) ([]*message, error) {
func (s *memCache) Messages(topic string, since sinceTime) ([]*message, error) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.messages[topic]; !ok {
Expand All @@ -38,7 +38,7 @@ func (s *memCache) Messages(topic string, since time.Time) ([]*message, error) {
messages := make([]*message, 0) // copy!
for _, m := range s.messages[topic] {
msgTime := time.Unix(m.Time, 0)
if msgTime == since || msgTime.After(since) {
if msgTime == since.Time() || msgTime.After(since.Time()) {
messages = append(messages, m)
}
}
Expand Down
4 changes: 2 additions & 2 deletions server/cache_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func (c *sqliteCache) AddMessage(m *message) error {
return err
}

func (c *sqliteCache) Messages(topic string, since time.Time) ([]*message, error) {
rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Unix())
func (c *sqliteCache) Messages(topic string, since sinceTime) ([]*message, error) {
rows, err := c.db.Query(selectMessagesSinceTimeQuery, topic, since.Time().Unix())
if err != nil {
return nil, err
}
Expand Down
45 changes: 37 additions & 8 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,25 @@ type indexPage struct {
CacheDuration string
}

type sinceTime time.Time

func (t sinceTime) IsAll() bool {
return t == sinceAllMessages
}

func (t sinceTime) IsNone() bool {
return t == sinceNoMessages
}

func (t sinceTime) Time() time.Time {
return time.Time(t)
}

var (
sinceAllMessages = sinceTime(time.Unix(0, 0))
sinceNoMessages = sinceTime(time.Unix(1, 0))
)

const (
messageLimit = 512
)
Expand Down Expand Up @@ -318,8 +337,8 @@ func (s *Server) handleSubscribe(w http.ResponseWriter, r *http.Request, v *visi
}
}

func (s *Server) sendOldMessages(t *topic, since time.Time, sub subscriber) error {
if since.IsZero() {
func (s *Server) sendOldMessages(t *topic, since sinceTime, sub subscriber) error {
if since.IsNone() {
return nil
}
messages, err := s.cache.Messages(t.id, since)
Expand All @@ -334,17 +353,27 @@ func (s *Server) sendOldMessages(t *topic, since time.Time, sub subscriber) erro
return nil
}

func parseSince(r *http.Request) (time.Time, error) {
// parseSince returns a timestamp identifying the time span from which cached messages should be received.
//
// Values in the "since=..." parameter can be either a unix timestamp or a duration (e.g. 12h), or
// "all" for all messages.
func parseSince(r *http.Request) (sinceTime, error) {
if !r.URL.Query().Has("since") {
return time.Time{}, nil
if r.URL.Query().Has("poll") {
return sinceAllMessages, nil
}
return sinceNoMessages, nil
}
if r.URL.Query().Get("since") == "all" {
return sinceAllMessages, nil
}
if since, err := strconv.ParseInt(r.URL.Query().Get("since"), 10, 64); err == nil {
return time.Unix(since, 0), nil
if s, err := strconv.ParseInt(r.URL.Query().Get("since"), 10, 64); err == nil {
return sinceTime(time.Unix(s, 0)), nil
}
if d, err := time.ParseDuration(r.URL.Query().Get("since")); err == nil {
return time.Now().Add(-1 * d), nil
return sinceTime(time.Now().Add(-1 * d)), nil
}
return time.Time{}, errHTTPBadRequest
return sinceNoMessages, errHTTPBadRequest
}

func (s *Server) handleOptions(w http.ResponseWriter, r *http.Request) error {
Expand Down
2 changes: 1 addition & 1 deletion server/static/js/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ const test = (topic) => {
};

const fetchCachedMessages = async (topic) => {
const topicJsonUrl = `/${topic}/json?poll=1&since=12h`; // Poll!
const topicJsonUrl = `/${topic}/json?poll=1`; // Poll!
for await (let line of makeTextFileLineIterator(topicJsonUrl)) {
const message = JSON.parse(line);
topics[topic]['messages'].push(message);
Expand Down

0 comments on commit d453db8

Please sign in to comment.