Skip to content

Commit

Permalink
retry with exponential back off
Browse files Browse the repository at this point in the history
When Unavaiable, Internal, Quota Exceeded ocurred, do retry.
  • Loading branch information
fujiwara committed Apr 5, 2024
1 parent 9527836 commit f065429
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 35 deletions.
18 changes: 12 additions & 6 deletions fcmv1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func (c *Client) Send(p Payload) ([]Result, error) {

if body.Error == nil && body.Name != "" {
return []Result{
Result{
{
StatusCode: res.StatusCode,
Token: p.Message.Token,
},
}, nil
} else if body.Error != nil {
return []Result{
Result{
{
StatusCode: res.StatusCode,
Token: p.Message.Token,
Error: body.Error,
Expand All @@ -70,15 +70,21 @@ func (c *Client) NewRequest(p Payload) (*http.Request, error) {
if err != nil {
return nil, err
}
token, err := c.tokenSource.Token()
if err != nil {
return nil, err
var bearer string
if ts := c.tokenSource; ts != nil {
token, err := c.tokenSource.Token()
if err != nil {
return nil, err
}
bearer = token.AccessToken
} else {
bearer = p.Message.Token
}
req, err := http.NewRequest("POST", c.endpoint.String(), bytes.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+token.AccessToken)
req.Header.Set("Authorization", "Bearer "+bearer)
req.Header.Set("Content-Type", "application/json")

return req, nil
Expand Down
3 changes: 3 additions & 0 deletions fcmv1/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const (
InvalidArgument = "INVALID_ARGUMENT"
Unregistered = "UNREGISTERED"
NotFound = "NOT_FOUND"
Internal = "INTERNAL"
Unavailable = "UNAVAILABLE"
QuotaExceeded = "QUOTA_EXCEEDED"
)

type Error struct {
Expand Down
51 changes: 22 additions & 29 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"math"
"os"
"os/exec"
"sync"
Expand Down Expand Up @@ -110,15 +111,18 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) {
for cnt := 0; cnt < RetryOnceCount; cnt++ {
select {
case req := <-s.retryq:
reqs := &[]Request{req}
select {
case s.queue <- reqs:
LogWithFields(logrus.Fields{"type": "retry", "resend_cnt": req.Tries}).
Debugf("Enqueue to retry to send notification.")
default:
LogWithFields(logrus.Fields{"type": "retry"}).
Infof("Could not retry to enqueue because the supervisor queue is full.")
}
delay := time.Duration(math.Pow(float64(req.Tries), 2)) * 100 * time.Millisecond
time.AfterFunc(delay, func() {
reqs := &[]Request{req}
select {
case s.queue <- reqs:
LogWithFields(logrus.Fields{"delay": delay, "type": "retry", "resend_cnt": req.Tries}).
Debugf("Enqueue to retry to send notification.")
default:
LogWithFields(logrus.Fields{"delay": delay, "type": "retry"}).
Infof("Could not retry to enqueue because the supervisor queue is full.")
}
})
default:
}
}
Expand Down Expand Up @@ -363,23 +367,7 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com
if resp.Err != nil {
req := resp.Req
LogWithFields(logf).Warnf("response is nil. reason: %s", resp.Err.Error())
if req.Tries < SendRetryCount {
req.Tries++
atomic.AddInt64(&(srvStats.RetryCount), 1)
logf["resend_cnt"] = req.Tries

select {
case retryq <- req:
LogWithFields(logf).
Debugf("Retry to enqueue into retryq because of http connection error with FCM.")
default:
LogWithFields(logf).
Warnf("Supervisor retry queue is full.")
}
} else {
LogWithFields(logf).
Warnf("Retry count is over than %d. Could not deliver notification.", SendRetryCount)
}
retry(retryq, req, resp.Err, logf)
return
}

Expand All @@ -391,12 +379,17 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com
LogWithFields(logf).Info("Succeeded to send a notification")
continue
}
// handle error response each registration_id
atomic.AddInt64(&(srvStats.ErrCount), 1)
switch err.Error() {
case fcmv1.Internal, fcmv1.Unavailable:
LogWithFields(logf).Warn("retrying:", err)
retry(retryq, resp.Req, err, logf)
case fcmv1.QuotaExceeded:
LogWithFields(logf).Warn("retrying after 1 min:", err)
time.AfterFunc(time.Minute, func() { retry(retryq, resp.Req, err, logf) })
case fcmv1.Unregistered, fcmv1.InvalidArgument, fcmv1.NotFound:
LogWithFields(logf).Errorf("calling error hook: %s", err)
atomic.AddInt64(&(srvStats.ErrCount), 1)
onResponse(result, errorResponseHandler.HookCmd(), cmdq)
LogWithFields(logf).Errorf("%s", err)
default:
LogWithFields(logf).Errorf("Unknown error message: %s", err)
}
Expand Down

0 comments on commit f065429

Please sign in to comment.