Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(common): added postgress async pool wrapper #1631

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions waku/common/postgres.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import
./postgres/common,
./postgres/connection,
./postgres/asyncpool

export
common,
connection,
asyncpool
176 changes: 176 additions & 0 deletions waku/common/postgres/asyncpool.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
# Simple async pool driver for postgress.
# Inspired by: https://github.com/treeform/pg/
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/sequtils,
stew/results,
chronicles,
chronos
import
./common,
./connection

logScope:
topics = "postgres asyncpool"


## Database connection pool options

type PgAsyncPoolOptions* = object
minConnections: int
maxConnections: int

func init*(T: type PgAsyncPoolOptions, minConnections: Positive = 1, maxConnections: Positive = 5): T =
if minConnections > maxConnections:
raise newException(Defect, "maxConnections must be greater or equal to minConnections")

PgAsyncPoolOptions(
minConnections: minConnections,
maxConnections: maxConnections
)

func minConnections*(options: PgAsyncPoolOptions): int =
options.minConnections

func maxConnections*(options: PgAsyncPoolOptions): int =
options.maxConnections


## Database connection pool

type PgAsyncPoolState {.pure.} = enum
Live,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe use Closed as the default state (first element of the enum) to avoid bugs

Closing,
Closed

type
## Database connection pool
PgAsyncPool* = ref object
connOptions: PgConnOptions
poolOptions: PgAsyncPoolOptions

state: PgAsyncPoolState
conns: seq[DbConn]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I would have created a new type to hold the DbConn & bool instead of two seq

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's another possibility, yes 🙂 Some parts come from treeforms's implementation. This is one of them.

busy: seq[bool]

func isClosing*(pool: PgAsyncPool): bool =
pool.state == PgAsyncPoolState.Closing

func isLive*(pool: PgAsyncPool): bool =
pool.state == PgAsyncPoolState.Live

func isBusy*(pool: PgAsyncPool): bool =
pool.busy.allIt(it)



proc close*(pool: var PgAsyncPool): Future[PgResult[void]] {.async.} =
## Gracefully wait and close all openned connections
if pool.state == PgAsyncPoolState.Closing:
while true:
await sleepAsync(0.milliseconds) # Do not block the async runtime
return ok()

pool.state = PgAsyncPoolState.Closing

# wait for the connections to be released and close them, without
# blocking the async runtime
while pool.busy.anyIt(it):
await sleepAsync(0.milliseconds)

for i in 0..<pool.conns.len:
if pool.busy[i]:
continue

pool.busy[i] = false
pool.conns[i].close()


proc forceClose(pool: var PgAsyncPool) =
## Close all the connections in the pool.
for i in 0..<pool.conns.len:
pool.busy[i] = false
pool.conns[i].close()

pool.state = PgAsyncPoolState.Closed

proc newConnPool*(connOptions: PgConnOptions, poolOptions: PgAsyncPoolOptions): Result[PgAsyncPool, string] =
## Create a new connection pool.
var pool = PgAsyncPool(
connOptions: connOptions,
poolOptions: poolOptions,
state: PgAsyncPoolState.Live,
conns: newSeq[DbConn](poolOptions.minConnections),
busy: newSeq[bool](poolOptions.minConnections),
)

for i in 0..<poolOptions.minConnections:
let connRes = open(connOptions)

# Teardown the opened connections if we failed to open all of them
if connRes.isErr():
pool.forceClose()
return err(connRes.error)

pool.conns[i] = connRes.get()
pool.busy[i] = false

ok(pool)


proc getConn*(pool: var PgAsyncPool): Future[PgResult[DbConn]] {.async.} =
## Wait for a free connection or create if max connections limits have not been reached.
if not pool.isLive():
return err("pool is not live")

# stablish new connections if we are under the limit
if pool.isBusy() and pool.conns.len < pool.poolOptions.maxConnections:
let connRes = open(pool.connOptions)
if connRes.isOk():
let conn = connRes.get()
pool.conns.add(conn)
pool.busy.add(true)

return ok(conn)
else:
warn "failed to stablish a new connection", msg = connRes.error

# wait for a free connection without blocking the async runtime
while pool.isBusy():
await sleepAsync(0.milliseconds)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

5 ms better, I think

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, you can probably do a poll-free version by using an Queue that would you check in relaseConn (or maybe AsyncQueue)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncQueue? 👀 👀 👀 👀 👀 👀 👀 👀


for index in 0..<pool.conns.len:
if pool.busy[index]:
continue

pool.busy[index] = true
return ok(pool.conns[index])

