Skip to content

Commit

Permalink
add notify pkg (#12)
Browse files Browse the repository at this point in the history
* add notify pkg
  • Loading branch information
veezhang authored Apr 11, 2022
1 parent 17f5905 commit e6d5489
Show file tree
Hide file tree
Showing 16 changed files with 1,175 additions and 3 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ lint: $(GOBIN)/golangci-lint
$(GOBIN)/golangci-lint run

test:
go test -race -coverprofile=coverage.txt -covermode=atomic ./...
# TODO: add -race arguments
go test -coverprofile=coverage.txt -covermode=atomic ./...

tools: $(GOBIN)/goimports \
$(GOBIN)/impi \
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/pkg/errors v0.9.1
github.com/prashantv/gostub v1.1.0
github.com/stretchr/testify v1.7.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
gopkg.in/gomail.v2 v2.0.0-20160411212932-81ebce5c23df
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ golang.org/x/net v0.0.0-20211029224645-99673261e6eb/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
1 change: 0 additions & 1 deletion httpclient/bytes_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestBytesClient(t *testing.T) {
w.WriteHeader(statusCode)
w.Write(respBody)
}))

defer testServer.Close()

checkHookFunc := func(resp *resty.Response, err error) {
Expand Down
1 change: 0 additions & 1 deletion httpclient/object_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestObjectClient(t *testing.T) {
w.WriteHeader(statusCode)
w.Write(respBody)
}))

defer testServer.Close()

checkHookFunc := func(resp *resty.Response, err error) {
Expand Down
108 changes: 108 additions & 0 deletions notify/dingtalk.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package notify

import (
"context"
"fmt"

"github.com/vesoft-inc/go-pkg/httpclient"

"github.com/pkg/errors"
)

// docs: https://open.dingtalk.com/document/group/custom-robot-access

const (
dingTalkRobotSendAddr = "https://oapi.dingtalk.com/robot/send"

DingDingMsgText DingDingMsgType = "text"
DingDingMsgMarkdown DingDingMsgType = "markdown"
)

var _ StringNotifier = (*dingTalkNotifier)(nil)

type (
DingDingMsgType string

DingTalkConfig struct {
AccessToken string
MsgType DingDingMsgType
AtMobiles []string
IsAtAll bool
Title string
}

dingTalkNotifier struct {
client httpclient.ObjectClient
config DingTalkConfig
}

dingTalkMessage struct {
MsgType string `json:"msgtype"`
At dingTalkAtInfo `json:"at"`
Text *dingTalkMessageText `json:"text,omitempty"`
Markdown *dingTalkMessageMarkdown `json:"markdown,omitempty"`
}

dingTalkAtInfo struct {
AtMobiles []string `json:"atMobiles"`
IsAtAll bool `json:"isAtAll"`
}

dingTalkMessageText struct {
Content string `json:"content"`
}

dingTalkMessageMarkdown struct {
Title string `json:"title"`
Text string `json:"text"`
}
)

// NewWithDingTalks creates Notifier for many ding talk robots.
func NewWithDingTalks(configs ...DingTalkConfig) Notifier {
stringNotifiers := make([]StringNotifier, len(configs))
for i := range configs {
stringNotifiers[i] = newDingTalkNotifier(configs[i])
}
return NewWithStringNotifiers(stringNotifiers...)
}

func newDingTalkNotifier(config DingTalkConfig) StringNotifier { // nolint:gocritic
return &dingTalkNotifier{
client: httpclient.NewObjectClient(dingTalkRobotSendAddr, httpclient.WithQueryParam("access_token", config.AccessToken)),
config: config,
}
}

func (n *dingTalkNotifier) Notify(_ context.Context, message string) error {
messageBody := &dingTalkMessage{
MsgType: string(n.config.MsgType),
At: dingTalkAtInfo{
AtMobiles: n.config.AtMobiles,
IsAtAll: n.config.IsAtAll,
},
}
if n.config.MsgType == DingDingMsgMarkdown {
messageBody.Markdown = &dingTalkMessageMarkdown{
Title: n.config.Title,
Text: message,
}
} else {
messageBody.Text = &dingTalkMessageText{
Content: fmt.Sprintf("%s\n%s", n.config.Title, message),
}
}

var responseObj struct {
ErrCode int `json:"errcode"`
ErrMsg string `json:"errmsg"`
}

if err := n.client.Post("", messageBody, &responseObj); err != nil {
return err
}
if responseObj.ErrCode != 0 {
return errors.Errorf("%d:%s", responseObj.ErrCode, responseObj.ErrMsg)
}
return nil
}
96 changes: 96 additions & 0 deletions notify/dingtalk_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package notify

import (
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"

"github.com/vesoft-inc/go-pkg/httpclient"

"github.com/go-resty/resty/v2"
"github.com/stretchr/testify/assert"
)

func TestNewWithDingTalks(t *testing.T) {
var (
ast = assert.New(t)
notifier Notifier
)

notifier = NewWithDingTalks(DingTalkConfig{})
ast.IsType(NotifierFunc(nil), notifier)

notifier = NewWithDingTalks(DingTalkConfig{}, DingTalkConfig{})
ast.IsType(&defaultNotify{}, notifier)
ast.Len(notifier.(*defaultNotify).notifiers, 2)
}

func Test_newDingTalkNotifier(t *testing.T) {
ast := assert.New(t)

n := newDingTalkNotifier(DingTalkConfig{})
ast.IsType(&dingTalkNotifier{}, n)
ast.NotNil(n.(*dingTalkNotifier).client)
}

func TestDingTalkNotify(t *testing.T) {
var (
notifier Notifier
err error
httpResponse struct {
status int
body []byte
}
)

ast := assert.New(t)

testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == resty.MethodPost {
body, err := ioutil.ReadAll(r.Body) //nolint:govet
ast.NoError(err)
var requestBody dingTalkMessage
err = json.Unmarshal(body, &requestBody)
ast.NoError(err)

if requestBody.MsgType != string(DingDingMsgText) &&
requestBody.MsgType != string(DingDingMsgMarkdown) {
ast.Fail("unsupported MsgType")
}
w.WriteHeader(httpResponse.status)
_, _ = w.Write(httpResponse.body)
return
}
}))

