From 1cb53c32ff42e09ee9c0102ba825aab4f2f86366 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Wed, 9 Aug 2023 21:05:54 +0900 Subject: [PATCH 1/7] Skip relaying msgs to suscribers when full --- msg/box.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/msg/box.go b/msg/box.go index f3e096e..8478051 100644 --- a/msg/box.go +++ b/msg/box.go @@ -79,6 +79,10 @@ func NewBox(logger *zerolog.Logger, topicID string, topic *pubsub.Topic) (*Box, return case msgCapsule := <-box.subCh: for _, subscriberCh := range box.subscribers { + if len(subscriberCh) == externalChanBufferSize { + // when subscriberCh is full, just skip this time + continue + } subscriberCh <- msgCapsule } msgCapsule = nil // explicitly free From ff2d793965d05f5a64fc2e5d701f8bcc85b5f2a7 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 10 Aug 2023 09:12:22 +0900 Subject: [PATCH 2/7] Revert "Skip relaying msgs to suscribers when full" This reverts commit 1cb53c32ff42e09ee9c0102ba825aab4f2f86366. --- msg/box.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/msg/box.go b/msg/box.go index 8478051..f3e096e 100644 --- a/msg/box.go +++ b/msg/box.go @@ -79,10 +79,6 @@ func NewBox(logger *zerolog.Logger, topicID string, topic *pubsub.Topic) (*Box, return case msgCapsule := <-box.subCh: for _, subscriberCh := range box.subscribers { - if len(subscriberCh) == externalChanBufferSize { - // when subscriberCh is full, just skip this time - continue - } subscriberCh <- msgCapsule } msgCapsule = nil // explicitly free From 8ffccfc2e13f5805e52056cdad57e5e425b6af97 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 10 Aug 2023 11:21:53 +0900 Subject: [PATCH 3/7] Add go.uber.org/ratelimit dep to go.mod --- go.mod | 1 + go.sum | 2 ++ 2 files changed, 3 insertions(+) diff --git a/go.mod b/go.mod index b2e8fe9..a01735b 100644 --- a/go.mod +++ b/go.mod @@ -11,6 +11,7 @@ require ( github.com/rs/zerolog v1.29.1 github.com/spf13/cobra v1.7.0 github.com/stretchr/testify v1.8.4 + go.uber.org/ratelimit v0.3.0 google.golang.org/grpc v1.55.0 google.golang.org/protobuf v1.30.0 ) diff --git a/go.sum b/go.sum index 913a120..db399be 100644 --- a/go.sum +++ b/go.sum @@ -682,6 +682,8 @@ go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKY go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/ratelimit v0.3.0 h1:IdZd9wqvFXnvLvSEBo0KPcGfkoBGNkpTHlrE3Rcjkjw= +go.uber.org/ratelimit v0.3.0/go.mod h1:So5LG7CV1zWpY1sHe+DXTJqQvOx+FFPFaAs2SnoyBaI= go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= From dfc7794c8835642ed8345093ae7d4b55677c8e28 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 10 Aug 2023 11:22:15 +0900 Subject: [PATCH 4/7] Apply grpc rate limit to unary calls - add GetEnvInt() func to util pkg --- cli/agent/agent.go | 4 +++- msg/box.go | 11 ++--------- util/grpc.go | 32 ++++++++++++++++++++++++++++++++ util/util.go | 17 ++++++++++++++++- 4 files changed, 53 insertions(+), 11 deletions(-) create mode 100644 util/grpc.go diff --git a/cli/agent/agent.go b/cli/agent/agent.go index cfbd83a..09b4fdb 100644 --- a/cli/agent/agent.go +++ b/cli/agent/agent.go @@ -77,7 +77,9 @@ var Cmd = &cobra.Command{ }() logger.Info().Msg("listening os signal: SIGINT, SIGTERM") - grpcServer = grpc.NewServer() + grpcServer = grpc.NewServer( + grpc.UnaryInterceptor(util.UnaryServerInterceptor()), + ) logger.Info().Msg("initalized gRPC server") lakeService, err = lake.NewService( ctx, diff --git a/msg/box.go b/msg/box.go index f3e096e..9fc64b4 100644 --- a/msg/box.go +++ b/msg/box.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "strconv" "sync" "time" @@ -256,21 +255,15 @@ func (box *Box) LeaveSub(subscriberID string) error { } func init() { - tmp, err := getEnvInt("INTERNAL_CHAN_BUFFER_SIZE", DefaultInternalChanBufferSize) + tmp, err := util.GetEnvInt("INTERNAL_CHAN_BUFFER_SIZE", DefaultInternalChanBufferSize) if err != nil { panic(err) } internalChanBufferSize = tmp - tmp, err = getEnvInt("EXTERNAL_CHAN_BUFFER_SIZE", DefaultExternalChanBufferSize) + tmp, err = util.GetEnvInt("EXTERNAL_CHAN_BUFFER_SIZE", DefaultExternalChanBufferSize) if err != nil { panic(err) } externalChanBufferSize = tmp } - -func getEnvInt(key string, fallback int) (int, error) { - tmpStr := strconv.Itoa(fallback) - tmpStr = util.GetEnv(key, tmpStr) - return strconv.Atoi(tmpStr) -} diff --git a/util/grpc.go b/util/grpc.go new file mode 100644 index 0000000..77a5fc6 --- /dev/null +++ b/util/grpc.go @@ -0,0 +1,32 @@ +package util + +import ( + "context" + + "go.uber.org/ratelimit" + "google.golang.org/grpc" +) + +const ( + DefaultUnaryServerInterceptorRateLimit = 100 +) + +var ( + unaryServerInterceptorRateLimit int +) + +func UnaryServerInterceptor() grpc.UnaryServerInterceptor { + rl := ratelimit.New(unaryServerInterceptorRateLimit) + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { + rl.Take() + return handler(ctx, req) + } +} + +func init() { + tmp, err := getEnvInt("UNARY_SERVER_INTERCEPTOR_RATE_LIMIT", DefaultUnaryServerInterceptorRateLimit) + if err != nil { + panic(err) + } + unaryServerInterceptorRateLimit = tmp +} diff --git a/util/util.go b/util/util.go index 9521e63..98846e9 100644 --- a/util/util.go +++ b/util/util.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "encoding/base64" "os" + "strconv" ) func CheckStrLen(target string, min, max int) bool { @@ -20,7 +21,7 @@ func GenerateRandomBase64String(size int) string { return base64.RawStdEncoding.EncodeToString(bytes) } -func GetEnv(key, fallback string) string { +func getEnv(key, fallback string) string { value, ok := os.LookupEnv(key) if !ok { return fallback @@ -28,6 +29,20 @@ func GetEnv(key, fallback string) string { return value } +func GetEnv(key, fallback string) string { + return getEnv(key, fallback) +} + +func getEnvInt(key string, fallback int) (int, error) { + tmpStr := strconv.Itoa(fallback) + tmpStr = getEnv(key, tmpStr) + return strconv.Atoi(tmpStr) +} + +func GetEnvInt(key string, fallback int) (int, error) { + return getEnvInt(key, fallback) +} + func GetLogLevel() string { return GetEnv("LOG_LEVEL", "info") } From 8d153492549d1f05643cdcca191bd76f78719e41 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 10 Aug 2023 19:19:29 +0900 Subject: [PATCH 5/7] Make client catch ctx.Done() --- cli/client/client.go | 99 +++++++++++++++++++++++--------------------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/cli/client/client.go b/cli/client/client.go index 07209ff..02f9679 100644 --- a/cli/client/client.go +++ b/cli/client/client.go @@ -172,63 +172,68 @@ var Cmd = &cobra.Command{ reader := bufio.NewReader(os.Stdin) ok := true for { - printInput(false) - input, err := reader.ReadString('\n') - if err == io.EOF { + select { + case <-ctx.Done(): return - } - if err != nil { - fmt.Println(err) - continue - } - input = strings.TrimSuffix(input, "\n") - if input == "" { - continue - } - if !ok { - cancel() - return - } - go func() { - msg := Msg{ - Data: []byte(input), - Metadata: map[string][]byte{ - "nickname": []byte(nickname), - }, - } - data, err := json.Marshal(msg) - if err != nil { - fmt.Println(err) + default: + printInput(false) + input, err := reader.ReadString('\n') + if err == io.EOF { return } - sigDataBytes, err := privKey.Sign(data) if err != nil { fmt.Println(err) + continue + } + input = strings.TrimSuffix(input, "\n") + if input == "" { + continue + } + if !ok { + cancel() return } + go func() { + msg := Msg{ + Data: []byte(input), + Metadata: map[string][]byte{ + "nickname": []byte(nickname), + }, + } + data, err := json.Marshal(msg) + if err != nil { + fmt.Println(err) + return + } + sigDataBytes, err := privKey.Sign(data) + if err != nil { + fmt.Println(err) + return + } - pubRes, err := cli.Publish(ctx, &pb.PublishReq{ - TopicId: topicID, - MsgCapsule: &pb.MsgCapsule{ - Data: data, - Signature: &pb.Signature{ - PubKey: pubKeyBytes, - Data: sigDataBytes, + pubRes, err := cli.Publish(ctx, &pb.PublishReq{ + TopicId: topicID, + MsgCapsule: &pb.MsgCapsule{ + Data: data, + Signature: &pb.Signature{ + PubKey: pubKeyBytes, + Data: sigDataBytes, + }, }, - }, - }) - if err != nil { - fmt.Println(err) - ok = false - return - } + }) + if err != nil { + fmt.Println(err) + ok = false + return + } - // check publish res - if !pubRes.GetOk() { - fmt.Println("failed to send message") - return - } - }() + // check publish res + if !pubRes.GetOk() { + fmt.Println("failed to send message") + return + } + }() + } } }() From fdab77ff310300a4208fc269994c5dec9b022d58 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 10 Aug 2023 19:39:27 +0900 Subject: [PATCH 6/7] Make client quiet --- cli/client/client.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cli/client/client.go b/cli/client/client.go index 02f9679..07651a9 100644 --- a/cli/client/client.go +++ b/cli/client/client.go @@ -170,7 +170,6 @@ var Cmd = &cobra.Command{ go func() { defer wg.Done() reader := bufio.NewReader(os.Stdin) - ok := true for { select { case <-ctx.Done(): @@ -189,10 +188,6 @@ var Cmd = &cobra.Command{ if input == "" { continue } - if !ok { - cancel() - return - } go func() { msg := Msg{ Data: []byte(input), @@ -223,7 +218,6 @@ var Cmd = &cobra.Command{ }) if err != nil { fmt.Println(err) - ok = false return } From 815cdfdb3d947fac08863e03727a0bd197fd13a1 Mon Sep 17 00:00:00 2001 From: h0n9 Date: Thu, 10 Aug 2023 19:41:54 +0900 Subject: [PATCH 7/7] Default unaryServerInterceptorRateLimit to 10000 --- util/grpc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/util/grpc.go b/util/grpc.go index 77a5fc6..aba1e20 100644 --- a/util/grpc.go +++ b/util/grpc.go @@ -8,7 +8,7 @@ import ( ) const ( - DefaultUnaryServerInterceptorRateLimit = 100 + DefaultUnaryServerInterceptorRateLimit = 10000 ) var (