Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: consumer metadata filter #20

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 102 additions & 34 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@ import (
"io"
"log/slog"
"math"
"strconv"
"strings"
"sync"
"time"
"unicode"

"github.com/google/uuid"
"github.com/jackc/pgtype"
"github.com/jmoiron/sqlx"
"github.com/pkg/errors"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -27,6 +27,7 @@ import (
"golang.org/x/sync/semaphore"

"go.dataddo.com/pgq/internal/pg"
"go.dataddo.com/pgq/internal/query"
)

type fatalError struct {
Expand Down Expand Up @@ -99,6 +100,8 @@ type consumerConfig struct {
// MsgProcessingReserveDuration is the duration for which the message is reserved for handling result state.
MessageProcessingReserveDuration time.Duration

MetadataFilters []MetadataFilter

Logger *slog.Logger
}

Expand Down Expand Up @@ -129,7 +132,7 @@ type InvalidMessage struct {

// Consumer is the preconfigured subscriber of the write input messages
type Consumer struct {
db *sql.DB
db *sqlx.DB
queueName string
cfg consumerConfig
handler MessageHandler
Expand Down Expand Up @@ -220,8 +223,35 @@ func WithLogger(logger *slog.Logger) ConsumerOption {
}
}

// MetadataFilter is a filter for metadata. Right now support only direct matching of key/value
type (
MetadataOperation string

MetadataFilter struct {
Key string
Operation MetadataOperation
Value string
}
)

const (
OpEqual MetadataOperation = "="
OpNotEqual MetadataOperation = "<>"
)

func WithMetadataFilter(filter *MetadataFilter) ConsumerOption {
return func(c *consumerConfig) {
filters := c.MetadataFilters
if filter == nil {
filters = make([]MetadataFilter, 0, 1)
}

c.MetadataFilters = append(filters, *filter)
}
}

// NewConsumer creates Consumer with proper settings
func NewConsumer(db *sql.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error) {
func NewConsumer(db *sqlx.DB, queueName string, handler MessageHandler, opts ...ConsumerOption) (*Consumer, error) {
config := defaultConsumerConfig
for _, opt := range opts {
opt(&config)
Expand Down Expand Up @@ -277,7 +307,11 @@ func (c *Consumer) Run(ctx context.Context) error {
if err := c.verifyTable(ctx); err != nil {
return errors.Wrap(err, "verifying table")
}
query := c.generateQuery()
query, err := c.generateQuery()
if err != nil {
return errors.Wrap(err, "generating query")
}

var wg sync.WaitGroup
defer wg.Wait() // wait for handlers to finish
for {
Expand Down Expand Up @@ -318,34 +352,46 @@ func (c *Consumer) verifyTable(ctx context.Context) error {
return nil
}

func (c *Consumer) generateQuery() string {
var sb strings.Builder
sb.WriteString(`UPDATE `)
sb.WriteString(pg.QuoteIdentifier(c.queueName))
sb.WriteString(` SET locked_until = $1`)
sb.WriteString(`, started_at = CURRENT_TIMESTAMP`)
sb.WriteString(`, consumed_count = consumed_count+1`)
sb.WriteString(` WHERE id IN (`)
func (c *Consumer) generateQuery() (*query.Builder, error) {
qb := query.NewBuilder()

qb.WriteString(`UPDATE `)
qb.WriteString(pg.QuoteIdentifier(c.queueName))
qb.WriteString(` SET locked_until = :locked_until`)
qb.WriteString(`, started_at = CURRENT_TIMESTAMP`)
qb.WriteString(`, consumed_count = consumed_count+1`)
qb.WriteString(` WHERE id IN (`)
{
sb.WriteString(`SELECT id FROM `)
sb.WriteString(pg.QuoteIdentifier(c.queueName))
sb.WriteString(` WHERE`)
qb.WriteString(`SELECT id FROM `)
qb.WriteString(pg.QuoteIdentifier(c.queueName))
qb.WriteString(` WHERE`)
if c.cfg.HistoryLimit > 0 {
sb.WriteString(` created_at >= CURRENT_TIMESTAMP - $3::interval AND`)
sb.WriteString(` created_at < CURRENT_TIMESTAMP AND`)
qb.WriteString(` created_at >= CURRENT_TIMESTAMP - (:history_limit)::interval AND`)
qb.WriteString(` created_at < CURRENT_TIMESTAMP AND`)
}
sb.WriteString(` (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP)`)
qb.WriteString(` (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP)`)
if c.cfg.MaxConsumeCount > 0 {
sb.WriteString(` AND consumed_count < `)
sb.WriteString(strconv.FormatUint(uint64(c.cfg.MaxConsumeCount), 10))
qb.WriteString(` AND consumed_count < :max_consume_count`)
}
sb.WriteString(` AND processed_at IS NULL`)
sb.WriteString(` ORDER BY consumed_count ASC, created_at ASC`)
sb.WriteString(` LIMIT $2`)
sb.WriteString(` FOR UPDATE SKIP LOCKED`)

if c.cfg.MetadataFilters != nil && len(c.cfg.MetadataFilters) > 0 {
for i, filter := range c.cfg.MetadataFilters {
if len(filter.Operation) == 0 {
return nil, fatalError{Err: fmt.Errorf("metadata filter operation is empty")}
}

qb.WriteString(fmt.Sprintf(" AND metadata->>:metadata_key_%d %s :metadata_value_%d", i, filter.Operation, i))
}
}

qb.WriteString(` AND processed_at IS NULL`)
qb.WriteString(` ORDER BY consumed_count ASC, created_at ASC`)
qb.WriteString(` LIMIT :limit`)
qb.WriteString(` FOR UPDATE SKIP LOCKED`)
}
sb.WriteString(`) RETURNING id, payload, metadata, consumed_count, locked_until`)
return sb.String()
qb.WriteString(`) RETURNING id, payload, metadata, consumed_count, locked_until`)

return qb, nil
}

func (c *Consumer) handleMessage(ctx context.Context, msg *MessageIncoming) {
Expand Down Expand Up @@ -418,7 +464,7 @@ func prepareCtxTimeout() (func(td time.Duration) context.Context, context.Cancel
return fn, cancel
}

func (c *Consumer) consumeMessages(ctx context.Context, query string) ([]*MessageIncoming, error) {
func (c *Consumer) consumeMessages(ctx context.Context, query *query.Builder) ([]*MessageIncoming, error) {
for {
maxMsg, err := acquireMaxFromSemaphore(ctx, c.sem, int64(c.cfg.MaxParallelMessages))
if err != nil {
Expand Down Expand Up @@ -451,8 +497,8 @@ type pgMessage struct {
LockedUntil pgtype.Timestamptz
}

func (c *Consumer) tryConsumeMessages(ctx context.Context, query string, limit int64) (_ []*MessageIncoming, err error) {
tx, err := c.db.BeginTx(ctx, nil)
func (c *Consumer) tryConsumeMessages(ctx context.Context, query *query.Builder, limit int64) (_ []*MessageIncoming, err error) {
tx, err := c.db.BeginTxx(ctx, nil)
if err != nil {
// TODO not necessary fatal, network could wiggle.
return nil, fatalError{Err: errors.WithStack(err)}
Expand All @@ -472,14 +518,36 @@ func (c *Consumer) tryConsumeMessages(ctx context.Context, query string, limit i
}()

lockedUntil := time.Now().Add(c.cfg.LockDuration)
args := []any{lockedUntil, limit}
if c.cfg.HistoryLimit > 0 {
namedParams := map[string]interface{}{
"locked_until": lockedUntil,
"limit": limit,
}

if query.HasParam("history_limit") {
var scanInterval pgtype.Interval
// time.Duration doesn't ever fail
_ = scanInterval.Set(c.cfg.HistoryLimit)
args = append(args, scanInterval)

namedParams["history_limit"] = scanInterval
}

if query.HasParam("max_consume_count") {
namedParams["max_consume_count"] = c.cfg.MaxConsumeCount
}
rows, err := tx.QueryContext(ctx, query, args...)

if c.cfg.MetadataFilters != nil && len(c.cfg.MetadataFilters) > 0 {
for i, filter := range c.cfg.MetadataFilters {
namedParams[fmt.Sprintf("metadata_key_%d", i)] = filter.Key
namedParams[fmt.Sprintf("metadata_value_%d", i)] = filter.Value
}
}

queryString, err := query.Build(namedParams)
if err != nil {
return nil, errors.WithStack(err)
}

rows, err := sqlx.NamedQueryContext(ctx, tx, queryString, namedParams)
if err != nil {
return nil, errors.WithStack(err)
}
Expand All @@ -505,7 +573,7 @@ func (c *Consumer) tryConsumeMessages(ctx context.Context, query string, limit i
return msgs, nil
}

func (c *Consumer) parseRow(ctx context.Context, rows *sql.Rows) (*MessageIncoming, error) {
func (c *Consumer) parseRow(ctx context.Context, rows *sqlx.Rows) (*MessageIncoming, error) {
var pgMsg pgMessage
if err := rows.Scan(
&pgMsg.ID,
Expand Down
27 changes: 19 additions & 8 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func TestConsumer_generateQuery(t *testing.T) {
{
name: "simple",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scanInterval 12 hours",
Expand All @@ -34,7 +34,17 @@ func TestConsumer_generateQuery(t *testing.T) {
WithHistoryLimit(12 * time.Hour),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - (:history_limit)::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "consume messages with metadata filter",
args: args{
queueName: "testing_queue",
opts: []ConsumerOption{
WithMetadataFilter(&MetadataFilter{Key: "foo", Operation: OpEqual, Value: "bar"}),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND metadata->>:metadata_key_0 = :metadata_value_0 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scn interval 12 hours abd max consumed count limit disabled",
Expand All @@ -46,12 +56,12 @@ func TestConsumer_generateQuery(t *testing.T) {
WithMaxConsumeCount(0),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - (:history_limit)::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "with metadata condition",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "scanInterval 12 hours with metadata condition",
Expand All @@ -61,20 +71,21 @@ func TestConsumer_generateQuery(t *testing.T) {
WithHistoryLimit(12 * time.Hour),
},
},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - $3::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE created_at >= CURRENT_TIMESTAMP - (:history_limit)::interval AND created_at < CURRENT_TIMESTAMP AND (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
{
name: "with negative metadata condition",
args: args{queueName: "testing_queue"},
want: "UPDATE \"testing_queue\" SET locked_until = $1, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < 3 AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT $2 FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
want: "UPDATE \"testing_queue\" SET locked_until = :locked_until, started_at = CURRENT_TIMESTAMP, consumed_count = consumed_count+1 WHERE id IN (SELECT id FROM \"testing_queue\" WHERE (locked_until IS NULL OR locked_until < CURRENT_TIMESTAMP) AND consumed_count < :max_consume_count AND processed_at IS NULL ORDER BY consumed_count ASC, created_at ASC LIMIT :limit FOR UPDATE SKIP LOCKED) RETURNING id, payload, metadata, consumed_count, locked_until",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c, err := NewConsumer(nil, tt.args.queueName, nil, tt.args.opts...)
require.NoError(t, err)
got := c.generateQuery()
require.Equal(t, tt.want, got)
got, err := c.generateQuery()
require.NoError(t, err)
require.Equal(t, tt.want, got.String())
})
}
}
Expand Down
4 changes: 2 additions & 2 deletions example_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package pgq_test

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"log"
"os"
"os/signal"

"github.com/jmoiron/sqlx"
"go.dataddo.com/pgq"
)

Expand Down Expand Up @@ -48,7 +48,7 @@ func (h *Handler) HandleMessage(ctx context.Context, msg *pgq.MessageIncoming) (
}

func ExampleConsumer() {
db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres")
db, err := sqlx.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres")
if err != nil {
log.Fatal("Error opening database:", err)
}
Expand Down
4 changes: 2 additions & 2 deletions example_publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package pgq_test

import (
"context"
"database/sql"
"encoding/json"
"log"
"time"

"github.com/jmoiron/sqlx"
"go.dataddo.com/pgq"
)

Expand All @@ -15,7 +15,7 @@ type PayloadStruct struct {
}

func ExamplePublisher() {
db, err := sql.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres")
db, err := sqlx.Open("postgres", "user=postgres password=postgres host=localhost port=5432 dbname=postgres")
if err != nil {
log.Fatal("Error opening database:", err)
}
Expand Down
4 changes: 2 additions & 2 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ package pgq_test

import (
"context"
"database/sql"
"log/slog"
"os"
"time"

"github.com/jmoiron/sqlx"
"go.dataddo.com/pgq"
"go.opentelemetry.io/otel/metric/noop"
)

var db *sql.DB
var db *sqlx.DB

func ExampleNewConsumer() {
slogger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.21
require (
github.com/google/uuid v1.5.0
github.com/jackc/pgtype v1.14.1
github.com/jmoiron/sqlx v1.3.5
github.com/lib/pq v1.10.2
github.com/pkg/errors v0.9.1
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/metric v1.21.0
Expand Down
Loading
Loading