Skip to content

Commit

Permalink
feat: support set max retries and retries interval (close #1252) (#1289)
Browse files Browse the repository at this point in the history
* feat: support set max retries and retries interval

* fix: httpresponse using `res` before checking for errors

* fix: `HttpServerPost` now be unexported

* refactor: pretty `httpDefault`
  • Loading branch information
qianjunakasumi authored Dec 24, 2021
1 parent 024ec34 commit bfc29a8
Showing 1 changed file with 100 additions and 85 deletions.
185 changes: 100 additions & 85 deletions server/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"encoding/json"
"fmt"
"io"
"math/rand"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -40,14 +39,18 @@ type HTTPServer struct {
Enabled bool `yaml:"enabled"`
MaxQueueSize int `yaml:"max-queue-size"`
} `yaml:"long-polling"`
Post []struct {
URL string `yaml:"url"`
Secret string `yaml:"secret"`
}
Post []httpServerPost `yaml:"post"`

MiddleWares `yaml:"middlewares"`
}

type httpServerPost struct {
URL string `yaml:"url"`
Secret string `yaml:"secret"`
MaxRetries *uint64 `yaml:"max-retries"`
RetriesInterval *uint64 `yaml:"retries-interval"`
}

type httpServer struct {
HTTP *http.Server
api *api.Caller
Expand All @@ -56,12 +59,14 @@ type httpServer struct {

// HTTPClient 反向HTTP上报客户端
type HTTPClient struct {
bot *coolq.CQBot
secret string
addr string
filter string
apiPort int
timeout int32
bot *coolq.CQBot
secret string
addr string
filter string
apiPort int
timeout int32
MaxRetries uint64
RetriesInterval uint64
}

type httpCtx struct {
Expand All @@ -70,66 +75,72 @@ type httpCtx struct {
postForm url.Values
}

const httpDefault = ` # HTTP 通信设置
- http:
# 服务端监听地址
host: 127.0.0.1
# 服务端监听端口
port: 5700
# 反向HTTP超时时间, 单位秒
# 最小值为5,小于5将会忽略本项设置
timeout: 5
# 长轮询拓展
long-polling:
# 是否开启
enabled: false
# 消息队列大小,0 表示不限制队列大小,谨慎使用
max-queue-size: 2000
const httpDefault = `
- http: # HTTP 通信设置
host: 127.0.0.1 # 服务端监听地址
port: 5700 # 服务端监听端口
timeout: 5 # 反向 HTTP 超时时间, 单位秒,<5 时将被忽略
long-polling: # 长轮询拓展
enabled: false # 是否开启
max-queue-size: 2000 # 消息队列大小,0 表示不限制队列大小,谨慎使用
middlewares:
<<: *default # 引用默认中间件
# 反向HTTP POST地址列表
post:
#- url: '' # 地址
# secret: '' # 密钥
post: # 反向HTTP POST地址列表
#- url: '' # 地址
# secret: '' # 密钥
# max-retries: 3 # 最大重试,0 时禁用
# retries-interval: 1500 # 重试时间,单位毫秒,0 时立即
#- url: http://127.0.0.1:5701/ # 地址
# secret: '' # 密钥
# secret: '' # 密钥
# max-retries: 10 # 最大重试,0 时禁用
# retries-interval: 1000 # 重试时间,单位毫秒,0 时立即
`

func init() {
config.AddServer(&config.Server{
Brief: "HTTP通信",
Default: httpDefault,
ParseEnv: func() (string, *yaml.Node) {
if os.Getenv("GCQ_HTTP_PORT") != "" {
// type convert tools
toInt64 := func(str string) int64 {
i, _ := strconv.ParseInt(str, 10, 64)
return i
}
accessTokenEnv := os.Getenv("GCQ_ACCESS_TOKEN")
node := &yaml.Node{}
httpConf := &HTTPServer{
Host: "0.0.0.0",
Port: 5700,
MiddleWares: MiddleWares{
AccessToken: accessTokenEnv,
},
}
param.SetExcludeDefault(&httpConf.Disabled, param.EnsureBool(os.Getenv("GCQ_HTTP_DISABLE"), false), false)
param.SetExcludeDefault(&httpConf.Host, os.Getenv("GCQ_HTTP_HOST"), "")
param.SetExcludeDefault(&httpConf.Port, int(toInt64(os.Getenv("GCQ_HTTP_PORT"))), 0)
if os.Getenv("GCQ_HTTP_POST_URL") != "" {
httpConf.Post = append(httpConf.Post, struct {
URL string `yaml:"url"`
Secret string `yaml:"secret"`
}{os.Getenv("GCQ_HTTP_POST_URL"), os.Getenv("GCQ_HTTP_POST_SECRET")})
}
_ = node.Encode(httpConf)
return "http", node
}
return "", nil
func nilParseUint(s string, base int, bitSize int) *uint64 {
pu, err := strconv.ParseUint(s, base, bitSize)
if err != nil {
return nil
}
return &pu
}

func readEnvConfig() (string, *yaml.Node) {

if s, ok := os.LookupEnv("GCQ_HTTP_PORT"); !ok || s == "" {
return "", nil
}

// type convert tools
toInt64 := func(str string) int64 {
i, _ := strconv.ParseInt(str, 10, 64)
return i
}
accessTokenEnv := os.Getenv("GCQ_ACCESS_TOKEN")
node := &yaml.Node{}
httpConf := &HTTPServer{
Host: "0.0.0.0",
Port: 5700,
MiddleWares: MiddleWares{
AccessToken: accessTokenEnv,
},
})
}
param.SetExcludeDefault(&httpConf.Disabled, param.EnsureBool(os.Getenv("GCQ_HTTP_DISABLE"), false), false)
param.SetExcludeDefault(&httpConf.Host, os.Getenv("GCQ_HTTP_HOST"), "")
param.SetExcludeDefault(&httpConf.Port, int(toInt64(os.Getenv("GCQ_HTTP_PORT"))), 0)
if os.Getenv("GCQ_HTTP_POST_URL") != "" {
httpConf.Post = append(httpConf.Post, httpServerPost{
os.Getenv("GCQ_HTTP_POST_URL"),
os.Getenv("GCQ_HTTP_POST_SECRET"),
nilParseUint(os.Getenv("GCQ_HTTP_POST_MAXRETRIES"), 10, 64),
nilParseUint(os.Getenv("GCQ_HTTP_POST_RETRIESINTERVAL"), 10, 64),
})
}
_ = node.Encode(httpConf)
return "http", node
}

func init() {
config.AddServer(&config.Server{Brief: "HTTP通信", Default: httpDefault, ParseEnv: readEnvConfig})
}

func (h *httpCtx) Get(s string) gjson.Result {
Expand Down Expand Up @@ -242,6 +253,13 @@ func checkAuth(req *http.Request, token string) int {
}
}

func puint64Operator(p *uint64, def uint64) uint64 {
if p == nil {
return def
}
return *p
}

// runHTTP 启动HTTP服务器与HTTP上报客户端
func runHTTP(bot *coolq.CQBot, node yaml.Node) {
var conf HTTPServer
Expand Down Expand Up @@ -285,12 +303,14 @@ client:
for _, c := range conf.Post {
if c.URL != "" {
go HTTPClient{
bot: bot,
secret: c.Secret,
addr: c.URL,
apiPort: conf.Port,
filter: conf.Filter,
timeout: conf.Timeout,
bot: bot,
secret: c.Secret,
addr: c.URL,
apiPort: conf.Port,
filter: conf.Filter,
timeout: conf.Timeout,
MaxRetries: puint64Operator(c.MaxRetries, 3),
RetriesInterval: puint64Operator(c.RetriesInterval, 1500),
}.Run()
}
}
Expand Down Expand Up @@ -330,10 +350,7 @@ func (c *HTTPClient) onBotPushEvent(e *coolq.Event) {
}

var res *http.Response
var err error
const maxAttemptTimes = 5

for i := 0; i <= maxAttemptTimes; i++ {
for i := uint64(0); i <= c.MaxRetries; i++ {
// see https://stackoverflow.com/questions/31337891/net-http-http-contentlength-222-with-body-length-0
// we should create a new request for every single post trial
req, err := http.NewRequest("POST", c.addr, bytes.NewReader(e.JSONBytes()))
Expand All @@ -344,24 +361,22 @@ func (c *HTTPClient) onBotPushEvent(e *coolq.Event) {
req.Header = header

res, err = client.Do(req)
if err == nil {
if res != nil {
//goland:noinspection GoDeferInLoop
defer res.Body.Close()
}
if err == nil {
break
}
if i != maxAttemptTimes {
if i < c.MaxRetries {
log.Warnf("上报 Event 数据到 %v 失败: %v 将进行第 %d 次重试", c.addr, err, i+1)
} else {
log.Warnf("上报 Event 数据 %s 到 %v 失败: %v 停止上报:已达重试上线", e.JSONBytes(), c.addr, err)
return
}
const maxWait = int64(time.Second * 3)
const minWait = int64(time.Millisecond * 500)
wait := rand.Int63n(maxWait-minWait) + minWait
time.Sleep(time.Duration(wait))
time.Sleep(time.Millisecond * time.Duration(c.RetriesInterval))
}

if err != nil {
log.Warnf("上报Event数据 %s 到 %v 失败: %v", e.JSONBytes(), c.addr, err)
return
}
log.Debugf("上报Event数据 %s 到 %v", e.JSONBytes(), c.addr)

r, err := io.ReadAll(res.Body)
Expand Down

0 comments on commit bfc29a8

Please sign in to comment.