Skip to content

Commit

Permalink
Merge branch 'master' into protobuf
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos authored Nov 3, 2023
2 parents 51b6546 + d51c207 commit 26a51da
Show file tree
Hide file tree
Showing 18 changed files with 564 additions and 167 deletions.
9 changes: 9 additions & 0 deletions cmd/waku/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,15 @@ var (
Destination: &options.Relay.ContentTopics,
EnvVars: []string{"WAKUNODE2_CONTENT_TOPICS"},
})
BridgeTopics = altsrc.NewGenericFlag(&cli.GenericFlag{
Name: "bridge-topics",
Usage: "Bridge two pubsub topics, from_topic:to_topic. Argument may be repeated.",
EnvVars: []string{"WAKUNODE2_BRIDGE_TOPIC"},
Value: &cliutils.BridgeTopicSlice{
Values: &options.Relay.BridgeTopics,
},
Hidden: true,
})
ProtectedTopics = cliutils.NewGenericFlagMultiValue(&cli.GenericFlag{
Name: "protected-topic",
Usage: "Topics and its public key to be used for message validation, topic:pubkey. Argument may be repeated.",
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func main() {
Topics,
ContentTopics,
PubSubTopics,
BridgeTopics,
ProtectedTopics,
RelayPeerExchange,
MinRelayPeersToPublish,
Expand Down
65 changes: 2 additions & 63 deletions cmd/waku/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,69 +383,8 @@ func Execute(options NodeOptions) error {
var wg sync.WaitGroup

if options.Relay.Enable {
for nodeTopic, cTopics := range pubSubTopicMap {
nodeTopic := nodeTopic
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer())
if err != nil {
return err
}

if len(options.Rendezvous.Nodes) != 0 {
// Register the node in rendezvous point
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)

wg.Add(1)
go func(nodeTopic string) {
t := time.NewTicker(rendezvous.RegisterDefaultTTL)
defer t.Stop()
defer wg.Done()

for {
select {
case <-ctx.Done():
return
case <-t.C:
// Register in rendezvous points periodically
wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints())
}
}
}(nodeTopic)

wg.Add(1)
go func(nodeTopic string) {
defer wg.Done()
desiredOutDegree := wakuNode.Relay().Params().D
t := time.NewTicker(7 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic))
peersToFind := desiredOutDegree - peerCnt
if peersToFind <= 0 {
continue
}

rp := <-iter.Next(ctx)
if rp == nil {
continue
}
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind)
cancel()
}
}
}(nodeTopic)

}
}

for _, protectedTopic := range options.Relay.ProtectedTopics {
if err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey); err != nil {
return nonRecoverErrorMsg("could not add signed topic validator: %w", err)
}
if err = handleRelayTopics(ctx, &wg, wakuNode, pubSubTopicMap); err != nil {
return err
}
}

Expand Down
1 change: 1 addition & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type DiscV5Options struct {
type RelayOptions struct {
Enable bool
Topics cli.StringSlice
BridgeTopics []cliutils.BridgeTopic
ProtectedTopics []cliutils.ProtectedTopic
PubSubTopics cli.StringSlice
ContentTopics cli.StringSlice
Expand Down
156 changes: 156 additions & 0 deletions cmd/waku/relay.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
package main

import (
"bytes"
"context"
"sync"
"time"

"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
wprotocol "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"github.com/waku-org/go-waku/waku/v2/rendezvous"
"github.com/waku-org/go-waku/waku/v2/utils"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)

var fwdMetaTag = []byte{102, 119, 100} //"fwd"

func handleRelayTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode, pubSubTopicMap map[string][]string) error {
for nodeTopic, cTopics := range pubSubTopicMap {
nodeTopic := nodeTopic
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(nodeTopic, cTopics...), relay.WithoutConsumer())
if err != nil {
return err
}

if len(options.Rendezvous.Nodes) != 0 {
// Register the node in rendezvous point
iter := rendezvous.NewRendezvousPointIterator(options.Rendezvous.Nodes)

wg.Add(1)
go func(nodeTopic string) {
t := time.NewTicker(rendezvous.RegisterDefaultTTL)
defer t.Stop()
defer wg.Done()

for {
select {
case <-ctx.Done():
return
case <-t.C:
// Register in rendezvous points periodically
wakuNode.Rendezvous().RegisterWithNamespace(ctx, nodeTopic, iter.RendezvousPoints())
}
}
}(nodeTopic)

wg.Add(1)
go func(nodeTopic string) {
defer wg.Done()
desiredOutDegree := wakuNode.Relay().Params().D
t := time.NewTicker(7 * time.Second)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
peerCnt := len(wakuNode.Relay().PubSub().ListPeers(nodeTopic))
peersToFind := desiredOutDegree - peerCnt
if peersToFind <= 0 {
continue
}

rp := <-iter.Next(ctx)
if rp == nil {
continue
}
ctx, cancel := context.WithTimeout(ctx, 7*time.Second)
wakuNode.Rendezvous().DiscoverWithNamespace(ctx, nodeTopic, rp, peersToFind)
cancel()
}
}
}(nodeTopic)

}
}