proc releaseConn(pool: var PgAsyncPool, conn: DbConn) =
## Mark the connection as released.
for i in 0..<pool.conns.len:
if pool.conns[i] == conn:
pool.busy[i] = false


proc query*(pool: var PgAsyncPool, query: SqlQuery, args: seq[string]): Future[PgResult[seq[Row]]] {.async.} =
## Runs the SQL query getting results.
let conn = ? await pool.getConn()
defer: pool.releaseConn(conn)

return await rows(conn, query, args)

proc exec*(pool: var PgAsyncPool, query: SqlQuery, args: seq[string]): Future[PgResult[void]] {.async.} =
## Runs the SQL query without results.
let conn = ? await pool.getConn()
defer: pool.releaseConn(conn)

let res = await rows(conn, query, args)
if res.isErr():
return err(res.error)

return ok()
9 changes: 9 additions & 0 deletions waku/common/postgres/common.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}


import stew/results

type PgResult*[T] = Result[T, string]
106 changes: 106 additions & 0 deletions waku/common/postgres/connection.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
stew/results,
chronicles,
chronos

import ./common

include db_postgres


logScope:
topics = "postgres connection"


## Connection options

type PgConnOptions* = object
connection: string
user: string
password: string
database: string

func init*(T: type PgConnOptions, connection, user, password, database: string): T =
PgConnOptions(
connection: connection,
user: user,
password: password,
database: database
)

func connection*(options: PgConnOptions): string =
options.connection

func user*(options: PgConnOptions): string =
options.user

func password*(options: PgConnOptions): string =
options.password

func database*(options: PgConnOptions): string =
options.database


## Connection management

proc error(db: DbConn): string =
## Extract the error message from the database connection.
$pqErrorMessage(db)


proc open*(options: PgConnOptions): common.PgResult[DbConn] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This proc is not actually async, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No... It is sync. Just a convenience wrapper that takes the Pg options and returns a result.

Copy link
Contributor

@Menduist Menduist Mar 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So personally I would just have a numConnections instead of min/max, don't want my program to randomly do blocking IO when I reach an arbitrary limit. Since the connections >min are not closed, you would probably end up with always maxConnections anyway
Though would also have to check what happens if for instance, the server restarts (or worst, just shuts down)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though would also have to check what happens if, for instance, the server restarts (or worst, just shuts down)

Absolutely! There are a thousand corner cases that I am not covering here. But for time's sake, I am trying to have a little MVP covering the "happy-path" cases, and we can iteratively improve the implementation.

## Open a new connection.
let conn = open(
options.connection,
options.user,
options.password,
options.database
)

if conn.status != CONNECTION_OK:
var reason = conn.error
if reason.len > 0:
reason = "unknown reason"

return err("failed to connect to database: " & reason)

ok(conn)


proc rows*(db: DbConn, query: SqlQuery, args: seq[string]): Future[common.PgResult[seq[Row]]] {.async.} =
## Runs the SQL getting results.
if db.status != CONNECTION_OK:
return err("connection is not ok: " & db.error)

let success = pqsendQuery(db, dbFormat(query, args))
if success != 1:
return err(db.error)

while true:
let success = pqconsumeInput(db)
if success != 1:
return err(db.error)

if pqisBusy(db) == 1:
await sleepAsync(0.milliseconds) # Do not block the async runtime
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good for a PoC, but ideally we would register the FD in chronos and wait for data to be available

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully agree 😁

If we can find a Sans-IO version of the Postgres protocol we can try that. For now... I think that, if we don't want to reimplement the Postgres protocol, we should stick to this approach.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to reimplement postgres

https://nim-lang.org/docs/postgres.html#pqsocket%2CPPGconn
This proc will give you the FD of the underlying postgres connection

You would you have to do something like:

while pqIsBusy(db) == 1:
  await waitForRead(AsyncSocket(pqsocket(db))
  pqconsumeInput(db)

Unfortunately, chronos doesn't has a waitForRead, so that would need to be added
Again, fine for a first version, but something to improve on later IMO

More info on the async usage of pg here: https://www.postgresql.org/docs/current/libpq-async.html
And an example in C : https://github.com/Menduist/sample_coroutine_api/blob/master/src/db/re_pgsql.c (callback based)

continue

var pqResult = pqgetResult(db)
if pqResult == nil and db.error.len > 0:
# Check if its a real error or just end of results
return err(db.error)

var rows: seq[Row]

var cols = pqnfields(pqResult)
var row = newRow(cols)
for i in 0'i32..pqNtuples(pqResult) - 1:
setRow(pqResult, row, i, cols)
rows.add(row)

pqclear(pqResult)