Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(store): waku store rpc codec support optional fields #1393

Merged
merged 1 commit into from
Nov 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions tests/v2/test_waku_store_rpc_codec.nim
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{.used.}

import
std/[options, times],
std/options,
testutils/unittests,
chronos
import
Expand Down Expand Up @@ -50,7 +50,7 @@ procSuite "Waku Store - RPC codec":
## Given
let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.FORWARD)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.FORWARD))

## When
let pb = pagingInfo.encode()
Expand All @@ -61,7 +61,7 @@ procSuite "Waku Store - RPC codec":
decodedPagingInfo.isOk()

check:
# the fields of decodedPagingInfo must be the same as the original pagingInfo
# The fields of decodedPagingInfo must be the same as the original pagingInfo
decodedPagingInfo.value == pagingInfo
decodedPagingInfo.value.direction == pagingInfo.direction

Expand All @@ -85,8 +85,13 @@ procSuite "Waku Store - RPC codec":
## Given
let
index = PagingIndexRPC.compute(fakeWakuMessage(), receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD)
query = HistoryQueryRPC(contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)], pagingInfo: pagingInfo, startTime: Timestamp(10), endTime: Timestamp(11))
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
query = HistoryQueryRPC(
contentFilters: @[HistoryContentFilterRPC(contentTopic: DefaultContentTopic), HistoryContentFilterRPC(contentTopic: DefaultContentTopic)],
pagingInfo: some(pagingInfo),
startTime: some(Timestamp(10)),
endTime: some(Timestamp(11))
)

