Skip to content

Commit

Permalink
Merge branch 'master' into feat/c-bindings-multiinstance
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Dec 15, 2023
2 parents b3c5469 + 097123a commit 047b28f
Show file tree
Hide file tree
Showing 17 changed files with 505 additions and 207 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
url = https://github.com/waku-org/waku-rln-contract.git
[submodule "waku/v2/protocol/waku-proto"]
path = waku/v2/protocol/waku-proto
url = git@github.com:waku-org/waku-proto.git
url = https://github.com/waku-org/waku-proto
9 changes: 0 additions & 9 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,15 +264,6 @@ var (
Destination: &options.Relay.ContentTopics,
EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"},
})
BridgeTopics = altsrc.NewGenericFlag(&cli.GenericFlag{
Name: "bridge-topics",
Usage: "Bridge two pubsub topics, from_topic:to_topic. Argument may be repeated.",
EnvVars: []string{"WAKUNODE2_BRIDGE_TOPIC"},
Value: &cliutils.BridgeTopicSlice{
Values: &options.Relay.BridgeTopics,
},
Hidden: true,
})
ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "protected-topic",
Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ func main() {
Topics,
ContentTopics,
PubSubTopics,
BridgeTopics,
ProtectedTopics,
RelayPeerExchange,
MinRelayPeersToPublish,
Expand Down
1 change: 0 additions & 1 deletion cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type DiscV5Options struct {
type RelayOptions struct {
Enable bool
Topics cli.StringSlice
BridgeTopics []cliutils.BridgeTopic
ProtectedTopics []cliutils.ProtectedTopic
PubSubTopics cli.StringSlice
ContentTopics cli.StringSlice
Expand Down
74 changes: 0 additions & 74 deletions cmd/waku/relay.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
package main

import (
"bytes"
"context"
"sync"
"time"

"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
wprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/rendezvous"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

var fwdMetaTag = []byte{102, 119, 100} //"fwd"

func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode, pubSubTopicMap map[string][]string) error {
for nodeTopic, cTopics := range pubSubTopicMap {
nodeTopic := nodeTopic
Expand Down Expand Up @@ -85,72 +78,5 @@ func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.W
}
}

err := bridgeTopics(ctx, wg, wakuNode)
if err != nil {
return err
}

return nil
}

func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode) error {
// Bridge topics
bridgedTopics := make(map[string]map[string]struct{})
bridgedTopicsSet := make(map[string]struct{})
for _, topics := range options.Relay.BridgeTopics {
_, ok := bridgedTopics[topics.FromTopic]
if !ok {
bridgedTopics[topics.FromTopic] = make(map[string]struct{})
}

bridgedTopics[topics.FromTopic][topics.ToTopic] = struct{}{}
bridgedTopicsSet[topics.FromTopic] = struct{}{}
bridgedTopicsSet[topics.ToTopic] = struct{}{}
}

// Make sure all topics are subscribed
for _, topic := range maps.Keys(bridgedTopicsSet) {
if !wakuNode.Relay().IsSubscribed(topic) {
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(topic), relay.WithoutConsumer())
if err != nil {
return err
}
}
}

for fromTopic, toTopics := range bridgedTopics {
subscriptions, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(fromTopic))
if err != nil {
return err
}

topics := maps.Keys(toTopics)
for _, subscription := range subscriptions {
wg.Add(1)
go func(subscription *relay.Subscription, topics []string) {
defer wg.Done()
for env := range subscription.Ch {
for _, topic := range topics {
// HACK: message has been already fwded
metaLen := len(env.Message().Meta)
fwdTagLen := len(fwdMetaTag)
if metaLen >= fwdTagLen && bytes.Equal(env.Message().Meta[metaLen-fwdTagLen:], fwdMetaTag) {
continue
}

// HACK: We append magic numbers here, just so the pubsub message ID will change
env.Message().Meta = append(env.Message().Meta, fwdMetaTag...)
_, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic))
if err != nil {
utils.Logger().Warn("could not bridge message", logging.HexBytes("hash", env.Hash()),
zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic),
zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
}
}
}
}(subscription, topics)
}
}

