-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Allow caller to optionally validate messages #45
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite like the jumping around the event loop in order to perform validation.
It seems to me that it is much simpler to implement with synchronous validation, unless there is a good reason.
floodsub.go
Outdated
p.maybePublishMessage(p.host.ID(), msg.Message) | ||
subs := p.getSubscriptions(msg) // call before goroutine! | ||
go func() { | ||
if p.validate(subs, msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't at the very least be some debugging output when we fail validation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know - is that log-worthy? To me it seems that a validator returning false is not very exceptional - and if it is, the caller can still log from inside the validator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is, it may indicate a bug or an attack and these things are nice to always nice to know.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess it depends on what the validator is used for. Anyway, added logging here
floodsub.go
Outdated
} | ||
|
||
// Subscribe returns a new Subscription for the given topic | ||
func (p *PubSub) Subscribe(topic string) (*Subscription, error) { | ||
func (p *PubSub) Subscribe(topic string, opts ...func(*Subscription) error) (*Subscription, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we typedef the func?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will do, good point
floodsub.go
Outdated
p.maybePublishMessage(rpc.from, pmsg) | ||
subs := p.getSubscriptions(&Message{pmsg}) // call before goroutine! | ||
go func() { | ||
if p.validate(subs, &Message{pmsg}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here re debugging output on validation error; also validation logic in separate goroutine.
@@ -319,6 +341,17 @@ func msgID(pmsg *pb.Message) string { | |||
return string(pmsg.GetFrom()) + string(pmsg.GetSeqno()) | |||
} | |||
|
|||
// validate is called in a goroutine and calls the validate functions of all subs with msg as parameter. | |||
func (p *PubSub) validate(subs []*Subscription, msg *Message) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this validates for all subscriptions -- but there may be a case that the message is valid for some topic and invalid for some other topic. Shouldn't we publish it for the valid topic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I talked to why about this and we weren't sure what to do. For now we just wanted to drop it. Maybe the right thing is to remove the topics it fails for and one drop the message if all topics have been removed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reasoning was "we can always make things more complicated" :)
floodsub.go
Outdated
@@ -171,7 +175,19 @@ func (p *PubSub) processLoop(ctx context.Context) { | |||
continue | |||
} | |||
case msg := <-p.publish: | |||
p.maybePublishMessage(p.host.ID(), msg.Message) | |||
subs := p.getSubscriptions(msg) // call before goroutine! | |||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we making this contraption with validation in a separate goroutine and then re-entry to the event loop with a separate message?
It seems to me that validators can be nonblocking and that we can simply validate synchronously and proceed to maybePublishMessage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
whyrusleeping said validators may run for a while so they shouldn't block the event loop.
@vyzo Doing validation out of the event loop is unfortunately necessary. Validation of messages that take even a few milliseconds will block the entire loop, which could to a significant bottleneck under load. Though It might be nice to find a way to rate limit this in some way to prevent unbounded concurrency |
We can throttle it using a buffered Block or Drop? |
@keks yeah, i'm thinking maybe thats out of scope for this PR |
Alrighty. I'll open an issue where we can discuss the trade-offs. |
@whyrusleeping what kind of validators do we have in mind? |
Another issue that we should be aware of with the goroutine validation approach is that it can potentially reorder messages. It's consistent with our very weak semantics for pubsub, but still an issue to be aware of. |
Another issue that should be noted is the potential for Denail of Service attacks due to unbounded concurrency in the validation. A goroutine is created for each message, which allows a malicious peer to flood the network with potentially deleterious effects. I think it might be in scope for the PR to address message throttling (#46) |
@vyzo i'm playing with the idea of using pubsub for a blockchain as a way to propagate blocks and transactions. Before a node re-broadcasts a block it receives via pubsub, it should make sure that its a valid block. Same with transactions. Validating a transaction is generally pretty cheap, but validating a block is a bit more expensive. I might be okay with having things received on a given topic block eachother (maybe with some bounded concurrency). On reordering messages, I'm okay with that. However, the DoS vector is definitely worth thinking about. |
Message reordering can already happen, since messages can be dropped, so the application layer already can't depend on ordered delivery. I want to address this in #42. re DoS: I'll work on bounding the concurrency tomorrow. Doing it in a separate PR is just more bureaucracy, (albeit not much more). |
- make validators time out after 100ms - add context param to validator functions - add type Validator func(context.Context, *Message) bool - drop message if more than 10 messages are already being validated
I made some further changes, I hope we can merge this now:
|
floodsub.go
Outdated
const ( | ||
ID = protocol.ID("/floodsub/1.0.0") | ||
maxConcurrency = 10 | ||
validateTimeoutMillis = 100 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this actual milliseconds and call it validateTimeout?
floodsub.go
Outdated
// validate is called in a goroutine and calls the validate functions of all subs with msg as parameter. | ||
func (p *PubSub) validate(subs []*Subscription, msg *Message) bool { | ||
for _, sub := range subs { | ||
ctx, cancel := context.WithTimeout(p.ctx, validateTimeoutMillis*time.Millisecond) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and here the millisecond should be part of the timeout constant.
floodsub.go
Outdated
ctx, cancel := context.WithTimeout(p.ctx, validateTimeoutMillis*time.Millisecond) | ||
defer cancel() | ||
|
||
if sub.validate != nil && !sub.validate(ctx, msg) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so the validation timeout is not enforced at all, but simply passed into the validator procedure as the context?
@vyzo good feedback, thanks! |
Oh, and I increased the timeout to 150ms because otherwise the test that checks whether a message gets dropped when the when all validators are taken would fail because the first message would already have timeouted when it sent the last one. |
Probably a good idea. |
|
@vyzo Further comments? Would like to see this merged :) |
floodsub.go
Outdated
const ID = protocol.ID("/floodsub/1.0.0") | ||
const ( | ||
ID = protocol.ID("/floodsub/1.0.0") | ||
maxConcurrency = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this configurable too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I started making this configurable per Subscription, but it turns out I'll end up with a slice of channels I need to block on and that is only possible using reflect, which is not an option for high-performance code.
Also I'm not sure if we should make everything configurable up front. I think its better to use sane defaults and not allow too much configuration. I believe this approach is very important for idiomatic Go.
So I pledge for not configurable, for now. If we need this to be configurable, we'll be able to quickly make it so. Until then, I don't see the need (and think it just makes things more complex where they don't need to be)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't have to be configurable per subscription -- I just don't like hardcoding a low value.
But up to you.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where else would you make it configurable? As a parameter to NewFloodSub? Then we had a global value for all subscriptions, and we would need to hard code a value in go-ipfs. I don't think that is a good option either.
I agree that 10 might not be the best value, so I sat down and did some simulations. They are quite rough, but I think we can still use them to inform our decision. You can find the code and "results" at https://gist.github.com/keks/a3bd10097c9c1d20ef5a2de3b5f794e3. Based on that, I'd suggest a value of 128. That seems quite high, but I believe we might drop too many packets otherwise.
The simulation assumes message validation takes as long as its timeout, so it's a worst-case analysis.
What do you think @whyrusleeping @vyzo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be either a parameter in NewFloodSub
or just make it a variable.
Either way, if we make the validation timeout configurable, we should make the max queue size configurable as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, maybe we make this an option on NewFloodsub
. When adding 'options' to things, i really like this approach: https://dave.cheney.net/2014/10/17/functional-options-for-friendly-apis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not convinced a global setting is helpful here, but I'm okay going with it. And sure, functional options are nice! That's how I implemented other options in this PR as well.
} | ||
}(pmsg) | ||
default: | ||
log.Warning("could not acquire validator; dropping message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe in the default here, we can have another select, just like this one, but instead of a default there, we have a timeout. That way we avoid starting a timer for every message, and we don't immediately drop any overflow
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't that block the main loop? And what would be the difference to just using the timeout instead of the default?
floodsub.go
Outdated
result <- sub.validate == nil || sub.validate(ctx, msg) | ||
}(sub) | ||
|
||
select { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this select should probably go in a separate loop, after this one. That way we do all the validation in parallel
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, good catch!
floodsub.go
Outdated
|
||
result := make(chan bool) | ||
go func(sub *Subscription) { | ||
result <- sub.validate == nil || sub.validate(ctx, msg) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe should return an error from the validator? Could be useful to log it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thinking was that the validator is user-supplied and they can do all the logging they want themselves. No need to make the API complex.
} | ||
}(msg) | ||
default: | ||
log.Warning("could not acquire validator; dropping message") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we should log some more information about the message? The topic and the message id come to mind.
type Validator func(context.Context, *Message) bool | ||
|
||
// WithValidator is an option that can be supplied to Subscribe. The argument is a function that returns whether or not a given message should be propagated further. | ||
func WithValidator(validate Validator) func(*Subscription) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use SubOpt
in the return type for this prototype?
} | ||
|
||
// WithValidatorTimeout is an option that can be supplied to Subscribe. The argument is a duration after which long-running validators are canceled. | ||
func WithValidatorTimeout(timeout time.Duration) func(*Subscription) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here too
I don't quite like the fact that we are always entering the validation logic, regardless of whether there is a validator appropriate for the message. I think at the very least we should increase the default throttle to something like a 100 or even 1000. |
const ID = protocol.ID("/floodsub/1.0.0") | ||
const ( | ||
ID = protocol.ID("/floodsub/1.0.0") | ||
defaultMaxConcurrency = 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make this a 100 at least (or even 1000) -- it's a global throttle on all messages not just per topic.
More on the subject of the global throttle: it is possible to DoS all topics by simply overflowing a topic that has a slow validator. This is an ugly Dos vector that can be addressed by only entering the validation logic only if there is an applicable validator. |
Actually, that's not quite sufficient -- i think we need a separate throttle per subscription so that no topic can DoS other validated topics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The validation throttle is global, and further it is applied to all topics regardless of whether they have a validator attached.
This means that a single slow validator will throttle all topics, which makes for a trivial Dos vector or bug waiting to happen.
Closing in favour of #55, so that I can work on it. |
Allows callers of
PubSub.Subscribe
to pass a validator functionfunc(Message)bool
that returns whether the passed message should be processed or not.