Skip to content

Commit

Permalink
Save lastViewedAt and load on start up (42wim#313)
Browse files Browse the repository at this point in the history
This ensures we only replay backlog for what we haven't seen.
  • Loading branch information
hloeung committed Jan 10, 2021
1 parent c8bcb31 commit 657ae57
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 0 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
matterircd
matterircd.toml
matterircd-lastsaved.db
6 changes: 6 additions & 0 deletions matterircd.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ SuffixContext = false
#in your mattermost "word that trigger mentions" notifications.
ShowMentions = false

# Path to file to store last viewed information. This is useful for replying only
# the messages missed.
LastViewedSavePath = "matterircd-lastsaved.db"
# Interval for how often to save last viewed information.
LastViewedSaveInterval = 5m

#############################
##### SLACK EXAMPLE #########
#############################
Expand Down
132 changes: 132 additions & 0 deletions mm-go-irckit/userbridge.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
package irckit

import (
"errors"
"fmt"
"math/rand"
"net"
"os"
"strings"
"sync"
"time"

"encoding/gob"

"github.com/42wim/matterircd/bridge"
"github.com/42wim/matterircd/bridge/mattermost"
"github.com/42wim/matterircd/bridge/slack"
Expand All @@ -25,6 +29,7 @@ type UserBridge struct {
inprogress bool //nolint:structcheck
lastViewedAt map[string]int64 //nolint:structcheck
lastViewedAtMutex sync.RWMutex //nolint:structcheck
lastViewedAtSaved int64 //nolint:structcheck
msgCounter map[string]int //nolint:structcheck
msgLast map[string][2]string //nolint:structcheck
msgLastMutex sync.RWMutex //nolint:structcheck
Expand All @@ -49,6 +54,18 @@ func NewUserBridge(c net.Conn, srv Server, cfg *viper.Viper) *User {
u.msgCounter = make(map[string]int)
u.updateCounter = make(map[string]time.Time)

statePath := u.v.GetString("mattermost.LastViewedSavePath")
if statePath != "" {
lastViewedAt, err := loadLastViewedState(statePath)
if err == nil {
logger.Info("Loaded lastViewedAt from ", lastViewedAt["__LastViewedStateSavedTime__"])
u.lastViewedAt = lastViewedAt
} else {
logger.Warning("Unable to load saved lastViewedAt, using empty values: ", err)
}
u.lastViewedAtSaved = model.GetMillis()
}

// used for login
u.createService("mattermost", "loginservice")
u.createService("slack", "loginservice")
Expand Down Expand Up @@ -153,6 +170,21 @@ func (u *User) handleDirectMessageEvent(event *bridge.DirectMessageEvent) {
u.lastViewedAtMutex.Lock()
defer u.lastViewedAtMutex.Unlock()
u.lastViewedAt[event.ChannelID] = model.GetMillis()
statePath := u.v.GetString(u.br.Protocol() + ".lastviewedsavepath")
if statePath == "" {
return
}
// We only want to save or dump out saved lastViewedAt on new
// messages after X time (default 5mins).
saveInterval := int64(300000)
val, err := time.ParseDuration(u.v.GetString(u.br.Protocol() + ".lastviewedsaveinterval"))
if err == nil {
saveInterval = val.Milliseconds()
}
if u.lastViewedAtSaved < (model.GetMillis() - saveInterval) {
saveLastViewedState(statePath, u.lastViewedAt)
u.lastViewedAtSaved = model.GetMillis()
}
}

func (u *User) handleChannelAddEvent(event *bridge.ChannelAddEvent) {
Expand Down Expand Up @@ -287,6 +319,21 @@ func (u *User) handleChannelMessageEvent(event *bridge.ChannelMessageEvent) {
u.lastViewedAtMutex.Lock()
defer u.lastViewedAtMutex.Unlock()
u.lastViewedAt[event.ChannelID] = model.GetMillis()
statePath := u.v.GetString(u.br.Protocol() + ".lastviewedsavepath")
if statePath == "" {
return
}
// We only want to save or dump out saved lastViewedAt on new
// messages after X time (default 5mins).
saveInterval := int64(300000)
val, err := time.ParseDuration(u.v.GetString(u.br.Protocol() + ".lastviewedsaveinterval"))
if err == nil {
saveInterval = val.Milliseconds()
}
if u.lastViewedAtSaved < (model.GetMillis() - saveInterval) {
saveLastViewedState(statePath, u.lastViewedAt)
u.lastViewedAtSaved = model.GetMillis()
}
}

func (u *User) handleFileEvent(event *bridge.FileEvent) {
Expand Down Expand Up @@ -628,6 +675,23 @@ func (u *User) addUserToChannelWorker(channels <-chan *bridge.ChannelInfo, throt
u.lastViewedAt[brchannel.ID] = model.GetMillis()
u.lastViewedAtMutex.Unlock()
}
u.lastViewedAtMutex.Lock()
defer u.lastViewedAtMutex.Unlock()
statePath := u.v.GetString(u.br.Protocol() + ".lastviewedsavepath")
if statePath == "" {
return
}
// We only want to save or dump out saved lastViewedAt on new
// messages after X time (default 5mins).
saveInterval := int64(300000)
val, err := time.ParseDuration(u.v.GetString(u.br.Protocol() + ".lastviewedsaveinterval"))
if err == nil {
saveInterval = val.Milliseconds()
}
if u.lastViewedAtSaved < (model.GetMillis() - saveInterval) {
saveLastViewedState(statePath, u.lastViewedAt)
u.lastViewedAtSaved = model.GetMillis()
}
}

func (u *User) MsgUser(toUser *User, msg string) {
Expand Down Expand Up @@ -878,3 +942,71 @@ func (u *User) updateLastViewed(channelID string) {
u.br.UpdateLastViewed(channelID)
}()
}

var LastViewedStateFormat = int64(1)

func saveLastViewedState(statePath string, lastViewedAt map[string]int64) error {
f, err := os.Create(statePath)
if err != nil {
logger.Warning("Unable to save lastViewedAt: ", err)
return err
}
defer f.Close()

currentTime := model.GetMillis()

lastViewedAt["__LastViewedStateFormat__"] = LastViewedStateFormat
if _, ok := lastViewedAt["__LastViewedStateCreateTime__"]; !ok {
lastViewedAt["__LastViewedStateCreateTime__"] = currentTime
}
lastViewedAt["__LastViewedStateSavedTime__"] = currentTime
// Simple checksum
lastViewedAt["__LastViewedStateChecksum__"] = lastViewedAt["__LastViewedStateCreateTime__"] ^ currentTime

err = gob.NewEncoder(f).Encode(lastViewedAt)
if err != nil {
logger.Warning("Unable to save lastViewedAt: ", err)
} else {
logger.Debug("Saving lastViewedAt")
}
return err
}

func loadLastViewedState(statePath string) (map[string]int64, error) {
f, err := os.Open(statePath)
if err != nil {
logger.Debug("Unable to load lastViewedAt: ", err)
return nil, err
}
defer f.Close()

var lastViewedAt map[string]int64
err = gob.NewDecoder(f).Decode(&lastViewedAt)
if err != nil {
logger.Warning("Unable to load lastViewedAt: ", err)
return nil, err
}

if lastViewedAt["__LastViewedStateFormat__"] != LastViewedStateFormat {
logger.Warning("State format version mismatch: ", lastViewedAt["__LastViewedStateFormat__"], " vs. ", LastViewedStateFormat)
return nil, errors.New("version mismatch")
}
checksum := lastViewedAt["__LastViewedStateChecksum__"]
createtime := lastViewedAt["__LastViewedStateCreateTime__"]
savedtime := lastViewedAt["__LastViewedStateSavedTime__"]
if createtime^savedtime != checksum {
logger.Warning("Checksum mismatch: (saved checksum, state file creation, last saved time)", checksum, createtime, savedtime)
return nil, errors.New("checksum mismatch")
}

// Check if stale, last saved for town-square older than 30 days. 86400 * 30
// TODO: Make this configurable.
currentTime := model.GetMillis()
townSquare, ok := lastViewedAt["town-square"]
if !ok || townSquare < currentTime-86400*30*1000 {
logger.Warning("File stale? Saved lastViewedAt for ~town-square too old: ", lastViewedAt["town-square"], currentTime)
return nil, errors.New("stale lastViewedAt state file")
}

return lastViewedAt, nil
}

0 comments on commit 657ae57

Please sign in to comment.