## When
let pb = query.encode()
Expand Down Expand Up @@ -121,8 +126,8 @@ procSuite "Waku Store - RPC codec":
let
message = fakeWakuMessage()
index = PagingIndexRPC.compute(message, receivedTime=ts(), pubsubTopic=DefaultPubsubTopic)
pagingInfo = PagingInfoRPC(pageSize: 1, cursor: index, direction: PagingDirectionRPC.BACKWARD)
res = HistoryResponseRPC(messages: @[message], pagingInfo:pagingInfo, error: HistoryResponseErrorRPC.INVALID_CURSOR)
pagingInfo = PagingInfoRPC(pageSize: some(1'u64), cursor: some(index), direction: some(PagingDirectionRPC.BACKWARD))
res = HistoryResponseRPC(messages: @[message], pagingInfo: some(pagingInfo), error: HistoryResponseErrorRPC.INVALID_CURSOR)

## When
let pb = res.encode()
Expand Down Expand Up @@ -150,4 +155,4 @@ procSuite "Waku Store - RPC codec":

check:
# check the correctness of init and encode for an empty HistoryResponseRPC
decodedEmptyRes.value == emptyRes
decodedEmptyRes.value == emptyRes
6 changes: 5 additions & 1 deletion waku/common/protobuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ else:
{.push raises: [].}

import
std/options,
libp2p/protobuf/minprotobuf,
libp2p/varint

Expand All @@ -15,7 +16,10 @@ export


proc write3*(proto: var ProtoBuffer, field: int, value: auto) =
if default(type(value)) != value:
when value is Option:
if value.isSome():
proto.write(field, value.get())
else:
proto.write(field, value)

proc finish3*(proto: var ProtoBuffer) =
Expand Down
18 changes: 12 additions & 6 deletions waku/v2/node/jsonrpc/jsonrpc_utils.nim
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,20 @@ proc `%`*(value: WakuMessage): JsonNode =
## we need to convert between these and the types for the Nim API

proc toPagingInfo*(pagingOptions: StorePagingOptions): PagingInfoRPC =
PagingInfoRPC(pageSize: pagingOptions.pageSize,
cursor: if pagingOptions.cursor.isSome: pagingOptions.cursor.get else: PagingIndexRPC(),
direction: if pagingOptions.forward: PagingDirectionRPC.FORWARD else: PagingDirectionRPC.BACKWARD)
PagingInfoRPC(
pageSize: some(pagingOptions.pageSize),
cursor: pagingOptions.cursor,
direction: if pagingOptions.forward: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
)

proc toPagingOptions*(pagingInfo: PagingInfoRPC): StorePagingOptions =
StorePagingOptions(pageSize: pagingInfo.pageSize,
cursor: some(pagingInfo.cursor),
forward: if pagingInfo.direction == PagingDirectionRPC.FORWARD: true else: false)
StorePagingOptions(
pageSize: pagingInfo.pageSize.get(0'u64),
cursor: pagingInfo.cursor,
forward: if pagingInfo.direction.isNone(): true
else: pagingInfo.direction.get() == PagingDirectionRPC.FORWARD
)

proc toJsonRPCStoreResponse*(response: HistoryResponse): StoreResponse =
StoreResponse(
Expand Down
10 changes: 5 additions & 5 deletions waku/v2/protocol/waku_store/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
let connection = connOpt.get()


let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: req.toRPC())
let reqRpc = HistoryRPC(requestId: generateRequestId(w.rng), query: some(req.toRPC()))
await connection.writeLP(reqRpc.encode().buffer)


Expand All @@ -69,11 +69,11 @@ proc sendHistoryQueryRPC(w: WakuStoreClient, req: HistoryQuery, peer: RemotePeer
# Disabled ,for now, since the default response is a possible case (no messages, pagesize = 0, error = NONE(0))
# TODO: Rework the RPC protocol to differentiate the default value from an empty value (e.g., status = 200 (OK))
# and rework the protobuf parsing to return Option[T] when empty values are received
# if respRpc.response == default(HistoryResponseRPC):
# waku_store_errors.inc(labelValues = [emptyRpcResponseFailure])
# return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure))
if respRpc.response.isNone():
waku_store_errors.inc(labelValues = [emptyRpcResponseFailure])
return err(HistoryError(kind: HistoryErrorKind.BAD_RESPONSE, cause: emptyRpcResponseFailure))

let resp = respRpc.response
let resp = respRpc.response.get()

return resp.toAPI()

Expand Down
11 changes: 6 additions & 5 deletions waku/v2/protocol/waku_store/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ proc initProtocolHandler*(ws: WakuStore) =

let reqRpc = decodeRes.value

if reqRpc.query == default(HistoryQueryRPC):
if reqRpc.query.isNone():
error "empty query rpc", peerId=conn.peerId, requestId=reqRpc.requestId
waku_store_errors.inc(labelValues = [emptyRpcQueryFailure])
# TODO: Return (BAD_REQUEST, cause: "empty query")
Expand All @@ -239,19 +239,20 @@ proc initProtocolHandler*(ws: WakuStore) =
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error= $respErr

let resp = HistoryResponseRPC(error: respErr.toRPC())
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
await conn.writeLp(rpc.encode().buffer)
return


let query = reqRpc.query.toApi()
let query = reqRpc.query.get().toAPI()

let respRes = ws.findMessages(query)

if respRes.isErr():
error "history query failed", peerId=conn.peerId, requestId=reqRpc.requestId, error=respRes.error

let resp = respRes.toRPC()
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
await conn.writeLp(rpc.encode().buffer)
return

Expand All @@ -270,7 +271,7 @@ proc initProtocolHandler*(ws: WakuStore) =

info "sending history response", peerId=conn.peerId, requestId=reqRpc.requestId, messages=resp.messages.len

let rpc = HistoryRPC(requestId: reqRpc.requestId, response: resp)
let rpc = HistoryRPC(requestId: reqRpc.requestId, response: some(resp))
await conn.writeLp(rpc.encode().buffer)

ws.handler = handler
Expand Down
120 changes: 60 additions & 60 deletions waku/v2/protocol/waku_store/rpc.nim
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ type

PagingInfoRPC* = object
## This type holds the information needed for the pagination
pageSize*: uint64
cursor*: PagingIndexRPC
direction*: PagingDirectionRPC
pageSize*: Option[uint64]
cursor*: Option[PagingIndexRPC]
direction*: Option[PagingDirectionRPC]


type
Expand All @@ -60,10 +60,10 @@ type

HistoryQueryRPC* = object
contentFilters*: seq[HistoryContentFilterRPC]
pubsubTopic*: PubsubTopic
pagingInfo*: PagingInfoRPC # used for pagination
startTime*: Timestamp # used for time-window query
endTime*: Timestamp # used for time-window query
pubsubTopic*: Option[PubsubTopic]
pagingInfo*: Option[PagingInfoRPC]
startTime*: Option[int64]
endTime*: Option[int64]

HistoryResponseErrorRPC* {.pure.} = enum
## HistoryResponseErrorRPC contains error message to inform the querying node about
Expand All @@ -74,13 +74,13 @@ type

HistoryResponseRPC* = object
messages*: seq[WakuMessage]
pagingInfo*: PagingInfoRPC # used for pagination
pagingInfo*: Option[PagingInfoRPC]
error*: HistoryResponseErrorRPC

HistoryRPC* = object
requestId*: string
query*: HistoryQueryRPC
response*: HistoryResponseRPC
query*: Option[HistoryQueryRPC]
response*: Option[HistoryResponseRPC]


proc parse*(T: type HistoryResponseErrorRPC, kind: uint32): T =
Expand Down Expand Up @@ -112,55 +112,53 @@ proc toAPI*(rpc: PagingIndexRPC): HistoryCursor =


proc toRPC*(query: HistoryQuery): HistoryQueryRPC =
let
contentFilters = query.contentTopics.mapIt(HistoryContentFilterRPC(contentTopic: it))

pubsubTopic = query.pubsubTopic.get(default(string))

pageSize = query.pageSize

cursor = query.cursor.get(default(HistoryCursor)).toRPC()
var rpc = HistoryQueryRPC()

rpc.contentFilters = query.contentTopics.mapIt(HistoryContentFilterRPC(contentTopic: it))

rpc.pubsubTopic = query.pubsubTopic

rpc.pagingInfo = block:
if query.cursor.isNone() and
query.pageSize == default(type query.pageSize) and
query.ascending == default(type query.ascending):
none(PagingInfoRPC)
else:
let
pageSize = some(query.pageSize)
cursor = query.cursor.map(toRPC)
direction = if query.ascending: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
some(PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
))

rpc.startTime = query.startTime
rpc.endTime = query.endTime

rpc

direction = if query.ascending: PagingDirectionRPC.FORWARD
else: PagingDirectionRPC.BACKWARD

startTime = query.startTime.get(default(Timestamp))

endTime = query.endTime.get(default(Timestamp))

HistoryQueryRPC(
contentFilters: contentFilters,
pubsubTopic: pubsubTopic,
pagingInfo: PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
),
startTime: startTime,
endTime: endTime
)

proc toAPI*(rpc: HistoryQueryRPC): HistoryQuery =
let
pubsubTopic = if rpc.pubsubTopic == default(string): none(PubsubTopic)
else: some(rpc.pubsubTopic)
pubsubTopic = rpc.pubsubTopic

contentTopics = rpc.contentFilters.mapIt(it.contentTopic)

cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor)
else: some(rpc.pagingInfo.cursor.toAPI())
cursor = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().cursor.isNone(): none(HistoryCursor)
else: rpc.pagingInfo.get().cursor.map(toAPI)

startTime = if rpc.startTime == default(Timestamp): none(Timestamp)
else: some(rpc.startTime)
startTime = rpc.startTime

endTime = if rpc.endTime == default(Timestamp): none(Timestamp)
else: some(rpc.endTime)
endTime = rpc.endTime

pageSize = if rpc.pagingInfo == default(PagingInfoRPC): 0.uint64
else: rpc.pagingInfo.pageSize
pageSize = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().pageSize.isNone(): 0'u64
rymnc marked this conversation as resolved.
Show resolved Hide resolved
else: rpc.pagingInfo.get().pageSize.get()

ascending = if rpc.pagingInfo == default(PagingInfoRPC): true
else: rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD
ascending = if rpc.pagingInfo.isNone() or rpc.pagingInfo.get().direction.isNone(): true
else: rpc.pagingInfo.get().direction.get() == PagingDirectionRPC.FORWARD

HistoryQuery(
pubsubTopic: pubsubTopic,
Expand All @@ -182,7 +180,7 @@ proc toRPC*(err: HistoryError): HistoryResponseErrorRPC =
of HistoryErrorKind.SERVICE_UNAVAILABLE:
HistoryResponseErrorRPC.SERVICE_UNAVAILABLE
else:
HistoryResponseErrorRPC.INVALID_CURSOR
HistoryResponseErrorRPC.INVALID_CURSOR

proc toAPI*(err: HistoryResponseErrorRPC): HistoryError =
# TODO: Better error mappings/move to error codes
Expand All @@ -208,18 +206,18 @@ proc toRPC*(res: HistoryResult): HistoryResponseRPC =

pagingInfo = block:
if resp.cursor.isNone():
default(PagingInfoRPC)
none(PagingInfoRPC)
else:
let
pageSize = resp.pageSize
cursor = resp.cursor.get(default(HistoryCursor)).toRPC()
direction = if resp.ascending: PagingDirectionRPC.FORWARD
else: PagingDirectionRPC.BACKWARD
PagingInfoRPC(
pageSize = some(resp.pageSize)
cursor = resp.cursor.map(toRPC)
direction = if resp.ascending: some(PagingDirectionRPC.FORWARD)
else: some(PagingDirectionRPC.BACKWARD)
some(PagingInfoRPC(
pageSize: pageSize,
cursor: cursor,
direction: direction
)
))

error = HistoryResponseErrorRPC.NONE

Expand All @@ -236,12 +234,14 @@ proc toAPI*(rpc: HistoryResponseRPC): HistoryResult =
let
messages = rpc.messages

pageSize = rpc.pagingInfo.pageSize
pageSize = if rpc.pagingInfo.isNone(): 0'u64
else: rpc.pagingInfo.get().pageSize.get(0'u64)

ascending = rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.direction == PagingDirectionRPC.FORWARD
ascending = if rpc.pagingInfo.isNone(): true
else: rpc.pagingInfo.get().direction.get(PagingDirectionRPC.FORWARD) == PagingDirectionRPC.FORWARD

cursor = if rpc.pagingInfo == default(PagingInfoRPC) or rpc.pagingInfo.cursor == default(PagingIndexRPC): none(HistoryCursor)
else: some(rpc.pagingInfo.cursor.toAPI())
cursor = if rpc.pagingInfo.isNone(): none(HistoryCursor)
else: rpc.pagingInfo.get().cursor.map(toAPI)

ok(HistoryResponse(
messages: messages,
Expand Down
Loading