Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: dial, drop and retrieve connected peers #5994

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ coverage.html
Session.vim
.undodir/*
/.idea/
/.vscode/
/cmd/*/.ethereum/
*.iml

Expand Down
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "third_party/nwaku"]
path = third_party/nwaku
url = https://github.com/waku-org/nwaku
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,7 @@
"cSpell.words": [
"unmarshalling"
],
"gopls":{
"buildFlags": ["-tags=use_nwaku,gowaku_skip_migrations,gowaku_no_rln"]
}
}
49 changes: 47 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.PHONY: statusgo statusd-prune all test clean help
.PHONY: statusgo-android statusgo-ios
.PHONY: build-libwaku test-libwaku clean-libwaku rebuild-libwaku

# Clear any GOROOT set outside of the Nix shell
export GOROOT=
Expand Down Expand Up @@ -61,6 +62,10 @@ GIT_AUTHOR ?= $(shell git config user.email || echo $$USER)
ENABLE_METRICS ?= true
BUILD_TAGS ?= gowaku_no_rln

ifeq ($(USE_NWAKU), true)
BUILD_TAGS += use_nwaku
endif

BUILD_FLAGS ?= -ldflags="-X github.com/status-im/status-go/params.Version=$(RELEASE_TAG:v%=%) \
-X github.com/status-im/status-go/params.GitCommit=$(GIT_COMMIT) \
-X github.com/status-im/status-go/params.IpfsGatewayURL=$(IPFS_GATEWAY_URL) \
Expand Down Expand Up @@ -234,8 +239,19 @@ statusgo-library: ##@cross-compile Build status-go as static library for current
@echo "Static library built:"
@ls -la build/bin/libstatus.*

statusgo-shared-library: generate
statusgo-shared-library: ##@cross-compile Build status-go as shared library for current platform

LIBWAKU := third_party/nwaku/build/libwaku.$(GOBIN_SHARED_LIB_EXT)
$(LIBWAKU):
@echo "Building libwaku"
$(MAKE) -C third_party/nwaku update || { echo "nwaku make update failed"; exit 1; }
$(MAKE) -C ./third_party/nwaku libwaku

build-libwaku: $(LIBWAKU)

