Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the MaxConsumeCount param configurable #4

Merged
merged 3 commits into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ import (
"go.dataddo.com/pgq/internal/pg"
)

const maxConsumedCount = 3

type fatalError struct {
Err error
}
Expand Down Expand Up @@ -94,6 +92,9 @@ type consumerConfig struct {
// If not set, it will look for message in the whole table.
// You may set this value when using partitioned table to search just in partitions you are interested in
HistoryLimit time.Duration
// MaxConsumeCount is the maximal number of times a message can be consumed before it is ignored.
// This is a safety mechanism to prevent failure infinite loops when a message causes unhandled panic, OOM etc.
MaxConsumeCount uint

Logger *slog.Logger
}
Expand All @@ -107,6 +108,7 @@ var defaultConsumerConfig = consumerConfig{
MaxParallelMessages: 1,
InvalidMessageCallback: func(context.Context, InvalidMessage, error) {},
Metrics: noop.Meter{},
MaxConsumeCount: 3,
Logger: noopLogger,
}

Expand Down Expand Up @@ -190,6 +192,16 @@ func WithHistoryLimit(d time.Duration) ConsumerOption {
}
}

// WithMaxConsumeCount sets the maximal number of times a message can be consumed before it is ignored.
// Unhandled SIGKILL or uncaught panic, OOM error etc. could lead to consumer failure infinite loop.
// Setting this value to greater than 0 will prevent happening this loop.
// Setting this to value 0 disableS this safe mechanism.
func WithMaxConsumeCount(max uint) ConsumerOption {
return func(c *consumerConfig) {
c.MaxConsumeCount = max
}
}

// WithLogger sets logger. Default is no logging.
func WithLogger(logger *slog.Logger) ConsumerOption {
return func(c *consumerConfig) {
Expand Down Expand Up @@ -349,8 +361,10 @@ func (c *Consumer) generateQuery() string {
sb.WriteString(` created_at < CURRENT_TIMESTAMP AND`)
}
sb.WriteString(` (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP)`)
sb.WriteString(` AND consumed_count < `)
sb.WriteString(strconv.Itoa(maxConsumedCount))
if c.cfg.MaxConsumeCount > 0 {
sb.WriteString(` AND consumed_count < `)
sb.WriteString(strconv.FormatUint(uint64(c.cfg.MaxConsumeCount), 10))
}
sb.WriteString(` AND processed_at IS NULL`)
sb.WriteString(` ORDER BY consumed_count ASC, created_at ASC`)
sb.WriteString(` LIMIT $2`)
Expand Down
12 changes: 12 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ func TestConsumer_generateQuery(t *testing.T) {
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
},
{
name: "scn interval 12 hours abd max consumed count limit disabled",

args: args{
queueName: "testing_queue",
opts: []ConsumerOption{
WithHistoryLimit(12 * time.Hour),
WithMaxConsumeCount(0),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
},
{
name: "with metadata condition",
args: args{queueName: "testing_queue"},
Expand Down