Skip to content

Commit

Permalink
refactor: making protocolIDs dynamic (not using a constant network)
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Oct 4, 2022
1 parent dc0b124 commit aaf719b
Show file tree
Hide file tree
Showing 16 changed files with 84 additions and 53 deletions.
4 changes: 2 additions & 2 deletions cmd/celestia/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package main
import (
"github.com/spf13/cobra"

cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

// NOTE: We should always ensure that the added Flags below are parsed somewhere, like in the PersistentPreRun func on
Expand Down
4 changes: 2 additions & 2 deletions cmd/celestia/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ package main
import (
"github.com/spf13/cobra"

cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

// NOTE: We should always ensure that the added Flags below are parsed somewhere, like in the PersistentPreRun func on
Expand Down
4 changes: 2 additions & 2 deletions cmd/celestia/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ package main
import (
"github.com/spf13/cobra"

cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
"github.com/celestiaorg/celestia-node/nodebuilder/state"
cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
)

// NOTE: We should always ensure that the added Flags below are parsed somewhere, like in the PersistentPreRun func on
Expand Down
8 changes: 4 additions & 4 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"testing"
"time"

"github.com/tendermint/tendermint/types"

"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
ds_sync "github.com/ipfs/go-datastore/sync"
Expand All @@ -16,9 +14,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
tmbytes "github.com/tendermint/tendermint/libs/bytes"
"github.com/tendermint/tendermint/types"

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/share"
)

Expand Down Expand Up @@ -136,8 +136,8 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
// 15 headers from the past and 15 future headers
mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15)

// create fraud share and break one header
f := fraud.NewProofService(ps, net.Hosts()[0], mockGet.GetByHeight, ds, false)
// create fraud service and break one header
f := fraud.NewProofService(ps, net.Hosts()[0], p2p.Private, mockGet.GetByHeight, ds, false)
require.NoError(t, f.Start(ctx))
mockGet.headers[1] = header.CreateFraudExtHeader(t, mockGet.headers[1], bServ)
newCtx := context.Background()
Expand Down
6 changes: 2 additions & 4 deletions fraud/requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/celestiaorg/go-libp2p-messenger/serde"
Expand All @@ -19,14 +18,13 @@ const (
readDeadline = time.Minute
)

