-
Notifications
You must be signed in to change notification settings - Fork 985
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Split P2P Topics And Introduce Middleware (Adapters) #421
Split P2P Topics And Introduce Middleware (Adapters) #421
Conversation
Codecov Report
@@ Coverage Diff @@
## master #421 +/- ##
==========================================
- Coverage 76.58% 76.45% -0.13%
==========================================
Files 35 36 +1
Lines 2242 2251 +9
==========================================
+ Hits 1717 1721 +4
- Misses 368 372 +4
- Partials 157 158 +1
Continue to review full report at Codecov.
|
Gazelle needs rerun but the middleware idea is awesome! Curious to see if perhaps a p2p incentive layer could be attached as something of that sort. |
This is ready for review. Please look |
beacon-chain/node/p2p_config.go
Outdated
return nil, err | ||
} | ||
|
||
// TODO: Define default adapters for logging, monitoring, etc. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we open an issue for this?
|
||
s, _ := p2p.NewServer() | ||
|
||
// TODO: Figure out the topic. Is it a protobuf topic, string, or int? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should create an issue for this too for new comers to pick it up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, this is a leftover TODO. It's a string at the moment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good. My main concern is that we've been passing around messages as an interface{}
type. Would be nice if we can clean that up in this PR. Otherwise, the actual splitting of topics looks 👌
shared/p2p/service.go
Outdated
} | ||
|
||
// Reverse adapter order | ||
for i := len(adapters)/2 - 1; i >= 0; i-- { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this list be reversed outside of the loop, or can this list change over time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good catch!
return | ||
} | ||
|
||
i := feed.Send(Message{Data: d}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here's the definition for Message:
type Message struct {
// Peer represents the sender of the message.
Peer Peer
// Data can be any type of message found in sharding/p2p/proto package.
Data interface{}
}
Can type interface{}
type for Data
be replaced with proto.Message
? Allowing any type here seems unnecessarily lax, especially considering that the value was cast down to proto.Message
at the beginning of this function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Everything looks great! Just had a few comments. Also is it possible to get the number of peers connected ? We could use it for sync
// limiter or blacklisting condition. | ||
func reqLogger(next p2p.Handler) p2p.Handler { | ||
return func(ctx context.Context, msg p2p.Message) { | ||
fmt.Printf("Received message from %s\n", msg.Peer) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
msg.Peer
should be %v
|
||
s.RegisterTopic(topic, message, adapters...) | ||
|
||
ch := make(chan p2p.Message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this be buffered ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should definitely avoid using arbitrary timeouts in tests. I poked around libp2p
to see if there was a workaround, but nothing obvious popped up.
I'm also curoius if subscribeToTopic
is necessary at all, now that we have RegisterTopic
.
// | ||
// The topics can originate from multiple sources. In other words, messages on | ||
// TopicA may come from direct peer communication or a pub/sub channel. | ||
func (s *Server) RegisterTopic(topic string, message interface{}, adapters ...Adapter) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
message interface{}
can be proto.Message
as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried this and it's failing tests. I wonder if reflect is assuming that TypeOf(message) is proto.Message rather than the full proto definition.
I didn't investigate thoroughly.
shared/p2p/service.go
Outdated
} | ||
|
||
func (s *Server) emit(feed *event.Feed, msg *floodsub.Message, msgType reflect.Type) { | ||
// TODO: reflect.Value.Interface() can panic so we should capture that |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You shouldn't have to handle this case explictly, since we're never passing in unexported fields for this value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, I dont recall why the TODO was here so I will remove it. Hopefully it doesn't cause issues.
shared/p2p/service.go
Outdated
h = adapter(h) | ||
} | ||
|
||
ctx, _ := context.WithTimeout(s.ctx, 10*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cancel
should be called at the end of this loop to avoid memory leaks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why? There is a timeout, i'm not sure we even need this timeout. I'll just remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://stackoverflow.com/questions/44393995/what-happens-if-i-dont-cancel-a-context
Looks like the timeout exists so that the adapter doesn't run continuously for more than 10 seconds.
log.WithFields(logrus.Fields{ | ||
"topic": topic, | ||
}).Debug("Subscribing to topic") | ||
go s.subscribeToTopic(topic, msgType) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove this method, subscribeToTopic
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am surprised this didn't upset the unused
linter. It was only used in tests!
shared/p2p/service_test.go
Outdated
wait := make(chan struct{}) | ||
go (func() { | ||
defer close(wait) | ||
msg := <-ch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can replace this line and the next with just <-ch
} | ||
|
||
// Short timeout to allow libp2p to handle peer connection. | ||
time.Sleep(time.Millisecond * 10) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The description for BasicHost.Connect
is:
// Connect ensures there is a connection between this host and the peer with
// given peer.ID. If there is not an active connection, Connect will issue a
// h.Network.Dial, and block until a connection is open, or an error is returned.
// Connect will absorb the addresses in pi into its internal peerstore.
// It will also resolve any /dns4, /dns6, and /dnsaddr addresses.
which suggests that this timeout shouldn't be necessary. Do you know why it's required? This is a potential source of flakiness, we should definitely try and avoid an arbitrary timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really. This pattern is all over the floodsub tests: https://github.com/libp2p/go-floodsub/blob/HEAD/floodsub_test.go
And it fails without it.
shared/p2p/service_test.go
Outdated
wait := make(chan struct{}) | ||
go (func() { | ||
defer close(wait) | ||
msg := <-ch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
shared/p2p/service.go
Outdated
adapters[i], adapters[opp] = adapters[opp], adapters[i] | ||
} | ||
|
||
go (func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need for go (func, do go func() {}() instead
We specify messages available for p2p communication common to beacon chain nodes and sharding clients. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you add a few sections here about the p2p middleware stuff for visibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does this have to do with protos?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Resolves #363
This is still a bit of work in progress as this feature is lacking some critical test paths.
I'm opening this PR for exposure and tracking this work in progress.
New features:
The above middleware examples are not added in this PR but are likely candidates for new features.