diff --git a/eth.nimble b/eth.nimble index 296b3cd7..6215f14c 100644 --- a/eth.nimble +++ b/eth.nimble @@ -52,6 +52,7 @@ proc runP2pTests() = "test_shh", "test_shh_config", "test_shh_connect", + "test_waku_bridge", "test_protocol_handlers", ]: runTest("tests/p2p/" & filename) diff --git a/eth/p2p/rlpx_protocols/waku_protocol.nim b/eth/p2p/rlpx_protocols/waku_protocol.nim new file mode 100644 index 00000000..99c88c56 --- /dev/null +++ b/eth/p2p/rlpx_protocols/waku_protocol.nim @@ -0,0 +1,468 @@ +# +# Waku +# (c) Copyright 2018-2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) +# + +## Waku +## ******* +## +## Waku is a fork of Whisper. +## +## Waku is a gossip protocol that synchronizes a set of messages across nodes +## with attention given to sender and recipient anonymitiy. Messages are +## categorized by a topic and stay alive in the network based on a time-to-live +## measured in seconds. Spam prevention is based on proof-of-work, where large +## or long-lived messages must spend more work. +## +## Example usage +## ---------- +## First an `EthereumNode` needs to be created, either with all capabilities set +## or with specifically the Waku capability set. +## The latter can be done like this: +## +## .. code-block::nim +## var node = newEthereumNode(keypair, address, netId, nil, +## addAllCapabilities = false) +## node.addCapability Waku +## +## Now calls such as ``postMessage`` and ``subscribeFilter`` can be done. +## However, they only make real sense after ``connectToNetwork`` was started. As +## else there will be no peers to send and receive messages from. + +import + options, tables, times, chronos, chronicles, + eth/[keys, async_utils, p2p], whisper/whisper_types + +import eth/p2p/rlpx_protocols/whisper_protocol + +export + whisper_types + +logScope: + topics = "waku" + +const + defaultQueueCapacity = 256 + wakuVersion* = 0 ## Waku version. + wakuVersionStr* = $wakuVersion ## Waku version. + defaultMinPow* = 0.2'f64 ## The default minimum PoW requirement for this node. + defaultMaxMsgSize* = 1024'u32 * 1024'u32 ## The current default and max + ## message size. This can never be larger than the maximum RLPx message size. + messageInterval* = chronos.milliseconds(300) ## Interval at which messages are + ## send to peers, in ms. + pruneInterval* = chronos.milliseconds(1000) ## Interval at which message + ## queue is pruned, in ms. + +type + WakuConfig* = object + powRequirement*: float64 + bloom*: Bloom + isLightNode*: bool + maxMsgSize*: uint32 + + WakuPeer = ref object + initialized: bool # when successfully completed the handshake + powRequirement*: float64 + bloom*: Bloom + isLightNode*: bool + trusted*: bool + received: HashSet[Message] + + WakuNetwork = ref object + queue*: ref Queue + filters*: Filters + config*: WakuConfig + + +proc allowed*(msg: Message, config: WakuConfig): bool = + # Check max msg size, already happens in RLPx but there is a specific waku + # max msg size which should always be < RLPx max msg size + if msg.size > config.maxMsgSize: + warn "Message size too large", size = msg.size + return false + + if msg.pow < config.powRequirement: + warn "Message PoW too low", pow = msg.pow, minPow = config.powRequirement + return false + + if not bloomFilterMatch(config.bloom, msg.bloom): + warn "Message does not match node bloom filter" + return false + + return true + +proc run(peer: Peer) {.gcsafe, async.} +proc run(node: EthereumNode, network: WakuNetwork) {.gcsafe, async.} + +proc initProtocolState*(network: WakuNetwork, node: EthereumNode) {.gcsafe.} = + new(network.queue) + network.queue[] = initQueue(defaultQueueCapacity) + network.filters = initTable[string, Filter]() + network.config.bloom = fullBloom() + network.config.powRequirement = defaultMinPow + network.config.isLightNode = false + network.config.maxMsgSize = defaultMaxMsgSize + asyncCheck node.run(network) + +p2pProtocol Waku(version = wakuVersion, + rlpxName = "waku", + peerState = WakuPeer, + networkState = WakuNetwork): + + onPeerConnected do (peer: Peer): + trace "onPeerConnected Waku" + let + wakuNet = peer.networkState + wakuPeer = peer.state + + let m = await peer.status(wakuVersion, + cast[uint](wakuNet.config.powRequirement), + @(wakuNet.config.bloom), + wakuNet.config.isLightNode, + timeout = chronos.milliseconds(500)) + + if m.protocolVersion == wakuVersion: + debug "Waku peer", peer, wakuVersion + else: + raise newException(UselessPeerError, "Incompatible Waku version") + + wakuPeer.powRequirement = cast[float64](m.powConverted) + + if m.bloom.len > 0: + if m.bloom.len != bloomSize: + raise newException(UselessPeerError, "Bloomfilter size mismatch") + else: + wakuPeer.bloom.bytesCopy(m.bloom) + else: + # If no bloom filter is send we allow all + wakuPeer.bloom = fullBloom() + + wakuPeer.isLightNode = m.isLightNode + if wakuPeer.isLightNode and wakuNet.config.isLightNode: + # No sense in connecting two light nodes so we disconnect + raise newException(UselessPeerError, "Two light nodes connected") + + wakuPeer.received.init() + wakuPeer.trusted = false + wakuPeer.initialized = true + + if not wakuNet.config.isLightNode: + traceAsyncErrors peer.run() + + debug "Waku peer initialized", peer + + handshake: + proc status(peer: Peer, + protocolVersion: uint, + powConverted: uint, + bloom: Bytes, + isLightNode: bool) + + proc messages(peer: Peer, envelopes: openarray[Envelope]) = + if not peer.state.initialized: + warn "Handshake not completed yet, discarding messages" + return + + for envelope in envelopes: + # check if expired or in future, or ttl not 0 + if not envelope.valid(): + warn "Expired or future timed envelope", peer + # disconnect from peers sending bad envelopes + # await peer.disconnect(SubprotocolReason) + continue + + let msg = initMessage(envelope) + if not msg.allowed(peer.networkState.config): + # disconnect from peers sending bad envelopes + # await peer.disconnect(SubprotocolReason) + continue + + # This peer send this message thus should not receive it again. + # If this peer has the message in the `received` set already, this means + # it was either already received here from this peer or send to this peer. + # Either way it will be in our queue already (and the peer should know + # this) and this peer is sending duplicates. + # Note: geth does not check if a peer has send a message to them before + # broadcasting this message. This too is seen here as a duplicate message + # (see above comment). If we want to seperate these cases (e.g. when peer + # rating), then we have to add a "peer.state.send" HashSet. + if peer.state.received.containsOrIncl(msg): + debug "Peer sending duplicate messages", peer, hash = msg.hash + # await peer.disconnect(SubprotocolReason) + continue + + # This can still be a duplicate message, but from another peer than + # the peer who send the message. + if peer.networkState.queue[].add(msg): + # notify filters of this message + peer.networkState.filters.notify(msg) + + proc powRequirement(peer: Peer, value: uint) = + if not peer.state.initialized: + warn "Handshake not completed yet, discarding powRequirement" + return + + peer.state.powRequirement = cast[float64](value) + + proc bloomFilterExchange(peer: Peer, bloom: Bytes) = + if not peer.state.initialized: + warn "Handshake not completed yet, discarding bloomFilterExchange" + return + + if bloom.len == bloomSize: + peer.state.bloom.bytesCopy(bloom) + + nextID 126 + + proc p2pRequest(peer: Peer, envelope: Envelope) = + # TODO: here we would have to allow to insert some specific implementation + # such as e.g. Waku Mail Server + discard + + proc p2pMessage(peer: Peer, envelope: Envelope) = + if peer.state.trusted: + # when trusted we can bypass any checks on envelope + let msg = Message(env: envelope, isP2P: true) + peer.networkState.filters.notify(msg) + + # Following message IDs are not part of EIP-627, but are added and used by + # the Status application, we ignore them for now. + nextID 11 + proc batchAcknowledged(peer: Peer) = discard + proc messageResponse(peer: Peer) = discard + + nextID 123 + requestResponse: + proc p2pSyncRequest(peer: Peer) = discard + proc p2pSyncResponse(peer: Peer) = discard + + proc p2pRequestComplete(peer: Peer) = discard + +# 'Runner' calls --------------------------------------------------------------- + +proc processQueue(peer: Peer) = + # Send to peer all valid and previously not send envelopes in the queue. + var + envelopes: seq[Envelope] = @[] + wakuPeer = peer.state(Waku) + wakuNet = peer.networkState(Waku) + + for message in wakuNet.queue.items: + if wakuPeer.received.contains(message): + # debug "message was already send to peer" + continue + + if message.pow < wakuPeer.powRequirement: + debug "Message PoW too low for peer", pow = message.pow, + powReq = wakuPeer.powRequirement + continue + + if not bloomFilterMatch(wakuPeer.bloom, message.bloom): + debug "Message does not match peer bloom filter" + continue + + trace "Adding envelope" + envelopes.add(message.env) + wakuPeer.received.incl(message) + + trace "Sending envelopes", amount=envelopes.len + # Ignore failure of sending messages, this could occur when the connection + # gets dropped + traceAsyncErrors peer.messages(envelopes) + +proc run(peer: Peer) {.async.} = + while peer.connectionState notin {Disconnecting, Disconnected}: + peer.processQueue() + await sleepAsync(messageInterval) + +proc pruneReceived(node: EthereumNode) {.raises: [].} = + if node.peerPool != nil: # XXX: a bit dirty to need to check for this here ... + var wakuNet = node.protocolState(Waku) + + for peer in node.protocolPeers(Waku): + if not peer.initialized: + continue + + # NOTE: Perhaps alter the queue prune call to keep track of a HashSet + # of pruned messages (as these should be smaller), and diff this with + # the received sets. + peer.received = intersection(peer.received, wakuNet.queue.itemHashes) + +proc run(node: EthereumNode, network: WakuNetwork) {.async.} = + while true: + # prune message queue every second + # TTL unit is in seconds, so this should be sufficient? + network.queue[].prune() + # pruning the received sets is not necessary for correct workings + # but simply from keeping the sets growing indefinitely + node.pruneReceived() + await sleepAsync(pruneInterval) + +# Private EthereumNode calls --------------------------------------------------- + +proc sendP2PMessage(node: EthereumNode, peerId: NodeId, env: Envelope): bool = + for peer in node.peers(Waku): + if peer.remote.id == peerId: + asyncCheck peer.p2pMessage(env) + return true + +proc queueMessage(node: EthereumNode, msg: Message): bool = + + var wakuNet = node.protocolState(Waku) + # We have to do the same checks here as in the messages proc not to leak + # any information that the message originates from this node. + if not msg.allowed(wakuNet.config): + return false + + trace "Adding message to queue" + if wakuNet.queue[].add(msg): + # Also notify our own filters of the message we are sending, + # e.g. msg from local Dapp to Dapp + wakuNet.filters.notify(msg) + + return true + +# Public EthereumNode calls ---------------------------------------------------- + +proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](), + symKey = none[SymKey](), src = none[PrivateKey](), + ttl: uint32, topic: Topic, payload: Bytes, + padding = none[Bytes](), powTime = 1'f, + powTarget = defaultMinPow, + targetPeer = none[NodeId]()): bool = + ## Post a message on the message queue which will be processed at the + ## next `messageInterval`. + ## + ## NOTE: This call allows a post without encryption. If encryption is + ## mandatory it should be enforced a layer up + let payload = encode(Payload(payload: payload, src: src, dst: pubKey, + symKey: symKey, padding: padding)) + if payload.isSome(): + var env = Envelope(expiry:epochTime().uint32 + ttl, + ttl: ttl, topic: topic, data: payload.get(), nonce: 0) + + # Allow lightnode to post only direct p2p messages + if targetPeer.isSome(): + return node.sendP2PMessage(targetPeer.get(), env) + elif not node.protocolState(Waku).config.isLightNode: + # non direct p2p message can not have ttl of 0 + if env.ttl == 0: + return false + var msg = initMessage(env, powCalc = false) + # XXX: make this non blocking or not? + # In its current blocking state, it could be noticed by a peer that no + # messages are send for a while, and thus that mining PoW is done, and + # that next messages contains a message originated from this peer + # zah: It would be hard to execute this in a background thread at the + # moment. We'll need a way to send custom "tasks" to the async message + # loop (e.g. AD2 support for AsyncChannels). + if not msg.sealEnvelope(powTime, powTarget): + return false + + # need to check expiry after mining PoW + if not msg.env.valid(): + return false + + return node.queueMessage(msg) + else: + warn "Light node not allowed to post messages" + return false + else: + error "Encoding of payload failed" + return false + +proc subscribeFilter*(node: EthereumNode, filter: Filter, + handler:FilterMsgHandler = nil): string = + ## Initiate a filter for incoming/outgoing messages. Messages can be + ## retrieved with the `getFilterMessages` call or with a provided + ## `FilterMsgHandler`. + ## + ## NOTE: This call allows for a filter without decryption. If encryption is + ## mandatory it should be enforced a layer up. + return node.protocolState(Waku).filters.subscribeFilter(filter, handler) + +proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool = + ## Remove a previously subscribed filter. + var filter: Filter + return node.protocolState(Waku).filters.take(filterId, filter) + +proc getFilterMessages*(node: EthereumNode, filterId: string): seq[ReceivedMessage] = + ## Get all the messages currently in the filter queue. This will reset the + ## filter message queue. + return node.protocolState(Waku).filters.getFilterMessages(filterId) + +proc filtersToBloom*(node: EthereumNode): Bloom = + ## Returns the bloom filter of all topics of all subscribed filters. + return node.protocolState(Waku).filters.toBloom() + +proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} = + ## Sets the PoW requirement for this node, will also send + ## this new PoW requirement to all connected peers. + ## + ## Failures when sending messages to peers will not be reported. + # NOTE: do we need a tolerance of old PoW for some time? + node.protocolState(Waku).config.powRequirement = powReq + var futures: seq[Future[void]] = @[] + for peer in node.peers(Waku): + futures.add(peer.powRequirement(cast[uint](powReq))) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) + +proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} = + ## Sets the bloom filter for this node, will also send + ## this new bloom filter to all connected peers. + ## + ## Failures when sending messages to peers will not be reported. + # NOTE: do we need a tolerance of old bloom filter for some time? + node.protocolState(Waku).config.bloom = bloom + var futures: seq[Future[void]] = @[] + for peer in node.peers(Waku): + futures.add(peer.bloomFilterExchange(@bloom)) + + # Exceptions from sendMsg will not be raised + await allFutures(futures) + +proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool = + ## Set the maximum allowed message size. + ## Can not be set higher than ``defaultMaxMsgSize``. + if size > defaultMaxMsgSize: + warn "size > defaultMaxMsgSize" + return false + node.protocolState(Waku).config.maxMsgSize = size + return true + +proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool = + ## Set a connected peer as trusted. + for peer in node.peers(Waku): + if peer.remote.id == peerId: + peer.state(Waku).trusted = true + return true + +proc setLightNode*(node: EthereumNode, isLightNode: bool) = + ## Set this node as a Waku light node. + ## + ## NOTE: Should be run before connection is made with peers as this + ## setting is only communicated at peer handshake. + node.protocolState(Waku).config.isLightNode = isLightNode + +proc configureWaku*(node: EthereumNode, config: WakuConfig) = + ## Apply a Waku configuration. + ## + ## NOTE: Should be run before connection is made with peers as some + ## of the settings are only communicated at peer handshake. + node.protocolState(Waku).config = config + +proc resetMessageQueue*(node: EthereumNode) = + ## Full reset of the message queue. + ## + ## NOTE: Not something that should be run in normal circumstances. + node.protocolState(Waku).queue[] = initQueue(defaultQueueCapacity) + +proc shareMessageQueue*(node: EthereumNode) = + node.protocolState(Waku).queue = node.protocolState(Whisper).queue diff --git a/eth/p2p/rlpx_protocols/whisper_protocol.nim b/eth/p2p/rlpx_protocols/whisper_protocol.nim index c23d4e55..d92919d8 100644 --- a/eth/p2p/rlpx_protocols/whisper_protocol.nim +++ b/eth/p2p/rlpx_protocols/whisper_protocol.nim @@ -70,7 +70,7 @@ type received: HashSet[Message] WhisperNetwork = ref object - queue*: Queue + queue*: ref Queue filters*: Filters config*: WhisperConfig @@ -95,7 +95,8 @@ proc run(peer: Peer) {.gcsafe, async.} proc run(node: EthereumNode, network: WhisperNetwork) {.gcsafe, async.} proc initProtocolState*(network: WhisperNetwork, node: EthereumNode) {.gcsafe.} = - network.queue = initQueue(defaultQueueCapacity) + new(network.queue) + network.queue[] = initQueue(defaultQueueCapacity) network.filters = initTable[string, Filter]() network.config.bloom = fullBloom() network.config.powRequirement = defaultMinPow @@ -192,7 +193,7 @@ p2pProtocol Whisper(version = whisperVersion, # This can still be a duplicate message, but from another peer than # the peer who send the message. - if peer.networkState.queue.add(msg): + if peer.networkState.queue[].add(msg): # notify filters of this message peer.networkState.filters.notify(msg) @@ -291,7 +292,7 @@ proc run(node: EthereumNode, network: WhisperNetwork) {.async.} = while true: # prune message queue every second # TTL unit is in seconds, so this should be sufficient? - network.queue.prune() + network.queue[].prune() # pruning the received sets is not necessary for correct workings # but simply from keeping the sets growing indefinitely node.pruneReceived() @@ -314,7 +315,7 @@ proc queueMessage(node: EthereumNode, msg: Message): bool = return false trace "Adding message to queue" - if whisperNet.queue.add(msg): + if whisperNet.queue[].add(msg): # Also notify our own filters of the message we are sending, # e.g. msg from local Dapp to Dapp whisperNet.filters.notify(msg) @@ -456,4 +457,4 @@ proc resetMessageQueue*(node: EthereumNode) = ## Full reset of the message queue. ## ## NOTE: Not something that should be run in normal circumstances. - node.protocolState(Whisper).queue = initQueue(defaultQueueCapacity) + node.protocolState(Whisper).queue[] = initQueue(defaultQueueCapacity) diff --git a/tests/p2p/test_waku_bridge.nim b/tests/p2p/test_waku_bridge.nim new file mode 100644 index 00000000..ed548ae2 --- /dev/null +++ b/tests/p2p/test_waku_bridge.nim @@ -0,0 +1,96 @@ +# +# Ethereum P2P +# (c) Copyright 2018 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import + sequtils, unittest, tables, chronos, eth/p2p, eth/p2p/peer_pool, + ./p2p_test_helper + +import eth/p2p/rlpx_protocols/waku_protocol as waku +import eth/p2p/rlpx_protocols/whisper_protocol as whisper + +let safeTTL = 5'u32 +let waitInterval = waku.messageInterval + 150.milliseconds + +suite "Waku - Whisper bridge tests": + # Waku Whisper node has both capabilities, listens to Whisper and Waku and + # relays traffic between the two. + var + nodeWakuWhisper = setupTestNode(Whisper, Waku) # This will be the bridge + nodeWhisper = setupTestNode(Whisper) + nodeWaku = setupTestNode(Waku) + + nodeWakuWhisper.startListening() + let bridgeNode = newNode(initENode(nodeWakuWhisper.keys.pubKey, + nodeWakuWhisper.address)) + nodeWakuWhisper.shareMessageQueue() + + waitFor nodeWhisper.peerPool.connectToNode(bridgeNode) + waitFor nodeWaku.peerPool.connectToNode(bridgeNode) + + asyncTest "WakuWhisper and Whisper peers connected": + check: + nodeWhisper.peerPool.connectedNodes.len() == 1 + nodeWaku.peerPool.connectedNodes.len() == 1 + + asyncTest "Whisper - Waku communcation via bridge": + # topic whisper node subscribes to, waku node posts to + let topic1 = [byte 0x12, 0, 0, 0] + # topic waku node subscribes to, whisper node posts to + let topic2 = [byte 0x34, 0, 0, 0] + var payloads = [repeat(byte 0, 10), repeat(byte 1, 10)] + var futures = [newFuture[int](), newFuture[int]()] + + proc handler1(msg: whisper.ReceivedMessage) = + check msg.decoded.payload == payloads[0] + futures[0].complete(1) + proc handler2(msg: waku.ReceivedMessage) = + check msg.decoded.payload == payloads[1] + futures[1].complete(1) + + var filter1 = whisper.subscribeFilter(nodeWhisper, + whisper.newFilter(topics = @[topic1]), handler1) + var filter2 = waku.subscribeFilter(nodeWaku, + waku.newFilter(topics = @[topic2]), handler2) + + check: + # Message should also end up in the Whisper node its queue via the bridge + waku.postMessage(nodeWaku, ttl = safeTTL + 1, topic = topic1, + payload = payloads[0]) == true + # Message should also end up in the Waku node its queue via the bridge + whisper.postMessage(nodeWhisper, ttl = safeTTL, topic = topic2, + payload = payloads[1]) == true + nodeWhisper.protocolState(Whisper).queue.items.len == 1 + nodeWaku.protocolState(Waku).queue.items.len == 1 + + # waitInterval*2 as messages have to pass the bridge also (2 hops) + await allFutures(futures).withTimeout(waitInterval*2) + + # Relay can receive Whisper & Waku messages + nodeWakuWhisper.protocolState(Whisper).queue.items.len == 2 + nodeWakuWhisper.protocolState(Waku).queue.items.len == 2 + + # Whisper node can receive Waku messages (via bridge) + nodeWhisper.protocolState(Whisper).queue.items.len == 2 + # Waku node can receive Whisper messages (via bridge) + nodeWaku.protocolState(Waku).queue.items.len == 2 + + whisper.unsubscribeFilter(nodeWhisper, filter1) == true + waku.unsubscribeFilter(nodeWaku, filter2) == true + + # XXX: This reads a bit weird, but eh + waku.resetMessageQueue(nodeWaku) + whisper.resetMessageQueue(nodeWhisper) + # shared queue so Waku and Whisper should be set to 0 + waku.resetMessageQueue(nodeWakuWhisper) + + check: + nodeWhisper.protocolState(Whisper).queue.items.len == 0 + nodeWaku.protocolState(Waku).queue.items.len == 0 + nodeWakuWhisper.protocolState(Whisper).queue.items.len == 0 + nodeWakuWhisper.protocolState(Waku).queue.items.len == 0