Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial commit #1

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 133 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,133 @@
Treetracker queue client for Python
# Treetracker queue Python client for PostgreSQL

This Python project provides asynchronous pub-sub (publish-subscribe) functionality using PostgreSQL as a message broker, leveraging `aiopg` for asynchronous PostgreSQL connections and `psycopg2` for handling PostgreSQL notifications.

## Features

- **Subscribe**: Listen to PostgreSQL channels for incoming notifications.
- **Publish**: Send messages to a PostgreSQL queue and notify subscribers.

## Requirements

Make sure you have the following installed:

- **Python 3.7+**
- **PostgreSQL 9.0+**
- **aiopg**: Asynchronous PostgreSQL driver for Python.
- **psycopg2**: PostgreSQL database adapter for Python.

Install the required Python packages using `pip`:

```bash
pip install aiopg psycopg2
```

## Database Setup

Before using the client, you will need to set up the necessary PostgreSQL table for testing locally otherwise skip this part:

```sql
CREATE SCHEMA queue; -- creates a schema called queue
CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; -- helps us generate uuids
CREATE TABLE queue.message (
id uuid PRIMARY KEY DEFAULT uuid_generate_v4(),
channel text,
data json,
created_at timestamptz,
updated_at timestamptz
); -- creates a table with columns id, channel, data, created_at & updated_at
ALTER TABLE queue.message ALTER COLUMN created_at SET DEFAULT now();
ALTER TABLE queue.message ALTER COLUMN updated_at SET DEFAULT now();
-- above two lines make created_at and updated_at columns to be autopopulated

CREATE OR REPLACE FUNCTION queue.new_message_notify() RETURNS TRIGGER AS $$
DECLARE
BEGIN
PERFORM pg_notify(cast(NEW.channel as text), row_to_json(new)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER new_insert_trigger BEFORE INSERT ON queue.message
FOR EACH ROW EXECUTE PROCEDURE queue.new_message_notify();
```

## Usage

### 1. Subscribing to a Channel

To subscribe to a PostgreSQL channel, you can use the `subscribe` method of the `Client` class. It continuously listens for notifications on the given channel.

```python
import asyncio
import aiopg
from client import Client

dsn = 'dbname=test user=postgres password=yourpassword host=localhost'

async def run_subscriber():
async with aiopg.connect(dsn) as conn:
await Client.subscribe(conn, 'my_channel')

# Run the subscriber
asyncio.run(run_subscriber())
```

### 2. Publishing to a Channel

To publish a message to a PostgreSQL channel, use the `publish` method. It inserts a message into the `queue.message` table and notifies listeners.

```python
import asyncio
from client import Client

dsn = 'dbname=test user=postgres password=yourpassword host=localhost'

async def run_publisher():
await Client.publish(dsn, 'my_channel', 'Hello, PostgreSQL!')

# Run the publisher
asyncio.run(run_publisher())
```

### Example Output

- When you publish a message, you’ll see:

```
Postgres message dispatch success: (1, 'my_channel', 'Hello, PostgreSQL!')
```

- When you subscribe to a channel, you’ll receive:

```
Receive <- Hello, PostgreSQL!
```

## Methods

### `subscribe(conn, channel: str)`

Listens for notifications on a PostgreSQL channel.

- **Parameters**:
- `conn`: The `aiopg.Connection` object.
- `channel`: The PostgreSQL channel name.

- **Returns**: Prints out the messages received from the channel.

### `publish(dsn: str, channel: str, data: str)`

Publishes a message to a PostgreSQL channel by inserting data into the `queue.message` table.

- **Parameters**:
- `dsn`: The PostgreSQL connection string.
- `channel`: The PostgreSQL channel to notify.
- `data`: The message to be published.

- **Returns**: Inserts a row into the `queue.message` table and notifies the channel subscribers.

## Error Handling

- The code catches and handles common PostgreSQL errors during subscription and publishing.
- It also ensures that listeners receive a 'finish' message, signaling the end of notifications.
61 changes: 61 additions & 0 deletions client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import aiopg
import psycopg2
import asyncio

class Client:


async def subscribe(conn, channel: str):
"""
Listen for notifications on a PostgreSQL channel.

Args:
conn (aiopg.Connection): The aiopg connection.
channel (str): The PostgreSQL channel to listen to.
"""
async with conn.cursor() as cur:
await cur.execute(f"LISTEN {channel}")

# Continuously listen for notifications
while True:
try:
# Wait and get the next notification message
msg = await conn.notifies.get()
except psycopg2.Error as ex:
print("ERROR: ", ex)
return

if msg.payload == "finish":
return
else:
print(f"Receive <- {msg.payload}")


async def publish(dsn, channel: str, data: str):
"""
Publish a message to the PostgreSQL queue by inserting data into the table.

Args:
dsn (str): The PostgreSQL database connection string.
channel (str): The channel where the message is to be dispatched.
data (str): The data to be inserted into the queue.
"""
# SQL query to insert data into the queue.message table
query = "INSERT INTO queue.message(channel, data) VALUES (%s, %s) RETURNING *"
values = (channel, data)

async with aiopg.connect(dsn) as conn:
async with conn.cursor() as cur:
try:
# Execute the SQL query with the provided values
await cur.execute(query, values)

# Fetch and print the result from the database
result = await cur.fetchone()
print(f"Postgres message dispatch success: {result}")
except Exception as error:
print(f"Error Occurred -> {error}")

finally:
# This is often useful to let listeners know that no more messages will be sent.
await cur.execute(f"NOTIFY {channel}, 'finish'")
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
psycopg[binary]
asyncio
aiopg