Skip to content

Commit

Permalink
Make the MaxConsumeCount param configurable (#4)
Browse files Browse the repository at this point in the history
  • Loading branch information
kedlas authored Oct 23, 2023
1 parent 819cea5 commit 447b6ae
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 4 deletions.
22 changes: 18 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"go.dataddo.com/pgq/internal/pg"
)

const maxConsumedCount = 3

type fatalError struct {
Err error
}
Expand Down Expand Up @@ -94,6 +92,9 @@ type consumerConfig struct {
// If not set, it will look for message in the whole table.
// You may set this value when using partitioned table to search just in partitions you are interested in
HistoryLimit time.Duration
// MaxConsumeCount is the maximal number of times a message can be consumed before it is ignored.
// This is a safety mechanism to prevent failure infinite loops when a message causes unhandled panic, OOM etc.
MaxConsumeCount uint

Logger *slog.Logger
}
Expand All @@ -107,6 +108,7 @@ var defaultConsumerConfig = consumerConfig{
MaxParallelMessages: 1,
InvalidMessageCallback: func(context.Context, InvalidMessage, error) {},
Metrics: noop.Meter{},
MaxConsumeCount: 3,
Logger: noopLogger,
}

Expand Down Expand Up @@ -190,6 +192,16 @@ func WithHistoryLimit(d time.Duration) ConsumerOption {
}
}

// WithMaxConsumeCount sets the maximal number of times a message can be consumed before it is ignored.
// Unhandled SIGKILL or uncaught panic, OOM error etc. could lead to consumer failure infinite loop.
// Setting this value to greater than 0 will prevent happening this loop.
// Setting this to value 0 disableS this safe mechanism.
func WithMaxConsumeCount(max uint) ConsumerOption {
return func(c *consumerConfig) {
c.MaxConsumeCount = max
}
}

// WithLogger sets logger. Default is no logging.
func WithLogger(logger *slog.Logger) ConsumerOption {
return func(c *consumerConfig) {
Expand Down Expand Up @@ -349,8 +361,10 @@ func (c *Consumer) generateQuery() string {
sb.WriteString(` created_at < CURRENT_TIMESTAMP AND`)
}
sb.WriteString(` (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP)`)
sb.WriteString(` AND consumed_count < `)
sb.WriteString(strconv.Itoa(maxConsumedCount))
if c.cfg.MaxConsumeCount > 0 {
sb.WriteString(` AND consumed_count < `)
sb.WriteString(strconv.FormatUint(uint64(c.cfg.MaxConsumeCount), 10))
}
sb.WriteString(` AND processed_at IS NULL`)
sb.WriteString(` ORDER BY consumed_count ASC, created_at ASC`)
sb.WriteString(` LIMIT $2`)
Expand Down
12 changes: 12 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ func TestConsumer_generateQuery(t *testing.T) {
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
},
{
name: "scn interval 12 hours abd max consumed count limit disabled",

args: args{
queueName: "testing_queue",
opts: []ConsumerOption{
WithHistoryLimit(12 * time.Hour),
WithMaxConsumeCount(0),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
},
{
name: "with metadata condition",
args: args{queueName: "testing_queue"},
Expand Down

0 comments on commit 447b6ae

Please sign in to comment.