Skip to content

Commit

Permalink
Implement ratelimiting (matrix). Fixes #1238 (#1326)
Browse files Browse the repository at this point in the history
  • Loading branch information
42wim authored Dec 6, 2020
1 parent 8eba2d3 commit 2d3c26a
Show file tree
Hide file tree
Showing 2 changed files with 169 additions and 51 deletions.
32 changes: 32 additions & 0 deletions bridge/matrix/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,35 @@ func (b *Bmatrix) getAvatarURL(sender string) string {

return url
}

// handleRatelimit handles the ratelimit errors and return if we're ratelimited and the amount of time to sleep
func (b *Bmatrix) handleRatelimit(err error) (time.Duration, bool) {
httpErr := handleError(err)
if httpErr.Errcode != "M_LIMIT_EXCEEDED" {
return 0, false
}

b.Log.Debugf("ratelimited: %s", httpErr.Err)
b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before retrying", httpErr.RetryAfterMs/1000)

return time.Duration(httpErr.RetryAfterMs) * time.Millisecond, true
}

// retry function will check if we're ratelimited and retries again when backoff time expired
// returns original error if not 429 ratelimit
func (b *Bmatrix) retry(f func() error) error {
b.rateMutex.Lock()
defer b.rateMutex.Unlock()

for {
if err := f(); err != nil {
if backoff, ok := b.handleRatelimit(err); ok {
time.Sleep(backoff)
} else {
return err
}
} else {
return nil
}
}
}
188 changes: 137 additions & 51 deletions bridge/matrix/matrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Bmatrix struct {
UserID string
NicknameMap map[string]NicknameCacheEntry
RoomMap map[string]string
rateMutex sync.RWMutex
sync.RWMutex
*bridge.Config
}
Expand Down Expand Up @@ -99,25 +100,18 @@ func (b *Bmatrix) Disconnect() error {
}

func (b *Bmatrix) JoinChannel(channel config.ChannelInfo) error {
retry:
resp, err := b.mc.JoinRoom(channel.Name, "", nil)
if err != nil {
httpErr := handleError(err)
if httpErr.Errcode == "M_LIMIT_EXCEEDED" {
b.Log.Infof("getting ratelimited by matrix, sleeping approx %d seconds before joining %s", httpErr.RetryAfterMs/1000, channel.Name)
time.Sleep((time.Duration(httpErr.RetryAfterMs) * time.Millisecond))

goto retry
return b.retry(func() error {
resp, err := b.mc.JoinRoom(channel.Name, "", nil)
if err != nil {
return err
}

return err
}
b.Lock()
b.RoomMap[resp.RoomID] = channel.Name
b.Unlock()

b.Lock()
b.RoomMap[resp.RoomID] = channel.Name
b.Unlock()

return nil
return nil
})
}

func (b *Bmatrix) Send(msg config.Message) (string, error) {
Expand All @@ -135,29 +129,56 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
Body: username.plain + msg.Text,
FormattedBody: username.formatted + msg.Text,
}
resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
if err != nil {
return "", err
}
return resp.EventID, err

msgID := ""

err := b.retry(func() error {
resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)
if err != nil {
return err
}

msgID = resp.EventID

return err
})

return msgID, err
}

// Delete message
if msg.Event == config.EventMsgDelete {
if msg.ID == "" {
return "", nil
}
resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{})
if err != nil {
return "", err
}
return resp.EventID, err

msgID := ""

err := b.retry(func() error {
resp, err := b.mc.RedactEvent(channel, msg.ID, &matrix.ReqRedact{})
if err != nil {
return err
}

msgID = resp.EventID

return err
})

return msgID, err
}

// Upload a file if it exists
if msg.Extra != nil {
for _, rmsg := range helper.HandleExtra(&msg, b.General) {
if _, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text); err != nil {
rmsg := rmsg

err := b.retry(func() error {
_, err := b.mc.SendText(channel, rmsg.Username+rmsg.Text)

return err
})
if err != nil {
b.Log.Errorf("sendText failed: %s", err)
}
}
Expand Down Expand Up @@ -187,7 +208,12 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
EventID: msg.ID,
Type: "m.replace",
}
_, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg)

err := b.retry(func() error {
_, err := b.mc.SendMessageEvent(channel, "m.room.message", rmsg)

return err
})
if err != nil {
return "", err
}
Expand All @@ -202,26 +228,58 @@ func (b *Bmatrix) Send(msg config.Message) (string, error) {
Body: username.plain + msg.Text,
FormattedBody: username.formatted + msg.Text,
}
resp, err := b.mc.SendMessageEvent(channel, "m.room.message", m)

