From 5006275f588003aee04b9c45971db2fd6761d008 Mon Sep 17 00:00:00 2001 From: Robert Laszczak Date: Wed, 21 Nov 2018 00:35:33 +0100 Subject: [PATCH] fix throttling middleware --- _examples/simple-app/subscribing-app/main.go | 9 +++++---- message/router/middleware/throttle.go | 6 +++--- message/router/middleware/throttle_test.go | 3 ++- message/router_test.go | 2 +- 4 files changed, 11 insertions(+), 9 deletions(-) diff --git a/_examples/simple-app/subscribing-app/main.go b/_examples/simple-app/subscribing-app/main.go index 8320a2a58..6b8271075 100644 --- a/_examples/simple-app/subscribing-app/main.go +++ b/_examples/simple-app/subscribing-app/main.go @@ -33,7 +33,7 @@ func main() { retryMiddleware := middleware.Retry{} retryMiddleware.MaxRetries = 1 - retryMiddleware.WaitTime = time.Second + retryMiddleware.WaitTime = time.Millisecond * 10 poisonQueue, err := middleware.NewPoisonQueue(pub, "poison_queue") if err != nil { @@ -42,7 +42,7 @@ func main() { h.AddMiddleware( // limiting processed messages to 10 per second - middleware.NewThrottle(10/time.Second).Middleware, + middleware.NewThrottle(100, time.Second).Middleware, // some, simple metrics newMetricsMiddleware().Middleware, @@ -61,8 +61,8 @@ func main() { middleware.CorrelationID, // simulating error or panic from handler - middleware.RandomFail(0.1), - middleware.RandomPanic(0.1), + middleware.RandomFail(0.01), + middleware.RandomPanic(0.01), ) // close router when SIGTERM is sent @@ -97,6 +97,7 @@ func createSubscriber(consumerGroup string, logger watermill.LoggerAdapter) mess Brokers: brokers, ConsumerGroup: consumerGroup, ConsumersCount: 8, + AutoOffsetReset: "earliest", }, marshaler, logger, diff --git a/message/router/middleware/throttle.go b/message/router/middleware/throttle.go index 9777057c7..4925d2bc9 100644 --- a/message/router/middleware/throttle.go +++ b/message/router/middleware/throttle.go @@ -11,9 +11,9 @@ type Throttle struct { } // NewThrottle creates new Throttle instance. -// Example rate: 10/time.Second -func NewThrottle(rate time.Duration) *Throttle { - return &Throttle{time.Tick(rate)} +// Example duration and count: NewThrottle(10, time.Second) for 10 messages per second +func NewThrottle(count int64, duration time.Duration) *Throttle { + return &Throttle{time.Tick(duration/time.Duration(count))} } func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc { diff --git a/message/router/middleware/throttle_test.go b/message/router/middleware/throttle_test.go index aeaa0b6f0..750cc52d8 100644 --- a/message/router/middleware/throttle_test.go +++ b/message/router/middleware/throttle_test.go @@ -19,7 +19,7 @@ const ( ) func TestThrottle_Middleware(t *testing.T) { - throttle := middleware.NewThrottle(testTimeout / perSecond) + throttle := middleware.NewThrottle(perSecond, testTimeout) ctx, _ := context.WithTimeout(context.Background(), testTimeout) @@ -73,4 +73,5 @@ CounterLoop: ) assert.True(t, producedMessagesCounter <= int(perSecond*testTimeout.Seconds())) + assert.True(t, producedMessagesCounter > 0) } diff --git a/message/router_test.go b/message/router_test.go index c5674cb68..4d94c86a7 100644 --- a/message/router_test.go +++ b/message/router_test.go @@ -113,7 +113,7 @@ func TestRouter_functional_nack(t *testing.T) { pubSub, err := createPubSub() require.NoError(t, err) defer pubSub.Close() - + r, err := message.NewRouter( message.RouterConfig{}, watermill.NewStdLogger(true, true),