-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Introduce harvester_limit to limit number of harvesters #2417
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,6 +5,8 @@ import ( | |
"sync" | ||
"time" | ||
|
||
"sync/atomic" | ||
|
||
cfg "github.com/elastic/beats/filebeat/config" | ||
"github.com/elastic/beats/filebeat/harvester" | ||
"github.com/elastic/beats/filebeat/input" | ||
|
@@ -14,14 +16,15 @@ import ( | |
) | ||
|
||
type Prospector struct { | ||
cfg *common.Config // Raw config | ||
config prospectorConfig | ||
prospectorer Prospectorer | ||
spoolerChan chan *input.Event | ||
harvesterChan chan *input.Event | ||
done chan struct{} | ||
states *file.States | ||
wg sync.WaitGroup | ||
cfg *common.Config // Raw config | ||
config prospectorConfig | ||
prospectorer Prospectorer | ||
spoolerChan chan *input.Event | ||
harvesterChan chan *input.Event | ||
done chan struct{} | ||
states *file.States | ||
wg sync.WaitGroup | ||
harvesterCounter uint64 | ||
} | ||
|
||
type Prospectorer interface { | ||
|
@@ -155,6 +158,13 @@ func (p *Prospector) createHarvester(state file.State) (*harvester.Harvester, er | |
} | ||
|
||
func (p *Prospector) startHarvester(state file.State, offset int64) error { | ||
|
||
if p.config.HarvesterLimit > 0 && atomic.LoadUint64(&p.harvesterCounter) >= p.config.HarvesterLimit { | ||
return fmt.Errorf("Harvester limit reached.") | ||
} | ||
|
||
atomic.AddUint64(&p.harvesterCounter, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if createHarvester fails counter is off. Either move counter right before p.wg.Add(1) or decrement counter on fail (+ explain why it must be decremented and can not be moved past this place). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think there is a race here if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. right. Unfortunately There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tsg Just checked the code again and startHarvester cannot be called concurrently. The atomic operation is needed because of the decrementing of the counter inside the go routine. |
||
|
||
state.Offset = offset | ||
// Create harvester with state | ||
h, err := p.createHarvester(state) | ||
|
@@ -164,7 +174,10 @@ func (p *Prospector) startHarvester(state file.State, offset int64) error { | |
|
||
p.wg.Add(1) | ||
go func() { | ||
defer p.wg.Done() | ||
defer func() { | ||
p.wg.Done() | ||
atomic.AddUint64(&p.harvesterCounter, ^uint64(0)) | ||
}() | ||
// Starts harvester and picks the right type. In case type is not set, set it to defeault (log) | ||
h.Harvest() | ||
}() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should add a comment about atomic ops being safe, due to startHarvester not being executed concurrently and atomic ops only required for harvesters shutting down.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added in separate commit. Can be squashed.