var (
resp *matrix.RespSendEvent
err error
)

err = b.retry(func() error {
resp, err = b.mc.SendMessageEvent(channel, "m.room.message", m)

return err
})
if err != nil {
return "", err
}

return resp.EventID, err
}

if b.GetBool("HTMLDisable") {
resp, err := b.mc.SendText(channel, username.plain+msg.Text)
var (
resp *matrix.RespSendEvent
err error
)

err = b.retry(func() error {
resp, err = b.mc.SendText(channel, username.plain+msg.Text)

return err
})
if err != nil {
return "", err
}

return resp.EventID, err
}

// Post normal message with HTML support (eg riot.im)
resp, err := b.mc.SendFormattedText(channel, username.plain+msg.Text, username.formatted+helper.ParseMarkdown(msg.Text))
var (
resp *matrix.RespSendEvent
err error
)

err = b.retry(func() error {
resp, err = b.mc.SendFormattedText(channel, username.plain+msg.Text,
username.formatted+helper.ParseMarkdown(msg.Text))

return err
})
if err != nil {
return "", err
}

return resp.EventID, err
}

Expand Down Expand Up @@ -420,13 +478,25 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
sp := strings.Split(fi.Name, ".")
mtype := mime.TypeByExtension("." + sp[len(sp)-1])
// image and video uploads send no username, we have to do this ourself here #715
_, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment)
err := b.retry(func() error {
_, err := b.mc.SendFormattedText(channel, username.plain+fi.Comment, username.formatted+fi.Comment)

return err
})
if err != nil {
b.Log.Errorf("file comment failed: %#v", err)
}

b.Log.Debugf("uploading file: %s %s", fi.Name, mtype)
res, err := b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))

var res *matrix.RespMediaUpload

err = b.retry(func() error {
res, err = b.mc.UploadToContentRepo(content, mtype, int64(len(*fi.Data)))

return err
})

if err != nil {
b.Log.Errorf("file upload failed: %#v", err)
return
Expand All @@ -435,40 +505,56 @@ func (b *Bmatrix) handleUploadFile(msg *config.Message, channel string, fi *conf
switch {
case strings.Contains(mtype, "video"):
b.Log.Debugf("sendVideo %s", res.ContentURI)
_, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendVideo(channel, fi.Name, res.ContentURI)

return err
})
if err != nil {
b.Log.Errorf("sendVideo failed: %#v", err)
}
case strings.Contains(mtype, "image"):
b.Log.Debugf("sendImage %s", res.ContentURI)
_, err = b.mc.SendImage(channel, fi.Name, res.ContentURI)
err = b.retry(func() error {
_, err = b.mc.SendImage(channel, fi.Name, res.ContentURI)

return err
})
if err != nil {
b.Log.Errorf("sendImage failed: %#v", err)
}
case strings.Contains(mtype, "audio"):
b.Log.Debugf("sendAudio %s", res.ContentURI)
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{
MsgType: "m.audio",
Body: fi.Name,
URL: res.ContentURI,
Info: matrix.AudioInfo{
Mimetype: mtype,
Size: uint(len(*fi.Data)),
},
err = b.retry(func() error {
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.AudioMessage{
MsgType: "m.audio",
Body: fi.Name,
URL: res.ContentURI,
Info: matrix.AudioInfo{
Mimetype: mtype,
Size: uint(len(*fi.Data)),
},
})

return err
})
if err != nil {
b.Log.Errorf("sendAudio failed: %#v", err)
}
default:
b.Log.Debugf("sendFile %s", res.ContentURI)
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{
MsgType: "m.file",
Body: fi.Name,
URL: res.ContentURI,
Info: matrix.FileInfo{
Mimetype: mtype,
Size: uint(len(*fi.Data)),
},
err = b.retry(func() error {
_, err = b.mc.SendMessageEvent(channel, "m.room.message", matrix.FileMessage{
MsgType: "m.file",
Body: fi.Name,
URL: res.ContentURI,
Info: matrix.FileInfo{
Mimetype: mtype,
Size: uint(len(*fi.Data)),
},
})

return err
})
if err != nil {
b.Log.Errorf("sendFile failed: %#v", err)
Expand Down

0 comments on commit 2d3c26a

Please sign in to comment.