Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Optimize postgres - prepared statements in select #2182

Merged
merged 8 commits into from
Nov 7, 2023
2 changes: 2 additions & 0 deletions tests/waku_archive/test_driver_postgres.nim
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,5 @@ suite "Postgres driver":
putRes = await driver.put(DefaultPubsubTopic,
msg2, computeDigest(msg2, DefaultPubsubTopic), msg2.timestamp)
require not putRes.isOk()

(await driver.close()).expect("driver to close")
97 changes: 79 additions & 18 deletions waku/common/databases/db_postgres/dbconn.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ else:
{.push raises: [ValueError,DbError].}

import
std/[times, strutils, strformat],
stew/results,
chronos

Expand Down Expand Up @@ -52,9 +53,8 @@ proc sendQuery(db: DbConn,
## This proc can be used directly for queries that don't retrieve values back.

if db.status != CONNECTION_OK:
let checkRes = db.check()
if checkRes.isErr():
return err("failed to connect to database: " & checkRes.error)
db.check().isOkOr:
return err("failed to connect to database: " & $error)

return err("unknown reason")

Expand All @@ -67,40 +67,85 @@ proc sendQuery(db: DbConn,

let success = db.pqsendQuery(cstring(wellFormedQuery))
if success != 1:
let checkRes = db.check()
if checkRes.isErr():
return err("failed pqsendQuery: " & checkRes.error)
db.check().isOkOr:
return err("failed pqsendQuery: " & $error)

return err("failed pqsendQuery: unknown reason")

return ok()

proc sendQueryPrepared(
db: DbConn,
stmtName: string,
paramValues: openArray[string],
paramLengths: openArray[int32],
paramFormats: openArray[int32]):
Result[void, string] =
## This proc can be used directly for queries that don't retrieve values back.

if paramValues.len != paramLengths.len or paramValues.len != paramFormats.len or
paramLengths.len != paramFormats.len:
let lengthsErrMsg = $paramValues.len & " " & $paramLengths.len & " " & $paramFormats.len
return err("lengths discrepancies in sendQueryPrepared: " & $lengthsErrMsg)
SionoiS marked this conversation as resolved.
Show resolved Hide resolved

if db.status != CONNECTION_OK:
db.check().isOkOr:
return err("failed to connect to database: " & $error)

return err("unknown reason")

var cstrArrayParams = allocCStringArray(paramValues)
defer: deallocCStringArray(cstrArrayParams)

let nParams = cast[int32](paramValues.len)

const ResultFormat = 0 ## 0 for text format, 1 for binary format.

let success = db.pqsendQueryPrepared(stmtName,
nParams,
cstrArrayParams,
unsafeAddr paramLengths[0],
unsafeAddr paramFormats[0],
ResultFormat)
if success != 1:
db.check().isOkOr:
return err("failed pqsendQueryPrepared: " & $error)

return err("failed pqsendQueryPrepared: unknown reason")

return ok()

proc waitQueryToFinish(db: DbConn,
rowCallback: DataProc = nil):
Future[Result[void, string]] {.async.} =
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil.

while true:

while db.pqisBusy() == 1:
## TODO: Enhance performance in concurrent queries.
## The connection keeps busy for quite a long time when performing intense concurrect queries.
## For example, a given query can last 11 milliseconds within from the database point of view
## but, on the other hand, the connection remains in "db.pqisBusy() == 1" for 100ms more.
## I think this is because `nwaku` is single-threaded and it has to handle many connections (20)
## simultaneously. Therefore, there is an underlying resource sharing (cpu) that makes this
## to happen. Notice that the _Postgres_ database spawns one process per each connection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I appreciate this great, explanatory comment here!!!

let success = db.pqconsumeInput()

if success != 1:
let checkRes = db.check()
if checkRes.isErr():
return err("failed pqconsumeInput: " & checkRes.error)
db.check().isOkOr:
return err("failed pqconsumeInput: " & $error)

return err("failed pqconsumeInput: unknown reason")

if db.pqisBusy() == 1:
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
continue
await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime

## Now retrieve the result
while true:
let pqResult = db.pqgetResult()

if pqResult == nil:
# Check if its a real error or just end of results
let checkRes = db.check()
if checkRes.isErr():
return err("error in rows: " & checkRes.error)
db.check().isOkOr:
return err("error in query: " & $error)

return ok() # reached the end of the results

Expand All @@ -122,3 +167,19 @@ proc dbConnQuery*(db: DbConn,
return err("error in dbConnQuery calling waitQueryToFinish: " & $error)

return ok()

proc dbConnQueryPrepared*(db: DbConn,
stmtName: string,
paramValues: seq[string],
paramLengths: seq[int32],
paramFormats: seq[int32],
rowCallback: DataProc):
Future[Result[void, string]] {.async, gcsafe.} =

db.sendQueryPrepared(stmtName, paramValues , paramLengths, paramFormats).isOkOr:
return err("error in dbConnQueryPrepared calling sendQuery: " & $error)

(await db.waitQueryToFinish(rowCallback)).isOkOr:
return err("error in dbConnQueryPrepared calling waitQueryToFinish: " & $error)

return ok()
106 changes: 53 additions & 53 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ else:
{.push raises: [].}

import
std/[sequtils,nre, strformat],
std/[sequtils,nre,strformat,sets],
stew/results,
chronos
import
Expand All @@ -21,9 +21,9 @@ type PgAsyncPoolState {.pure.} = enum
type
PgDbConn = object
dbConn: DbConn
busy: bool
open: bool
insertStmt: SqlPrepared
busy: bool
preparedStmts: HashSet[string] ## [stmtName's]

type
# Database connection pool
Expand Down Expand Up @@ -90,9 +90,10 @@ proc close*(pool: PgAsyncPool):
if pool.conns[i].busy:
continue

pool.conns[i].dbConn.close()
pool.conns[i].busy = false
pool.conns[i].open = false
if pool.conns[i].open:
pool.conns[i].dbConn.close()
pool.conns[i].busy = false
pool.conns[i].open = false

for i in 0..<pool.conns.len:
if pool.conns[i].open:
Expand Down Expand Up @@ -135,13 +136,14 @@ proc getConnIndex(pool: PgAsyncPool):

elif pool.conns.len < pool.maxConnections:
## stablish a new connection
let connRes = dbconn.open(pool.connString)
if connRes.isOk():
let conn = connRes.get()
pool.conns.add(PgDbConn(dbConn: conn, busy: true, open: true))
return ok(pool.conns.len - 1)
else:
return err("failed to stablish a new connection: " & connRes.error)
let conn = dbconn.open(pool.connString).valueOr:
return err("failed to stablish a new connection: " & $error)

pool.conns.add(PgDbConn(dbConn: conn,
open: true,
busy: true,
preparedStmts: initHashSet[string]()))
return ok(pool.conns.len - 1)

proc resetConnPool*(pool: PgAsyncPool): Future[DatabaseResult[void]] {.async.} =
## Forces closing the connection pool.
Expand All @@ -168,14 +170,11 @@ proc pgQuery*(pool: PgAsyncPool,
args: seq[string] = newSeq[string](0),
rowCallback: DataProc = nil):
Future[DatabaseResult[void]] {.async.} =
## rowCallback != nil when it is expected to retrieve info from the database.
## rowCallback == nil for queries that change the database state.

let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
return err("connRes.isErr in query: " & connIndexRes.error)
let connIndex = (await pool.getConnIndex()).valueOr:
return err("connRes.isErr in query: " & $error)

let conn = pool.conns[connIndexRes.value].dbConn
let conn = pool.conns[connIndex].dbConn
defer: pool.releaseConn(conn)

(await conn.dbConnQuery(sql(query), args, rowCallback)).isOkOr:
Expand All @@ -184,44 +183,45 @@ proc pgQuery*(pool: PgAsyncPool,
return ok()

proc runStmt*(pool: PgAsyncPool,
baseStmt: string,
args: seq[string]):
stmtName: string,
stmtDefinition: string,
paramValues: seq[string],
paramLengths: seq[int32],
paramFormats: seq[int32],
rowCallback: DataProc = nil):

Future[DatabaseResult[void]] {.async.} =
# Runs a stored statement, for performance purposes.
# In the current implementation, this is aimed
# to run the 'insertRow' stored statement aimed to add a new Waku message.
## Runs a stored statement, for performance purposes.
## The stored statements are connection specific and is a technique of caching a very common
## queries within the same connection.
##
## rowCallback != nil when it is expected to retrieve info from the database.
## rowCallback == nil for queries that change the database state.

let connIndexRes = await pool.getConnIndex()
if connIndexRes.isErr():
return err(connIndexRes.error())
let connIndex = (await pool.getConnIndex()).valueOr:
return err("Error in runStmt: " & $error)

let conn = pool.conns[connIndexRes.value].dbConn
let conn = pool.conns[connIndex].dbConn
defer: pool.releaseConn(conn)

var preparedStmt = pool.conns[connIndexRes.value].insertStmt
if cast[string](preparedStmt) == "":
# The connection doesn't have insertStmt set yet. Let's create it.
# Each session/connection should have its own prepared statements.
const numParams = 7
try:
pool.conns[connIndexRes.value].insertStmt =
conn.prepare("insertRow", sql(baseStmt),
numParams)
except DbError:
return err("failed prepare in runStmt: " & getCurrentExceptionMsg())

preparedStmt = pool.conns[connIndexRes.value].insertStmt

try:
let res = conn.tryExec(preparedStmt, args)
if not res:
let connCheckRes = conn.check()
if connCheckRes.isErr():
return err("failed to insert into database: " & connCheckRes.error)

return err("failed to insert into database: unkown reason")

except DbError:
return err("failed to insert into database: " & getCurrentExceptionMsg())
if not pool.conns[connIndex].preparedStmts.contains(stmtName):
# The connection doesn't have that statement yet. Let's create it.
# Each session/connection has its own prepared statements.
let res = catch:
let len = paramValues.len
discard conn.prepare(stmtName, sql(stmtDefinition), len)

if res.isErr():
return err("failed prepare in runStmt: " & res.error.msg)

pool.conns[connIndex].preparedStmts.incl(stmtName)

(await conn.dbConnQueryPrepared(stmtName,
paramValues,
paramLengths,
paramFormats,
rowCallback)
).isOkOr:
return err("error in runStmt: " & $error)

return ok()
Loading
Loading