Skip to content

Commit

Permalink
fix(filter): waku filter rpc codec support optional fields
Browse files Browse the repository at this point in the history
  • Loading branch information
Lorenzo Delgado authored Nov 18, 2022
1 parent 1d72ee3 commit af4fb5f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 40 deletions.
2 changes: 2 additions & 0 deletions waku/common/protobuf.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ proc write3*(proto: var ProtoBuffer, field: int, value: auto) =
when value is Option:
if value.isSome():
proto.write(field, value.get())
elif value is bool:
proto.write(field, zint(value))
else:
proto.write(field, value)

Expand Down
8 changes: 4 additions & 4 deletions waku/v2/protocol/waku_filter/client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ proc initProtocolHandler(wf: WakuFilterClient) =
let rpc = decodeReqRes.get()
trace "filter message received"

if rpc.push == MessagePush():
if rpc.push.isNone():
waku_filter_errors.inc(labelValues = [emptyMessagePushFailure])
# TODO: Manage the empty push message error. Perform any action?
return
Expand All @@ -108,7 +108,7 @@ proc initProtocolHandler(wf: WakuFilterClient) =
let
peerId = conn.peerId
requestId = rpc.requestId
push = rpc.push
push = rpc.push.get()

info "received filter message push", peerId=conn.peerId, requestId=requestId
wf.handleMessagePush(peerId, requestId, push)
Expand Down Expand Up @@ -149,11 +149,11 @@ proc sendFilterRequestRpc(wf: WakuFilterClient,

let rpc = FilterRpc(
requestId: requestId,
request: FilterRequest(
request: some(FilterRequest(
subscribe: subscribe,
pubSubTopic: pubsubTopic,
contentFilters: contentFilters
)
))
)

let sendRes = await wf.sendFilterRpc(rpc, peer)
Expand Down
10 changes: 5 additions & 5 deletions waku/v2/protocol/waku_filter/protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ type
proc handleFilterRequest(wf: WakuFilter, peerId: PeerId, rpc: FilterRPC) =
let
requestId = rpc.requestId
subscribe = rpc.request.subscribe
pubsubTopic = rpc.request.pubsubTopic
contentTopics = rpc.request.contentFilters.mapIt(it.contentTopic)
subscribe = rpc.request.get().subscribe
pubsubTopic = rpc.request.get().pubsubTopic
contentTopics = rpc.request.get().contentFilters.mapIt(it.contentTopic)

if subscribe:
info "added filter subscritpiton", peerId=peerId, pubsubTopic=pubsubTopic, contentTopics=contentTopics
Expand All @@ -101,7 +101,7 @@ proc initProtocolHandler(wf: WakuFilter) =

## Filter request
# Subscription/unsubscription request
if rpc.request == FilterRequest():
if rpc.request.isNone():
waku_filter_errors.inc(labelValues = [emptyFilterRequestFailure])
# TODO: Manage the empty filter request message error. Perform any action?
return
Expand Down Expand Up @@ -185,7 +185,7 @@ proc handleMessage*(wf: WakuFilter, pubsubTopic: PubsubTopic, msg: WakuMessage)

let rpc = FilterRPC(
requestId: sub.requestId,
push: MessagePush(messages: @[msg])
push: some(MessagePush(messages: @[msg]))
)

let res = await wf.sendFilterRpc(rpc, sub.peer)
Expand Down
19 changes: 14 additions & 5 deletions waku/v2/protocol/waku_filter/rpc.nim
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
import ../waku_message
when (NimMajor, NimMinor) < (1, 4):
{.push raises: [Defect].}
else:
{.push raises: [].}

import
std/options
import
../waku_message


type
ContentFilter* = object
contentTopic*: ContentTopic
contentTopic*: string

FilterRequest* = object
contentFilters*: seq[ContentFilter]
pubSubTopic*: string
pubsubTopic*: string
subscribe*: bool

MessagePush* = object
messages*: seq[WakuMessage]

FilterRPC* = object
requestId*: string
request*: FilterRequest
push*: MessagePush
request*: Option[FilterRequest]
push*: Option[MessagePush]
75 changes: 49 additions & 26 deletions waku/v2/protocol/waku_filter/rpc_codec.nim
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ when (NimMajor, NimMinor) < (1, 4):
else:
{.push raises: [].}

import
std/options
import
../../../common/protobuf,
../waku_message,
Expand All @@ -24,17 +26,21 @@ proc encode*(filter: ContentFilter): ProtoBuffer =

proc decode*(T: type ContentFilter, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = ContentFilter()

var contentTopic: ContentTopic
discard ?pb.getField(1, contentTopic)
var contentTopic: string
if not ?pb.getField(1, contentTopic):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.contentTopic = contentTopic

ok(ContentFilter(contentTopic: contentTopic))
ok(rpc)


proc encode*(rpc: FilterRequest): ProtoBuffer =
var pb = initProtoBuffer()

pb.write3(1, uint64(rpc.subscribe))
pb.write3(1, rpc.subscribe)
pb.write3(2, rpc.pubSubTopic)

for filter in rpc.contentFilters:
Expand All @@ -46,20 +52,27 @@ proc encode*(rpc: FilterRequest): ProtoBuffer =

proc decode*(T: type FilterRequest, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var rpc = FilterRequest(contentFilters: @[], pubSubTopic: "")
var rpc = FilterRequest()

var subflag: uint64
if ?pb.getField(1, subflag):
if not ?pb.getField(1, subflag):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.subscribe = bool(subflag)

var pubSubTopic: PubsubTopic
discard ?pb.getField(2, pubSubTopic)
rpc.pubSubTopic = pubSubTopic
var pubsubTopic: string
if not ?pb.getField(2, pubsubTopic):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.pubsubTopic = pubsubTopic

var buffs: seq[seq[byte]]
discard ?pb.getRepeatedField(3, buffs)
for buf in buffs:
rpc.contentFilters.add(?ContentFilter.decode(buf))
if not ?pb.getRepeatedField(3, buffs):
return err(ProtoError.RequiredFieldMissing)
else:
for buf in buffs:
let filter = ?ContentFilter.decode(buf)
rpc.contentFilters.add(filter)

ok(rpc)

Expand All @@ -76,23 +89,25 @@ proc encode*(push: MessagePush): ProtoBuffer =

proc decode*(T: type MessagePush, buffer: seq[byte]): ProtoResult[T] =
let pb = initProtoBuffer(buffer)
var push = MessagePush()
var rpc = MessagePush()

var messages: seq[seq[byte]]
discard ?pb.getRepeatedField(1, messages)
if not ?pb.getRepeatedField(1, messages):
return err(ProtoError.RequiredFieldMissing)
else:
for buf in messages:
let msg = ?WakuMessage.decode(buf)
rpc.messages.add(msg)

for buf in messages:
push.messages.add(?WakuMessage.decode(buf))

ok(push)
ok(rpc)


proc encode*(rpc: FilterRPC): ProtoBuffer =
var pb = initProtoBuffer()

pb.write3(1, rpc.requestId)
pb.write3(2, rpc.request.encode())
pb.write3(3, rpc.push.encode())
pb.write3(2, rpc.request.map(encode))
pb.write3(3, rpc.push.map(encode))
pb.finish3()

pb
Expand All @@ -102,15 +117,23 @@ proc decode*(T: type FilterRPC, buffer: seq[byte]): ProtoResult[T] =
var rpc = FilterRPC()

var requestId: string
discard ?pb.getField(1, requestId)
rpc.requestId = requestId
if not ?pb.getField(1, requestId):
return err(ProtoError.RequiredFieldMissing)
else:
rpc.requestId = requestId

var requestBuffer: seq[byte]
discard ?pb.getField(2, requestBuffer)
rpc.request = ?FilterRequest.decode(requestBuffer)
if not ?pb.getField(2, requestBuffer):
rpc.request = none(FilterRequest)
else:
let request = ?FilterRequest.decode(requestBuffer)
rpc.request = some(request)

var pushBuffer: seq[byte]
discard ?pb.getField(3, pushBuffer)
rpc.push = ?MessagePush.decode(pushBuffer)
if not ?pb.getField(3, pushBuffer):
rpc.push = none(MessagePush)
else:
let push = ?MessagePush.decode(pushBuffer)
rpc.push = some(push)

ok(rpc)

0 comments on commit af4fb5f

Please sign in to comment.