return nil
}
56 changes: 0 additions & 56 deletions waku/cliutils/bridge.go

This file was deleted.

7 changes: 1 addition & 6 deletions waku/v2/discv5/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,14 @@ func (d *DiscoveryV5) evaluateNode() func(node *enode.Node) bool {
if node == nil {
return false
}
d.log.Debug("found a peer", logging.ENode("enr", node))

// node filtering based on ENR; we do not filter based on ENR in the first waku discv5 beta stage
if !isWakuNode(node) {
d.log.Debug("peer is not waku node", logging.ENode("enr", node))
return false
}
d.log.Debug("peer is a waku node", logging.ENode("enr", node))
_, err := wenr.EnodeToPeerInfo(node)

_, err := wenr.EnodeToPeerInfo(node)
if err != nil {
d.metrics.RecordError(peerInfoFailure)
d.log.Error("obtaining peer info from enode", logging.ENode("enr", node), zap.Error(err))
Expand Down Expand Up @@ -411,20 +409,17 @@ func (d *DiscoveryV5) DefaultPredicate() Predicate {
}

if nodeRS == nil {
d.log.Debug("node has no shards registered", logging.ENode("node", n))
// Node has no shards registered.
return false
}

if nodeRS.ClusterID != localRS.ClusterID {
d.log.Debug("cluster id mismatch from local clusterid", logging.ENode("node", n), zap.Error(err))
return false
}

// Contains any
for _, idx := range localRS.ShardIDs {
if nodeRS.Contains(localRS.ClusterID, idx) {
d.log.Debug("shards match for discovered node", logging.ENode("node", n))
return true
}
}
Expand Down
1 change: 0 additions & 1 deletion waku/v2/peermanager/peer_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,6 @@ func (pm *PeerManager) AddDiscoveredPeer(p service.PeerData, connectNow bool) {
enr, err := pm.host.Peerstore().(wps.WakuPeerstore).ENR(p.AddrInfo.ID)
// Verifying if the enr record is more recent (DiscV5 and peer exchange can return peers already seen)
if err == nil && enr.Record().Seq() > p.ENR.Seq() {
pm.logger.Debug("found discovered peer already in peerStore", logging.HostID("peer", p.AddrInfo.ID))
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/envelope_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestEnvelope(t *testing.T) {

require.Equal(
t,
[]uint8{70, 218, 246, 174, 188, 127, 199, 220, 111, 30, 61, 218, 238, 60, 83, 3, 179, 98, 85, 35, 7, 107, 188, 138, 32, 70, 170, 126, 55, 21, 71, 70},
[]byte{0x91, 0x0, 0xe4, 0xa5, 0xcf, 0xf7, 0x19, 0x27, 0x49, 0x81, 0x66, 0xb3, 0xdf, 0xc7, 0xa6, 0x31, 0xf0, 0x87, 0xc7, 0x29, 0xb4, 0x28, 0x83, 0xb9, 0x5c, 0x31, 0x25, 0x33, 0x3, 0xc9, 0x7, 0x95},
hash,
)
}
7 changes: 4 additions & 3 deletions waku/v2/protocol/lightpush/waku_lightpush.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ func NewWakuLightPush(relay *relay.WakuRelay, pm *peermanager.PeerManager, reg p
wakuLP.pm = pm
wakuLP.metrics = newMetrics(reg)

if pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}

return wakuLP
}

Expand All @@ -73,9 +77,6 @@ func (wakuLP *WakuLightPush) Start(ctx context.Context) error {
wakuLP.h.SetStreamHandlerMatch(LightPushID_v20beta1, protocol.PrefixTextMatch(string(LightPushID_v20beta1)), wakuLP.onRequest(ctx))
wakuLP.log.Info("Light Push protocol started")

if wakuLP.pm != nil {
wakuLP.pm.RegisterWakuProtocol(LightPushID_v20beta1, LightPushENRField)
}
return nil
}

Expand Down
105 changes: 105 additions & 0 deletions waku/v2/protocol/lightpush/waku_lightpush_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package lightpush
import (
"context"
"crypto/rand"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"go.uber.org/zap"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -39,6 +41,22 @@ func makeWakuRelay(t *testing.T, pusubTopic string) (*relay.WakuRelay, *relay.Su
return relay, sub[0], host
}

func waitForMsg(t *testing.T, wg *sync.WaitGroup, ch chan *protocol.Envelope) {
wg.Add(1)
log := utils.Logger()
go func() {
defer wg.Done()
select {
case env := <-ch:
msg := env.Message()
log.Info("Received ", zap.String("msg", msg.String()))
case <-time.After(2 * time.Second):
require.Fail(t, "Message timeout")
}
}()
wg.Wait()
}

// Node1: Relay
// Node2: Relay+Lightpush
// Client that will lightpush a message
Expand Down Expand Up @@ -226,3 +244,90 @@ func TestWakuLightPushAutoSharding(t *testing.T) {
wg.Wait()

}

func TestWakuLightPushCornerCases(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

testTopic := "/waku/2/go/lightpush/test"
testContentTopic := "/test/10/my-lp-app/proto"

// Prepare peer manager instance to include in test
pm := peermanager.NewPeerManager(10, 10, utils.Logger())

node1, sub1, host1 := makeWakuRelay(t, testTopic)
defer node1.Stop()
defer sub1.Unsubscribe()

node2, sub2, host2 := makeWakuRelay(t, testTopic)
defer node2.Stop()
defer sub2.Unsubscribe()

lightPushNode2 := NewWakuLightPush(node2, pm, prometheus.DefaultRegisterer, utils.Logger())
lightPushNode2.SetHost(host2)
err := lightPushNode2.Start(ctx)
require.NoError(t, err)
defer lightPushNode2.Stop()

port, err := tests.FindFreePort(t, "", 5)
require.NoError(t, err)

clientHost, err := tests.MakeHost(context.Background(), port, rand.Reader)
require.NoError(t, err)
client := NewWakuLightPush(nil, nil, prometheus.DefaultRegisterer, utils.Logger())
client.SetHost(clientHost)

host2.Peerstore().AddAddr(host1.ID(), tests.GetHostAddress(host1), peerstore.PermanentAddrTTL)
err = host2.Peerstore().AddProtocols(host1.ID(), relay.WakuRelayID_v200)
require.NoError(t, err)

err = host2.Connect(ctx, host2.Peerstore().PeerInfo(host1.ID()))
require.NoError(t, err)

clientHost.Peerstore().AddAddr(host2.ID(), tests.GetHostAddress(host2), peerstore.PermanentAddrTTL)
err = clientHost.Peerstore().AddProtocols(host2.ID(), LightPushID_v20beta1)
require.NoError(t, err)

msg2 := tests.CreateWakuMessage(testContentTopic, utils.GetUnixEpoch())

// Wait for the mesh connection to happen between node1 and node2
time.Sleep(2 * time.Second)

var wg sync.WaitGroup

var lpOptions []Option
lpOptions = append(lpOptions, WithPubSubTopic(testTopic))
lpOptions = append(lpOptions, WithPeer(host2.ID()))

// Check that msg publish has passed for nominal case
_, err = client.Publish(ctx, msg2, lpOptions...)
require.NoError(t, err)

// Wait for the nominal case message at node1
waitForMsg(t, &wg, sub1.Ch)

// Test error case with nil message
_, err = client.Publish(ctx, nil, lpOptions...)
require.Error(t, err)

// Create new "dummy" host - not related to any node
host3, err := tests.MakeHost(context.Background(), 12345, rand.Reader)
require.NoError(t, err)

var lpOptions2 []Option

// Test error case with empty options
_, err = client.Publish(ctx, msg2, lpOptions2...)
require.Error(t, err)

// Test error case with unrelated host
_, err = client.Publish(ctx, msg2, WithPubSubTopic(testTopic), WithPeer(host3.ID()))
require.Error(t, err)

// Test corner case with default pubSub topic
_, err = client.Publish(ctx, msg2, WithDefaultPubsubTopic(), WithPeer(host2.ID()))
require.NoError(t, err)

// Test situation when cancel func is nil
lightPushNode2.cancel = nil
}
Loading

0 comments on commit 047b28f

Please sign in to comment.