func requestProofs(
func (f *ProofService) requestProofs(
ctx context.Context,
host host.Host,
pid peer.ID,
proofTypes []string,
) ([]*pb.ProofResponse, error) {
msg := &pb.FraudMessageRequest{RequestedProofType: proofTypes}
stream, err := host.NewStream(ctx, pid, fraudProtocolID)
stream, err := f.host.NewStream(ctx, pid, f.protocolID)
if err != nil {
return nil, err
}
Expand Down
24 changes: 16 additions & 8 deletions fraud/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ import (
)

// fraudRequests is the amount of external requests that will be tried to get fraud proofs from other peers.
const fraudRequests = 5
const (
fraudRequests = 5
protocolPrefix = "/fraud/v0.0.1/"
)

var fraudProtocolID = protocol.ID(fmt.Sprintf("/fraud/v0.0.1/%s", p2p.DefaultNetwork))
func fraudProtocolID(network p2p.Network) protocol.ID {
return protocol.ID(fmt.Sprintf("%s%s", protocolPrefix, network))
}

// ProofService is responsible for validating and propagating Fraud Proofs.
// It implements the Service interface.
Expand All @@ -34,24 +39,27 @@ type ProofService struct {
storesLk sync.RWMutex
stores map[ProofType]datastore.Datastore

pubsub *pubsub.PubSub
host host.Host
getter headerFetcher
ds datastore.Datastore
pubsub *pubsub.PubSub
host host.Host
protocolID protocol.ID
getter headerFetcher
ds datastore.Datastore

syncerEnabled bool
}

func NewProofService(
p *pubsub.PubSub,
host host.Host,
network p2p.Network,
getter headerFetcher,
ds datastore.Datastore,
syncerEnabled bool,
) *ProofService {
return &ProofService{
pubsub: p,
host: host,
protocolID: fraudProtocolID(network),
getter: getter,
topics: make(map[ProofType]*pubsub.Topic),
stores: make(map[ProofType]datastore.Datastore),
Expand Down Expand Up @@ -80,7 +88,7 @@ func (f *ProofService) Start(context.Context) error {
if err := f.registerProofTopics(getRegisteredProofTypes()...); err != nil {
return err
}
f.host.SetStreamHandler(fraudProtocolID, f.handleFraudMessageRequest)
f.host.SetStreamHandler(f.protocolID, f.handleFraudMessageRequest)
if f.syncerEnabled {
go f.syncFraudProofs(f.ctx)
}
Expand All @@ -89,7 +97,7 @@ func (f *ProofService) Start(context.Context) error {

// Stop removes the stream handler and cancels the underlying ProofService
func (f *ProofService) Stop(context.Context) error {
f.host.RemoveStreamHandler(fraudProtocolID)
f.host.RemoveStreamHandler(f.protocolID)
f.cancel()
return nil
}
Expand Down
4 changes: 4 additions & 0 deletions fraud/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
)

func TestService_Subscribe(t *testing.T) {
Expand Down Expand Up @@ -128,6 +129,7 @@ func TestService_ReGossiping(t *testing.T) {
pserviceB := NewProofService(
psB,
net.Hosts()[1],
p2p.Private,
func(ctx context.Context, u uint64) (*header.ExtendedHeader, error) {
return &header.ExtendedHeader{}, nil
},
Expand All @@ -143,6 +145,7 @@ func TestService_ReGossiping(t *testing.T) {
pserviceC := NewProofService(
psC,
net.Hosts()[2],
p2p.Private,
func(ctx context.Context, u uint64) (*header.ExtendedHeader, error) {
return &header.ExtendedHeader{}, nil
},
Expand Down Expand Up @@ -260,6 +263,7 @@ func createServiceWithHost(
return NewProofService(
ps,
host,
p2p.Private,
store.GetByHeight,
sync.MutexWrap(datastore.NewMapDatastore()),
enabledSyncer,
Expand Down
2 changes: 1 addition & 1 deletion fraud/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (f *ProofService) syncFraudProofs(ctx context.Context) {
// valid peer found, so go send proof requests
go func(pid peer.ID) {
log.Debugw("requesting proofs from peer", "pid", pid)
respProofs, err := requestProofs(ctx, f.host, pid, proofTypes)
respProofs, err := f.requestProofs(ctx, pid, proofTypes)
if err != nil {
log.Errorw("error while requesting fraud proofs", "err", err, "peer", pid)
return
Expand Down
21 changes: 14 additions & 7 deletions header/p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,30 @@ import (

var log = logging.Logger("header/p2p")

// PubSubTopic hardcodes the name of the ExtendedHeader
// gossipsub topic.
const PubSubTopic = "header-sub"
const (
// PubSubTopic hardcodes the name of the ExtendedHeader
// gossipsub topic.
PubSubTopic = "header-sub"
protocolPrefix = "/header-ex/v0.0.2/"
)

var exchangeProtocolID = protocol.ID(fmt.Sprintf("/header-ex/v0.0.2/%s", p2p.DefaultNetwork))
func exchangeProtocolID(network p2p.Network) protocol.ID {
return protocol.ID(fmt.Sprintf("%s%s", protocolPrefix, network))
}

// Exchange enables sending outbound ExtendedHeaderRequests to the network as well as
// handling inbound ExtendedHeaderRequests from the network.
type Exchange struct {
host host.Host
host host.Host
protocolID protocol.ID

trustedPeers peer.IDSlice
}

func NewExchange(host host.Host, peers peer.IDSlice) *Exchange {
func NewExchange(host host.Host, network p2p.Network, peers peer.IDSlice) *Exchange {
return &Exchange{
host: host,
protocolID: exchangeProtocolID(network),
trustedPeers: peers,
}
}
Expand Down Expand Up @@ -127,7 +134,7 @@ func (ex *Exchange) performRequest(

//nolint:gosec // G404: Use of weak random number generator
index := rand.Intn(len(ex.trustedPeers))
stream, err := ex.host.NewStream(ctx, ex.trustedPeers[index], exchangeProtocolID)
stream, err := ex.host.NewStream(ctx, ex.trustedPeers[index], ex.protocolID)
if err != nil {
return nil, err
}
Expand Down
15 changes: 10 additions & 5 deletions header/p2p/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/celestiaorg/celestia-node/header"
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
header_pb "github.com/celestiaorg/celestia-node/header/pb"
network "github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

Expand Down Expand Up @@ -64,15 +65,19 @@ func TestExchange_RequestByHash(t *testing.T) {
host, peer := net.Hosts()[0], net.Hosts()[1]
// create and start the ExchangeServer
store := createStore(t, 5)
serv := NewExchangeServer(host, store)
serv := NewExchangeServer(host, network.Private, store)
err = serv.Start(ctx)
require.NoError(t, err)
t.Cleanup(func() {
serv.Stop(context.Background()) //nolint:errcheck
})

// start a new stream via Peer to see if Host can handle inbound requests
stream, err := peer.NewStream(context.Background(), libhost.InfoFromHost(host).ID, exchangeProtocolID)
stream, err := peer.NewStream(
context.Background(),
libhost.InfoFromHost(host).ID,
exchangeProtocolID(network.Private),
)
require.NoError(t, err)
// create request for a header at a random height
reqHeight := store.headHeight - 2
Expand Down Expand Up @@ -102,18 +107,18 @@ func createMocknet(t *testing.T) (libhost.Host, libhost.Host) {
return net.Hosts()[0], net.Hosts()[1]
}

// createP2PExAndServer creates a Exchange with 5 headers already in its store.
// createP2PExAndServer creates an Exchange with 5 headers already in its store.
func createP2PExAndServer(t *testing.T, host, tpeer libhost.Host) (header.Exchange, *mockStore) {
store := createStore(t, 5)
serverSideEx := NewExchangeServer(tpeer, store)
serverSideEx := NewExchangeServer(tpeer, network.Private, store)
err := serverSideEx.Start(context.Background())
require.NoError(t, err)

t.Cleanup(func() {
serverSideEx.Stop(context.Background()) //nolint:errcheck
})

return NewExchange(host, []peer.ID{tpeer.ID()}), store
return NewExchange(host, network.Private, []peer.ID{tpeer.ID()}), store
}

type mockStore struct {
Expand Down
18 changes: 11 additions & 7 deletions header/p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,30 +5,34 @@ import (

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/protocol"

tmbytes "github.com/tendermint/tendermint/libs/bytes"

"github.com/celestiaorg/celestia-node/header"
p2p_pb "github.com/celestiaorg/celestia-node/header/p2p/pb"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/go-libp2p-messenger/serde"
)

// ExchangeServer represents the server-side component for
// responding to inbound header-related requests.
type ExchangeServer struct {
host host.Host
store header.Store
host host.Host
store header.Store
protocolID protocol.ID

ctx context.Context
cancel context.CancelFunc
}

// NewExchangeServer returns a new P2P server that handles inbound
// header-related requests.
func NewExchangeServer(host host.Host, store header.Store) *ExchangeServer {
func NewExchangeServer(host host.Host, network p2p.Network, store header.Store) *ExchangeServer {
return &ExchangeServer{
host: host,
store: store,
host: host,
protocolID: exchangeProtocolID(network),
store: store,
}
}

Expand All @@ -37,7 +41,7 @@ func (serv *ExchangeServer) Start(context.Context) error {
serv.ctx, serv.cancel = context.WithCancel(context.Background())
log.Info("server: listening for inbound header requests")

serv.host.SetStreamHandler(exchangeProtocolID, serv.requestHandler)
serv.host.SetStreamHandler(serv.protocolID, serv.requestHandler)

return nil
}
Expand All @@ -46,7 +50,7 @@ func (serv *ExchangeServer) Start(context.Context) error {
func (serv *ExchangeServer) Stop(context.Context) error {
log.Info("server: stopping server")
serv.cancel()
serv.host.RemoveStreamHandler(exchangeProtocolID)
serv.host.RemoveStreamHandler(serv.protocolID)
return nil
}

Expand Down
Loading

0 comments on commit aaf719b

Please sign in to comment.