client1 := httpclient.NewObjectClientRaw(httpclient.NewClient(testServer.URL))
client2 := httpclient.NewObjectClientRaw(httpclient.NewClient(testServer.URL))

n1 := newDingTalkNotifier(DingTalkConfig{MsgType: DingDingMsgText})
n1.(*dingTalkNotifier).client = client1
n2 := newDingTalkNotifier(DingTalkConfig{MsgType: DingDingMsgMarkdown})
n2.(*dingTalkNotifier).client = client2

notifier = NewWithStringNotifiers(n1, n2)

httpResponse.status = 200
httpResponse.body = []byte(`{"errcode": 0,"errmsg": "msg"}`)
err = notifier.Notify(context.TODO(), "Message")
ast.NoError(err)

httpResponse.status = 200
httpResponse.body = []byte(`{"errcode": 100001,"errmsg": "msg"}`)
err = notifier.Notify(context.TODO(), "Message")
if ast.ErrorIs(err, ErrNotifyNotification) {
ast.Contains(err.Error(), "100001:msg")
}

httpResponse.status = 500
err = notifier.Notify(context.TODO(), "Message")
if ast.ErrorIs(err, ErrNotifyNotification) {
ast.Contains(err.Error(), "500 Internal Server Error")
}
}
141 changes: 141 additions & 0 deletions notify/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package notify

import (
"context"
"crypto"
"encoding/hex"
"fmt"
"sync"
"time"
)

const (
DefaultDuplicateFilterInterval = time.Hour
DefaultDuplicateFilterMaxRecords = 100
)

var (
_ Filter = (*duplicateFilter)(nil)
_ Filter = FilterFunc(nil)
_ Notifier = (*defaultFilterNotifier)(nil)
)

type (
// Filter is to filter whether a notify should to be sent.
Filter interface {
// IfNotify's arguments is the arguments other than the first one in Notifier.Notify.
IfNotify(interface{}) bool
}

// FilterFunc is an adapter to allow the use of ordinary functions as Filter.
FilterFunc func(interface{}) bool

defaultFilterNotifier struct {
filter Filter
notifier Notifier
}

DuplicateFilterParams struct {
DupInterval time.Duration // duplicate data notification interval
MaxRecords int // the max of records
Name string
}

duplicateFilter struct {
params DuplicateFilterParams
mu sync.RWMutex // it's for safe access lastTimeMap
lastTimeMap map[string]time.Time // record the time of last notify, the key is the hash of the data
}
)

// NewWithFilter creates Notifier for notifiers with a Filter.
func NewWithFilter(f Filter, notifiers ...Notifier) Notifier {
return &defaultFilterNotifier{
filter: f,
notifier: combineNotifiers(notifiers...),
}
}

// NewWithDuplicateFilter creates Notifier for notifiers with a Filter.
// Within dupInterval, the same message will only be notify once.
func NewWithDuplicateFilter(params DuplicateFilterParams, notifiers ...Notifier) Notifier {
return NewWithFilter(newDuplicateFilter(params), notifiers...)
}

func newDuplicateFilter(params DuplicateFilterParams) Filter {
if params.DupInterval <= 0 {
params.DupInterval = DefaultDuplicateFilterInterval
}
if params.MaxRecords <= 0 {
params.MaxRecords = DefaultDuplicateFilterMaxRecords
}
return &duplicateFilter{
params: params,
lastTimeMap: map[string]time.Time{},
}
}

func (n *defaultFilterNotifier) Notify(ctx context.Context, data interface{}) error {
if n.filter != nil && !n.filter.IfNotify(data) {
return nil
}
return n.notifier.Notify(ctx, data)
}

func (n *duplicateFilter) IfNotify(data interface{}) bool {
hashFn := func(values ...interface{}) string {
h := crypto.MD5.New()
for _, v := range values {
_, _ = fmt.Fprint(h, v)
}

return hex.EncodeToString(h.Sum(nil))
}

hash := hashFn(data)

n.mu.RLock()
lastTime, ok := n.lastTimeMap[hash]
n.mu.RUnlock()

if ok && n.isInCoolPeriod(lastTime) {
return false
}

n.mu.Lock()
n.lastTimeMap[hash] = time.Now()
n.mu.Unlock()

n.cleanLastTimeMapIfNecessary()

return true
}

func (n *duplicateFilter) cleanLastTimeMapIfNecessary() {
var count int
n.mu.RLock()
count = len(n.lastTimeMap)
n.mu.RUnlock()

if count < n.params.MaxRecords {
return
}

newMap := make(map[string]time.Time)
n.mu.Lock()
for hash, lastTime := range n.lastTimeMap {
if n.isInCoolPeriod(lastTime) {
newMap[hash] = lastTime
}
}
n.lastTimeMap = newMap
n.mu.Unlock()
}

func (n *duplicateFilter) isInCoolPeriod(lastTime time.Time) bool {
return time.Since(lastTime) <= n.params.DupInterval
}

func (f FilterFunc) IfNotify(data interface{}) bool {
return f != nil && f(data)
}
Loading

0 comments on commit e6d5489

Please sign in to comment.