Skip to content

Commit

Permalink
feat: verify proofs based on bandwidth usage
Browse files Browse the repository at this point in the history
  • Loading branch information
richard-ramos committed Jul 27, 2023
1 parent a422add commit c5f8540
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 11 deletions.
6 changes: 6 additions & 0 deletions cmd/waku/flags_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,11 @@ func rlnFlags() []cli.Flag {
Value: &options.RLNRelay.MembershipContractAddress,
},
},
&cli.IntFlag{
Name: "rln-relay-bandwidth-threshold",
Value: 0,
Usage: "Message rate in bytes/sec after which verification of proofs should happen. Use 0 to disable bandwidth rate limits",
Destination: &options.RLNRelay.BandwidthThreshold,
},
}
}
2 changes: 2 additions & 0 deletions cmd/waku/node_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func checkForRLN(logger *zap.Logger, options Options, nodeOpts *[]node.WakuNodeO
ethPrivKey = options.RLNRelay.ETHPrivateKey
}

*nodeOpts = append(*nodeOpts, node.WithRLNBandwidthThreshold(options.RLNRelay.BandwidthThreshold))

*nodeOpts = append(*nodeOpts, node.WithDynamicRLNRelay(
options.RLNRelay.PubsubTopic,
options.RLNRelay.ContentTopic,
Expand Down
1 change: 1 addition & 0 deletions cmd/waku/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type RLNRelayOptions struct {
ETHPrivateKey *ecdsa.PrivateKey
ETHClientAddress string
MembershipContractAddress common.Address
BandwidthThreshold int
}

// FilterOptions are settings used to enable filter protocol. This is a protocol
Expand Down
8 changes: 7 additions & 1 deletion waku/v2/node/wakunode2_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/protocol/rln/group_manager/static"
r "github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
"golang.org/x/time/rate"
)

// RLNRelay is used to access any operation related to Waku RLN protocol
Expand Down Expand Up @@ -73,7 +74,12 @@ func (w *WakuNode) mountRlnRelay(ctx context.Context) error {
}
}

rlnRelay, err := rln.New(w.Relay(), groupManager, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, w.timesource, w.log)
var limiter *rate.Limiter
if w.opts.rlnRelayBandwidthThreshold != 0 {
limiter = rate.NewLimiter(rate.Limit(w.opts.rlnRelayBandwidthThreshold), w.opts.rlnRelayBandwidthThreshold)
}

rlnRelay, err := rln.New(w.Relay(), groupManager, w.opts.rlnRelayPubsubTopic, w.opts.rlnRelayContentTopic, w.opts.rlnSpamHandler, limiter, w.timesource, w.log)
if err != nil {
return err
}
Expand Down
1 change: 1 addition & 0 deletions waku/v2/node/wakuoptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ type WakuNodeParameters struct {
keystorePassword string
rlnMembershipContractAddress common.Address
rlnRegistrationHandler func(tx *types.Transaction)
rlnRelayBandwidthThreshold int

keepAliveInterval time.Duration

Expand Down
8 changes: 8 additions & 0 deletions waku/v2/node/wakuoptions_rln.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,14 @@ import (
r "github.com/waku-org/go-zerokit-rln/rln"
)

// WithRLNBandwidthThreshold sets the message rate in bytes/sec after which verification of proofs should happen
func WithRLNBandwidthThreshold(rateLimit int) WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.rlnRelayBandwidthThreshold = rateLimit
return nil
}
}

