Skip to content

Commit

Permalink
Merge pull request #255 from keep-network/configurable-validation-queue
Browse files Browse the repository at this point in the history
Configurable size of validate queue
  • Loading branch information
vyzo authored Jan 27, 2020
2 parents 97846b5 + ae00326 commit 25c434f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 2 deletions.
2 changes: 1 addition & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ type PubSubRouter interface {

type Message struct {
*pb.Message
ReceivedFrom peer.ID
ReceivedFrom peer.ID
ValidatorData interface{}
}

Expand Down
15 changes: 14 additions & 1 deletion validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
)

const (
defaultValidateQueueSize = 32
defaultValidateConcurrency = 1024
defaultValidateThrottle = 8192
)
Expand Down Expand Up @@ -82,7 +83,7 @@ type rmValReq struct {
func newValidation() *validation {
return &validation{
topicVals: make(map[string]*topicVal),
validateQ: make(chan *validateReq, 32),
validateQ: make(chan *validateReq, defaultValidateQueueSize),
validateThrottle: make(chan struct{}, defaultValidateThrottle),
validateWorkers: runtime.NumCPU(),
}
Expand Down Expand Up @@ -342,6 +343,18 @@ func (val *topicVal) validateMsg(ctx context.Context, src peer.ID, msg *Message)

/// Options

// WithValidateQueueSize sets the buffer of validate queue. Defaults to 32.
// When queue is full, validation is throttled and new messages are dropped.
func WithValidateQueueSize(n int) Option {
return func(ps *PubSub) error {
if n > 0 {
ps.val.validateQ = make(chan *validateReq, n)
return nil
}
return fmt.Errorf("validate queue size must be > 0")
}
}

// WithValidateThrottle sets the upper bound on the number of active validation
// goroutines across all topics. The default is 8192.
func WithValidateThrottle(n int) Option {
Expand Down

0 comments on commit 25c434f

Please sign in to comment.