Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: enhance libpq management #3015

Merged
merged 4 commits into from
Sep 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 21 additions & 17 deletions waku/common/databases/db_postgres/dbconn.nim
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import std/[times, strutils], results, chronos
import std/[times, strutils, asyncnet, os, sequtils], results, chronos

include db_connector/db_postgres

Expand Down Expand Up @@ -33,7 +33,17 @@ proc open*(connString: string): Result[DbConn, string] =

return err("unknown reason")

ok(conn)
## registering the socket fd in chronos for better wait for data
let asyncFd = cast[asyncengine.AsyncFD](pqsocket(conn))
asyncengine.register(asyncFd)

return ok(conn)

proc closeDbConn*(db: DbConn) {.raises: [OSError].} =
let fd = db.pqsocket()
if fd != -1:
asyncengine.unregister(cast[asyncengine.AsyncFD](fd))
db.close()

proc sendQuery(
db: DbConn, query: SqlQuery, args: seq[string]
Expand Down Expand Up @@ -112,23 +122,17 @@ proc waitQueryToFinish(
## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.)
## For other queries, like "INSERT", 'rowCallback' should be nil.

while db.pqisBusy() == 1:
## TODO: Enhance performance in concurrent queries.
## The connection keeps busy for quite a long time when performing intense concurrect queries.
## For example, a given query can last 11 milliseconds within from the database point of view
## but, on the other hand, the connection remains in "db.pqisBusy() == 1" for 100ms more.
## I think this is because `nwaku` is single-threaded and it has to handle many connections (20)
## simultaneously. Therefore, there is an underlying resource sharing (cpu) that makes this
## to happen. Notice that the _Postgres_ database spawns one process per each connection.
let success = db.pqconsumeInput()

if success != 1:
db.check().isOkOr:
return err("failed pqconsumeInput: " & $error)
var dataAvailable = false
proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} =
dataAvailable = true

let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db))

return err("failed pqconsumeInput: unknown reason")
asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr:
return err("failed to add event reader in waitQueryToFinish: " & $error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU chronos will trigger onDataAvailable directly on a socket read instead of doing db.check() which is slower right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIU chronos will trigger onDataAvailable directly on a socket read instead of doing db.check() which is slower right?

Yes correct :) In fact, the functions pqGetResult and pqConsumeInput ( libpq-async ) doesn't behave well in high load conditions.


await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime
while not dataAvailable:
await sleepAsync(timer.milliseconds(1))

## Now retrieve the result
while true:
Expand Down
11 changes: 4 additions & 7 deletions waku/common/databases/db_postgres/pgasyncpool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type PgAsyncPoolState {.pure.} = enum
Live
Closing

type PgDbConn = object
type PgDbConn = ref object
dbConn: DbConn
open: bool
busy: bool
Expand Down Expand Up @@ -76,14 +76,11 @@ proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} =
if pool.conns[i].busy:
continue

if pool.conns[i].open:
pool.conns[i].dbConn.close()
pool.conns[i].busy = false
pool.conns[i].open = false

for i in 0 ..< pool.conns.len:
if pool.conns[i].open:
pool.conns[i].dbConn.close()
pool.conns[i].dbConn.closeDbConn()
pool.conns[i].busy = false
pool.conns[i].open = false

pool.conns.setLen(0)
pool.state = PgAsyncPoolState.Closed
Expand Down
34 changes: 25 additions & 9 deletions waku/waku_store/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{.push raises: [].}

import
std/options,
std/[options, times],
results,
chronicles,
chronos,
Expand Down Expand Up @@ -36,9 +36,11 @@ type WakuStore* = ref object of LPProtocol

## Protocol

type StoreResp = tuple[resp: seq[byte], requestId: string]

proc handleQueryRequest(
self: WakuStore, requestor: PeerId, raw_request: seq[byte]
): Future[seq[byte]] {.async.} =
): Future[StoreResp] {.async.} =
var res = StoreQueryResponse()

let req = StoreQueryRequest.decode(raw_request).valueOr:
Expand All @@ -48,7 +50,7 @@ proc handleQueryRequest(
res.statusCode = uint32(ErrorCode.BAD_REQUEST)
res.statusDesc = "decoding rpc failed: " & $error

return res.encode().buffer
return (res.encode().buffer, "not_parsed_requestId")

let requestId = req.requestId

Expand All @@ -65,7 +67,7 @@ proc handleQueryRequest(
res.statusCode = uint32(error.kind)
res.statusDesc = $error

return res.encode().buffer
return (res.encode().buffer, "not_parsed_requestId")

res.requestId = requestId
res.statusCode = 200
Expand All @@ -74,7 +76,7 @@ proc handleQueryRequest(
info "sending store query response",
peerId = requestor, requestId = requestId, messages = res.messages.len

return res.encode().buffer
return (res.encode().buffer, requestId)

proc initProtocolHandler(self: WakuStore) =
let rejectReposnseBuffer = StoreQueryResponse(
Expand All @@ -87,7 +89,8 @@ proc initProtocolHandler(self: WakuStore) =
).encode().buffer

proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} =
var resBuf: seq[byte]
var successfulQuery = false ## only consider the correct queries in metrics
var resBuf: StoreResp
self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn):
let readRes = catch:
await conn.readLp(DefaultMaxRpcSize.int)
Expand All @@ -100,21 +103,34 @@ proc initProtocolHandler(self: WakuStore) =
amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"]
)

let queryStartTime = getTime().toUnixFloat()

resBuf = await self.handleQueryRequest(conn.peerId, reqBuf)

let queryDuration = getTime().toUnixFloat() - queryStartTime
waku_store_time_seconds.inc(amount = queryDuration, labelValues = ["query-db"])
successfulQuery = true
do:
debug "store query request rejected due rate limit exceeded",
peerId = conn.peerId, limit = $self.requestRateLimiter.setting
resBuf = rejectReposnseBuffer
resBuf = (rejectReposnseBuffer, "rejected")

let writeRespStartTime = getTime().toUnixFloat()

let writeRes = catch:
await conn.writeLp(resBuf)
await conn.writeLp(resBuf.resp)

if writeRes.isErr():
error "Connection write error", error = writeRes.error.msg
return

debug "after sending response", requestId = resBuf.requestId
if successfulQuery:
let writeDuration = getTime().toUnixFloat() - writeRespStartTime
waku_store_time_seconds.inc(amount = writeDuration, labelValues = ["send-resp"])

waku_service_network_bytes.inc(
amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"]
amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"]
)

self.handler = handler
Expand Down
5 changes: 5 additions & 0 deletions waku/waku_store/protocol_metrics.nim
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import metrics
declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"]
declarePublicGauge waku_store_queries, "number of store queries received"

## f.e., we have the "query" phase, where the node performs the query to the database,
## and the "libp2p" phase, where the node writes the store response to the libp2p stream.
declarePublicGauge waku_store_time_seconds,
"Time in seconds spent by each store phase", labels = ["phase"]

# Error types (metric label values)
const
dialFailure* = "dial_failure"
Expand Down
Loading