// WithStaticRLNRelay enables the Waku V2 RLN protocol in offchain mode
// Requires the `gowaku_rln` build constrain (or the env variable RLN=true if building go-waku)
func WithStaticRLNRelay(pubsubTopic string, contentTopic string, memberIndex r.MembershipIndex, spamHandler rln.SpamHandler) WakuNodeOption {
Expand Down
2 changes: 1 addition & 1 deletion waku/v2/protocol/rln/rln_relay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (s *WakuRLNRelaySuite) TestOffchainMode() {
groupManager, err := static.NewStaticGroupManager(groupIDCommitments, idCredential, index, utils.Logger())
s.Require().NoError(err)

wakuRLNRelay, err := New(relay, groupManager, RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, timesource.NewDefaultClock(), utils.Logger())
wakuRLNRelay, err := New(relay, groupManager, RLNRELAY_PUBSUB_TOPIC, RLNRELAY_CONTENT_TOPIC, nil, nil, timesource.NewDefaultClock(), utils.Logger())
s.Require().NoError(err)

err = wakuRLNRelay.Start(context.TODO())
Expand Down
28 changes: 19 additions & 9 deletions waku/v2/protocol/rln/waku_rln_relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/waku-org/go-waku/waku/v2/timesource"
"github.com/waku-org/go-zerokit-rln/rln"
"go.uber.org/zap"
"golang.org/x/time/rate"
proto "google.golang.org/protobuf/proto"
)

Expand All @@ -34,6 +35,7 @@ type WakuRLNRelay struct {

groupManager GroupManager
rootTracker *group_manager.MerkleRootTracker
rateLimiter *rate.Limiter

// pubsubTopic is the topic for which rln relay is mounted
pubsubTopic string
Expand All @@ -55,6 +57,7 @@ func New(
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler,
rateLimiter *rate.Limiter,
timesource timesource.Timesource,
log *zap.Logger) (*WakuRLNRelay, error) {
rlnInstance, err := rln.NewRLN()
Expand All @@ -72,6 +75,7 @@ func New(
RLN: rlnInstance,
groupManager: groupManager,
rootTracker: rootTracker,
rateLimiter: rateLimiter,
pubsubTopic: pubsubTopic,
contentTopic: contentTopic,
relay: relay,
Expand Down Expand Up @@ -280,26 +284,32 @@ func (rlnRelay *WakuRLNRelay) addValidator(
pubsubTopic string,
contentTopic string,
spamHandler SpamHandler) error {
validator := func(ctx context.Context, peerID peer.ID, message *pubsub.Message) bool {
rlnRelay.log.Debug("rln-relay topic validator called")
validator := func(ctx context.Context, peerID peer.ID, message *pubsub.Message) pubsub.ValidationResult {
rlnRelay.log.Debug("topic validator called")

wakuMessage := &pb.WakuMessage{}
if err := proto.Unmarshal(message.Data, wakuMessage); err != nil {
rlnRelay.log.Debug("could not unmarshal message")
return true
return pubsub.ValidationReject
}

// check the contentTopic
if (wakuMessage.ContentTopic != "") && (contentTopic != "") && (wakuMessage.ContentTopic != contentTopic) {
rlnRelay.log.Debug("content topic did not match", zap.String("contentTopic", contentTopic))
return true
return pubsub.ValidationAccept
}

if rlnRelay.rateLimiter != nil && rlnRelay.rateLimiter.AllowN(time.Now(), len(message.Data)) {
return pubsub.ValidationAccept
}

rlnRelay.log.Debug("message bandwidth limit exceeded, running rate limit proof validation")

// validate the message
validationRes, err := rlnRelay.ValidateMessage(wakuMessage, nil)
if err != nil {
rlnRelay.log.Debug("validating message", zap.Error(err))
return false
return pubsub.ValidationReject
}

switch validationRes {
Expand All @@ -308,13 +318,13 @@ func (rlnRelay *WakuRLNRelay) addValidator(
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
)
return true
return pubsub.ValidationAccept
case invalidMessage:
rlnRelay.log.Debug("message could not be verified",
zap.String("pubsubTopic", pubsubTopic),
zap.String("id", hex.EncodeToString(wakuMessage.Hash(pubsubTopic))),
)
return false
return pubsub.ValidationReject
case spamMessage:
rlnRelay.log.Debug("spam message found",
zap.String("pubsubTopic", pubsubTopic),
Expand All @@ -327,10 +337,10 @@ func (rlnRelay *WakuRLNRelay) addValidator(
}
}

return false
return pubsub.ValidationReject
default:
rlnRelay.log.Debug("unhandled validation result", zap.Int("validationResult", int(validationRes)))
return false
return pubsub.ValidationIgnore
}
}

Expand Down

0 comments on commit c5f8540

Please sign in to comment.