Skip to content

Commit

Permalink
fix throttling middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
roblaszczak committed Nov 21, 2018
1 parent bb62a6d commit 5006275
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 9 deletions.
9 changes: 5 additions & 4 deletions _examples/simple-app/subscribing-app/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func main() {

retryMiddleware := middleware.Retry{}
retryMiddleware.MaxRetries = 1
retryMiddleware.WaitTime = time.Second
retryMiddleware.WaitTime = time.Millisecond * 10

poisonQueue, err := middleware.NewPoisonQueue(pub, "poison_queue")
if err != nil {
Expand All @@ -42,7 +42,7 @@ func main() {

h.AddMiddleware(
// limiting processed messages to 10 per second
middleware.NewThrottle(10/time.Second).Middleware,
middleware.NewThrottle(100, time.Second).Middleware,

// some, simple metrics
newMetricsMiddleware().Middleware,
Expand All @@ -61,8 +61,8 @@ func main() {
middleware.CorrelationID,

// simulating error or panic from handler
middleware.RandomFail(0.1),
middleware.RandomPanic(0.1),
middleware.RandomFail(0.01),
middleware.RandomPanic(0.01),
)

// close router when SIGTERM is sent
Expand Down Expand Up @@ -97,6 +97,7 @@ func createSubscriber(consumerGroup string, logger watermill.LoggerAdapter) mess
Brokers: brokers,
ConsumerGroup: consumerGroup,
ConsumersCount: 8,
AutoOffsetReset: "earliest",
},
marshaler,
logger,
Expand Down
6 changes: 3 additions & 3 deletions message/router/middleware/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ type Throttle struct {
}

// NewThrottle creates new Throttle instance.
// Example rate: 10/time.Second
func NewThrottle(rate time.Duration) *Throttle {
return &Throttle{time.Tick(rate)}
// Example duration and count: NewThrottle(10, time.Second) for 10 messages per second
func NewThrottle(count int64, duration time.Duration) *Throttle {
return &Throttle{time.Tick(duration/time.Duration(count))}
}

func (t Throttle) Middleware(h message.HandlerFunc) message.HandlerFunc {
Expand Down
3 changes: 2 additions & 1 deletion message/router/middleware/throttle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
)

func TestThrottle_Middleware(t *testing.T) {
throttle := middleware.NewThrottle(testTimeout / perSecond)
throttle := middleware.NewThrottle(perSecond, testTimeout)

ctx, _ := context.WithTimeout(context.Background(), testTimeout)

Expand Down Expand Up @@ -73,4 +73,5 @@ CounterLoop:
)

assert.True(t, producedMessagesCounter <= int(perSecond*testTimeout.Seconds()))
assert.True(t, producedMessagesCounter > 0)
}
2 changes: 1 addition & 1 deletion message/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func TestRouter_functional_nack(t *testing.T) {
pubSub, err := createPubSub()
require.NoError(t, err)
defer pubSub.Close()

r, err := message.NewRouter(
message.RouterConfig{},
watermill.NewStdLogger(true, true),
Expand Down

0 comments on commit 5006275

Please sign in to comment.