Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Local networked P2P! #222

Merged
merged 90 commits into from
Jul 17, 2018
Merged

Local networked P2P! #222

merged 90 commits into from
Jul 17, 2018

Conversation

prestonvanloon
Copy link
Member

@prestonvanloon prestonvanloon commented Jun 30, 2018

Depends on #221 (vendor libraries) for a smaller diff.

Preliminary p2p services

The p2p functionality here is rather simple, but is a means to test (locally) networked sharding nodes.

Key features:

  • Peer discovery via Multicast DNS (i.e. locally running nodes will be discovered and connected to without DHT or manually specified)
  • Messages are defined via protobuf
  • Message topics are mapped to the protobuf enum
  • CollationBody{Request|Response} are ready to go
  • Send "works" by broadcasting your message to all peers. Technically your intended receiver will receive the message, but so will everyone else for the time being.
  • Usage should be straightforward as outlined in the p2p design. Subscribe to the feeds you want, send/broadcast as you please.
  • You will receive your own messages (at least until these are filtered out), but that shouldn't be an issue for most use cases. Just be aware that Send == Broadcast and you receive your own messages, don't get caught in an infinite loop!

}

// topicPeerLister has a method to return connected peers on a given topic.
// This is implemented by floodsub.PubSub
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

punctuation

func (d *discovery) HandlePeerFound(pi ps.PeerInfo) {
d.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, ps.PermanentAddrTTL)
if err := d.host.Connect(d.ctx, pi); err != nil {
log.Warn(fmt.Sprintf("Failed to connect to peer: %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Warnf

log.Warn(fmt.Sprintf("Failed to connect to peer: %v", err))
}

log.Debug(fmt.Sprintf("Peers now: %s", d.host.Peerstore().Peers()))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log.Debugf

@@ -0,0 +1,39 @@
syntax = "proto3";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put this stuff inside of shared/p2p/proto?

}, nil
}

// Start the main routine for an p2p server.
func (s *Server) Start() {
log.Info("Starting shardp2p server")
if err := startDiscovery(s.ctx, s.host, s.gsub); err != nil {
log.Error(fmt.Sprintf("Could not start p2p discovery! %v", err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Errorf

@@ -76,18 +78,36 @@ func (p *Proposer) Stop() error {
return nil
}

// TODO: Move this somewhere else
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's put this in the utils package.

@@ -79,18 +78,18 @@ func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, read
}

period := new(big.Int).Div(blockNumber.Number(), big.NewInt(s.config.PeriodLength))
period = period.Sub(period, big.NewInt(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment explaining why we're doing this

s.p2p.Broadcast(req)
log.Debug("Sent request for collation body via a shardp2p broadcast")
} else {
log.Warn("Syncer generated nil CollationBodyRequest")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already captured by the log right before it that says "no collation found for shardid x, period y"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see that message?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

syncer/handlers.go log.Debugf("No collation exists for shard")

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but from this context when we ask the syncer to generate a collation request and it gives us nothing back, something feels wrong. Without debug logging the simulator appears to do nothing.

We'll need to iterate on this to make it better.


s.p2p.Broadcast(tx)
log.Debug("Transaction broadcasted")
// s.p2p.Broadcast(messages.TransactionBroadcast{Transaction: tx})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just delete the comments beneath it?

res, err := RespondCollationBody(req, collationFetcher)
if err != nil {
log.Errorf("Could not construct response: %v", err)
continue
}

if res == nil {
// TODO: Send that we don't have it?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree we should send a msg for this

@rauljordan
Copy link
Contributor

Also, in order to try this out, we need to start a proposer first and then a simulator, as only one of these actors can access the local leveldb instance. Additionally, the simulator lags behind the proposer in terms of requesting collation data, as they are never synchronized on the same period for these requests. Just a few things to keep in mind @prestonvanloon.

@prestonvanloon
Copy link
Member Author

I addressed some concerns. I’ll move the protobuf definition to a better place, tomorrow.

// startDiscovery protocols. Currently, this supports discovery via multicast
// DNS peer discovery.
//
// TODO: add other discovery protocols such as DHT, etc.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's open up an issue for add other discovery protocols

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

func (d *discovery) HandlePeerFound(pi ps.PeerInfo) {
d.host.Peerstore().AddAddrs(pi.ID, pi.Addrs, ps.PermanentAddrTTL)
if err := d.host.Connect(d.ctx, pi); err != nil {
log.Warnf("Failed to connect to peer: %v", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't error be more appropriate?
Warning is like "you should probably take a look at this"
Error is like "something failed but I'm not quitting"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's really error level. Imagine that they discover a peer but then the peer goes offline or refuses the connection.

In geth, these are debug level logs so warning might even be too verbose.

Copy link
Member

@nisdas nisdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything looks great aside from a few comments. Great Job !

func expectPeers(t *testing.T, h *bhost.BasicHost, n int) {
if len(h.Peerstore().Peers()) != n {
t.Errorf(
"Expected 2 peer for host %v, but has %d peers. They are: %v.",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of Expected 2 peers , should be Expected %d peers , with the argument being n

var portRange int32 = 100

// buildOptions for the libp2p host.
// TODO: Expand on these options and provide the option configuration via flags.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's open up an issue for this

Copy link
Member

@terencechain terencechain left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@rauljordan rauljordan merged commit f2f8850 into prysmaticlabs:master Jul 17, 2018
prestonvanloon added a commit that referenced this pull request Jul 22, 2018
prestonvanloon added a commit that referenced this pull request Jul 22, 2018
prestonvanloon added a commit that referenced this pull request Jul 22, 2018
Former-commit-id: d22d05529bb0050b8a03053a28d876e3e458bbff [formerly 284a046]
Former-commit-id: 83cd9c8
@prestonvanloon prestonvanloon deleted the p2p branch July 24, 2018 12:04
Redidacove pushed a commit to Redidacove/prysm that referenced this pull request Aug 13, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants