Skip to content

Commit

Permalink
Support saving last viewed at state for multiple users (mattermost) (#…
Browse files Browse the repository at this point in the history
…425)

Persistent last viewed at state was added in PR #313. It is done by
dumping out the state of the current user to a file on
disk. Unfortunately, this is only per user, the current user, even
though matterircd supports multiple users.

This fixes that by switching to using bbolt for storing and saving the
last viewed at state.
  • Loading branch information
hloeung authored Nov 11, 2021
1 parent 592d814 commit 45d6e01
Show file tree
Hide file tree
Showing 45 changed files with 6,515 additions and 89 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.9.0
github.com/stretchr/testify v1.7.0
go.etcd.io/bbolt v1.3.6
golang.org/x/net v0.0.0-20210913180222-943fd674d43e // indirect
golang.org/x/sys v0.0.0-20210910150752-751e447fb3d0 // indirect
golang.org/x/text v0.3.7 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1005,6 +1005,7 @@ github.com/ziutek/mymysql v1.5.4/go.mod h1:LMSpPZ6DbqWFxNCHW77HeMg9I646SAhApZ/wK
gitlab.com/nyarla/go-crypt v0.0.0-20160106005555-d9a5dc2b789b/go.mod h1:T3BPAOm2cqquPa0MKWeNkmOM5RQsRhkrwMWonFMN7fE=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ=
go.etcd.io/bbolt v1.3.6 h1:/ecaJf0sk1l4l6V4awd65v2C3ILy7MSj+s/x1ADCIMU=
go.etcd.io/bbolt v1.3.6/go.mod h1:qXsaaIqmgQH0T+OPdb99Bf+PKfBBQVAdyD6TY9G8XM4=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
Expand Down
26 changes: 25 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"net"
"os"
"strings"
"time"

bolt "go.etcd.io/bbolt"

"github.com/42wim/matterircd/config"
irckit "github.com/42wim/matterircd/mm-go-irckit"
Expand All @@ -22,6 +25,8 @@ var (
githash string
logger *logrus.Entry
v *viper.Viper

LastViewedSaveDB *bolt.DB
)

func main() {
Expand Down Expand Up @@ -98,6 +103,25 @@ func main() {
}()
}

mmLastViewedFile := "matterircd-lastsaved.db"
if statePath := v.GetString("mattermost.LastViewedSaveFile"); statePath != "" {
mmLastViewedFile = statePath
}
db, err := bolt.Open(mmLastViewedFile, 0600, &bolt.Options{Timeout: 1 * time.Second})
// XXX: backwards compatibility with older DB format (pre bbolt).
// TODO: Remove in future releases.
if err != nil && err.Error() == "invalid database" {
logger.Warning("Found old last viewed at state file, renaming to .migrated")
os.Rename(mmLastViewedFile, mmLastViewedFile+".migrated")
// Recall to create DB now that it no longer exists.
db, err = bolt.Open(mmLastViewedFile, 0600, &bolt.Options{Timeout: 1 * time.Second})
}
if err != nil {
logger.Fatal(err)
}
defer db.Close()
LastViewedSaveDB = db

// backwards compatible

