diff --git a/waku/common/databases/db_postgres/dbconn.nim b/waku/common/databases/db_postgres/dbconn.nim index 16e9602f66..1c72ea7ca2 100644 --- a/waku/common/databases/db_postgres/dbconn.nim +++ b/waku/common/databases/db_postgres/dbconn.nim @@ -1,4 +1,4 @@ -import std/[times, strutils], results, chronos +import std/[times, strutils, asyncnet, os, sequtils], results, chronos include db_connector/db_postgres @@ -33,7 +33,17 @@ proc open*(connString: string): Result[DbConn, string] = return err("unknown reason") - ok(conn) + ## registering the socket fd in chronos for better wait for data + let asyncFd = cast[asyncengine.AsyncFD](pqsocket(conn)) + asyncengine.register(asyncFd) + + return ok(conn) + +proc closeDbConn*(db: DbConn) {.raises: [OSError].} = + let fd = db.pqsocket() + if fd != -1: + asyncengine.unregister(cast[asyncengine.AsyncFD](fd)) + db.close() proc sendQuery( db: DbConn, query: SqlQuery, args: seq[string] @@ -112,23 +122,17 @@ proc waitQueryToFinish( ## The 'rowCallback' param is != nil when the underlying query wants to retrieve results (SELECT.) ## For other queries, like "INSERT", 'rowCallback' should be nil. - while db.pqisBusy() == 1: - ## TODO: Enhance performance in concurrent queries. - ## The connection keeps busy for quite a long time when performing intense concurrect queries. - ## For example, a given query can last 11 milliseconds within from the database point of view - ## but, on the other hand, the connection remains in "db.pqisBusy() == 1" for 100ms more. - ## I think this is because `nwaku` is single-threaded and it has to handle many connections (20) - ## simultaneously. Therefore, there is an underlying resource sharing (cpu) that makes this - ## to happen. Notice that the _Postgres_ database spawns one process per each connection. - let success = db.pqconsumeInput() - - if success != 1: - db.check().isOkOr: - return err("failed pqconsumeInput: " & $error) + var dataAvailable = false + proc onDataAvailable(udata: pointer) {.gcsafe, raises: [].} = + dataAvailable = true + + let asyncFd = cast[asyncengine.AsyncFD](pqsocket(db)) - return err("failed pqconsumeInput: unknown reason") + asyncengine.addReader2(asyncFd, onDataAvailable).isOkOr: + return err("failed to add event reader in waitQueryToFinish: " & $error) - await sleepAsync(timer.milliseconds(0)) # Do not block the async runtime + while not dataAvailable: + await sleepAsync(timer.milliseconds(1)) ## Now retrieve the result while true: diff --git a/waku/common/databases/db_postgres/pgasyncpool.nim b/waku/common/databases/db_postgres/pgasyncpool.nim index fb8bb3fff5..66e66bd2ff 100644 --- a/waku/common/databases/db_postgres/pgasyncpool.nim +++ b/waku/common/databases/db_postgres/pgasyncpool.nim @@ -10,7 +10,7 @@ type PgAsyncPoolState {.pure.} = enum Live Closing -type PgDbConn = object +type PgDbConn = ref object dbConn: DbConn open: bool busy: bool @@ -76,14 +76,11 @@ proc close*(pool: PgAsyncPool): Future[Result[void, string]] {.async.} = if pool.conns[i].busy: continue - if pool.conns[i].open: - pool.conns[i].dbConn.close() - pool.conns[i].busy = false - pool.conns[i].open = false - for i in 0 ..< pool.conns.len: if pool.conns[i].open: - pool.conns[i].dbConn.close() + pool.conns[i].dbConn.closeDbConn() + pool.conns[i].busy = false + pool.conns[i].open = false pool.conns.setLen(0) pool.state = PgAsyncPoolState.Closed diff --git a/waku/waku_store/protocol.nim b/waku/waku_store/protocol.nim index 2f47cc6c8c..4e94d4c489 100644 --- a/waku/waku_store/protocol.nim +++ b/waku/waku_store/protocol.nim @@ -4,7 +4,7 @@ {.push raises: [].} import - std/options, + std/[options, times], results, chronicles, chronos, @@ -36,9 +36,11 @@ type WakuStore* = ref object of LPProtocol ## Protocol +type StoreResp = tuple[resp: seq[byte], requestId: string] + proc handleQueryRequest( self: WakuStore, requestor: PeerId, raw_request: seq[byte] -): Future[seq[byte]] {.async.} = +): Future[StoreResp] {.async.} = var res = StoreQueryResponse() let req = StoreQueryRequest.decode(raw_request).valueOr: @@ -48,7 +50,7 @@ proc handleQueryRequest( res.statusCode = uint32(ErrorCode.BAD_REQUEST) res.statusDesc = "decoding rpc failed: " & $error - return res.encode().buffer + return (res.encode().buffer, "not_parsed_requestId") let requestId = req.requestId @@ -65,7 +67,7 @@ proc handleQueryRequest( res.statusCode = uint32(error.kind) res.statusDesc = $error - return res.encode().buffer + return (res.encode().buffer, "not_parsed_requestId") res.requestId = requestId res.statusCode = 200 @@ -74,7 +76,7 @@ proc handleQueryRequest( info "sending store query response", peerId = requestor, requestId = requestId, messages = res.messages.len - return res.encode().buffer + return (res.encode().buffer, requestId) proc initProtocolHandler(self: WakuStore) = let rejectReposnseBuffer = StoreQueryResponse( @@ -87,7 +89,8 @@ proc initProtocolHandler(self: WakuStore) = ).encode().buffer proc handler(conn: Connection, proto: string) {.async, gcsafe, closure.} = - var resBuf: seq[byte] + var successfulQuery = false ## only consider the correct queries in metrics + var resBuf: StoreResp self.requestRateLimiter.checkUsageLimit(WakuStoreCodec, conn): let readRes = catch: await conn.readLp(DefaultMaxRpcSize.int) @@ -100,21 +103,34 @@ proc initProtocolHandler(self: WakuStore) = amount = reqBuf.len().int64, labelValues = [WakuStoreCodec, "in"] ) + let queryStartTime = getTime().toUnixFloat() + resBuf = await self.handleQueryRequest(conn.peerId, reqBuf) + + let queryDuration = getTime().toUnixFloat() - queryStartTime + waku_store_time_seconds.inc(amount = queryDuration, labelValues = ["query-db"]) + successfulQuery = true do: debug "store query request rejected due rate limit exceeded", peerId = conn.peerId, limit = $self.requestRateLimiter.setting - resBuf = rejectReposnseBuffer + resBuf = (rejectReposnseBuffer, "rejected") + + let writeRespStartTime = getTime().toUnixFloat() let writeRes = catch: - await conn.writeLp(resBuf) + await conn.writeLp(resBuf.resp) if writeRes.isErr(): error "Connection write error", error = writeRes.error.msg return + debug "after sending response", requestId = resBuf.requestId + if successfulQuery: + let writeDuration = getTime().toUnixFloat() - writeRespStartTime + waku_store_time_seconds.inc(amount = writeDuration, labelValues = ["send-resp"]) + waku_service_network_bytes.inc( - amount = resBuf.len().int64, labelValues = [WakuStoreCodec, "out"] + amount = resBuf.resp.len().int64, labelValues = [WakuStoreCodec, "out"] ) self.handler = handler diff --git a/waku/waku_store/protocol_metrics.nim b/waku/waku_store/protocol_metrics.nim index d413c0a678..851670cdba 100644 --- a/waku/waku_store/protocol_metrics.nim +++ b/waku/waku_store/protocol_metrics.nim @@ -5,6 +5,11 @@ import metrics declarePublicGauge waku_store_errors, "number of store protocol errors", ["type"] declarePublicGauge waku_store_queries, "number of store queries received" +## f.e., we have the "query" phase, where the node performs the query to the database, +## and the "libp2p" phase, where the node writes the store response to the libp2p stream. +declarePublicGauge waku_store_time_seconds, + "Time in seconds spent by each store phase", labels = ["phase"] + # Error types (metric label values) const dialFailure* = "dial_failure"