Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Postgres] Unable to ack messages due to improper cast to xid8 #528

Open
braunsonm opened this issue Dec 21, 2024 · 0 comments
Open

[Postgres] Unable to ack messages due to improper cast to xid8 #528

braunsonm opened this issue Dec 21, 2024 · 0 comments

Comments

@braunsonm
Copy link

braunsonm commented Dec 21, 2024

I'm working on a watermill integration using Postgres with the following schema:

CREATE TABLE IF NOT EXISTS public.watermill_example
(
    "offset" bigint NOT NULL GENERATED BY DEFAULT AS IDENTITY ( INCREMENT 1 START 1 MINVALUE 1 MAXVALUE 9223372036854775807 CACHE 1 ),
    uuid character varying(36) COLLATE pg_catalog."default" NOT NULL,
    created_at timestamp without time zone NOT NULL DEFAULT now(),
    payload bytea,
    metadata bytea,
    transaction_id xid8 NOT NULL,
    CONSTRAINT watermill_example_pkey PRIMARY KEY ("offset", transaction_id)
)

CREATE TABLE IF NOT EXISTS public.watermill_offsets_example
(
    consumer_group character varying(255) COLLATE pg_catalog."default" NOT NULL,
    offset_acked bigint,
    last_processed_transaction_id xid8 NOT NULL,
    CONSTRAINT watermill_offsets_example_pkey PRIMARY KEY (consumer_group)
)

However after a message is consumed the first time, the logs are full of errors when trying to ACK the success

[watermill] 2024/12/21 00:27:19.044828 subscriber.go:435: 	level=DEBUG msg="Message acked by subscriber" consumer_group=api err="could not get args for acking the message: ERROR: column "last_processed_transaction_id" is of type xid8 but expression is of type integer (SQLSTATE 42804)" msg_uuid=0c5c5b21-e92f-4176-a1a9-bcf4b3358a9d subscriber_id=01JFK9CZVXSFRWT7V4Z1EMWWXQ topic=example
[watermill] 2024/12/21 00:27:19.045498 backoff_manager.go:58: 	level=ERROR msg="Error querying for message" consumer_group=api err="could not get args for acking the message: ERROR: column "last_processed_transaction_id" is of type xid8 but expression is of type integer (SQLSTATE 42804)" subscriber_id=01JFK9CZVXSFRWT7V4Z1EMWWXQ topic=examplewait_time=1s 

The code is fairly standard from what I could tell in the docs:

func setupPubSub(db *gorm.DB, logger watermill.LoggerAdapter) (*sql.Publisher, *sql.Subscriber) {
	stdLibSqlDB, err := db.DB()
	if err != nil {
		panic(err)
	}

	deadline := time.Minute * 30
	sub, err := sql.NewSubscriber(
		stdLibSqlDB,
		sql.SubscriberConfig{
			ConsumerGroup:  "api",
			SchemaAdapter:  sql.DefaultPostgreSQLSchema{},
			OffsetsAdapter: sql.DefaultPostgreSQLOffsetsAdapter{},
			ResendInterval: time.Minute * 5,
			AckDeadline:    &deadline,
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	pub, err := sql.NewPublisher(
		stdLibSqlDB,
		sql.PublisherConfig{
			SchemaAdapter: sql.DefaultPostgreSQLSchema{},
		},
		logger,
	)
	if err != nil {
		panic(err)
	}

	return pub, sub
}

func setupWatermillRouter(logger watermill.LoggerAdapter) (*message.Router, error) {
	router, err := message.NewRouter(message.RouterConfig{}, logger)
	if err != nil {
		log.Println(fmt.Errorf("Error creating eventing router: %w", err))
	}

	// SignalsHandler will gracefully shutdown Router when SIGTERM is received.
	// You can also close the router by just calling `r.Close()`.
	router.AddPlugin(plugin.SignalsHandler)

	// Router level middleware are executed for every message sent to the router
	router.AddMiddleware(
		(&middleware.Deduplicator{
			Timeout: time.Minute * 30,
		}).Middleware,

		// CorrelationID will copy the correlation id from the incoming message's metadata to the produced messages
		middleware.CorrelationID,

		// The handler function is retried if it returns an error.
		// After MaxRetries, the message is Nacked and it's up to the PubSub to resend it.
		middleware.Retry{
			MaxRetries:          3,
			InitialInterval:     time.Millisecond * 100,
			RandomizationFactor: 2.5,
			Logger:              logger,
		}.Middleware,

		// Recoverer handles panics from handlers.
		// In this case, it passes them as errors to the Retry middleware.
		middleware.Recoverer,
	)

	return router, err
}

func main() {
       // ... snip some gorm DB connection

	logger := watermill.NewStdLogger(true, false)
	router, err := setupWatermillRouter(logger)
	if err != nil {
		log.Fatalln(err)
	}
	pub, sub := setupPubSub(db, logger)

       // Register handlers
      // ... snip
}

This error makes sense to me as the Go SQL driver will try to write the transaction_id as an integer, which is not supported in Postgres into an xid8 column. Hence this workaround "solves" the issue but results in two casts

func (a OffsetsAdapter) AckMessageQuery(topic string, row sql.Row, consumerGroup string) sql.Query {
	ackQuery := `INSERT INTO ` + a.MessagesOffsetsTable(topic) + `(offset_acked, last_processed_transaction_id, consumer_group)
	VALUES
		($1, $2::text::xid8, $3) -- THIS IS THE LINE THAT IS CHANGED
	ON CONFLICT
		(consumer_group)
	DO UPDATE SET
		offset_acked = excluded.offset_acked,
		last_processed_transaction_id = excluded.last_processed_transaction_id`

	return sql.Query{ackQuery, []any{row.Offset, row.ExtraData["transaction_id"], consumerGroup}}
}

So I guess my question is, how does this normally work? This is with Postgres 15.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant