Skip to content

Commit

Permalink
Get attempt for consumed msg (#6)
Browse files Browse the repository at this point in the history
It also introduces a breaking change by removing Message interface.
The context cancellation for message consumption has configurable time delta for safe return to the queue.
  • Loading branch information
prochac authored Jan 17, 2024
2 parents 25e98a5 + d6c7037 commit e63f922
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 129 deletions.
15 changes: 10 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ import (
"database/sql"
"encoding/json"
"fmt"
"go.dataddo.com/pgq"

_ "github.com/jackc/pgx/v4/stdlib"

"go.dataddo.com/pgq"
)

func main() {
Expand All @@ -123,14 +125,17 @@ func main() {
// publish the message to the queue
// provide the payload which is the JSON object
// and optional metadata which is the map[string]string
msg := pgq.NewMessage(nil, json.RawMessage(`{"foo":"bar"}`))
msg := &pgq.MessageOutgoing{
Payload: json.RawMessage(`{"foo":"bar"}`),
}
msgId, err := publisher.Publish(context.Background(), queueName, msg)
if err != nil {
panic(err.Error())
}

fmt.Println("Message published with ID:", msgId)
}

```

After the message is successfully published, you can see the new row with given `msgId` in the queue table.
Expand Down Expand Up @@ -208,9 +213,9 @@ func main() {

// we must specify the message handler, which implements simple interface
type handler struct {}
func (h *handler) HandleMessage(_ context.Context, msg pgq.Message) (processed bool, err error) {
fmt.Println("Message payload:", string(msg.Payload()))
return true, nil
func (h *handler) HandleMessage(_ context.Context, msg *pgq.MessageIncoming) (processed bool, err error) {
fmt.Println("Message payload:", string(msg.Payload))
return true, nil
}
```

Expand Down
78 changes: 48 additions & 30 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,14 @@ const (
// // | true | <nil> | processed, no error. |
// // | true | some error | processed, ended with error. Don't retry! |
type MessageHandler interface {
HandleMessage(context.Context, Message) (processed bool, err error)
HandleMessage(context.Context, *MessageIncoming) (processed bool, err error)
}

// MessageHandlerFunc is MessageHandler implementation by simple function.
type MessageHandlerFunc func(context.Context, Message) (processed bool, err error)
type MessageHandlerFunc func(context.Context, *MessageIncoming) (processed bool, err error)

// HandleMessage calls self. It also implements MessageHandler interface.
func (fn MessageHandlerFunc) HandleMessage(ctx context.Context, msg Message) (processed bool, err error) {
func (fn MessageHandlerFunc) HandleMessage(ctx context.Context, msg *MessageIncoming) (processed bool, err error) {
return fn(ctx, msg)
}

Expand All @@ -96,20 +96,24 @@ type consumerConfig struct {
// This is a safety mechanism to prevent failure infinite loops when a message causes unhandled panic, OOM etc.
MaxConsumeCount uint

// MsgProcessingReserveDuration is the duration for which the message is reserved for handling result state.
MessageProcessingReserveDuration time.Duration

Logger *slog.Logger
}

var noopLogger = slog.New(slog.NewTextHandler(io.Discard, &slog.HandlerOptions{Level: slog.Level(math.MaxInt)}))

var defaultConsumerConfig = consumerConfig{
LockDuration: time.Hour,
PollingInterval: 5 * time.Second,
AckTimeout: 1 * time.Second,
MaxParallelMessages: 1,
InvalidMessageCallback: func(context.Context, InvalidMessage, error) {},
Metrics: noop.Meter{},
MaxConsumeCount: 3,
Logger: noopLogger,
LockDuration: time.Hour,
PollingInterval: 5 * time.Second,
AckTimeout: 1 * time.Second,
MessageProcessingReserveDuration: 1 * time.Second,
MaxParallelMessages: 1,
InvalidMessageCallback: func(context.Context, InvalidMessage, error) {},
Metrics: noop.Meter{},
MaxConsumeCount: 3,
Logger: noopLogger,
}

// InvalidMessageCallback defines what should happen to messages which are identified as invalid.
Expand Down Expand Up @@ -175,6 +179,13 @@ func WithMetrics(m metric.Meter) ConsumerOption {
}
}

// WithMessageProcessingReserveDuration sets the duration for which the message is reserved for handling result state.
func WithMessageProcessingReserveDuration(d time.Duration) ConsumerOption {
return func(c *consumerConfig) {
c.MessageProcessingReserveDuration = d
}
}

// WithInvalidMessageCallback sets callback for invalid messages.
func WithInvalidMessageCallback(fn InvalidMessageCallback) ConsumerOption {
return func(c *consumerConfig) {
Expand Down Expand Up @@ -280,7 +291,7 @@ func (c *Consumer) Run(ctx context.Context) error {
}
wg.Add(len(msgs))
for _, msg := range msgs {
go func(msg *message) {
go func(msg *MessageIncoming) {
defer wg.Done()
defer c.sem.Release(1)
c.handleMessage(ctx, msg)
Expand Down Expand Up @@ -370,19 +381,19 @@ func (c *Consumer) generateQuery() string {
sb.WriteString(` LIMIT $2`)
sb.WriteString(` FOR UPDATE SKIP LOCKED`)
}
sb.WriteString(`) RETURNING id, payload, metadata`)
sb.WriteString(`) RETURNING id, payload, metadata, consumed_count, locked_until`)
return sb.String()
}

func (c *Consumer) handleMessage(ctx context.Context, msg *message) {
ctx, cancel := context.WithTimeout(ctx, c.cfg.LockDuration)
func (c *Consumer) handleMessage(ctx context.Context, msg *MessageIncoming) {
ctx, cancel := context.WithDeadline(ctx, msg.Deadline)
defer cancel()

ctxTimeout, cancel := prepareCtxTimeout()
defer cancel()
// TODO configurable Propagator
propagator := otel.GetTextMapPropagator()
carrier := propagation.MapCarrier(msg.metadata)
carrier := propagation.MapCarrier(msg.Metadata)
ctx = propagator.Extract(ctx, carrier)

ctx, span := otel.Tracer("pgq").Start(ctx, "HandleMessage")
Expand All @@ -405,7 +416,7 @@ func (c *Consumer) handleMessage(ctx context.Context, msg *message) {
"error", err.Error(),
"ackTimeout", c.cfg.AckTimeout,
"reason", reason,
"msg.metadata", msg.metadata,
"msg.metadata", msg.Metadata,
)
}
return
Expand All @@ -419,7 +430,7 @@ func (c *Consumer) handleMessage(ctx context.Context, msg *message) {
"error", err,
"ackTimeout", c.cfg.AckTimeout,
"reason", discardReason,
"msg.metadata", msg.metadata,
"msg.metadata", msg.Metadata,
)
}
return
Expand All @@ -429,7 +440,7 @@ func (c *Consumer) handleMessage(ctx context.Context, msg *message) {
c.cfg.Logger.ErrorContext(ctx, "pgq: ack failed",
"error", err,
"ackTimeout", c.cfg.AckTimeout,
"msg.metadata", msg.metadata,
"msg.metadata", msg.Metadata,
)
}
}
Expand All @@ -444,7 +455,7 @@ func prepareCtxTimeout() (func(td time.Duration) context.Context, context.Cancel
return fn, cancel
}

func (c *Consumer) consumeMessages(ctx context.Context, query string) ([]*message, error) {
func (c *Consumer) consumeMessages(ctx context.Context, query string) ([]*MessageIncoming, error) {
for {
maxMsg, err := acquireMaxFromSemaphore(ctx, c.sem, int64(c.cfg.MaxParallelMessages))
if err != nil {
Expand All @@ -470,12 +481,14 @@ func (c *Consumer) consumeMessages(ctx context.Context, query string) ([]*messag
}

type pgMessage struct {
ID pgtype.UUID
Payload pgtype.JSONB
Metadata pgtype.JSONB
ID pgtype.UUID
Payload pgtype.JSONB
Metadata pgtype.JSONB
Attempt pgtype.Int4
LockedUntil pgtype.Timestamptz
}

func (c *Consumer) tryConsumeMessages(ctx context.Context, query string, limit int64) (_ []*message, err error) {
func (c *Consumer) tryConsumeMessages(ctx context.Context, query string, limit int64) (_ []*MessageIncoming, err error) {
tx, err := c.db.BeginTx(ctx, nil)
if err != nil {
// TODO not necessary fatal, network could wiggle.
Expand Down Expand Up @@ -509,7 +522,7 @@ func (c *Consumer) tryConsumeMessages(ctx context.Context, query string, limit i
}
defer rows.Close()

var msgs []*message
var msgs []*MessageIncoming
for rows.Next() {
msg, err := c.parseRow(ctx, rows)
if err != nil {
Expand All @@ -529,12 +542,14 @@ func (c *Consumer) tryConsumeMessages(ctx context.Context, query string, limit i
return msgs, nil
}

func (c *Consumer) parseRow(ctx context.Context, rows *sql.Rows) (*message, error) {
func (c *Consumer) parseRow(ctx context.Context, rows *sql.Rows) (*MessageIncoming, error) {
var pgMsg pgMessage
if err := rows.Scan(
&pgMsg.ID,
&pgMsg.Payload,
&pgMsg.Metadata,
&pgMsg.Attempt,
&pgMsg.LockedUntil,
); err != nil {
if isErrorCode(err, undefinedTableErrCode, undefinedColumnErrCode) {
return nil, fatalError{Err: err}
Expand Down Expand Up @@ -584,23 +599,26 @@ func (c *Consumer) discardInvalidMsg(ctx context.Context, id pgtype.UUID, err er
}
}

func (c *Consumer) finishParsing(pgMsg pgMessage) (*message, error) {
msg := &message{
func (c *Consumer) finishParsing(pgMsg pgMessage) (*MessageIncoming, error) {
msg := &MessageIncoming{
id: uuid.UUID(pgMsg.ID.Bytes),
once: sync.Once{},
ackFn: c.ackMessage(c.db, pgMsg.ID),
nackFn: c.nackMessage(c.db, pgMsg.ID),
discardFn: c.discardMessage(c.db, pgMsg.ID),
}
var err error
msg.payload, err = parsePayload(pgMsg)
msg.Payload, err = parsePayload(pgMsg)
if err != nil {
return msg, errors.Wrap(err, "parsing payload")
}
msg.metadata, err = parseMetadata(pgMsg)
msg.Metadata, err = parseMetadata(pgMsg)
if err != nil {
return msg, errors.Wrap(err, "parsing metadata")
}
msg.Attempt = int(pgMsg.Attempt.Int)
msg.maxConsumedCount = c.cfg.MaxConsumeCount
msg.Deadline = pgMsg.LockedUntil.Time.Add(-c.cfg.AckTimeout).Add(-c.cfg.MessageProcessingReserveDuration)
return msg, nil
}

Expand Down
12 changes: 6 additions & 6 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestConsumer_generateQuery(t *testing.T) {
{
name: "simple",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scanInterval 12 hours",
Expand All @@ -34,7 +34,7 @@ func TestConsumer_generateQuery(t *testing.T) {
WithHistoryLimit(12 * time.Hour),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scn interval 12 hours abd max consumed count limit disabled",
Expand All @@ -46,12 +46,12 @@ func TestConsumer_generateQuery(t *testing.T) {
WithMaxConsumeCount(0),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "with metadata condition",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scanInterval 12 hours with metadata condition",
Expand All @@ -61,12 +61,12 @@ func TestConsumer_generateQuery(t *testing.T) {
WithHistoryLimit(12 * time.Hour),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "with negative metadata condition",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata",
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
}
for _, tt := range tests {
Expand Down
6 changes: 3 additions & 3 deletions example_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

type Handler struct{}

func (h *Handler) HandleMessage(ctx context.Context, msg pgq.Message) (res bool, err error) {
func (h *Handler) HandleMessage(ctx context.Context, msg *pgq.MessageIncoming) (res bool, err error) {
defer func() {
r := recover()
if r == nil {
Expand All @@ -30,15 +30,15 @@ func (h *Handler) HandleMessage(ctx context.Context, msg pgq.Message) (res bool,
err = fmt.Errorf("%v", r)
}
}()
if msg.Metadata()["heaviness"] == "heavy" {
if msg.Metadata["heaviness"] == "heavy" {
// nack the message, it will be retried
// Message won't contain error detail in the database.
return pgq.MessageNotProcessed, nil
}
var myPayload struct {
Foo string `json:"foo"`
}
if err := json.Unmarshal(msg.Payload(), &myPayload); err != nil {
if err := json.Unmarshal(msg.Payload, &myPayload); err != nil {
// discard the message, it will not be retried
// Message will contain error detail in the database.
return pgq.MessageProcessed, fmt.Errorf("invalid payload: %v", err)
Expand Down
18 changes: 9 additions & 9 deletions example_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,19 @@ func ExamplePublisher() {
p := pgq.NewPublisher(db)
ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
payload, _ := json.Marshal(PayloadStruct{Foo: "bar"})
messages := []pgq.Message{
pgq.NewMessage(
pgq.Metadata{
messages := []*pgq.MessageOutgoing{
{
Metadata: pgq.Metadata{
"version": "1.0",
},
json.RawMessage(payload),
),
pgq.NewMessage(
pgq.Metadata{
Payload: json.RawMessage(payload),
},
{
Metadata: pgq.Metadata{
"version": "1.0",
},
json.RawMessage(payload),
),
Payload: json.RawMessage(payload),
},
}
ids, err := p.Publish(ctx, queueName, messages...)
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@ func ExampleNewConsumer() {
pgq.WithLockDuration(10*time.Minute),
pgq.WithPollingInterval(500*time.Millisecond),
pgq.WithAckTimeout(5*time.Second),
pgq.WithMessageProcessingReserveDuration(5*time.Second),
pgq.WithMaxParallelMessages(42),
pgq.WithMetrics(noop.Meter{}),
pgq.WithHistoryLimit(24*time.Hour),
pgq.WithLogger(slogger),
pgq.WithInvalidMessageCallback(func(ctx context.Context, msg pgq.InvalidMessage, err error) {
// message payload and/or metadata are not JSON object.
// message Payload and/or Metadata are not JSON object.
// The message will be discarded.
slogger.Warn("invalid message",
"error", err,
Expand Down
14 changes: 7 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,19 @@ module go.dataddo.com/pgq
go 1.21

require (
github.com/google/uuid v1.3.1
github.com/jackc/pgtype v1.14.0
github.com/google/uuid v1.5.0
github.com/jackc/pgtype v1.14.1
github.com/pkg/errors v0.9.1
go.opentelemetry.io/otel v1.19.0
go.opentelemetry.io/otel/metric v1.19.0
golang.org/x/sync v0.3.0
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/metric v1.21.0
golang.org/x/sync v0.6.0
)

require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/jackc/pgio v1.0.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
)

// Test dependencies
Expand Down
Loading

0 comments on commit e63f922

Please sign in to comment.