diff --git a/README.md b/README.md index a12a9d6..287f63d 100644 --- a/README.md +++ b/README.md @@ -5,25 +5,39 @@ pgsq logo -PGQ is a [Go](http://golang.org) package that provides a queuing mechanism for your Go applications built on top of the postgres database. -It enables developers to implement efficient and reliable, but simple message queues for their microservices architecture using the familiar postgres infrastructure. +PGQ is a [Go](http://golang.org) package that provides a queuing mechanism for your Go applications. +It is built on top of the postgres database and enables developers to implement efficient and reliable, +but simple message queues for their services composed architecture using the familiar postgres infrastructure. ## Features -- __Postgres__-backed: Leverages the power of PostgreSQL to store and manage queues. -- __Reliable__: Guarantees message persistence and delivery, even if facing the failures. +- __Postgres__-backed: Leverages the power of SQL to store and manage queues. +- __Reliable__: Guarantees message persistence and delivery, even if facing the various failures. - __Transactional__: Supports transactional message handling, ensuring consistency. - __Simple usage__: Provides a clean and easy-to-use API for interacting with the queue. -- __Efficient__: Optimized for high throughput and low-latency message processing. +- __Efficient__: Optimized for medium to long jobs durations. + +## Why PGQ? +- __Postgres__: The `postgres` just works and is feature rich, scalable and performant. +- __SQL__: You are already familiar with SQLm right? No need to learn anything new. +- __Stack__: You do not have to maintain/manage/administer/patch any additional message broker component. +- __Simplicity__: The `pgq` is a simple and straightforward solution for your messaging needs. +- __Usability__: The `pgq` can be used for many scenarios. + +Of course, you can implement your own implementation or use other messaging technologies, +but why would you do that when you can use the `postgres` which is already there and is a proven technology? ## When to pick PGQ? -Even though there are other great technologies and tools for complex messaging including the robust routing configuration, sometimes you do not need it, and you can be just fine with the simpler tooling. +Even though there are other great technologies and tools for complex messaging including the robust routing configuration, +sometimes you do not need it, and you can be just fine with the simpler tooling. Pick pgq if you: -- need to distribute the traffic fairly among your app replicas and int time to protect each of them from the overload -- need the out-of-a-box observability -- already use `postgres` and you don't want to complicate your tech stack -- are ok with basic routing +- need to distribute the traffic fairly among your app replicas +- need to protect your services from overload +- want the out-of-a-box observability of your queues +- want to use SQL for managing your queues +- already use `postgres` and you want to keep your tech stack simple +- don't want to manage another technology and learn its specifics No need to bring the new technology to your existing stack when you can be pretty satisfied with `postgres`. Write the consumers and publishers in various languages with the simple idea behind - __use postgres table as a queue__. @@ -34,6 +48,12 @@ As the `pgq` queue table contains the records of already processed jobs too, you Pgq is intended to replace the specialized message brokers in environments where you already use postgres, and you want clean, simple and straightforward communication among your services. +## Basic principles +- Every `queue` is the single postgres `table`. +- You maintain the table on your own. You can extend it as you need. +- Publishers add new rows to the queue table. +- Consumers update the pgq mandatory fields of the rows in the queue table. + ## Installation To install PGQ, use the go get command: ``` @@ -44,7 +64,10 @@ go get go.dataddo.com/pgq@latest Prerequisites: In order to make the `pgq` functional, there must exist the `postgres table` with all the necessary `pgq` fields. -You can create the table on your own, or you can use the query generator. You usually run the `create table` command just once during the application setup. +You can create the table on your own with classic `CREATE TABLE ...`, or you can use the query generator to generate the query for you. +The generated query creates the queue table alongside with indexes which improve the consumer queries performance. + +You usually run the setup commands just once during the queue setup. ```go package main @@ -57,11 +80,12 @@ import ( func main() { queueName := "my_queue" - // create string contains the "CREATE TABLE my_queue ..." - // which you may use for table creation. - // You may also use the "GenerateDropTableQuery" for dropping the table + // create string contains the "CREATE TABLE queueName ..." + // which you may use for table and indexes creation. create := schema.GenerateCreateTableQuery(queueName) fmt.Println(create) + + // You may also use the "GenerateDropTableQuery" for dropping all the pgq artifacts (down migration) } ``` @@ -186,7 +210,7 @@ func main() { type handler struct {} func (h *handler) HandleMessage(_ context.Context, msg pgq.Message) (processed bool, err error) { fmt.Println("Message payload:", string(msg.Payload())) - return true, nil + return true, nil } ``` @@ -255,7 +279,7 @@ Get the number of messages currently being processed. Another good candidate for select count(*) from queue_name where processed_at is null and locked_until is null; ``` -_Tip: You can use the `pgq` table as a source for your monitoring system._ +_Tip: You can use the `pgq` table as a source for your monitoring system and enable alerting for suspicious values. It is usually good not to monitor only the peak size but also the empty queues, which may indicate some troubles on publishers side._ The queue length and messages being processed example in a Grafana panel populated by data from Prometheus fectehd from postgres queue table ### Processed messages @@ -285,6 +309,18 @@ The pgq internally uses the classic `UPDATE` + `SELECT ... FOR UPDATE` postgres The Select statement is using the `SKIP LOCKED` clause which enables the consumer to fetch the messages in the queue in the order they were created, and doesn't get stuck on the locked rows. +### Consumer loop + +The consumer loop diagram +Consumers periodically ask the queue table for the new messages to be processed. + +- When there is no message to be processed, consumer idles for a `polling interval` duration and then tries again. +- When the consumer finds the message to be processed, it locks it by updating the `locked_until` field with the `lock duration` timestamp. +- If the consumer fails to update the `locked_until` field, it means that another consumer has already locked the message, and the current consumer tries to find the message again. +- If the consumer successfully locks the message, it starts to process it. +- When the consumer finishes the processing, it updates the `processed_at` field with the current timestamp. + + ## Optimizing performance When using the pgq in production environment you should focus on the following areas to improve the performance: diff --git a/docs/consumer_loop.png b/docs/consumer_loop.png new file mode 100644 index 0000000..98bc6ca Binary files /dev/null and b/docs/consumer_loop.png differ