statusgo-shared-library: generate ##@cross-compile Build status-go as shared library for current platform
ifeq ($(USE_NWAKU),true)
$(MAKE) $(LIBWAKU)
endif
## cmd/library/README.md explains the magic incantation behind this
mkdir -p build/bin/statusgo-lib
go run cmd/library/*.go > build/bin/statusgo-lib/main.go
Expand Down Expand Up @@ -356,9 +372,38 @@ lint-fix:
-w {} \;
$(MAKE) vendor

mock: ##@other Regenerate mocks
mockgen -package=fake -destination=transactions/fake/mock.go -source=transactions/fake/txservice.go
mockgen -package=status -destination=services/status/account_mock.go -source=services/status/service.go
mockgen -package=peer -destination=services/peer/discoverer_mock.go -source=services/peer/service.go
mockgen -package=mock_transactor -destination=transactions/mock_transactor/transactor.go -source=transactions/transactor.go
mockgen -package=mock_pathprocessor -destination=services/wallet/router/pathprocessor/mock_pathprocessor/processor.go -source=services/wallet/router/pathprocessor/processor.go
mockgen -package=mock_bridge -destination=services/wallet/bridge/mock_bridge/bridge.go -source=services/wallet/bridge/bridge.go
mockgen -package=mock_client -destination=rpc/chain/mock/client/client.go -source=rpc/chain/client.go
mockgen -package=mock_token -destination=services/wallet/token/mock/token/tokenmanager.go -source=services/wallet/token/token.go
mockgen -package=mock_thirdparty -destination=services/wallet/thirdparty/mock/types.go -source=services/wallet/thirdparty/types.go
mockgen -package=mock_balance_persistence -destination=services/wallet/token/mock/balance_persistence/balance_persistence.go -source=services/wallet/token/balance_persistence.go
mockgen -package=mock_network -destination=rpc/network/mock/network.go -source=rpc/network/network.go
mockgen -package=mock_rpcclient -destination=rpc/mock/client/client.go -source=rpc/client.go
mockgen -package=mock_collectibles -destination=services/wallet/collectibles/mock/collection_data_db.go -source=services/wallet/collectibles/collection_data_db.go
mockgen -package=mock_collectibles -destination=services/wallet/collectibles/mock/collectible_data_db.go -source=services/wallet/collectibles/collectible_data_db.go
mockgen -package=mock_thirdparty -destination=services/wallet/thirdparty/mock/collectible_types.go -source=services/wallet/thirdparty/collectible_types.go
mockgen -package=mock_paraswap -destination=services/wallet/thirdparty/paraswap/mock/types.go -source=services/wallet/thirdparty/paraswap/types.go
mockgen -package=mock_onramp -destination=services/wallet/onramp/mock/types.go -source=services/wallet/onramp/types.go


docker-test: ##@tests Run tests in a docker container with golang.
docker run --privileged --rm -it -v "$(PWD):$(DOCKER_TEST_WORKDIR)" -w "$(DOCKER_TEST_WORKDIR)" $(DOCKER_TEST_IMAGE) go test ${ARGS}

test-libwaku: | $(LIBWAKU)
go test -tags '$(BUILD_TAGS) use_nwaku' -run TestBasicWakuV2 ./wakuv2/... -count 1 -v -json | jq -r '.Output'

clean-libwaku:
@echo "Removing libwaku"
rm $(LIBWAKU)

rebuild-libwaku: | clean-libwaku $(LIBWAKU)

test: test-unit ##@tests Run basic, short tests during development

test-unit: generate
Expand Down
10 changes: 5 additions & 5 deletions cmd/ping-community/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ import (
"github.com/status-im/status-go/multiaccounts"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/multiaccounts/settings"
"github.com/status-im/status-go/wakuv2"

"github.com/status-im/status-go/logutils"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/protocol"
"github.com/status-im/status-go/protocol/common"
"github.com/status-im/status-go/protocol/common/shard"
"github.com/status-im/status-go/protocol/identity/alias"
"github.com/status-im/status-go/protocol/protobuf"
wakuextn "github.com/status-im/status-go/services/wakuext"
Expand All @@ -48,8 +48,8 @@ var (
seedPhrase = flag.String("seed-phrase", "", "Seed phrase")
version = flag.Bool("version", false, "Print version and dump configuration")
communityID = flag.String("community-id", "", "The id of the community")
shardCluster = flag.Int("shard-cluster", shard.MainStatusShardCluster, "The shard cluster in which the of the community is published")
shardIndex = flag.Int("shard-index", shard.DefaultShardIndex, "The shard index in which the community is published")
shardCluster = flag.Int("shard-cluster", wakuv2.MainStatusShardCluster, "The shard cluster in which the of the community is published")
shardIndex = flag.Int("shard-index", wakuv2.DefaultShardIndex, "The shard index in which the community is published")
chatID = flag.String("chat-id", "", "The id of the chat")

dataDir = flag.String("dir", getDefaultDataDir(), "Directory used by node to store data")
Expand Down Expand Up @@ -148,9 +148,9 @@ func main() {

messenger := wakuextservice.Messenger()

var s *shard.Shard = nil
var s *wakuv2.Shard = nil
if shardCluster != nil && shardIndex != nil {
s = &shard.Shard{
s = &wakuv2.Shard{
Cluster: uint16(*shardCluster),
Index: uint16(*shardIndex),
}
Expand Down
62 changes: 53 additions & 9 deletions eth-node/bridge/geth/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"

"github.com/waku-org/go-waku/waku/v2/api/history"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/p2p/enode"
gocommon "github.com/status-im/status-go/common"
Expand Down Expand Up @@ -274,10 +276,6 @@ func (w *GethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}

func (w *GethWakuWrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
return nil, 0, errors.New("not implemented")
}

func (w *GethWakuWrapper) ConnectionChanged(_ connection.State) {}

func (w *GethWakuWrapper) ClearEnvelopesCache() {
Expand Down Expand Up @@ -314,13 +312,59 @@ func (w *wakuFilterWrapper) ID() string {
func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) {
}

func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) {
func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) PeerID() peer.ID {
panic("not implemented")
func (w *GethWakuWrapper) GetActiveStorenode() peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeChanged() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeNotWorking() <-chan struct{} {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) OnStorenodeAvailable() <-chan peer.ID {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
return false
}

func (w *GethWakuWrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
panic("not available in WakuV1")
}

func (w *GethWakuWrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
return errors.New("not available in WakuV1")
}

func (w *GethWakuWrapper) IsStorenodeAvailable(peerID peer.ID) bool {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
panic("not available in WakuV1")

}

func (w *GethWakuWrapper) PingPeer(context.Context, peer.ID) (time.Duration, error) {
return 0, errors.New("not available in WakuV1")
func (w *GethWakuWrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
panic("not available in WakuV1")
}
111 changes: 70 additions & 41 deletions eth-node/bridge/geth/wakuv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/multiformats/go-multiaddr"
"google.golang.org/protobuf/proto"

"github.com/waku-org/go-waku/waku/v2/api/history"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/store"

Expand Down Expand Up @@ -176,39 +177,6 @@ func (w *gethWakuV2Wrapper) createFilterWrapper(id string, keyAsym *ecdsa.Privat
}, id), nil
}

func (w *gethWakuV2Wrapper) RequestStoreMessages(ctx context.Context, peerID peer.ID, r types.MessagesRequest, processEnvelopes bool) (types.StoreRequestCursor, int, error) {
options := []store.RequestOption{
store.WithPaging(false, uint64(r.Limit)),
}

var cursor []byte
if r.StoreCursor != nil {
cursor = r.StoreCursor
}

contentTopics := []string{}
for _, topic := range r.ContentTopics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic).ContentTopic())
}

query := store.FilterCriteria{
TimeStart: proto.Int64(int64(r.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(r.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(w.waku.GetPubsubTopic(r.PubsubTopic), contentTopics...),
}

pbCursor, envelopesCount, err := w.waku.Query(ctx, peerID, query, cursor, options, processEnvelopes)
if err != nil {
return nil, 0, err
}

if pbCursor != nil {
return pbCursor, envelopesCount, nil
}

return nil, envelopesCount, nil
}

func (w *gethWakuV2Wrapper) StartDiscV5() error {
return w.waku.StartDiscV5()
}
Expand Down Expand Up @@ -259,7 +227,7 @@ func (w *gethWakuV2Wrapper) DialPeerByID(peerID peer.ID) error {
}

func (w *gethWakuV2Wrapper) ListenAddresses() ([]multiaddr.Multiaddr, error) {
return w.waku.ListenAddresses(), nil
return w.waku.ListenAddresses()
}

func (w *gethWakuV2Wrapper) RelayPeersByTopic(topic string) (*types.PeerList, error) {
Expand Down Expand Up @@ -289,7 +257,7 @@ func (w *gethWakuV2Wrapper) SubscribeToConnStatusChanges() (*types.ConnStatusSub
func (w *gethWakuV2Wrapper) SetCriteriaForMissingMessageVerification(peerID peer.ID, pubsubTopic string, contentTopics []types.TopicType) error {
var cTopics []string
for _, ct := range contentTopics {
cTopics = append(cTopics, wakucommon.TopicType(ct).ContentTopic())
cTopics = append(cTopics, wakucommon.BytesToTopic(ct.Bytes()).ContentTopic())
}
pubsubTopic = w.waku.GetPubsubTopic(pubsubTopic)
w.waku.SetTopicsToVerifyForMissingMessages(peerID, pubsubTopic, cTopics)
Expand Down Expand Up @@ -338,14 +306,75 @@ func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) {
w.waku.ConfirmMessageDelivered(hashes)
}

func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) {
w.waku.SetStorePeerID(peerID)
}

func (w *gethWakuV2Wrapper) PeerID() peer.ID {
return w.waku.PeerID()
}

func (w *gethWakuV2Wrapper) PingPeer(ctx context.Context, peerID peer.ID) (time.Duration, error) {
return w.waku.PingPeer(ctx, peerID)
func (w *gethWakuV2Wrapper) GetActiveStorenode() peer.ID {
return w.waku.StorenodeCycle.GetActiveStorenode()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailableOneShot() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeAvailableOneshotEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeChanged() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeChangedEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeNotWorking() <-chan struct{} {
return w.waku.StorenodeCycle.StorenodeNotWorkingEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) OnStorenodeAvailable() <-chan peer.ID {
return w.waku.StorenodeCycle.StorenodeAvailableEmitter.Subscribe()
}

func (w *gethWakuV2Wrapper) WaitForAvailableStoreNode(timeout time.Duration) bool {
return w.waku.StorenodeCycle.WaitForAvailableStoreNode(context.TODO(), timeout)
}

func (w *gethWakuV2Wrapper) SetStorenodeConfigProvider(c history.StorenodeConfigProvider) {
w.waku.StorenodeCycle.SetStorenodeConfigProvider(c)
}

func (w *gethWakuV2Wrapper) ProcessMailserverBatch(
ctx context.Context,
batch types.MailserverBatch,
storenodeID peer.ID,
pageLimit uint64,
shouldProcessNextPage func(int) (bool, uint64),
processEnvelopes bool,
) error {
pubsubTopic := w.waku.GetPubsubTopic(batch.PubsubTopic)
contentTopics := []string{}
for _, topic := range batch.Topics {
contentTopics = append(contentTopics, wakucommon.BytesToTopic(topic.Bytes()).ContentTopic())
}

criteria := store.FilterCriteria{
TimeStart: proto.Int64(int64(batch.From) * int64(time.Second)),
TimeEnd: proto.Int64(int64(batch.To) * int64(time.Second)),
ContentFilter: protocol.NewContentFilter(pubsubTopic, contentTopics...),
}

return w.waku.HistoryRetriever.Query(ctx, criteria, storenodeID, pageLimit, shouldProcessNextPage, processEnvelopes)
}

func (w *gethWakuV2Wrapper) IsStorenodeAvailable(peerID peer.ID) bool {
return w.waku.StorenodeCycle.IsStorenodeAvailable(peerID)
}

func (w *gethWakuV2Wrapper) PerformStorenodeTask(fn func() error, opts ...history.StorenodeTaskOption) error {
return w.waku.StorenodeCycle.PerformStorenodeTask(fn, opts...)
}

func (w *gethWakuV2Wrapper) DisconnectActiveStorenode(ctx context.Context, backoff time.Duration, shouldCycle bool) {
w.waku.StorenodeCycle.Lock()
defer w.waku.StorenodeCycle.Unlock()

w.waku.StorenodeCycle.DisconnectActiveStorenode(backoff)
if shouldCycle {
w.waku.StorenodeCycle.Cycle(ctx)
}
}
Loading
Loading