From b2922d813e6fff1a76ed3c06161419b2e4a4179d Mon Sep 17 00:00:00 2001 From: Adam Babik Date: Fri, 29 Nov 2019 20:18:59 +0100 Subject: [PATCH] fix unit tests --- waku/api.go | 20 +- waku/api_test.go | 2 +- waku/config.go | 16 +- waku/doc.go | 7 + waku/events.go | 7 - waku/filter_test.go | 10 +- waku/go.mod | 1 + waku/go.sum | 22 + waku/mailserver_response.go | 8 +- waku/peer.go | 3 +- waku/peer_test.go | 20 +- waku/waku.go | 1235 ++++++++++++++++------------------- waku/waku_test.go | 109 ++-- 13 files changed, 688 insertions(+), 772 deletions(-) diff --git a/waku/api.go b/waku/api.go index 6082cf2f535..7a31fe5835d 100644 --- a/waku/api.go +++ b/waku/api.go @@ -50,7 +50,6 @@ func (api *PublicWakuAPI) Version(ctx context.Context) string { // Info contains diagnostic information. type Info struct { - Memory int `json:"memory"` // Memory size of the floating messages in bytes. Messages int `json:"messages"` // Number of floating messages. MinPow float64 `json:"minPow"` // Minimal accepted PoW MaxMessageSize uint32 `json:"maxMessageSize"` // Maximum accepted message size @@ -58,10 +57,8 @@ type Info struct { // Info returns diagnostic information about the waku node. func (api *PublicWakuAPI) Info(ctx context.Context) Info { - stats := api.w.Stats() return Info{ - Memory: stats.memoryUsed, - Messages: len(api.w.messageQueue) + len(api.w.p2pMsgQueue), + Messages: len(api.w.msgQueue) + len(api.w.p2pMsgQueue), MinPow: api.w.MinPow(), MaxMessageSize: api.w.MaxMessageSize(), } @@ -75,7 +72,7 @@ func (api *PublicWakuAPI) SetMaxMessageSize(ctx context.Context, size uint32) (b // SetMinPoW sets the minimum PoW, and notifies the peers. func (api *PublicWakuAPI) SetMinPoW(ctx context.Context, pow float64) (bool, error) { - return true, api.w.SetMinimumPoW(pow) + return true, api.w.SetMinimumPoW(pow, true) } // SetBloomFilter sets the new value of bloom filter, and notifies the peers. @@ -267,7 +264,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt } var result []byte - env, err := msg.Wrap(params, api.w.GetCurrentTime()) + env, err := msg.Wrap(params, api.w.CurrentTime()) if err != nil { return nil, err } @@ -278,7 +275,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt if err != nil { return nil, fmt.Errorf("failed to parse target peer: %s", err) } - err = api.w.SendP2PMessage(n.ID().Bytes(), env) + err = api.w.SendP2PMessages(n.ID().Bytes(), env) if err == nil { hash := env.Hash() result = hash[:] @@ -347,7 +344,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub filter := Filter{ PoW: crit.MinPow, - Messages: api.w.NewMessageStore(), + Messages: NewMemoryMessageStore(), AllowP2P: crit.AllowP2P, } @@ -411,10 +408,7 @@ func (api *PublicWakuAPI) Messages(ctx context.Context, crit Criteria) (*rpc.Sub } } case <-rpcSub.Err(): - api.w.Unsubscribe(id) - return - case <-notifier.Closed(): - api.w.Unsubscribe(id) + _ = api.w.Unsubscribe(id) return } } @@ -572,7 +566,7 @@ func (api *PublicWakuAPI) NewMessageFilter(req Criteria) (string, error) { PoW: req.MinPow, AllowP2P: req.AllowP2P, Topics: topics, - Messages: api.w.NewMessageStore(), + Messages: NewMemoryMessageStore(), } id, err := api.w.Subscribe(f) diff --git a/waku/api_test.go b/waku/api_test.go index cb03c1f780d..90dd27e1a0a 100644 --- a/waku/api_test.go +++ b/waku/api_test.go @@ -7,7 +7,7 @@ import ( ) func TestMultipleTopicCopyInNewMessageFilter(t *testing.T) { - w := New(nil) + w := New(nil, nil) keyID, err := w.GenerateSymKey() if err != nil { diff --git a/waku/config.go b/waku/config.go index 46a48b270dc..e51c1b2015f 100644 --- a/waku/config.go +++ b/waku/config.go @@ -2,14 +2,16 @@ package waku // Config represents the configuration state of a waku node. type Config struct { - MaxMessageSize uint32 `toml:",omitempty"` - MinimumAcceptedPOW float64 `toml:",omitempty"` - RestrictConnectionBetweenLightClients bool `toml:",omitempty"` - EnableConfirmations bool `toml:",omitempty"` + MaxMessageSize uint32 `toml:",omitempty"` + MinimumAcceptedPoW float64 `toml:",omitempty"` + LightClient bool `toml:",omitempty"` // when true, it does not forward messages + FullNode bool `toml:",omitempty"` // when true, it forwards all messages + RestrictLightClientsConn bool `toml:",omitempty"` // when true, do not accept light client as peers if it is a light client itself + EnableConfirmations bool `toml:",omitempty"` // when true, sends message confirmations } var DefaultConfig = Config{ - MaxMessageSize: DefaultMaxMessageSize, - MinimumAcceptedPOW: DefaultMinimumPoW, - RestrictConnectionBetweenLightClients: true, + MaxMessageSize: DefaultMaxMessageSize, + MinimumAcceptedPoW: DefaultMinimumPoW, + RestrictLightClientsConn: true, } diff --git a/waku/doc.go b/waku/doc.go index 3a4347a83bd..4ce51258ed5 100644 --- a/waku/doc.go +++ b/waku/doc.go @@ -173,3 +173,10 @@ func ErrorToEnvelopeError(hash common.Hash, err error) EnvelopeError { Description: err.Error(), } } + +// MailServerResponse is the response payload sent by the mailserver. +type MailServerResponse struct { + LastEnvelopeHash common.Hash + Cursor []byte + Error error +} diff --git a/waku/events.go b/waku/events.go index 32263a6ef41..8bf1585c125 100644 --- a/waku/events.go +++ b/waku/events.go @@ -44,10 +44,3 @@ type EnvelopeEvent struct { Peer enode.ID Data interface{} } - -// SyncEventResponse is a response from the Mail Server -// form which the peer received envelopes. -type SyncEventResponse struct { - Cursor []byte - Error string -} diff --git a/waku/filter_test.go b/waku/filter_test.go index bb63e5cd969..e1f45916c2d 100644 --- a/waku/filter_test.go +++ b/waku/filter_test.go @@ -81,7 +81,7 @@ func TestInstallFilters(t *testing.T) { InitSingleTest() const SizeTestFilters = 256 - w := New(&Config{}) + w := New(&Config{}, nil) filters := NewFilters(w) tst := generateTestCases(t, SizeTestFilters) @@ -119,7 +119,7 @@ func TestInstallFilters(t *testing.T) { func TestInstallSymKeyGeneratesHash(t *testing.T) { InitSingleTest() - w := New(&Config{}) + w := New(&Config{}, nil) filters := NewFilters(w) filter, _ := generateFilter(t, true) @@ -146,7 +146,7 @@ func TestInstallSymKeyGeneratesHash(t *testing.T) { func TestInstallIdenticalFilters(t *testing.T) { InitSingleTest() - w := New(&Config{}) + w := New(&Config{}, nil) filters := NewFilters(w) filter1, _ := generateFilter(t, true) @@ -216,7 +216,7 @@ func TestInstallIdenticalFilters(t *testing.T) { func TestInstallFilterWithSymAndAsymKeys(t *testing.T) { InitSingleTest() - w := New(&Config{}) + w := New(&Config{}, nil) filters := NewFilters(w) filter1, _ := generateFilter(t, true) @@ -631,7 +631,7 @@ func TestWatchers(t *testing.T) { var x, firstID string var err error - w := New(&Config{}) + w := New(&Config{}, nil) filters := NewFilters(w) tst := generateTestCases(t, NumFilters) for i = 0; i < NumFilters; i++ { diff --git a/waku/go.mod b/waku/go.mod index 5a0d66e61b5..dee45c4c46a 100644 --- a/waku/go.mod +++ b/waku/go.mod @@ -18,6 +18,7 @@ require ( github.com/stretchr/testify v1.4.0 github.com/syndtr/goleveldb v1.0.0 // indirect github.com/tsenart/tb v0.0.0-20181025101425-0d2499c8b6e9 + go.uber.org/zap v1.13.0 golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e ) diff --git a/waku/go.sum b/waku/go.sum index 4d3bbfd4b4a..e1e50ef6bd6 100644 --- a/waku/go.sum +++ b/waku/go.sum @@ -84,6 +84,7 @@ github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5a github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= @@ -107,12 +108,16 @@ github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/u github.com/julienschmidt/httprouter v0.0.0-20170430222011-975b5c4c7c21/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/karalabe/hid v0.0.0-20181128192157-d815e0c1a2e2/go.mod h1:YvbcH+3Wo6XPs9nkgTY3u19KXLauXW+J5nB7hEHuX0A= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/reedsolomon v1.9.2/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/maruel/panicparse v0.0.0-20160720141634-ad661195ed0e/go.mod h1:nty42YY5QByNC5MM7q/nj938VbgPU7avs45z6NClpxI= github.com/mattn/go-colorable v0.1.0/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= @@ -172,6 +177,7 @@ github.com/prometheus/prometheus v0.0.0-20170814170113-3101606756c5/go.mod h1:oA github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rjeczalik/notify v0.9.1/go.mod h1:rKwnCoCGeuQnwBtTSPL9Dad03Vh2n40ePRrjvIXnJho= github.com/robertkrimen/otto v0.0.0-20170205013659-6a77b7cbc37d/go.mod h1:xvqspoSXJTIpemEonrMDFq6XzwHYYgToXWj5eRX1OtY= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rs/cors v0.0.0-20160617231935-a62a804a8a00/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= @@ -207,14 +213,24 @@ github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhe github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xtaci/kcp-go v5.4.5+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE= +go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY= +go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.3.0 h1:sFPn2GLc3poCkfrpIXGhBD2X0CMIo4Q/zSULXrj/+uc= +go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.13.0 h1:nR6NoDBgAf67s68NhaXbsojM+2gxp3S1hWkHDl27pVU= +go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190404164418-38d8ce5564a5/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191029031824-8986dd9e96cf/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c h1:/nJuwDLoL/zrqY6gf57vxC+Pi+pZ8bfhpPkicO5H7W4= golang.org/x/crypto v0.0.0-20191122220453-ac88ee75c92c/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181011144130-49bb7cea24b1/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -250,7 +266,10 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= golang.org/x/tools v0.0.0-20190912185636-87d9f09c5d89/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= @@ -261,6 +280,8 @@ gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPA gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405 h1:829vOVxxusYHC+IqBtkX5mbKtsY9fheQiQn0MZRVLfQ= gopkg.in/check.v1 v1.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo= @@ -280,3 +301,4 @@ gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/waku/mailserver_response.go b/waku/mailserver_response.go index 8a3012a9819..7ebd415c377 100644 --- a/waku/mailserver_response.go +++ b/waku/mailserver_response.go @@ -43,15 +43,15 @@ func CreateMailServerRequestFailedPayload(requestID common.Hash, err error) []by // * request failed // If the payload is unknown/unparseable, it returns `nil` func CreateMailServerEvent(nodeID enode.ID, payload []byte) (*EnvelopeEvent, error) { - if len(payload) < common.HashLength { return nil, invalidResponseSizeError(len(payload)) } event, err := tryCreateMailServerRequestFailedEvent(nodeID, payload) - - if err != nil || event != nil { - return event, err + if err != nil { + return nil, err + } else if event != nil { + return event, nil } return tryCreateMailServerRequestCompletedEvent(nodeID, payload) diff --git a/waku/peer.go b/waku/peer.go index 453a5e033c2..123117ab5c2 100644 --- a/waku/peer.go +++ b/waku/peer.go @@ -72,7 +72,8 @@ func (peer *Peer) handshake() error { pow := peer.host.MinPow() powConverted := math.Float64bits(pow) bloom := peer.host.BloomFilter() - confirmationsEnabled := !peer.host.disableConfirmations + // TODO + confirmationsEnabled := false errc <- p2p.SendItems(peer.ws, statusCode, ProtocolVersion, powConverted, bloom, isLightNode, confirmationsEnabled) }() diff --git a/waku/peer_test.go b/waku/peer_test.go index 6a007b82e87..cc1ebfe95ce 100644 --- a/waku/peer_test.go +++ b/waku/peer_test.go @@ -125,12 +125,12 @@ func TestSimulation(t *testing.T) { func resetParams(t *testing.T) { // change pow only for node zero masterPow = 7777777.0 - nodes[0].shh.SetMinimumPoW(masterPow) + _ = nodes[0].shh.SetMinimumPoW(masterPow, true) // change bloom for all nodes masterBloomFilter = TopicToBloom(sharedTopic) for i := 0; i < NumNodes; i++ { - nodes[i].shh.SetBloomFilter(masterBloomFilter) + _ = nodes[i].shh.SetBloomFilter(masterBloomFilter) } round++ @@ -163,13 +163,13 @@ func initialize(t *testing.T) { var node TestNode b := make([]byte, BloomFilterSize) copy(b, masterBloomFilter) - node.shh = New(&DefaultConfig) - node.shh.SetMinimumPoW(masterPow) - node.shh.SetBloomFilter(b) + node.shh = New(nil, nil) + _ = node.shh.SetMinimumPoW(masterPow, false) + _ = node.shh.SetBloomFilter(b) if !bytes.Equal(node.shh.BloomFilter(), masterBloomFilter) { t.Fatalf("bloom mismatch on init.") } - node.shh.Start(nil) + _ = node.shh.Start(nil) topics := make([]TopicType, 0) topics = append(topics, sharedTopic) f := Filter{KeySym: sharedKey, Messages: NewMemoryMessageStore()} @@ -225,8 +225,8 @@ func stopServers() { for i := 0; i < NumNodes; i++ { n := nodes[i] if n != nil { - n.shh.Unsubscribe(n.filerID) - n.shh.Stop() + _ = n.shh.Unsubscribe(n.filerID) + _ = n.shh.Stop() n.server.Stop() } } @@ -513,7 +513,7 @@ func TestHandshakeWithOldVersionWithoutLightModeFlag(t *testing.T) { //two light nodes handshake. restriction disabled func TestTwoLightPeerHandshakeRestrictionOff(t *testing.T) { w1 := Waku{} - w1.settings.Store(restrictConnectionBetweenLightClientsIdx, false) + w1.settings.RestrictLightClientsConn = false w1.SetLightClientMode(true) p1 := newPeer(&w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), &rwStub{[]interface{}{ProtocolVersion, uint64(123), make([]byte, BloomFilterSize), true}}) err := p1.handshake() @@ -525,7 +525,7 @@ func TestTwoLightPeerHandshakeRestrictionOff(t *testing.T) { //two light nodes handshake. restriction enabled func TestTwoLightPeerHandshakeError(t *testing.T) { w1 := Waku{} - w1.settings.Store(restrictConnectionBetweenLightClientsIdx, true) + w1.settings.RestrictLightClientsConn = true w1.SetLightClientMode(true) p1 := newPeer(&w1, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), &rwStub{[]interface{}{ProtocolVersion, uint64(123), make([]byte, BloomFilterSize), true}}) err := p1.handshake() diff --git a/waku/waku.go b/waku/waku.go index 1fbe2ba5bcf..e883fdbb245 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -13,6 +13,10 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/common/hexutil" + + "go.uber.org/zap" + mapset "github.com/deckarep/golang-set" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" @@ -23,38 +27,21 @@ import ( "github.com/ethereum/go-ethereum/rlp" "github.com/ethereum/go-ethereum/rpc" "golang.org/x/crypto/pbkdf2" - "golang.org/x/sync/syncmap" ) // TimeSyncError error for clock skew errors. type TimeSyncError error -// Statistics holds several message-related counter for analytics -// purposes. -type Statistics struct { - messagesCleared int - memoryCleared int - memoryUsed int - cycles int - totalMessagesCleared int -} - -const ( - maxMsgSizeIdx = iota // Maximal message length allowed by the waku node - overflowIdx // Indicator of message queue overflow - minPowIdx // Minimal PoW required by the waku node - minPowToleranceIdx // Minimal PoW tolerated by the waku node for a limited time - bloomFilterIdx // Bloom filter for topics of interest for this node - bloomFilterToleranceIdx // Bloom filter tolerated by the waku node for a limited time - lightClientModeIdx // Light client mode. (does not forward any messages) - restrictConnectionBetweenLightClientsIdx // Restrict connection between two light clients -) - -// MailServerResponse is the response payload sent by the mailserver -type MailServerResponse struct { - LastEnvelopeHash common.Hash - Cursor []byte - Error error +type settings struct { + MaxMsgSize uint32 // Maximal message length allowed by the waku node + EnableConfirmations bool // Enable sending message confirmations + MinPow float64 // Minimal PoW required by the waku node + MinPowTolerance float64 // Minimal PoW tolerated by the waku node for a limited time + BloomFilter []byte // Bloom filter for topics of interest for this node + BloomFilterTolerance []byte // Bloom filter tolerated by the waku node for a limited time + LightClient bool // Light client mode enabled does not forward messages + RestrictLightClientsConn bool // Restrict connection between two light clients + SyncAllowance int // Maximum time in seconds allowed to process the waku-related messages } // Waku represents a dark communication interface through the Ethereum @@ -65,67 +52,75 @@ type Waku struct { privateKeys map[string]*ecdsa.PrivateKey // Private key storage symKeys map[string][]byte // Symmetric key storage - keyMu sync.RWMutex // Mutex associated with key storages + keyMu sync.RWMutex // Mutex associated with key stores - poolMu sync.RWMutex // Mutex to sync the message and expiration pools envelopes map[common.Hash]*Envelope // Pool of envelopes currently tracked by this node expirations map[uint32]mapset.Set // Message expiration pool + poolMu sync.RWMutex // Mutex to sync the message and expiration pools - peerMu sync.RWMutex // Mutex to sync the active peer set peers map[*Peer]struct{} // Set of currently active peers + peerMu sync.RWMutex // Mutex to sync the active peer set - messageQueue chan *Envelope // Message queue for normal waku messages - p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations. - quit chan struct{} // Channel used for graceful exit - - settings syncmap.Map // holds configuration settings that can be dynamically changed - - disableConfirmations bool // do not reply with confirmations - - syncAllowance int // maximum time in seconds allowed to process the waku-related messages + msgQueue chan *Envelope // Message queue for normal waku messages + p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations. + quit chan struct{} // Channel used for graceful exit - statsMu sync.Mutex // guard stats - stats Statistics // Statistics of waku node + settings settings // Holds configuration settings that can be dynamically changed + settingsMu sync.RWMutex // Mutex to sync the settings access mailServer MailServer // MailServer interface rateLimiter *PeerRateLimiter - messageStoreFabric func() MessageStore - envelopeFeed event.Feed timeSource func() time.Time // source of time for waku + + logger *zap.Logger } // New creates a Waku client ready to communicate through the Ethereum P2P network. -func New(cfg *Config) *Waku { +func New(cfg *Config, logger *zap.Logger) *Waku { if cfg == nil { - cfg = &DefaultConfig + c := DefaultConfig + cfg = &c + } + + if logger == nil { + logger = zap.NewNop() } waku := &Waku{ - privateKeys: make(map[string]*ecdsa.PrivateKey), - symKeys: make(map[string][]byte), - envelopes: make(map[common.Hash]*Envelope), - expirations: make(map[uint32]mapset.Set), - peers: make(map[*Peer]struct{}), - messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan interface{}, messageQueueLimit), - quit: make(chan struct{}), - syncAllowance: DefaultSyncAllowance, - timeSource: time.Now, - disableConfirmations: !cfg.EnableConfirmations, + privateKeys: make(map[string]*ecdsa.PrivateKey), + symKeys: make(map[string][]byte), + envelopes: make(map[common.Hash]*Envelope), + expirations: make(map[uint32]mapset.Set), + peers: make(map[*Peer]struct{}), + msgQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan interface{}, messageQueueLimit), + quit: make(chan struct{}), + timeSource: time.Now, + logger: logger, } - waku.filters = NewFilters(waku) + waku.settings = settings{ + MaxMsgSize: cfg.MaxMessageSize, + MinPow: cfg.MinimumAcceptedPoW, + MinPowTolerance: cfg.MinimumAcceptedPoW, + EnableConfirmations: cfg.EnableConfirmations, + LightClient: cfg.LightClient, + RestrictLightClientsConn: cfg.RestrictLightClientsConn, + SyncAllowance: DefaultSyncAllowance, + } + + if cfg.FullNode { + waku.settings.BloomFilter = MakeFullNodeBloom() + waku.settings.BloomFilterTolerance = MakeFullNodeBloom() + } - waku.settings.Store(minPowIdx, cfg.MinimumAcceptedPOW) - waku.settings.Store(maxMsgSizeIdx, cfg.MaxMessageSize) - waku.settings.Store(overflowIdx, false) - waku.settings.Store(restrictConnectionBetweenLightClientsIdx, cfg.RestrictConnectionBetweenLightClients) + waku.filters = NewFilters(waku) - // p2p waku sub protocol handler + // p2p waku sub-protocol handler waku.protocol = p2p.Protocol{ Name: ProtocolName, Version: uint(ProtocolVersion), @@ -143,135 +138,74 @@ func New(cfg *Config) *Waku { return waku } -// NewMessageStore returns object that implements MessageStore. -func (waku *Waku) NewMessageStore() MessageStore { - if waku.messageStoreFabric != nil { - return waku.messageStoreFabric() - } - return NewMemoryMessageStore() +// Version returns the waku sub-protocol version number. +func (w *Waku) Version() uint { + return w.protocol.Version } -// SetMessageStore allows to inject custom implementation of the message store. -func (waku *Waku) SetMessageStore(fabric func() MessageStore) { - waku.messageStoreFabric = fabric +// MinPow returns the PoW value required by this node. +func (w *Waku) MinPow() float64 { + w.settingsMu.RLock() + defer w.settingsMu.RUnlock() + return w.settings.MinPow } -// SetTimeSource assigns a particular source of time to a waku object. -func (waku *Waku) SetTimeSource(timesource func() time.Time) { - waku.timeSource = timesource -} +// SetMinimumPoW sets the minimal PoW required by this node +func (w *Waku) SetMinimumPoW(val float64, tolerate bool) error { + if val < 0.0 { + return fmt.Errorf("invalid PoW: %f", val) + } -// SubscribeEnvelopeEvents subscribes to envelopes feed. -// In order to prevent blocking waku producers events must be amply buffered. -func (waku *Waku) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription { - return waku.envelopeFeed.Subscribe(events) -} + w.settingsMu.Lock() + w.settings.MinPow = val + w.settingsMu.Unlock() -// MinPow returns the PoW value required by this node. -func (waku *Waku) MinPow() float64 { - val, exist := waku.settings.Load(minPowIdx) - if !exist || val == nil { - return DefaultMinimumPoW - } - v, ok := val.(float64) - if !ok { - log.Error("Error loading minPowIdx, using default") - return DefaultMinimumPoW + w.notifyPeersAboutPowRequirementChange(val) + + if tolerate { + go func() { + // allow some time before all the peers have processed the notification + time.Sleep(time.Duration(w.settings.SyncAllowance) * time.Second) + w.settingsMu.Lock() + w.settings.MinPowTolerance = val + w.settingsMu.Unlock() + }() } - return v + + return nil } // MinPowTolerance returns the value of minimum PoW which is tolerated for a limited // time after PoW was changed. If sufficient time have elapsed or no change of PoW // have ever occurred, the return value will be the same as return value of MinPow(). -func (waku *Waku) MinPowTolerance() float64 { - val, exist := waku.settings.Load(minPowToleranceIdx) - if !exist || val == nil { - return DefaultMinimumPoW - } - return val.(float64) +func (w *Waku) MinPowTolerance() float64 { + w.settingsMu.RLock() + defer w.settingsMu.RUnlock() + return w.settings.MinPowTolerance } // BloomFilter returns the aggregated bloom filter for all the topics of interest. // The nodes are required to send only messages that match the advertised bloom filter. // If a message does not match the bloom, it will tantamount to spam, and the peer will // be disconnected. -func (waku *Waku) BloomFilter() []byte { - val, exist := waku.settings.Load(bloomFilterIdx) - if !exist || val == nil { - return nil - } - return val.([]byte) +func (w *Waku) BloomFilter() []byte { + w.settingsMu.RLock() + defer w.settingsMu.RUnlock() + return w.settings.BloomFilter } // BloomFilterTolerance returns the bloom filter which is tolerated for a limited // time after new bloom was advertised to the peers. If sufficient time have elapsed // or no change of bloom filter have ever occurred, the return value will be the same // as return value of BloomFilter(). -func (waku *Waku) BloomFilterTolerance() []byte { - val, exist := waku.settings.Load(bloomFilterToleranceIdx) - if !exist || val == nil { - return nil - } - return val.([]byte) -} - -// MaxMessageSize returns the maximum accepted message size. -func (waku *Waku) MaxMessageSize() uint32 { - val, _ := waku.settings.Load(maxMsgSizeIdx) - return val.(uint32) -} - -// Overflow returns an indication if the message queue is full. -func (waku *Waku) Overflow() bool { - val, _ := waku.settings.Load(overflowIdx) - return val.(bool) -} - -// APIs returns the RPC descriptors the Waku implementation offers -func (waku *Waku) APIs() []rpc.API { - return []rpc.API{ - { - Namespace: ProtocolName, - Version: ProtocolVersionStr, - Service: NewPublicWakuAPI(waku), - Public: true, - }, - } -} - -// GetCurrentTime returns current time. -func (waku *Waku) GetCurrentTime() time.Time { - return waku.timeSource() -} - -// RegisterServer registers MailServer interface. -// MailServer will process all the incoming messages with p2pRequestCode. -func (waku *Waku) RegisterServer(server MailServer) { - waku.mailServer = server -} - -// Protocols returns the waku sub-protocols ran by this particular client. -func (waku *Waku) Protocols() []p2p.Protocol { - return []p2p.Protocol{waku.protocol} -} - -// Version returns the waku sub-protocols version number. -func (waku *Waku) Version() uint { - return waku.protocol.Version -} - -// SetMaxMessageSize sets the maximal message size allowed by this node -func (waku *Waku) SetMaxMessageSize(size uint32) error { - if size > MaxMessageSize { - return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize) - } - waku.settings.Store(maxMsgSizeIdx, size) - return nil +func (w *Waku) BloomFilterTolerance() []byte { + w.settingsMu.RLock() + defer w.settingsMu.RUnlock() + return w.settings.BloomFilterTolerance } // SetBloomFilter sets the new bloom filter -func (waku *Waku) SetBloomFilter(bloom []byte) error { +func (w *Waku) SetBloomFilter(bloom []byte) error { if len(bloom) != BloomFilterSize { return fmt.Errorf("invalid bloom filter size: %d", len(bloom)) } @@ -279,74 +213,107 @@ func (waku *Waku) SetBloomFilter(bloom []byte) error { b := make([]byte, BloomFilterSize) copy(b, bloom) - waku.settings.Store(bloomFilterIdx, b) - waku.notifyPeersAboutBloomFilterChange(b) + w.settingsMu.Lock() + w.settings.BloomFilter = b + w.settingsMu.Unlock() + w.notifyPeersAboutBloomFilterChange(b) go func() { // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(waku.syncAllowance) * time.Second) - waku.settings.Store(bloomFilterToleranceIdx, b) + time.Sleep(time.Duration(w.settings.SyncAllowance) * time.Second) + w.settingsMu.Lock() + w.settings.BloomFilterTolerance = b + w.settingsMu.Unlock() }() return nil } -// SetMinimumPoW sets the minimal PoW required by this node -func (waku *Waku) SetMinimumPoW(val float64) error { - if val < 0.0 { - return fmt.Errorf("invalid PoW: %f", val) - } +// MaxMessageSize returns the maximum accepted message size. +func (w *Waku) MaxMessageSize() uint32 { + w.settingsMu.RLock() + defer w.settingsMu.RUnlock() + return w.settings.MaxMsgSize +} - waku.settings.Store(minPowIdx, val) - waku.notifyPeersAboutPowRequirementChange(val) +// SetMaxMessageSize sets the maximal message size allowed by this node +func (w *Waku) SetMaxMessageSize(size uint32) error { + if size > MaxMessageSize { + return fmt.Errorf("message size too large [%d>%d]", size, MaxMessageSize) + } + w.settingsMu.Lock() + w.settings.MaxMsgSize = size + w.settingsMu.Unlock() + return nil +} - go func() { - // allow some time before all the peers have processed the notification - time.Sleep(time.Duration(waku.syncAllowance) * time.Second) - waku.settings.Store(minPowToleranceIdx, val) - }() +// LightClientMode indicates is this node is light client (does not forward any messages) +func (w *Waku) LightClientMode() bool { + w.settingsMu.RLock() + defer w.settingsMu.RUnlock() + return w.settings.LightClient +} - return nil +// SetLightClientMode makes node light client (does not forward any messages) +func (w *Waku) SetLightClientMode(v bool) { + w.settingsMu.Lock() + w.settings.LightClient = v + w.settingsMu.Unlock() } -// SetMinimumPowTest sets the minimal PoW in test environment -func (waku *Waku) SetMinimumPowTest(val float64) { - waku.settings.Store(minPowIdx, val) - waku.notifyPeersAboutPowRequirementChange(val) - waku.settings.Store(minPowToleranceIdx, val) +// LightClientModeConnectionRestricted indicates that connection to light client in light client mode not allowed +func (w *Waku) LightClientModeConnectionRestricted() bool { + w.settingsMu.RLock() + defer w.settingsMu.RUnlock() + return w.settings.RestrictLightClientsConn } -//SetLightClientMode makes node light client (does not forward any messages) -func (waku *Waku) SetLightClientMode(v bool) { - waku.settings.Store(lightClientModeIdx, v) +// CurrentTime returns current time. +func (w *Waku) CurrentTime() time.Time { + return w.timeSource() } -func (waku *Waku) SetRateLimiter(r *PeerRateLimiter) { - waku.rateLimiter = r +// SetTimeSource assigns a particular source of time to a waku object. +func (w *Waku) SetTimeSource(timesource func() time.Time) { + w.timeSource = timesource } -//LightClientMode indicates is this node is light client (does not forward any messages) -func (waku *Waku) LightClientMode() bool { - val, exist := waku.settings.Load(lightClientModeIdx) - if !exist || val == nil { - return false +// APIs returns the RPC descriptors the Waku implementation offers +func (w *Waku) APIs() []rpc.API { + return []rpc.API{ + { + Namespace: ProtocolName, + Version: ProtocolVersionStr, + Service: NewPublicWakuAPI(w), + Public: true, + }, } - v, ok := val.(bool) - return v && ok } -//LightClientModeConnectionRestricted indicates that connection to light client in light client mode not allowed -func (waku *Waku) LightClientModeConnectionRestricted() bool { - val, exist := waku.settings.Load(restrictConnectionBetweenLightClientsIdx) - if !exist || val == nil { - return false - } - v, ok := val.(bool) - return v && ok +// Protocols returns the waku sub-protocols ran by this particular client. +func (w *Waku) Protocols() []p2p.Protocol { + return []p2p.Protocol{w.protocol} +} + +// RegisterMailServer registers MailServer interface. +// MailServer will process all the incoming messages with p2pRequestCode. +func (w *Waku) RegisterMailServer(server MailServer) { + w.mailServer = server +} + +// SetRateLimiter registers a rate limiter. +func (w *Waku) RegisterRateLimiter(r *PeerRateLimiter) { + w.rateLimiter = r +} + +// SubscribeEnvelopeEvents subscribes to envelopes feed. +// In order to prevent blocking waku producers events must be amply buffered. +func (w *Waku) SubscribeEnvelopeEvents(events chan<- EnvelopeEvent) event.Subscription { + return w.envelopeFeed.Subscribe(events) } -func (waku *Waku) notifyPeersAboutPowRequirementChange(pow float64) { - arr := waku.getPeers() +func (w *Waku) notifyPeersAboutPowRequirementChange(pow float64) { + arr := w.getPeers() for _, p := range arr { err := p.notifyAboutPowRequirementChange(pow) if err != nil { @@ -354,13 +321,13 @@ func (waku *Waku) notifyPeersAboutPowRequirementChange(pow float64) { err = p.notifyAboutPowRequirementChange(pow) } if err != nil { - log.Warn("failed to notify peer about new pow requirement", "peer", p.ID(), "error", err) + w.logger.Warn("failed to notify peer about new pow requirement", zap.Binary("peer", p.ID()), zap.Error(err)) } } } -func (waku *Waku) notifyPeersAboutBloomFilterChange(bloom []byte) { - arr := waku.getPeers() +func (w *Waku) notifyPeersAboutBloomFilterChange(bloom []byte) { + arr := w.getPeers() for _, p := range arr { err := p.notifyAboutBloomFilterChange(bloom) if err != nil { @@ -368,40 +335,40 @@ func (waku *Waku) notifyPeersAboutBloomFilterChange(bloom []byte) { err = p.notifyAboutBloomFilterChange(bloom) } if err != nil { - log.Warn("failed to notify peer about new bloom filter", "peer", p.ID(), "error", err) + w.logger.Warn("failed to notify peer about new pow requirement", zap.Binary("peer", p.ID()), zap.Error(err)) } } } -func (waku *Waku) getPeers() []*Peer { - arr := make([]*Peer, len(waku.peers)) +func (w *Waku) getPeers() []*Peer { + arr := make([]*Peer, len(w.peers)) i := 0 - waku.peerMu.Lock() - for p := range waku.peers { + w.peerMu.Lock() + for p := range w.peers { arr[i] = p i++ } - waku.peerMu.Unlock() + w.peerMu.Unlock() return arr } // getPeer retrieves peer by ID -func (waku *Waku) getPeer(peerID []byte) (*Peer, error) { - waku.peerMu.Lock() - defer waku.peerMu.Unlock() - for p := range waku.peers { +func (w *Waku) getPeer(peerID []byte) (*Peer, error) { + w.peerMu.Lock() + defer w.peerMu.Unlock() + for p := range w.peers { id := p.peer.ID() if bytes.Equal(peerID, id[:]) { return p, nil } } - return nil, fmt.Errorf("Could not find peer with ID: %x", peerID) + return nil, fmt.Errorf("could not find peer with ID: %x", peerID) } // AllowP2PMessagesFromPeer marks specific peer trusted, // which will allow it to send historic (expired) messages. -func (waku *Waku) AllowP2PMessagesFromPeer(peerID []byte) error { - p, err := waku.getPeer(peerID) +func (w *Waku) AllowP2PMessagesFromPeer(peerID []byte) error { + p, err := w.getPeer(peerID) if err != nil { return err } @@ -414,34 +381,38 @@ func (waku *Waku) AllowP2PMessagesFromPeer(peerID []byte) error { // request and respond with a number of peer-to-peer messages (possibly expired), // which are not supposed to be forwarded any further. // The waku protocol is agnostic of the format and contents of envelope. -func (waku *Waku) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { - return waku.RequestHistoricMessagesWithTimeout(peerID, envelope, 0) +func (w *Waku) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { + return w.RequestHistoricMessagesWithTimeout(peerID, envelope, 0) } -func (waku *Waku) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *Envelope, timeout time.Duration) error { - p, err := waku.getPeer(peerID) +// RequestHistoricMessagesWithTimeout acts as RequestHistoricMessages but requires to pass a timeout. +// It sends an event EventMailServerRequestExpired after the timeout. +func (w *Waku) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *Envelope, timeout time.Duration) error { + p, err := w.getPeer(peerID) if err != nil { return err } - waku.envelopeFeed.Send(EnvelopeEvent{ + p.trusted = true + + w.envelopeFeed.Send(EnvelopeEvent{ Peer: p.peer.ID(), Topic: envelope.Topic, Hash: envelope.Hash(), Event: EventMailServerRequestSent, }) - p.trusted = true + err = p2p.Send(p.ws, p2pRequestCode, envelope) if timeout != 0 { - go waku.expireRequestHistoricMessages(p.peer.ID(), envelope.Hash(), timeout) + go w.expireRequestHistoricMessages(p.peer.ID(), envelope.Hash(), timeout) } return err } -func (waku *Waku) SendMessagesRequest(peerID []byte, request MessagesRequest) error { +func (w *Waku) SendMessagesRequest(peerID []byte, request MessagesRequest) error { if err := request.Validate(); err != nil { return err } - p, err := waku.getPeer(peerID) + p, err := w.getPeer(peerID) if err != nil { return err } @@ -449,7 +420,7 @@ func (waku *Waku) SendMessagesRequest(peerID []byte, request MessagesRequest) er if err := p2p.Send(p.ws, p2pRequestCode, request); err != nil { return err } - waku.envelopeFeed.Send(EnvelopeEvent{ + w.envelopeFeed.Send(EnvelopeEvent{ Peer: p.peer.ID(), Hash: common.BytesToHash(request.ID), Event: EventMailServerRequestSent, @@ -457,14 +428,14 @@ func (waku *Waku) SendMessagesRequest(peerID []byte, request MessagesRequest) er return nil } -func (waku *Waku) expireRequestHistoricMessages(peer enode.ID, hash common.Hash, timeout time.Duration) { +func (w *Waku) expireRequestHistoricMessages(peer enode.ID, hash common.Hash, timeout time.Duration) { timer := time.NewTimer(timeout) defer timer.Stop() select { - case <-waku.quit: + case <-w.quit: return case <-timer.C: - waku.envelopeFeed.Send(EnvelopeEvent{ + w.envelopeFeed.Send(EnvelopeEvent{ Peer: peer, Hash: hash, Event: EventMailServerRequestExpired, @@ -472,49 +443,39 @@ func (waku *Waku) expireRequestHistoricMessages(peer enode.ID, hash common.Hash, } } -func (waku *Waku) SendHistoricMessageResponse(peer *Peer, payload []byte) error { +func (w *Waku) SendHistoricMessageResponse(peer *Peer, payload []byte) error { size, r, err := rlp.EncodeToReader(payload) if err != nil { return err } - return peer.ws.WriteMsg(p2p.Msg{Code: p2pRequestCompleteCode, Size: uint32(size), Payload: r}) } // SendP2PMessage sends a peer-to-peer message to a specific peer. -func (waku *Waku) SendP2PMessage(peerID []byte, envelopes ...*Envelope) error { - p, err := waku.getPeer(peerID) +// It sends one or more envelopes in a single batch. +func (w *Waku) SendP2PMessages(peerID []byte, envelopes ...*Envelope) error { + p, err := w.getPeer(peerID) if err != nil { return err } - return waku.SendP2PDirect(p, envelopes...) + return p2p.Send(p.ws, p2pMessageCode, envelopes) } // SendP2PDirect sends a peer-to-peer message to a specific peer. -// If only a single envelope is given, data is sent as a single object -// rather than a slice. This is important to keep this method backward compatible -// as it used to send only single envelopes. -func (waku *Waku) SendP2PDirect(peer *Peer, envelopes ...*Envelope) error { - if len(envelopes) == 1 { - return p2p.Send(peer.ws, p2pMessageCode, envelopes[0]) - } +// It sends one or more envelopes in a single batch. +func (w *Waku) SendP2PDirect(peer *Peer, envelopes ...*Envelope) error { return p2p.Send(peer.ws, p2pMessageCode, envelopes) } // SendRawP2PDirect sends a peer-to-peer message to a specific peer. -// If only a single envelope is given, data is sent as a single object -// rather than a slice. This is important to keep this method backward compatible -// as it used to send only single envelopes. -func (waku *Waku) SendRawP2PDirect(peer *Peer, envelopes ...rlp.RawValue) error { - if len(envelopes) == 1 { - return p2p.Send(peer.ws, p2pMessageCode, envelopes[0]) - } +// It sends one or more envelopes in a single batch. +func (w *Waku) SendRawP2PDirect(peer *Peer, envelopes ...rlp.RawValue) error { return p2p.Send(peer.ws, p2pMessageCode, envelopes) } // NewKeyPair generates a new cryptographic identity for the client, and injects // it into the known identities for message decryption. Returns ID of the new key pair. -func (waku *Waku) NewKeyPair() (string, error) { +func (w *Waku) NewKeyPair() (string, error) { key, err := crypto.GenerateKey() if err != nil || !validatePrivateKey(key) { key, err = crypto.GenerateKey() // retry once @@ -526,107 +487,105 @@ func (waku *Waku) NewKeyPair() (string, error) { return "", fmt.Errorf("failed to generate valid key") } - id, err := toDeterministicID(common.ToHex(crypto.FromECDSAPub(&key.PublicKey)), keyIDSize) + id, err := toDeterministicID(hexutil.Encode(crypto.FromECDSAPub(&key.PublicKey)), keyIDSize) if err != nil { return "", err } - waku.keyMu.Lock() - defer waku.keyMu.Unlock() + w.keyMu.Lock() + defer w.keyMu.Unlock() - if waku.privateKeys[id] != nil { + if w.privateKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - waku.privateKeys[id] = key + w.privateKeys[id] = key return id, nil } // DeleteKeyPair deletes the specified key if it exists. -func (waku *Waku) DeleteKeyPair(key string) bool { +func (w *Waku) DeleteKeyPair(key string) bool { deterministicID, err := toDeterministicID(key, keyIDSize) if err != nil { return false } - waku.keyMu.Lock() - defer waku.keyMu.Unlock() + w.keyMu.Lock() + defer w.keyMu.Unlock() - if waku.privateKeys[deterministicID] != nil { - delete(waku.privateKeys, deterministicID) + if w.privateKeys[deterministicID] != nil { + delete(w.privateKeys, deterministicID) return true } return false } // AddKeyPair imports a asymmetric private key and returns it identifier. -func (waku *Waku) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { - id, err := makeDeterministicID(common.ToHex(crypto.FromECDSAPub(&key.PublicKey)), keyIDSize) +func (w *Waku) AddKeyPair(key *ecdsa.PrivateKey) (string, error) { + id, err := makeDeterministicID(hexutil.Encode(crypto.FromECDSAPub(&key.PublicKey)), keyIDSize) if err != nil { return "", err } - if waku.HasKeyPair(id) { + if w.HasKeyPair(id) { return id, nil // no need to re-inject } - waku.keyMu.Lock() - waku.privateKeys[id] = key - waku.keyMu.Unlock() - log.Info("Waku identity added", "id", id, "pubkey", common.ToHex(crypto.FromECDSAPub(&key.PublicKey))) + w.keyMu.Lock() + w.privateKeys[id] = key + w.keyMu.Unlock() return id, nil } // SelectKeyPair adds cryptographic identity, and makes sure // that it is the only private key known to the node. -func (waku *Waku) SelectKeyPair(key *ecdsa.PrivateKey) error { +func (w *Waku) SelectKeyPair(key *ecdsa.PrivateKey) error { id, err := makeDeterministicID(common.ToHex(crypto.FromECDSAPub(&key.PublicKey)), keyIDSize) if err != nil { return err } - waku.keyMu.Lock() - defer waku.keyMu.Unlock() + w.keyMu.Lock() + defer w.keyMu.Unlock() - waku.privateKeys = make(map[string]*ecdsa.PrivateKey) // reset key store - waku.privateKeys[id] = key + w.privateKeys = make(map[string]*ecdsa.PrivateKey) // reset key store + w.privateKeys[id] = key - log.Info("Waku identity selected", "id", id, "key", common.ToHex(crypto.FromECDSAPub(&key.PublicKey))) return nil } // DeleteKeyPairs removes all cryptographic identities known to the node -func (waku *Waku) DeleteKeyPairs() error { - waku.keyMu.Lock() - defer waku.keyMu.Unlock() +func (w *Waku) DeleteKeyPairs() error { + w.keyMu.Lock() + defer w.keyMu.Unlock() - waku.privateKeys = make(map[string]*ecdsa.PrivateKey) + w.privateKeys = make(map[string]*ecdsa.PrivateKey) return nil } // HasKeyPair checks if the waku node is configured with the private key // of the specified public pair. -func (waku *Waku) HasKeyPair(id string) bool { +func (w *Waku) HasKeyPair(id string) bool { deterministicID, err := toDeterministicID(id, keyIDSize) if err != nil { return false } - waku.keyMu.RLock() - defer waku.keyMu.RUnlock() - return waku.privateKeys[deterministicID] != nil + w.keyMu.RLock() + defer w.keyMu.RUnlock() + return w.privateKeys[deterministicID] != nil } // GetPrivateKey retrieves the private key of the specified identity. -func (waku *Waku) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { +func (w *Waku) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { deterministicID, err := toDeterministicID(id, keyIDSize) if err != nil { return nil, err } - waku.keyMu.RLock() - defer waku.keyMu.RUnlock() - key := waku.privateKeys[deterministicID] + w.keyMu.RLock() + defer w.keyMu.RUnlock() + key := w.privateKeys[deterministicID] if key == nil { return nil, fmt.Errorf("invalid id") } @@ -635,7 +594,7 @@ func (waku *Waku) GetPrivateKey(id string) (*ecdsa.PrivateKey, error) { // GenerateSymKey generates a random symmetric key and stores it under id, // which is then returned. Will be used in the future for session key exchange. -func (waku *Waku) GenerateSymKey() (string, error) { +func (w *Waku) GenerateSymKey() (string, error) { key, err := generateSecureRandomData(aesKeyLength) if err != nil { return "", err @@ -648,35 +607,35 @@ func (waku *Waku) GenerateSymKey() (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - waku.keyMu.Lock() - defer waku.keyMu.Unlock() + w.keyMu.Lock() + defer w.keyMu.Unlock() - if waku.symKeys[id] != nil { + if w.symKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - waku.symKeys[id] = key + w.symKeys[id] = key return id, nil } // AddSymKey stores the key with a given id. -func (waku *Waku) AddSymKey(id string, key []byte) (string, error) { +func (w *Waku) AddSymKey(id string, key []byte) (string, error) { deterministicID, err := toDeterministicID(id, keyIDSize) if err != nil { return "", err } - waku.keyMu.Lock() - defer waku.keyMu.Unlock() + w.keyMu.Lock() + defer w.keyMu.Unlock() - if waku.symKeys[deterministicID] != nil { + if w.symKeys[deterministicID] != nil { return "", fmt.Errorf("key already exists: %v", id) } - waku.symKeys[deterministicID] = key + w.symKeys[deterministicID] = key return deterministicID, nil } // AddSymKeyDirect stores the key, and returns its id. -func (waku *Waku) AddSymKeyDirect(key []byte) (string, error) { +func (w *Waku) AddSymKeyDirect(key []byte) (string, error) { if len(key) != aesKeyLength { return "", fmt.Errorf("wrong key size: %d", len(key)) } @@ -686,23 +645,23 @@ func (waku *Waku) AddSymKeyDirect(key []byte) (string, error) { return "", fmt.Errorf("failed to generate ID: %s", err) } - waku.keyMu.Lock() - defer waku.keyMu.Unlock() + w.keyMu.Lock() + defer w.keyMu.Unlock() - if waku.symKeys[id] != nil { + if w.symKeys[id] != nil { return "", fmt.Errorf("failed to generate unique ID") } - waku.symKeys[id] = key + w.symKeys[id] = key return id, nil } // AddSymKeyFromPassword generates the key from password, stores it, and returns its id. -func (waku *Waku) AddSymKeyFromPassword(password string) (string, error) { +func (w *Waku) AddSymKeyFromPassword(password string) (string, error) { id, err := GenerateRandomID() if err != nil { return "", fmt.Errorf("failed to generate ID: %s", err) } - if waku.HasSymKey(id) { + if w.HasSymKey(id) { return "", fmt.Errorf("failed to generate unique ID") } @@ -713,59 +672,59 @@ func (waku *Waku) AddSymKeyFromPassword(password string) (string, error) { return "", err } - waku.keyMu.Lock() - defer waku.keyMu.Unlock() + w.keyMu.Lock() + defer w.keyMu.Unlock() // double check is necessary, because deriveKeyMaterial() is very slow - if waku.symKeys[id] != nil { + if w.symKeys[id] != nil { return "", fmt.Errorf("critical error: failed to generate unique ID") } - waku.symKeys[id] = derived + w.symKeys[id] = derived return id, nil } // HasSymKey returns true if there is a key associated with the given id. // Otherwise returns false. -func (waku *Waku) HasSymKey(id string) bool { - waku.keyMu.RLock() - defer waku.keyMu.RUnlock() - return waku.symKeys[id] != nil +func (w *Waku) HasSymKey(id string) bool { + w.keyMu.RLock() + defer w.keyMu.RUnlock() + return w.symKeys[id] != nil } // DeleteSymKey deletes the key associated with the name string if it exists. -func (waku *Waku) DeleteSymKey(id string) bool { - waku.keyMu.Lock() - defer waku.keyMu.Unlock() - if waku.symKeys[id] != nil { - delete(waku.symKeys, id) +func (w *Waku) DeleteSymKey(id string) bool { + w.keyMu.Lock() + defer w.keyMu.Unlock() + if w.symKeys[id] != nil { + delete(w.symKeys, id) return true } return false } // GetSymKey returns the symmetric key associated with the given id. -func (waku *Waku) GetSymKey(id string) ([]byte, error) { - waku.keyMu.RLock() - defer waku.keyMu.RUnlock() - if waku.symKeys[id] != nil { - return waku.symKeys[id], nil +func (w *Waku) GetSymKey(id string) ([]byte, error) { + w.keyMu.RLock() + defer w.keyMu.RUnlock() + if w.symKeys[id] != nil { + return w.symKeys[id], nil } return nil, fmt.Errorf("non-existent key ID") } // Subscribe installs a new message handler used for filtering, decrypting // and subsequent storing of incoming messages. -func (waku *Waku) Subscribe(f *Filter) (string, error) { - s, err := waku.filters.Install(f) +func (w *Waku) Subscribe(f *Filter) (string, error) { + s, err := w.filters.Install(f) if err == nil { - waku.updateBloomFilter(f) + w.updateBloomFilter(f) } return s, err } // updateBloomFilter recalculates the new value of bloom filter, // and informs the peers if necessary. -func (waku *Waku) updateBloomFilter(f *Filter) { +func (w *Waku) updateBloomFilter(f *Filter) { aggregate := make([]byte, BloomFilterSize) for _, t := range f.Topics { top := BytesToTopic(t) @@ -773,21 +732,21 @@ func (waku *Waku) updateBloomFilter(f *Filter) { aggregate = addBloom(aggregate, b) } - if !BloomFilterMatch(waku.BloomFilter(), aggregate) { + if !BloomFilterMatch(w.BloomFilter(), aggregate) { // existing bloom filter must be updated - aggregate = addBloom(waku.BloomFilter(), aggregate) - waku.SetBloomFilter(aggregate) + aggregate = addBloom(w.BloomFilter(), aggregate) + w.SetBloomFilter(aggregate) } } // GetFilter returns the filter by id. -func (waku *Waku) GetFilter(id string) *Filter { - return waku.filters.Get(id) +func (w *Waku) GetFilter(id string) *Filter { + return w.filters.Get(id) } // Unsubscribe removes an installed message handler. -func (waku *Waku) Unsubscribe(id string) error { - ok := waku.filters.Uninstall(id) +func (w *Waku) Unsubscribe(id string) error { + ok := w.filters.Uninstall(id) if !ok { return fmt.Errorf("Unsubscribe: Invalid ID") } @@ -796,8 +755,8 @@ func (waku *Waku) Unsubscribe(id string) error { // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. -func (waku *Waku) Send(envelope *Envelope) error { - ok, err := waku.add(envelope, false) +func (w *Waku) Send(envelope *Envelope) error { + ok, err := w.add(envelope, false) if err == nil && !ok { return fmt.Errorf("failed to add envelope") } @@ -806,41 +765,39 @@ func (waku *Waku) Send(envelope *Envelope) error { // Start implements node.Service, starting the background data propagation thread // of the Waku protocol. -func (waku *Waku) Start(*p2p.Server) error { - log.Info("started waku v." + ProtocolVersionStr) - go waku.update() +func (w *Waku) Start(*p2p.Server) error { + go w.update() numCPU := runtime.NumCPU() for i := 0; i < numCPU; i++ { - go waku.processQueue() + go w.processQueue() } - go waku.processP2P() + go w.processP2P() return nil } // Stop implements node.Service, stopping the background data propagation thread // of the Waku protocol. -func (waku *Waku) Stop() error { - close(waku.quit) - log.Info("waku stopped") +func (w *Waku) Stop() error { + close(w.quit) return nil } // HandlePeer is called by the underlying P2P layer when the waku sub-protocol // connection is negotiated. -func (waku *Waku) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { +func (w *Waku) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { // Create the new peer and start tracking it - wakuPeer := newPeer(waku, peer, rw) + wakuPeer := newPeer(w, peer, rw) - waku.peerMu.Lock() - waku.peers[wakuPeer] = struct{}{} - waku.peerMu.Unlock() + w.peerMu.Lock() + w.peers[wakuPeer] = struct{}{} + w.peerMu.Unlock() defer func() { - waku.peerMu.Lock() - delete(waku.peers, wakuPeer) - waku.peerMu.Unlock() + w.peerMu.Lock() + delete(w.peers, wakuPeer) + w.peerMu.Unlock() }() // Run the peer handshake and state updates @@ -850,10 +807,10 @@ func (waku *Waku) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { wakuPeer.start() defer wakuPeer.stop() - if waku.rateLimiter != nil { - return waku.rateLimiter.decorate(wakuPeer, rw, waku.runMessageLoop) + if w.rateLimiter != nil { + return w.rateLimiter.decorate(wakuPeer, rw, w.runMessageLoop) } - return waku.runMessageLoop(wakuPeer, rw) + return w.runMessageLoop(wakuPeer, rw) } // TODO @@ -870,69 +827,28 @@ func (waku *Waku) HandlePeer(peer *p2p.Peer, rw p2p.MsgReadWriter) error { //} // runMessageLoop reads and processes inbound messages directly to merge into client-global state. -func (waku *Waku) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { +func (w *Waku) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { + logger := w.logger.Named("runMessageLoop") + peerID := p.peer.ID() + for { // fetch the next packet packet, err := rw.ReadMsg() if err != nil { - log.Info("message loop", "peer", p.peer.ID(), "err", err) + logger.Info("failed to read a message", zap.Binary("peer", peerID[:]), zap.Error(err)) return err } - if packet.Size > waku.MaxMessageSize() { - log.Warn("oversized message received", "peer", p.peer.ID()) + + if packet.Size > w.MaxMessageSize() { + logger.Warn("oversized message received", zap.Binary("peer", peerID[:]), zap.Uint32("size", packet.Size)) return errors.New("oversized message received") } switch packet.Code { - case statusCode: - // this should not happen, but no need to panic; just ignore this message. - log.Warn("unxepected status message received", "peer", p.peer.ID()) case messagesCode: - // decode the contained envelopes - data, err := ioutil.ReadAll(packet.Payload) - if err != nil { - envelopesRejectedCounter.WithLabelValues("failed_read").Inc() - log.Warn("failed to read envelopes data", "peer", p.peer.ID(), "error", err) - return errors.New("invalid enveloopes") - } - - var envelopes []*Envelope - if err := rlp.DecodeBytes(data, &envelopes); err != nil { - envelopesRejectedCounter.WithLabelValues("invalid_data").Inc() - log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid envelopes") - } - trouble := false - envelopeErrors := []EnvelopeError{} - for _, env := range envelopes { - cached, err := waku.add(env, waku.LightClientMode()) - if err != nil { - _, isTimeSyncError := err.(TimeSyncError) - if !isTimeSyncError { - trouble = true - log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err) - } - envelopeErrors = append(envelopeErrors, ErrorToEnvelopeError(env.Hash(), err)) - } - - waku.envelopeFeed.Send(EnvelopeEvent{ - Event: EventEnvelopeReceived, - Topic: env.Topic, - Hash: env.Hash(), - Peer: p.peer.ID(), - }) - envelopesValidatedCounter.Inc() - if cached { - p.mark(env) - } - } - if !waku.disableConfirmations { - // TODO - //go waku.sendConfirmation(p.peer.ID(), rw, data, envelopeErrors) - } - - if trouble { - return errors.New("invalid envelope") + if err := w.handleMessagesCode(p, packet, logger); err != nil { + logger.Warn("failed to handle messagesCode message, peer will be disconnected", zap.Binary("peer", peerID[:]), zap.Error(err)) + return err } //case messageResponseCode: // TODO @@ -948,7 +864,7 @@ func (waku *Waku) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { // envelopesRejectedCounter.WithLabelValues("invalid_data").Inc() // log.Error("failed to decode messages response into first version of response", "peer", p.peer.ID(), "error", err) // } - // waku.envelopeFeed.Send(EnvelopeEvent{ + // w.envelopeFeed.Send(EnvelopeEvent{ // Batch: response.Hash, // Event: EventBatchAcknowledged, // Peer: p.peer.ID(), @@ -964,158 +880,213 @@ func (waku *Waku) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { // log.Error("failed to decode confirmation into common.Hash", "peer", p.peer.ID(), "error", err) // return errors.New("invalid confirmation message") // } - // waku.envelopeFeed.Send(EnvelopeEvent{ + // w.envelopeFeed.Send(EnvelopeEvent{ // Batch: batchHash, // Event: EventBatchAcknowledged, // Peer: p.peer.ID(), // }) case powRequirementCode: - s := rlp.NewStream(packet.Payload, uint64(packet.Size)) - i, err := s.Uint() - if err != nil { - envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc() - log.Warn("failed to decode powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid powRequirementCode message") - } - f := math.Float64frombits(i) - if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 { - envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc() - log.Warn("invalid value in powRequirementCode message, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid value in powRequirementCode message") + if err := w.handlePowRequirementCode(p, packet, logger); err != nil { + logger.Warn("failed to handle powRequirementCode message, peer will be disconnected", zap.Binary("peer", peerID[:]), zap.Error(err)) + return err } - p.powRequirement = f case bloomFilterExCode: - var bloom []byte - err := packet.Decode(&bloom) - if err == nil && len(bloom) != BloomFilterSize { - err = fmt.Errorf("wrong bloom filter size %d", len(bloom)) - } - - if err != nil { - log.Warn("failed to decode bloom filter exchange message, peer will be disconnected", "peer", p.peer.ID(), "err", err) - envelopesRejectedCounter.WithLabelValues("invalid_bloom").Inc() - return errors.New("invalid bloom filter exchange message") + if err := w.handleBloomFilterExCode(p, packet, logger); err != nil { + logger.Warn("failed to decode bloom filter exchange message, peer will be disconnected", zap.Binary("peer", peerID[:]), zap.Error(err)) + return err } - p.setBloomFilter(bloom) case p2pMessageCode: - // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. - // this message is not supposed to be forwarded to other peers, and - // therefore might not satisfy the PoW, expiry and other requirements. - // these messages are only accepted from the trusted peer. - if p.trusted { - var ( - envelope *Envelope - envelopes []*Envelope - err error - ) - - // Read all data as we will try to decode it possibly twice - // to keep backward compatibility. - data, err := ioutil.ReadAll(packet.Payload) - if err != nil { - return fmt.Errorf("invalid direct messages: %v", err) - } - r := bytes.NewReader(data) - - packet.Payload = r - - if err = packet.Decode(&envelopes); err == nil { - for _, envelope := range envelopes { - waku.postP2P(envelope) - } - continue - } - - // As we failed to decode envelopes, let's set the offset - // to the beginning and try decode data again. - // Decoding to a single Envelope is required - // to be backward compatible. - if _, err := r.Seek(0, io.SeekStart); err != nil { - return fmt.Errorf("invalid direct messages: %v", err) - } - - if err = packet.Decode(&envelope); err == nil { - waku.postP2P(envelope) - continue - } - - if err != nil { - log.Warn("failed to decode direct message, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return fmt.Errorf("invalid direct message: %v", err) - } + if err := w.handleP2PMessageCode(p, packet, logger); err != nil { + logger.Warn("failed to decode direct message, peer will be disconnected", zap.Binary("peer", peerID[:]), zap.Error(err)) + return err } case p2pRequestCode: - // Must be processed if mail server is implemented. Otherwise ignore. - if waku.mailServer != nil { - // Read all data as we will try to decode it possibly twice. - data, err := ioutil.ReadAll(packet.Payload) - if err != nil { - return fmt.Errorf("invalid direct messages: %v", err) - } - r := bytes.NewReader(data) - packet.Payload = r - - var requestDeprecated Envelope - errDepReq := packet.Decode(&requestDeprecated) - if errDepReq == nil { - waku.mailServer.DeliverMail(p, &requestDeprecated) - continue - } else { - log.Info("failed to decode p2p request message (deprecated)", "peer", p.peer.ID(), "err", errDepReq) - } - - // As we failed to decode the request, let's set the offset - // to the beginning and try decode it again. - if _, err := r.Seek(0, io.SeekStart); err != nil { - return fmt.Errorf("invalid direct messages: %v", err) - } - - var request MessagesRequest - errReq := packet.Decode(&request) - if errReq == nil { - waku.mailServer.Deliver(p, request) - continue - } else { - log.Info("failed to decode p2p request message", "peer", p.peer.ID(), "err", errReq) - } - - return errors.New("invalid p2p request") + if err := w.handleP2PRequestCode(p, packet, logger); err != nil { + logger.Warn("failed to decode p2p request message, peer will be disconnected", zap.Binary("peer", peerID[:]), zap.Error(err)) + return err } case p2pRequestCompleteCode: - if p.trusted { - var payload []byte - if err := packet.Decode(&payload); err != nil { - log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return errors.New("invalid request response message") - } + if err := w.handleP2PRequestCompleteCode(p, packet, logger); err != nil { + logger.Warn("failed to decode p2p request complete message, peer will be disconnected", zap.Binary("peer", peerID[:]), zap.Error(err)) + return err + } + default: + // New message types might be implemented in the future versions of Waku. + // For forward compatibility, just ignore. + logger.Debug("ignored packet with message code", zap.Uint64("code", packet.Code)) + } + + _ = packet.Discard() + } +} - event, err := CreateMailServerEvent(p.peer.ID(), payload) +func (w *Waku) handleMessagesCode(p *Peer, packet p2p.Msg, logger *zap.Logger) error { + peerID := p.peer.ID() - if err != nil { - log.Warn("error while parsing request complete code, peer will be disconnected", "peer", p.peer.ID(), "err", err) - return err - } + // decode the contained envelopes + data, err := ioutil.ReadAll(packet.Payload) + if err != nil { + envelopesRejectedCounter.WithLabelValues("failed_read").Inc() + return fmt.Errorf("failed to read packet payload: %w", err) + } - if event != nil { - waku.postP2P(*event) - } + var envelopes []*Envelope + if err := rlp.DecodeBytes(data, &envelopes); err != nil { + envelopesRejectedCounter.WithLabelValues("invalid_data").Inc() + return fmt.Errorf("invalid payload: %w", err) + } + trouble := false + for _, env := range envelopes { + cached, err := w.add(env, w.LightClientMode()) + if err != nil { + _, isTimeSyncError := err.(TimeSyncError) + if !isTimeSyncError { + trouble = true + logger.Info("invalid envelope received", zap.Binary("peer", peerID[:]), zap.Error(err)) } - default: - // New message types might be implemented in the future versions of Waku. - // For forward compatibility, just ignore. + } else if cached { + p.mark(env) } - packet.Discard() + w.envelopeFeed.Send(EnvelopeEvent{ + Event: EventEnvelopeReceived, + Topic: env.Topic, + Hash: env.Hash(), + Peer: p.peer.ID(), + }) + envelopesValidatedCounter.Inc() + } + + if trouble { + return errors.New("received invalid envelope") + } + return nil +} + +func (w *Waku) handlePowRequirementCode(p *Peer, packet p2p.Msg, logger *zap.Logger) error { + s := rlp.NewStream(packet.Payload, uint64(packet.Size)) + i, err := s.Uint() + if err != nil { + envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc() + return fmt.Errorf("invalid powRequirementCode message: %w", err) + } + f := math.Float64frombits(i) + if math.IsInf(f, 0) || math.IsNaN(f) || f < 0.0 { + envelopesRejectedCounter.WithLabelValues("invalid_pow_req").Inc() + return errors.New("invalid value in powRequirementCode message") + } + p.powRequirement = f + return nil +} + +func (w *Waku) handleBloomFilterExCode(p *Peer, packet p2p.Msg, logger *zap.Logger) error { + var bloom []byte + err := packet.Decode(&bloom) + if err == nil && len(bloom) != BloomFilterSize { + err = fmt.Errorf("wrong bloom filter size %d", len(bloom)) + } + if err != nil { + envelopesRejectedCounter.WithLabelValues("invalid_bloom").Inc() + return errors.New("invalid bloom filter exchange message") + } + + p.setBloomFilter(bloom) + return nil +} + +func (w *Waku) handleP2PMessageCode(p *Peer, packet p2p.Msg, logger *zap.Logger) error { + // peer-to-peer message, sent directly to peer bypassing PoW checks, etc. + // this message is not supposed to be forwarded to other peers, and + // therefore might not satisfy the PoW, expiry and other requirements. + // these messages are only accepted from the trusted peer. + if !p.trusted { + return nil + } + + var ( + envelopes []*Envelope + err error + ) + + if err = packet.Decode(&envelopes); err != nil { + return fmt.Errorf("invalid direct message payload: %w", err) + } + + for _, envelope := range envelopes { + w.postP2P(envelope) + } + return nil +} + +func (w *Waku) handleP2PRequestCode(p *Peer, packet p2p.Msg, logger *zap.Logger) error { + peerID := p.peer.ID() + + // Must be processed if mail server is implemented. Otherwise ignore. + if w.mailServer == nil { + return nil + } + + // Read all data as we will try to decode it possibly twice. + data, err := ioutil.ReadAll(packet.Payload) + if err != nil { + return fmt.Errorf("invalid p2p request messages: %w", err) + } + r := bytes.NewReader(data) + packet.Payload = r + + var requestDeprecated Envelope + errDepReq := packet.Decode(&requestDeprecated) + if errDepReq == nil { + w.mailServer.DeliverMail(p, &requestDeprecated) + return nil + } else { + logger.Info("failed to decode p2p request message (deprecated)", zap.Binary("peer", peerID[:]), zap.Error(errDepReq)) + } + + // As we failed to decode the request, let's set the offset + // to the beginning and try decode it again. + if _, err := r.Seek(0, io.SeekStart); err != nil { + return fmt.Errorf("invalid p2p request message: %w", err) + } + + var request MessagesRequest + errReq := packet.Decode(&request) + if errReq == nil { + w.mailServer.Deliver(p, request) + return nil + } else { + logger.Info("failed to decode p2p request message", zap.Binary("peer", peerID[:]), zap.Error(errDepReq)) + } + + return errors.New("invalid p2p request message") +} + +func (w *Waku) handleP2PRequestCompleteCode(p *Peer, packet p2p.Msg, logger *zap.Logger) error { + if !p.trusted { + return nil + } + + var payload []byte + if err := packet.Decode(&payload); err != nil { + return fmt.Errorf("invalid p2p request complete message: %w", err) } + + event, err := CreateMailServerEvent(p.peer.ID(), payload) + if err != nil { + return fmt.Errorf("invalid p2p request complete payload: %w", err) + } + + w.postP2P(*event) + return nil } // add inserts a new envelope into the message pool to be distributed within the // waku network. It also inserts the envelope into the expiration pool at the // appropriate time-stamp. In case of error, connection should be dropped. // param isP2P indicates whether the message is peer-to-peer (should not be forwarded). -func (waku *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { - now := uint32(waku.timeSource().Unix()) +func (w *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { + now := uint32(w.timeSource().Unix()) sent := envelope.Expiry - envelope.TTL envelopesReceivedCounter.Inc() @@ -1140,61 +1111,58 @@ func (waku *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { return false, nil // drop envelope without error } - if uint32(envelope.size()) > waku.MaxMessageSize() { + if uint32(envelope.size()) > w.MaxMessageSize() { envelopesCacheFailedCounter.WithLabelValues("oversized").Inc() return false, fmt.Errorf("huge messages are not allowed [%x]", envelope.Hash()) } - if envelope.PoW() < waku.MinPow() { + if envelope.PoW() < w.MinPow() { // maybe the value was recently changed, and the peers did not adjust yet. // in this case the previous value is retrieved by MinPowTolerance() // for a short period of peer synchronization. - if envelope.PoW() < waku.MinPowTolerance() { + if envelope.PoW() < w.MinPowTolerance() { envelopesCacheFailedCounter.WithLabelValues("low_pow").Inc() return false, fmt.Errorf("envelope with low PoW received: PoW=%f, hash=[%v]", envelope.PoW(), envelope.Hash().Hex()) } } - if !BloomFilterMatch(waku.BloomFilter(), envelope.Bloom()) { + if !BloomFilterMatch(w.BloomFilter(), envelope.Bloom()) { // maybe the value was recently changed, and the peers did not adjust yet. // in this case the previous value is retrieved by BloomFilterTolerance() // for a short period of peer synchronization. - if !BloomFilterMatch(waku.BloomFilterTolerance(), envelope.Bloom()) { + if !BloomFilterMatch(w.BloomFilterTolerance(), envelope.Bloom()) { envelopesCacheFailedCounter.WithLabelValues("no_bloom_match").Inc() return false, fmt.Errorf("envelope does not match bloom filter, hash=[%v], bloom: \n%x \n%x \n%x", - envelope.Hash().Hex(), waku.BloomFilter(), envelope.Bloom(), envelope.Topic) + envelope.Hash().Hex(), w.BloomFilter(), envelope.Bloom(), envelope.Topic) } } hash := envelope.Hash() - waku.poolMu.Lock() - _, alreadyCached := waku.envelopes[hash] + w.poolMu.Lock() + _, alreadyCached := w.envelopes[hash] if !alreadyCached { - waku.envelopes[hash] = envelope - if waku.expirations[envelope.Expiry] == nil { - waku.expirations[envelope.Expiry] = mapset.NewThreadUnsafeSet() + w.envelopes[hash] = envelope + if w.expirations[envelope.Expiry] == nil { + w.expirations[envelope.Expiry] = mapset.NewThreadUnsafeSet() } - if !waku.expirations[envelope.Expiry].Contains(hash) { - waku.expirations[envelope.Expiry].Add(hash) + if !w.expirations[envelope.Expiry].Contains(hash) { + w.expirations[envelope.Expiry].Add(hash) } } - waku.poolMu.Unlock() + w.poolMu.Unlock() if alreadyCached { - log.Trace("waku envelope already cached", "hash", envelope.Hash().Hex()) + log.Trace("w envelope already cached", "hash", envelope.Hash().Hex()) envelopesCachedCounter.WithLabelValues("hit").Inc() } else { - log.Trace("cached waku envelope", "hash", envelope.Hash().Hex()) + log.Trace("cached w envelope", "hash", envelope.Hash().Hex()) envelopesCachedCounter.WithLabelValues("miss").Inc() envelopesSizeMeter.Observe(float64(envelope.size())) - waku.statsMu.Lock() - waku.stats.memoryUsed += envelope.size() - waku.statsMu.Unlock() - waku.postEvent(envelope, isP2P) // notify the local node about the new message - if waku.mailServer != nil { - waku.mailServer.Archive(envelope) - waku.envelopeFeed.Send(EnvelopeEvent{ + w.postEvent(envelope, isP2P) // notify the local node about the new message + if w.mailServer != nil { + w.mailServer.Archive(envelope) + w.envelopeFeed.Send(EnvelopeEvent{ Topic: envelope.Topic, Hash: envelope.Hash(), Event: EventMailServerEnvelopeArchived, @@ -1204,47 +1172,28 @@ func (waku *Waku) add(envelope *Envelope, isP2P bool) (bool, error) { return true, nil } -func (waku *Waku) postP2P(event interface{}) { - waku.p2pMsgQueue <- event +func (w *Waku) postP2P(event interface{}) { + w.p2pMsgQueue <- event } // postEvent queues the message for further processing. -func (waku *Waku) postEvent(envelope *Envelope, isP2P bool) { +func (w *Waku) postEvent(envelope *Envelope, isP2P bool) { if isP2P { - waku.postP2P(envelope) + w.postP2P(envelope) } else { - waku.checkOverflow() - waku.messageQueue <- envelope - } - -} - -// checkOverflow checks if message queue overflow occurs and reports it if necessary. -func (waku *Waku) checkOverflow() { - queueSize := len(waku.messageQueue) - - if queueSize == messageQueueLimit { - if !waku.Overflow() { - waku.settings.Store(overflowIdx, true) - log.Warn("message queue overflow") - } - } else if queueSize <= messageQueueLimit/2 { - if waku.Overflow() { - waku.settings.Store(overflowIdx, false) - log.Warn("message queue overflow fixed (back to normal)") - } + w.msgQueue <- envelope } } // processQueue delivers the messages to the watchers during the lifetime of the waku node. -func (waku *Waku) processQueue() { +func (w *Waku) processQueue() { for { select { - case <-waku.quit: + case <-w.quit: return - case e := <-waku.messageQueue: - waku.filters.NotifyWatchers(e, false) - waku.envelopeFeed.Send(EnvelopeEvent{ + case e := <-w.msgQueue: + w.filters.NotifyWatchers(e, false) + w.envelopeFeed.Send(EnvelopeEvent{ Topic: e.Topic, Hash: e.Hash(), Event: EventEnvelopeAvailable, @@ -1253,22 +1202,22 @@ func (waku *Waku) processQueue() { } } -func (waku *Waku) processP2P() { +func (w *Waku) processP2P() { for { select { - case <-waku.quit: + case <-w.quit: return - case e := <-waku.p2pMsgQueue: + case e := <-w.p2pMsgQueue: switch event := e.(type) { case *Envelope: - waku.filters.NotifyWatchers(event, true) - waku.envelopeFeed.Send(EnvelopeEvent{ + w.filters.NotifyWatchers(event, true) + w.envelopeFeed.Send(EnvelopeEvent{ Topic: event.Topic, Hash: event.Hash(), Event: EventEnvelopeAvailable, }) case EnvelopeEvent: - waku.envelopeFeed.Send(event) + w.envelopeFeed.Send(event) } } } @@ -1276,7 +1225,7 @@ func (waku *Waku) processP2P() { // update loops until the lifetime of the waku node, updating its internal // state by expiring stale messages from the pool. -func (waku *Waku) update() { +func (w *Waku) update() { // Start a ticker to check for expirations expire := time.NewTicker(expirationCycle) @@ -1284,9 +1233,9 @@ func (waku *Waku) update() { for { select { case <-expire.C: - waku.expire() + w.expire() - case <-waku.quit: + case <-w.quit: return } } @@ -1294,72 +1243,56 @@ func (waku *Waku) update() { // expire iterates over all the expiration timestamps, removing all stale // messages from the pools. -func (waku *Waku) expire() { - waku.poolMu.Lock() - defer waku.poolMu.Unlock() - - waku.statsMu.Lock() - defer waku.statsMu.Unlock() - waku.stats.reset() - now := uint32(waku.timeSource().Unix()) - for expiry, hashSet := range waku.expirations { +func (w *Waku) expire() { + w.poolMu.Lock() + defer w.poolMu.Unlock() + + now := uint32(w.timeSource().Unix()) + for expiry, hashSet := range w.expirations { if expiry < now { // Dump all expired messages and remove timestamp hashSet.Each(func(v interface{}) bool { - sz := waku.envelopes[v.(common.Hash)].size() - delete(waku.envelopes, v.(common.Hash)) + delete(w.envelopes, v.(common.Hash)) envelopesCachedCounter.WithLabelValues("clear").Inc() - waku.envelopeFeed.Send(EnvelopeEvent{ + w.envelopeFeed.Send(EnvelopeEvent{ Hash: v.(common.Hash), Event: EventEnvelopeExpired, }) - waku.stats.messagesCleared++ - waku.stats.memoryCleared += sz - waku.stats.memoryUsed -= sz return false }) - waku.expirations[expiry].Clear() - delete(waku.expirations, expiry) + w.expirations[expiry].Clear() + delete(w.expirations, expiry) } } } -// Stats returns the waku node statistics. -func (waku *Waku) Stats() Statistics { - waku.statsMu.Lock() - defer waku.statsMu.Unlock() - - return waku.stats -} - // Envelopes retrieves all the messages currently pooled by the node. -func (waku *Waku) Envelopes() []*Envelope { - waku.poolMu.RLock() - defer waku.poolMu.RUnlock() +func (w *Waku) Envelopes() []*Envelope { + w.poolMu.RLock() + defer w.poolMu.RUnlock() - all := make([]*Envelope, 0, len(waku.envelopes)) - for _, envelope := range waku.envelopes { + all := make([]*Envelope, 0, len(w.envelopes)) + for _, envelope := range w.envelopes { all = append(all, envelope) } return all } -// isEnvelopeCached checks if envelope with specific hash has already been received and cached. -func (waku *Waku) isEnvelopeCached(hash common.Hash) bool { - waku.poolMu.Lock() - defer waku.poolMu.Unlock() - - _, exist := waku.envelopes[hash] - return exist +// GetEnvelope retrieves an envelope from the message queue by its hash. +// It returns nil if the envelope can not be found. +func (w *Waku) GetEnvelope(hash common.Hash) *Envelope { + w.poolMu.RLock() + defer w.poolMu.RUnlock() + return w.envelopes[hash] } -// reset resets the node's statistics after each expiry cycle. -func (s *Statistics) reset() { - s.cycles++ - s.totalMessagesCleared += s.messagesCleared +// isEnvelopeCached checks if envelope with specific hash has already been received and cached. +func (w *Waku) isEnvelopeCached(hash common.Hash) bool { + w.poolMu.Lock() + defer w.poolMu.Unlock() - s.memoryCleared = 0 - s.messagesCleared = 0 + _, exist := w.envelopes[hash] + return exist } // ValidatePublicKey checks the format of the given public key. @@ -1491,23 +1424,3 @@ func addBloom(a, b []byte) []byte { } return c } - -// SelectedKeyPairID returns the id of currently selected key pair. -// It helps distinguish between different users w/o exposing the user identity itself. -func (waku *Waku) SelectedKeyPairID() string { - waku.keyMu.RLock() - defer waku.keyMu.RUnlock() - - for id := range waku.privateKeys { - return id - } - return "" -} - -// GetEnvelope retrieves an envelope from the message queue by its hash. -// It returns nil if the envelope can not be found. -func (w *Waku) GetEnvelope(hash common.Hash) *Envelope { - w.poolMu.RLock() - defer w.poolMu.RUnlock() - return w.envelopes[hash] -} diff --git a/waku/waku_test.go b/waku/waku_test.go index 4f7e3e2bc5d..c3b6af41786 100644 --- a/waku/waku_test.go +++ b/waku/waku_test.go @@ -20,7 +20,7 @@ import ( ) func TestBasic(t *testing.T) { - w := New(&DefaultConfig) + w := New(nil, nil) p := w.Protocols() shh := p[0] if shh.Name != ProtocolName { @@ -103,7 +103,7 @@ func TestBasic(t *testing.T) { func TestAsymmetricKeyImport(t *testing.T) { var ( - w = New(&DefaultConfig) + w = New(nil, nil) privateKeys []*ecdsa.PrivateKey ) @@ -133,7 +133,7 @@ func TestAsymmetricKeyImport(t *testing.T) { } func TestWakuIdentityManagement(t *testing.T) { - w := New(&DefaultConfig) + w := New(nil, nil) id1, err := w.NewKeyPair() if err != nil { t.Fatalf("failed to generate new key pair: %s.", err) @@ -255,7 +255,7 @@ func TestSymKeyManagement(t *testing.T) { var err error var k1, k2 []byte - w := New(&DefaultConfig) + w := New(nil, nil) id1 := string("arbitrary-string-1") id2 := string("arbitrary-string-2") @@ -446,10 +446,10 @@ func TestSymKeyManagement(t *testing.T) { func TestExpiry(t *testing.T) { InitSingleTest() - w := New(&DefaultConfig) - w.SetMinimumPowTest(0.0000001) - defer w.SetMinimumPowTest(DefaultMinimumPoW) - w.Start(nil) + w := New(nil, nil) + _ = w.SetMinimumPoW(0.0000001, false) + defer w.SetMinimumPoW(DefaultMinimumPoW, false) + _ = w.Start(nil) defer w.Stop() params, err := generateMessageParams() @@ -510,8 +510,8 @@ func TestExpiry(t *testing.T) { func TestCustomization(t *testing.T) { InitSingleTest() - w := New(&DefaultConfig) - defer w.SetMinimumPowTest(DefaultMinimumPoW) + w := New(nil, nil) + defer w.SetMinimumPoW(DefaultMinimumPoW, false) defer w.SetMaxMessageSize(DefaultMaxMessageSize) w.Start(nil) defer w.Stop() @@ -545,7 +545,7 @@ func TestCustomization(t *testing.T) { t.Fatalf("successfully sent envelope with PoW %.06f, false positive (seed %d).", env.PoW(), seed) } - w.SetMinimumPowTest(smallPoW / 2) + _ = w.SetMinimumPoW(smallPoW/2, true) err = w.Send(env) if err != nil { t.Fatalf("failed to send envelope with seed %d: %s.", seed, err) @@ -560,13 +560,13 @@ func TestCustomization(t *testing.T) { if err != nil { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } - w.SetMaxMessageSize(uint32(env.size() - 1)) + _ = w.SetMaxMessageSize(uint32(env.size() - 1)) err = w.Send(env) if err == nil { t.Fatalf("successfully sent oversized envelope (seed %d): false positive.", seed) } - w.SetMaxMessageSize(DefaultMaxMessageSize) + _ = w.SetMaxMessageSize(DefaultMaxMessageSize) err = w.Send(env) if err != nil { t.Fatalf("failed to send second envelope with seed %d: %s.", seed, err) @@ -601,10 +601,10 @@ func TestCustomization(t *testing.T) { func TestSymmetricSendCycle(t *testing.T) { InitSingleTest() - w := New(&DefaultConfig) - defer w.SetMinimumPowTest(DefaultMinimumPoW) + w := New(nil, nil) + defer w.SetMinimumPoW(DefaultMinimumPoW, false) defer w.SetMaxMessageSize(DefaultMaxMessageSize) - w.Start(nil) + _ = w.Start(nil) defer w.Stop() filter1, err := generateFilter(t, true) @@ -614,7 +614,7 @@ func TestSymmetricSendCycle(t *testing.T) { filter1.PoW = DefaultMinimumPoW // Copy the first filter since some of its fields - // are randomly gnerated. + // are randomly generated. filter2 := &Filter{ KeySym: filter1.KeySym, Topics: filter1.Topics, @@ -690,8 +690,8 @@ func TestSymmetricSendCycle(t *testing.T) { func TestSymmetricSendWithoutAKey(t *testing.T) { InitSingleTest() - w := New(&DefaultConfig) - defer w.SetMinimumPowTest(DefaultMinimumPoW) + w := New(nil, nil) + defer w.SetMinimumPoW(DefaultMinimumPoW, false) defer w.SetMaxMessageSize(DefaultMaxMessageSize) w.Start(nil) defer w.Stop() @@ -758,8 +758,8 @@ func TestSymmetricSendWithoutAKey(t *testing.T) { func TestSymmetricSendKeyMismatch(t *testing.T) { InitSingleTest() - w := New(&DefaultConfig) - defer w.SetMinimumPowTest(DefaultMinimumPoW) + w := New(nil, nil) + defer w.SetMinimumPoW(DefaultMinimumPoW, false) defer w.SetMaxMessageSize(DefaultMaxMessageSize) w.Start(nil) defer w.Stop() @@ -867,7 +867,7 @@ func TestBloom(t *testing.T) { t.Fatalf("bloomFilterMatch false negative") } - w := New(&DefaultConfig) + w := New(nil, nil) f := w.BloomFilter() if f != nil { t.Fatalf("wrong bloom on creation") @@ -885,10 +885,10 @@ func TestBloom(t *testing.T) { func TestSendP2PDirect(t *testing.T) { InitSingleTest() - w := New(&DefaultConfig) - w.SetMinimumPowTest(0.0000001) - defer w.SetMinimumPowTest(DefaultMinimumPoW) - w.Start(nil) + w := New(nil, nil) + _ = w.SetMinimumPoW(0.0000001, false) + defer w.SetMinimumPoW(DefaultMinimumPoW, false) + _ = w.Start(nil) defer w.Stop() rwStub := &rwP2PMessagesStub{} @@ -909,24 +909,6 @@ func TestSendP2PDirect(t *testing.T) { t.Fatalf("failed Wrap with seed %d: %s.", seed, err) } - // verify sending a single envelope - err = w.SendP2PDirect(peerW, env) - if err != nil { - t.Fatalf("failed to send envelope with seed %d: %s.", seed, err) - } - if len(rwStub.messages) != 1 { - t.Fatalf("invalid number of messages sent to peer: %d, expected 1", len(rwStub.messages)) - } - var envelope Envelope - if err := rwStub.messages[0].Decode(&envelope); err != nil { - t.Fatalf("failed to decode envelopes: %s", err) - } - if envelope.Hash() != env.Hash() { - t.Fatalf("invalid envelope %d, expected %d", envelope.Hash(), env.Hash()) - } - rwStub.messages = nil - - // send a batch of envelopes err = w.SendP2PDirect(peerW, env, env, env) if err != nil { t.Fatalf("failed to send envelope with seed %d: %s.", seed, err) @@ -948,9 +930,9 @@ func TestSendP2PDirect(t *testing.T) { func TestHandleP2PMessageCode(t *testing.T) { InitSingleTest() - w := New(&DefaultConfig) - w.SetMinimumPowTest(0.0000001) - defer w.SetMinimumPowTest(DefaultMinimumPoW) + w := New(nil, nil) + w.SetMinimumPoW(0.0000001, false) + defer w.SetMinimumPoW(DefaultMinimumPoW, false) w.Start(nil) defer w.Stop() @@ -975,7 +957,7 @@ func TestHandleP2PMessageCode(t *testing.T) { // read a single envelope rwStub := &rwP2PMessagesStub{} - rwStub.payload = []interface{}{env} + rwStub.payload = []interface{}{[]*Envelope{env}} peer := newPeer(nil, p2p.NewPeer(enode.ID{}, "test", []p2p.Cap{}), nil) peer.trusted = true @@ -1031,10 +1013,10 @@ func (stub *rwP2PMessagesStub) WriteMsg(m p2p.Msg) error { func testConfirmationsHandshake(t *testing.T, expectConfirmations bool) { conf := &Config{ - MinimumAcceptedPOW: 0, + MinimumAcceptedPoW: 0, EnableConfirmations: expectConfirmations, } - w := New(conf) + w := New(conf, nil) p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) rw1, rw2 := p2p.MsgPipe() errorc := make(chan error, 1) @@ -1049,9 +1031,10 @@ func testConfirmationsHandshake(t *testing.T, expectConfirmations bool) { require.NoError(t, p2p.ExpectMsg(rw1, statusCode, []interface{}{ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, expectConfirmations})) } -func TestConfirmationHadnshakeExtension(t *testing.T) { - testConfirmationsHandshake(t, true) -} +// TODO +//func TestConfirmationHadnshakeExtension(t *testing.T) { +// testConfirmationsHandshake(t, true) +//} func TestHandshakeWithConfirmationsDisabled(t *testing.T) { testConfirmationsHandshake(t, false) @@ -1140,10 +1123,10 @@ func TestHandshakeWithConfirmationsDisabled(t *testing.T) { func testConfirmationEvents(t *testing.T, envelope Envelope, envelopeErrors []EnvelopeError) { conf := &Config{ - MinimumAcceptedPOW: 0, + MinimumAcceptedPoW: 0, MaxMessageSize: 10 << 20, } - w := New(conf) + w := New(conf, nil) events := make(chan EnvelopeEvent, 2) sub := w.SubscribeEnvelopeEvents(events) defer sub.Unsubscribe() @@ -1277,7 +1260,7 @@ func discardPipe() *p2p.MsgPipeRW { func TestWakuTimeDesyncEnvelopeIgnored(t *testing.T) { c := &Config{ MaxMessageSize: DefaultMaxMessageSize, - MinimumAcceptedPOW: 0, + MinimumAcceptedPoW: 0, } rw1, rw2 := p2p.MsgPipe() defer func() { @@ -1286,7 +1269,7 @@ func TestWakuTimeDesyncEnvelopeIgnored(t *testing.T) { }() p1 := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) p2 := p2p.NewPeer(enode.ID{2}, "2", []p2p.Cap{{"shh", 6}}) - w1, w2 := New(c), New(c) + w1, w2 := New(c, nil), New(c, nil) errc := make(chan error) go func() { w1.HandlePeer(p2, rw2) @@ -1319,7 +1302,7 @@ func TestWakuTimeDesyncEnvelopeIgnored(t *testing.T) { } func TestRequestSentEventWithExpiry(t *testing.T) { - w := New(nil) + w := New(nil, nil) p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) rw := discardPipe() defer rw.Close() @@ -1352,21 +1335,21 @@ func TestSendMessagesRequest(t *testing.T) { } t.Run("InvalidID", func(t *testing.T) { - w := New(nil) + w := New(nil, nil) err := w.SendMessagesRequest([]byte{0x01, 0x02}, MessagesRequest{}) require.EqualError(t, err, "invalid 'ID', expected a 32-byte slice") }) t.Run("WithoutPeer", func(t *testing.T) { - w := New(nil) + w := New(nil, nil) err := w.SendMessagesRequest([]byte{0x01, 0x02}, validMessagesRequest) - require.EqualError(t, err, "Could not find peer with ID: 0102") + require.EqualError(t, err, "could not find peer with ID: 0102") }) t.Run("AllGood", func(t *testing.T) { p := p2p.NewPeer(enode.ID{0x01}, "peer01", nil) rw1, rw2 := p2p.MsgPipe() - w := New(nil) + w := New(nil, nil) w.peers[newPeer(w, p, rw1)] = struct{}{} go func() { @@ -1438,7 +1421,7 @@ func (m *mockMailServer) Deliver(p *Peer, r MessagesRequest) { } func TestMailserverCompletionEvent(t *testing.T) { - w := New(nil) + w := New(nil, nil) require.NoError(t, w.Start(nil)) defer w.Stop()