if v.GetString("bind") != "" {
Expand Down Expand Up @@ -171,7 +195,7 @@ func start(socket net.Listener) {

logger.Infof("New connection: %s", conn.RemoteAddr())

user := irckit.NewUserBridge(conn, newsrv, v)
user := irckit.NewUserBridge(conn, newsrv, v, LastViewedSaveDB)
err = newsrv.Connect(user)
if err != nil {
logger.Errorf("Failed to join: %v", err)
Expand Down
4 changes: 0 additions & 4 deletions matterircd.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,6 @@ ShowMentions = false
# Path to file to store last viewed information. This is useful for replying only
# the messages missed.
LastViewedSaveFile = "matterircd-lastsaved.db"
# Interval for how often to save last viewed information.
LastViewedSaveInterval = "5m"
# Consider saved last view information stale if last saved older than this time
LastViewedStaleDuration = "30d"

#############################
##### SLACK EXAMPLE #########
Expand Down
149 changes: 65 additions & 84 deletions mm-go-irckit/userbridge.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package irckit

import (
"encoding/binary"
"encoding/gob"
"errors"
"fmt"
Expand All @@ -12,6 +13,8 @@ import (
"sync"
"time"

bolt "go.etcd.io/bbolt"

"github.com/42wim/matterircd/bridge"
"github.com/42wim/matterircd/bridge/mattermost"
mattermost6 "github.com/42wim/matterircd/bridge/mattermost6"
Expand All @@ -37,8 +40,8 @@ type UserBridge struct {
lastViewedAtMutex sync.RWMutex //nolint:structcheck
lastViewedAt map[string]int64 //nolint:structcheck

lastViewedAtSaved int64 //nolint:structcheck
msgCounter map[string]int //nolint:structcheck
lastViewedAtDB *bolt.DB //nolint:structcheck
msgCounter map[string]int //nolint:structcheck

msgLastMutex sync.RWMutex //nolint:structcheck
msgLast map[string][2]string //nolint:structcheck
Expand All @@ -50,7 +53,7 @@ type UserBridge struct {
updateCounter map[string]time.Time //nolint:structcheck
}

func NewUserBridge(c net.Conn, srv Server, cfg *viper.Viper) *User {
func NewUserBridge(c net.Conn, srv Server, cfg *viper.Viper, db *bolt.DB) *User {
u := NewUser(&conn{
Conn: c,
Encoder: irc.NewEncoder(c),
Expand All @@ -59,7 +62,7 @@ func NewUserBridge(c net.Conn, srv Server, cfg *viper.Viper) *User {

u.Srv = srv
u.v = cfg
u.lastViewedAt = u.loadLastViewedAt()
u.lastViewedAtDB = db
u.msgLast = make(map[string][2]string)
u.msgMap = make(map[string]map[string]int)
u.msgCounter = make(map[string]int)
Expand Down Expand Up @@ -100,9 +103,6 @@ func (u *User) handleEventChan() {
case *bridge.ReactionAddEvent, *bridge.ReactionRemoveEvent:
u.handleReactionEvent(e)
case *bridge.LogoutEvent:
if statePath := u.v.GetString(u.br.Protocol() + ".lastviewedsavefile"); statePath != "" {
saveLastViewedAtStateFile(statePath, u.lastViewedAt)
}
return
}
}
Expand Down Expand Up @@ -606,21 +606,44 @@ func (u *User) addUserToChannelWorker(channels <-chan *bridge.ChannelInfo, throt
if since == 0 {
continue
}

logSince := "server"
channame := brchannel.Name
if !brchannel.DM {
channame = fmt.Sprintf("#%s", brchannel.Name)
}

// We used to stored last viewed at if present.
u.lastViewedAtMutex.RLock()
if lastViewedAt, ok := u.lastViewedAt[brchannel.ID]; ok {
// But only use the stored last viewed if it's later than what the server knows.
if lastViewedAt > since {
since = lastViewedAt + 1
var lastViewedAt int64
key := brchannel.ID
u.lastViewedAtDB.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(u.User))
if v := b.Get([]byte(key)); v != nil {
lastViewedAt = int64(binary.LittleEndian.Uint64(v))
}
return nil
})

// XXX: backwards compatibility with older DB format (pre bbolt).
// TODO: Remove in future releases.
if lastViewedAt == 0 {
u.lastViewedAtMutex.RLock()
lastViewedAt = u.lastViewedAt[brchannel.ID]
u.lastViewedAtMutex.RUnlock()
}
u.lastViewedAtMutex.RUnlock()

// But only use the stored last viewed if it's later than what the server knows.
if lastViewedAt > since {
since = lastViewedAt + 1
logSince = "stored"
}

// post everything to the channel you haven't seen yet
postlist := u.br.GetPostsSince(brchannel.ID, since)
if postlist == nil {
// if the channel is not from the primary team id, we can't get posts
if brchannel.TeamID == u.br.GetMe().TeamID {
logger.Errorf("something wrong with getPostsSince for channel %s (%s)", brchannel.ID, brchannel.Name)
logger.Errorf("something wrong with getPostsSince for %s for channel %s (%s)", u.Nick, channame, brchannel.ID)
}
continue
}
Expand Down Expand Up @@ -677,14 +700,12 @@ func (u *User) addUserToChannelWorker(channels <-chan *bridge.ChannelInfo, throt

if showReplayHdr {
date := ts.Format("2006-01-02 15:04:05")
channame := brchannel.Name
if brchannel.DM {
spoof(nick, fmt.Sprintf("\x02Replaying since %s\x0f", date))
spoof(nick, fmt.Sprintf("\x02Replaying msgs since %s\x0f", date))
} else {
spoof("matterircd", fmt.Sprintf("\x02Replaying since %s\x0f", date))
channame = fmt.Sprintf("#%s", brchannel.Name)
spoof("matterircd", fmt.Sprintf("\x02Replaying msgs since %s\x0f", date))
}
logger.Infof("Replaying logs for %s (%s) since %s", brchannel.ID, channame, date)
logger.Infof("Replaying msgs for %s for %s (%s) since %s (%s)", u.Nick, channame, brchannel.ID, date, logSince)
showReplayHdr = false
}

Expand Down Expand Up @@ -967,7 +988,6 @@ func (u *User) loginTo(protocol string) error {
u.br, _, err = mattermost.New(u.v, u.Credentials, u.eventChan, u.addUsersToChannels)
}
}

if err != nil {
return err
}
Expand All @@ -982,6 +1002,18 @@ func (u *User) loginTo(protocol string) error {
u.User = info.User
u.MentionKeys = info.MentionKeys

// XXX: backwards compatibility with older DB format (pre bbolt).
// TODO: Remove in future releases.
u.lastViewedAt = u.loadLastViewedAt()

err = u.lastViewedAtDB.Update(func(tx *bolt.Tx) error {
_, err2 := tx.CreateBucketIfNotExists([]byte(u.User))
return err2
})
if err != nil {
return err
}

return nil
}

Expand All @@ -990,9 +1022,6 @@ func (u *User) logoutFrom(protocol string) error {
logger.Debug("logging out from", protocol)

u.Srv.Logout(u)
if statePath := u.v.GetString(u.br.Protocol() + ".lastviewedsavefile"); statePath != "" {
saveLastViewedAtStateFile(statePath, u.lastViewedAt)
}
return nil
}

Expand Down Expand Up @@ -1110,18 +1139,11 @@ func (u *User) updateLastViewed(channelID string) {
}()
}

// TODO: This has been replaced by bbolt. Remove in future releases.
func (u *User) loadLastViewedAt() map[string]int64 {
statePath := u.v.GetString("mattermost.lastviewedsavefile")
if statePath == "" {
return make(map[string]int64)
}

statePath := u.v.GetString("mattermost.lastviewedsavefile") + ".migrated"
if _, err := os.Stat(statePath); os.IsNotExist(err) {
logger.Debug("No saved lastViewedAt, using empty values")
lastViewedAt := make(map[string]int64)
// We also want to dump out/create the lastViewedAt state file.
saveLastViewedAtStateFile(statePath, lastViewedAt)
return lastViewedAt
return make(map[string]int64)
}

staleDuration := u.v.GetString("mattermost.lastviewedstaleduration")
Expand All @@ -1131,70 +1153,29 @@ func (u *User) loadLastViewedAt() map[string]int64 {
return make(map[string]int64)
}

logger.Warning("Found old last viewed at state file, loading and removing")
logger.Info("Loaded lastViewedAt from ", time.Unix(lastViewedAt["__LastViewedStateSavedTime__"]/1000, 0))
u.lastViewedAtSaved = model.GetMillis()
os.Remove(statePath)

return lastViewedAt
}

const defaultSaveInterval = int64((5 * time.Minute) / time.Millisecond)

func (u *User) saveLastViewedAt(channelID string) {
u.lastViewedAtMutex.Lock()
defer u.lastViewedAtMutex.Unlock()
if channelID != "" {
u.lastViewedAt[channelID] = model.GetMillis()
}
currentTime := make([]byte, 8)
binary.LittleEndian.PutUint64(currentTime, uint64(model.GetMillis()))

statePath := u.v.GetString(u.br.Protocol() + ".lastviewedsavefile")
if statePath == "" {
return
}

// We only want to save or dump out saved lastViewedAt on new
// messages after X time.
var saveInterval int64
val, err := time.ParseDuration(u.v.GetString(u.br.Protocol() + ".lastviewedsaveinterval"))
err := u.lastViewedAtDB.Update(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(u.User))
err := b.Put([]byte(channelID), currentTime)
return err
})
if err != nil {
saveInterval = defaultSaveInterval
} else {
saveInterval = val.Milliseconds()
}
if u.lastViewedAtSaved < (model.GetMillis() - saveInterval) {
saveLastViewedAtStateFile(statePath, u.lastViewedAt)
u.lastViewedAtSaved = model.GetMillis()
logger.Fatal(err)
}
}

const lastViewedStateFormat = int64(1)

func saveLastViewedAtStateFile(statePath string, lastViewedAt map[string]int64) error {
f, err := os.Create(statePath)
if err != nil {
logger.Debug("Unable to save lastViewedAt: ", err)
return err
}
defer f.Close()

currentTime := model.GetMillis()

lastViewedAt["__LastViewedStateFormat__"] = lastViewedStateFormat
if _, ok := lastViewedAt["__LastViewedStateCreateTime__"]; !ok {
lastViewedAt["__LastViewedStateCreateTime__"] = currentTime
}
lastViewedAt["__LastViewedStateSavedTime__"] = currentTime
// Simple checksum
lastViewedAt["__LastViewedStateChecksum__"] = lastViewedAt["__LastViewedStateCreateTime__"] ^ currentTime

logger.Debug("Saving lastViewedAt")

if err := gob.NewEncoder(f).Encode(lastViewedAt); err != nil {
return fmt.Errorf("gob encoding failed: %s", err)
}

return nil
}

const defaultStaleDuration = int64((30 * 24 * time.Hour) / time.Millisecond)

func loadLastViewedAtStateFile(statePath string, staleDuration string) (map[string]int64, error) {
Expand Down
7 changes: 7 additions & 0 deletions vendor/go.etcd.io/bbolt/.gitignore

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions vendor/go.etcd.io/bbolt/.travis.yml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions vendor/go.etcd.io/bbolt/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 45d6e01

Please sign in to comment.