// Protected topics
for _, protectedTopic := range options.Relay.ProtectedTopics {
if err := wakuNode.Relay().AddSignedTopicValidator(protectedTopic.Topic, protectedTopic.PublicKey); err != nil {
return nonRecoverErrorMsg("could not add signed topic validator: %w", err)
}
}

err := bridgeTopics(ctx, wg, wakuNode)
if err != nil {
return err
}

return nil
}

func bridgeTopics(ctx context.Context, wg *sync.WaitGroup, wakuNode *node.WakuNode) error {
// Bridge topics
bridgedTopics := make(map[string]map[string]struct{})
bridgedTopicsSet := make(map[string]struct{})
for _, topics := range options.Relay.BridgeTopics {
_, ok := bridgedTopics[topics.FromTopic]
if !ok {
bridgedTopics[topics.FromTopic] = make(map[string]struct{})
}

bridgedTopics[topics.FromTopic][topics.ToTopic] = struct{}{}
bridgedTopicsSet[topics.FromTopic] = struct{}{}
bridgedTopicsSet[topics.ToTopic] = struct{}{}
}

// Make sure all topics are subscribed
for _, topic := range maps.Keys(bridgedTopicsSet) {
if !wakuNode.Relay().IsSubscribed(topic) {
_, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(topic), relay.WithoutConsumer())
if err != nil {
return err
}
}
}

for fromTopic, toTopics := range bridgedTopics {
subscriptions, err := wakuNode.Relay().Subscribe(ctx, wprotocol.NewContentFilter(fromTopic))
if err != nil {
return err
}

topics := maps.Keys(toTopics)
for _, subscription := range subscriptions {
wg.Add(1)
go func(subscription *relay.Subscription, topics []string) {
defer wg.Done()
for env := range subscription.Ch {
for _, topic := range topics {
// HACK: message has been already fwded
metaLen := len(env.Message().Meta)
fwdTagLen := len(fwdMetaTag)
if metaLen >= fwdTagLen && bytes.Equal(env.Message().Meta[metaLen-fwdTagLen:], fwdMetaTag) {
continue
}

// HACK: We append magic numbers here, just so the pubsub message ID will change
env.Message().Meta = append(env.Message().Meta, fwdMetaTag...)
_, err := wakuNode.Relay().Publish(ctx, env.Message(), relay.WithPubSubTopic(topic))
if err != nil {
utils.Logger().Warn("could not bridge message", logging.HexString("hash", env.Hash()),
zap.String("fromTopic", env.PubsubTopic()), zap.String("toTopic", topic),
zap.String("contentTopic", env.Message().ContentTopic), zap.Error(err))
}
}
}
}(subscription, topics)
}
}

return nil
}
56 changes: 56 additions & 0 deletions waku/cliutils/bridge.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cliutils

import (
"errors"
"fmt"
"strings"

"golang.org/x/exp/slices"
)

type BridgeTopic struct {
FromTopic string
ToTopic string
}

func (p BridgeTopic) String() string {
return fmt.Sprintf("%s:%s", p.FromTopic, p.ToTopic)
}

type BridgeTopicSlice struct {
Values *[]BridgeTopic
}

func (k *BridgeTopicSlice) Set(value string) error {
topicParts := strings.Split(value, ":")
if len(topicParts) != 2 {
return errors.New("expected from_topic:to_topic")
}

for i := range topicParts {
topicParts[i] = strings.TrimSpace(topicParts[i])
}

if slices.Contains(topicParts, "") {
return errors.New("topic can't be empty")
}

*k.Values = append(*k.Values, BridgeTopic{
FromTopic: topicParts[0],
ToTopic: topicParts[1],
})

return nil
}

func (k *BridgeTopicSlice) String() string {
if k.Values == nil {
return ""
}
var output []string
for _, v := range *k.Values {
output = append(output, v.String())
}

return strings.Join(output, ", ")
}
Loading

0 comments on commit 26a51da

Please sign in to comment.