Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FCM v1 error handling #73

Merged
merged 6 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type SectionFCMv1 struct {
Enabled bool
ProjectID string
TokenSource oauth2.TokenSource
Endpoint string
}

// DefaultLoadConfig loads default /etc/gunfish.toml
Expand Down
2 changes: 2 additions & 0 deletions const.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,3 +78,5 @@ var (
OutputHookStdout bool
OutputHookStderr bool
)

var RetryBackoff = true
37 changes: 21 additions & 16 deletions fcmv1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ func (c *Client) Send(p Payload) ([]Result, error) {

if body.Error == nil && body.Name != "" {
return []Result{
Result{
{
StatusCode: res.StatusCode,
Token: p.Message.Token,
},
}, nil
} else if body.Error != nil {
return []Result{
Result{
{
StatusCode: res.StatusCode,
Token: p.Message.Token,
Error: body.Error,
Expand All @@ -70,22 +70,28 @@ func (c *Client) NewRequest(p Payload) (*http.Request, error) {
if err != nil {
return nil, err
}
token, err := c.tokenSource.Token()
if err != nil {
return nil, err
var bearer string
if ts := c.tokenSource; ts != nil {
token, err := c.tokenSource.Token()
if err != nil {
return nil, err
}
bearer = token.AccessToken
} else {
bearer = p.Message.Token
}
req, err := http.NewRequest("POST", c.endpoint.String(), bytes.NewReader(data))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+token.AccessToken)
req.Header.Set("Authorization", "Bearer "+bearer)
req.Header.Set("Content-Type", "application/json")

return req, nil
}

// NewClient establishes a http connection with fcm v1
func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint *url.URL, timeout time.Duration) (*Client, error) {
func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint string, timeout time.Duration) (*Client, error) {
client := &http.Client{
Timeout: timeout,
}
Expand All @@ -94,16 +100,15 @@ func NewClient(tokenSource oauth2.TokenSource, projectID string, endpoint *url.U
tokenSource: tokenSource,
}

if endpoint != nil {
c.endpoint = endpoint
} else {
ep, err := url.Parse(DefaultFCMEndpoint)
if err != nil {
return nil, err
}
ep.Path = path.Join(ep.Path, projectID, "messages:send")
c.endpoint = ep
if endpoint == "" {
endpoint = DefaultFCMEndpoint
}
ep, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
ep.Path = path.Join(ep.Path, projectID, "messages:send")
c.endpoint = ep

return c, nil
}
3 changes: 3 additions & 0 deletions fcmv1/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ const (
InvalidArgument = "INVALID_ARGUMENT"
Unregistered = "UNREGISTERED"
NotFound = "NOT_FOUND"
Internal = "INTERNAL"
Unavailable = "UNAVAILABLE"
QuotaExceeded = "QUOTA_EXCEEDED"
)

type Error struct {
Expand Down
64 changes: 64 additions & 0 deletions mock/fcmv1_server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package mock

import (
"encoding/json"
"fmt"
"log"
"math/rand"
"net/http"
"strings"
"time"

"github.com/kayac/Gunfish/fcmv1"
)

func FCMv1MockServer(projectID string, verbose bool) *http.ServeMux {
mux := http.NewServeMux()
p := fmt.Sprintf("/v1/projects/%s/messages:send", projectID)
log.Println("fcmv1 mock server path:", p)
mux.HandleFunc(p, func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
defer func() {
if verbose {
log.Printf("reqtime:%f proto:%s method:%s path:%s host:%s", reqtime(start), r.Proto, r.Method, r.URL.Path, r.RemoteAddr)
}
}()

// sets the response time from FCM server
time.Sleep(time.Millisecond*200 + time.Millisecond*(time.Duration(rand.Int63n(200)-100)))
token := r.Header.Get("Authorization")
token = strings.TrimPrefix(token, "Bearer ")

w.Header().Set("Content-Type", ApplicationJSON)
switch token {
case fcmv1.InvalidArgument:
createFCMv1ErrorResponse(w, http.StatusBadRequest, fcmv1.InvalidArgument)
case fcmv1.Unregistered:
createFCMv1ErrorResponse(w, http.StatusNotFound, fcmv1.Unregistered)
case fcmv1.Unavailable:
createFCMv1ErrorResponse(w, http.StatusServiceUnavailable, fcmv1.Unavailable)
case fcmv1.Internal:
createFCMv1ErrorResponse(w, http.StatusInternalServerError, fcmv1.Internal)
case fcmv1.QuotaExceeded:
createFCMv1ErrorResponse(w, http.StatusTooManyRequests, fcmv1.QuotaExceeded)
default:
enc := json.NewEncoder(w)
enc.Encode(fcmv1.ResponseBody{
Name: "ok",
})
}
})

return mux
}

func createFCMv1ErrorResponse(w http.ResponseWriter, code int, status string) error {
w.WriteHeader(code)
enc := json.NewEncoder(w)
return enc.Encode(fcmv1.ResponseBody{
Error: &fcmv1.FCMError{
Status: status,
Message: "mock error:" + status,
},
})
}
80 changes: 36 additions & 44 deletions supervisor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"errors"
"fmt"
"io"
"net/http"
"math"
"os"
"os/exec"
"sync"
Expand Down Expand Up @@ -33,15 +33,13 @@ type Supervisor struct {

// Worker sends notification to apns.
type Worker struct {
ac *apns.Client
fcv1 *fcmv1.Client
queue chan Request
respq chan SenderResponse
wgrp *sync.WaitGroup
sn int
id int
errorHandler func(Request, *http.Response, error)
successHandler func(Request, *http.Response)
ac *apns.Client
fcv1 *fcmv1.Client
queue chan Request
respq chan SenderResponse
wgrp *sync.WaitGroup
sn int
id int
}

// SenderResponse is responses to worker from sender.
Expand Down Expand Up @@ -113,17 +111,22 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) {
for cnt := 0; cnt < RetryOnceCount; cnt++ {
select {
case req := <-s.retryq:
reqs := &[]Request{req}
select {
case s.queue <- reqs:
LogWithFields(logrus.Fields{"type": "retry", "resend_cnt": req.Tries}).
Debugf("Enqueue to retry to send notification.")
default:
LogWithFields(logrus.Fields{"type": "retry"}).
Infof("Could not retry to enqueue because the supervisor queue is full.")
var delay time.Duration
if RetryBackoff {
delay = time.Duration(math.Pow(float64(req.Tries), 2)) * 100 * time.Millisecond
}
time.AfterFunc(delay, func() {
reqs := &[]Request{req}
select {
case s.queue <- reqs:
LogWithFields(logrus.Fields{"delay": delay, "type": "retry", "resend_cnt": req.Tries}).
Debugf("Enqueue to retry to send notification.")
default:
LogWithFields(logrus.Fields{"delay": delay, "type": "retry"}).
Infof("Could not retry to enqueue because the supervisor queue is full.")
}
})
default:
break
}
}
case <-s.exit:
Expand Down Expand Up @@ -172,7 +175,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) {
return Supervisor{}, errors.New("FCM legacy is not supported")
}
if conf.FCMv1.Enabled {
fcv1, err = fcmv1.NewClient(conf.FCMv1.TokenSource, conf.FCMv1.ProjectID, nil, fcmv1.ClientTimeout)
fcv1, err = fcmv1.NewClient(conf.FCMv1.TokenSource, conf.FCMv1.ProjectID, conf.FCMv1.Endpoint, fcmv1.ClientTimeout)
if err != nil {
LogWithFields(logrus.Fields{
"type": "supervisor",
Expand All @@ -192,7 +195,7 @@ func StartSupervisor(conf *config.Config) (Supervisor, error) {

s.workers = append(s.workers, &worker)
s.wgrp.Add(1)
go s.spawnWorker(worker, conf)
go s.spawnWorker(worker)
LogWithFields(logrus.Fields{
"type": "worker",
"worker_id": i,
Expand Down Expand Up @@ -245,7 +248,7 @@ func (s *Supervisor) Shutdown() {
}).Infoln("Stoped supervisor.")
}

func (s *Supervisor) spawnWorker(w Worker, conf *config.Config) {
func (s *Supervisor) spawnWorker(w Worker) {
atomic.AddInt64(&(srvStats.Workers), 1)
defer func() {
atomic.AddInt64(&(srvStats.Workers), -1)
Expand Down Expand Up @@ -367,23 +370,7 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com
if resp.Err != nil {
req := resp.Req
LogWithFields(logf).Warnf("response is nil. reason: %s", resp.Err.Error())
if req.Tries < SendRetryCount {
req.Tries++
atomic.AddInt64(&(srvStats.RetryCount), 1)
logf["resend_cnt"] = req.Tries

select {
case retryq <- req:
LogWithFields(logf).
Debugf("Retry to enqueue into retryq because of http connection error with FCM.")
default:
LogWithFields(logf).
Warnf("Supervisor retry queue is full.")
}
} else {
LogWithFields(logf).
Warnf("Retry count is over than %d. Could not deliver notification.", SendRetryCount)
}
retry(retryq, req, resp.Err, logf)
return
}

Expand All @@ -395,12 +382,17 @@ func handleFCMResponse(resp SenderResponse, retryq chan<- Request, cmdq chan Com
LogWithFields(logf).Info("Succeeded to send a notification")
continue
}
// handle error response each registration_id
atomic.AddInt64(&(srvStats.ErrCount), 1)
switch err.Error() {
case fcmv1.Internal, fcmv1.Unavailable:
LogWithFields(logf).Warn("retrying:", err)
retry(retryq, resp.Req, err, logf)
case fcmv1.QuotaExceeded:
LogWithFields(logf).Warn("retrying after 1 min:", err)
time.AfterFunc(time.Minute, func() { retry(retryq, resp.Req, err, logf) })
case fcmv1.Unregistered, fcmv1.InvalidArgument, fcmv1.NotFound:
LogWithFields(logf).Errorf("calling error hook: %s", err)
atomic.AddInt64(&(srvStats.ErrCount), 1)
onResponse(result, errorResponseHandler.HookCmd(), cmdq)
LogWithFields(logf).Errorf("%s", err)
default:
LogWithFields(logf).Errorf("Unknown error message: %s", err)
}
Expand Down Expand Up @@ -438,7 +430,7 @@ func spawnSender(wq <-chan Request, respq chan<- SenderResponse, wgrp *sync.Wait
no := req.Notification.(apns.Notification)
start := time.Now()
results, err := ac.Send(no)
respTime := time.Now().Sub(start).Seconds()
respTime := time.Since(start).Seconds()
rs := make([]Result, 0, len(results))
for _, v := range results {
rs = append(rs, v)
Expand All @@ -459,7 +451,7 @@ func spawnSender(wq <-chan Request, respq chan<- SenderResponse, wgrp *sync.Wait
p := req.Notification.(fcmv1.Payload)
start := time.Now()
results, err := fcv1.Send(p)
respTime := time.Now().Sub(start).Seconds()
respTime := time.Since(start).Seconds()
rs := make([]Result, 0, len(results))
for _, v := range results {
rs = append(rs, v)
Expand Down
1 change: 1 addition & 0 deletions supervisor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func (tr TestResponseHandler) HookCmd() string {
func init() {
logrus.SetLevel(logrus.WarnLevel)
conf.Apns.Host = gunfish.MockServer
gunfish.RetryBackoff = false // for testing
}

func TestEnqueuRequestToSupervisor(t *testing.T) {
Expand Down
6 changes: 6 additions & 0 deletions test/gunfish_test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,9 @@ cert_file = "{{ env `PROJECT_ROOT` `.` }}/test/server.crt"
key_file = "{{ env `PROJECT_ROOT` `.` }}/test/server.key"
request_per_sec = 2000
sender_num = 50

[fcm_v1]
# google_application_credentials = "{{ env `PROJECT_ROOT` `.` }}/credentials.json"
enabled = true
endpoint = "http://localhost:8888/v1/projects"
projectid = "test"
29 changes: 29 additions & 0 deletions test/tools/fcmv1mock/fcmv1mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"flag"
"fmt"
"log"
"net/http"

"github.com/kayac/Gunfish/mock"
)

func main() {
var (
port int
projectID string
verbose bool
)

flag.IntVar(&port, "port", 8888, "fcmv1 mock server port")
flag.StringVar(&projectID, "project-id", "test", "fcmv1 mock project id")
flag.BoolVar(&verbose, "verbose", false, "verbose flag")
flag.Parse()

mux := mock.FCMv1MockServer(projectID, verbose)
log.Println("start fcmv1mock server port:", port, "project_id:", projectID)
if err := http.ListenAndServe(fmt.Sprintf(":%d", port), mux); err != nil {
log.Fatal(err)
}
}
Loading