Skip to content

Commit

Permalink
chore: Optimize postgres - use of rowCallback approach (#2171)
Browse files Browse the repository at this point in the history
* db_postgres, postgres_driver: better performance by using callback.
  There were a bunch of milliseconds being lost due to multiple-row
  processing. This commit aims to have the minimum possible row
  process time.
* pgasyncpool: clarifying logic around pool conn management.
* db_postgres: removing duplicate code and more searchable proc names.
  • Loading branch information
Ivansete-status authored Oct 31, 2023
1 parent bcf8e96 commit 2b4ca4d
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 143 deletions.
47 changes: 33 additions & 14 deletions waku/common/databases/db_postgres/dbconn.nim
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import

include db_postgres

type DataProc* = proc(result: ptr PGresult) {.closure, gcsafe.}

## Connection management

proc check*(db: DbConn): Result[void, string] =
Expand Down Expand Up @@ -43,11 +45,11 @@ proc open*(connString: string):

ok(conn)

proc rows*(db: DbConn,
query: SqlQuery,
args: seq[string]):
Future[Result[seq[Row], string]] {.async.} =
## Runs the SQL getting results.
proc sendQuery(db: DbConn,
query: SqlQuery,
args: seq[string]):
Future[Result[void, string]] {.async.} =
## This proc can be used directly for queries that don't retrieve values back.

if db.status != CONNECTION_OK:
let checkRes = db.check()
Expand All @@ -71,7 +73,13 @@ proc rows*(db: DbConn,

return err("failed pqsendQuery: unknown reason")

var ret = newSeq[Row](0)
return ok()

proc waitQueryToFinish(db: DbConn,
rowCallback: DataProc = nil):
Future[Result[void, string]] {.async.} =
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil.

while true:

Expand All @@ -84,22 +92,33 @@ proc rows*(db: DbConn,
return err("failed pqconsumeInput: unknown reason")

if db.pqisBusy() == 1:
await sleepAsync(0.milliseconds) # Do not block the async runtime
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
continue

var pqResult = db.pqgetResult()
let pqResult = db.pqgetResult()
if pqResult == nil:
# Check if its a real error or just end of results
let checkRes = db.check()
if checkRes.isErr():
return err("error in rows: " & checkRes.error)

return ok(ret) # reached the end of the results
return ok() # reached the end of the results

var cols = pqResult.pqnfields()
var row = cols.newRow()
for i in 0'i32 .. pqResult.pqNtuples() - 1:
pqResult.setRow(row, i, cols) # puts the value in the row
ret.add(row)
if not rowCallback.isNil():
rowCallback(pqResult)

pqclear(pqResult)

proc dbConnQuery*(db: DbConn,
query: SqlQuery,
args: seq[string],
rowCallback: DataProc):
Future[Result[void, string]] {.async, gcsafe.} =

(await db.sendQuery(query, args)).isOkOr:
return err("error in dbConnQuery calling sendQuery: " & $error)

(await db.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)

return ok()
79 changes: 33 additions & 46 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,11 @@ else:
import
std/[sequtils,nre, strformat],
stew/results,
chronicles,
chronos
import
./dbconn,
../common

logScope:
topics = "postgres asyncpool"

type PgAsyncPoolState {.pure.} = enum
Closed,
Live,
Expand Down Expand Up @@ -107,6 +103,16 @@ proc close*(pool: PgAsyncPool):

return ok()

proc getFirstFreeConnIndex(pool: PgAsyncPool):
DatabaseResult[int] =
for index in 0..<pool.conns.len:
if pool.conns[index].busy:
continue

## Pick up the first free connection and set it busy
pool.conns[index].busy = true
return ok(index)

proc getConnIndex(pool: PgAsyncPool):
Future[DatabaseResult[int]] {.async.} =
## Waits for a free connection or create if max connections limits have not been reached.
Expand All @@ -115,8 +121,20 @@ proc getConnIndex(pool: PgAsyncPool):
if not pool.isLive():
return err("pool is not live")

# stablish new connections if we are under the limit
if pool.isBusy() and pool.conns.len < pool.maxConnections:
if not pool.isBusy():
return pool.getFirstFreeConnIndex()

## Pool is busy then

if pool.conns.len == pool.maxConnections:
## Can't create more connections. Wait for a free connection without blocking the async runtime.
while pool.isBusy():
await sleepAsync(0.milliseconds)

return pool.getFirstFreeConnIndex()

elif pool.conns.len < pool.maxConnections:
## stablish a new connection
let connRes = dbconn.open(pool.connString)
if connRes.isOk():
let conn = connRes.get()
Expand All @@ -125,17 +143,6 @@ proc getConnIndex(pool: PgAsyncPool):
else:
return err("failed to stablish a new connection: " & connRes.error)

# wait for a free connection without blocking the async runtime
while pool.isBusy():
await sleepAsync(0.milliseconds)

for index in 0..<pool.conns.len:
if pool.conns[index].busy:
continue

pool.conns[index].busy = true
return ok(index)

proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
## Forces closing the connection pool.
## This proc is intended to be called when the connection with the database
Expand All @@ -156,12 +163,13 @@ proc releaseConn(pool: PgAsyncPool, conn: DbConn) =
if pool.conns[i].dbConn == conn:
pool.conns[i].busy = false

proc query*(pool: PgAsyncPool,
query: string,
args: seq[string] = newSeq[string](0)):
Future[DatabaseResult[seq[Row]]] {.async.} =
## Runs the SQL query getting results.
## Retrieves info from the database.
proc pgQuery*(pool: PgAsyncPool,
query: string,
args: seq[string] = newSeq[string](0),
rowCallback: DataProc = nil):
Future[DatabaseResult[void]] {.async.} =
## rowCallback != nil when it is expected to retrieve info from the database.
## rowCallback == nil for queries that change the database state.

let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
Expand All @@ -170,29 +178,8 @@ proc query*(pool: PgAsyncPool,
let conn = pool.conns[connIndexRes.value].dbConn
defer: pool.releaseConn(conn)

let rowsRes = await conn.rows(sql(query), args)
if rowsRes.isErr():
return err("error in asyncpool query: " & rowsRes.error)

return ok(rowsRes.get())

proc exec*(pool: PgAsyncPool,
query: string,
args: seq[string] = newSeq[string](0)):
Future[DatabaseResult[void]] {.async.} =
## Runs the SQL query without results.
## Alters the database state.

let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
return err("connRes is err in exec: " & connIndexRes.error)

let conn = pool.conns[connIndexRes.value].dbConn
defer: pool.releaseConn(conn)

let rowsRes = await conn.rows(sql(query), args)
if rowsRes.isErr():
return err("rowsRes is err in exec: " & rowsRes.error)
(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr:
return err("error in asyncpool query: " & $error)

return ok()

Expand Down
Loading

0 comments on commit 2b4ca4d

Please sign in to comment.