Skip to content

Commit

Permalink
Update broker setup
Browse files Browse the repository at this point in the history
- remove setting queue_size
- remove unused setting bulk_queue_size
- introduce settings namespace `queue` with default `mem.events: 4096`
- add settings queue.mem.flush.min_events and queue.mem.flush.timeout
- change default output.logstash.pipelining to 5
- remove spooler settings from filebeat config object
- Update winlogbeat config validate check
  • Loading branch information
urso committed Jul 12, 2017
1 parent 09e90c6 commit cd45c0a
Show file tree
Hide file tree
Showing 24 changed files with 292 additions and 153 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- The `scripts/import_dashboards` is removed from packages. Use the `setup` command instead. {pull}4586[4586]
- Change format of the saved kibana dashboards to have a single JSON file for each dashboard {pull}4413[4413]
- Rename `configtest` command to `test config`. {pull}4590[4590]
- Remove setting `queue_size` and `bulk_queue_size`. {pull}4650[4650]

*Filebeat*

Expand Down Expand Up @@ -70,6 +71,8 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- Add support for analyzers and multifields in fields.yml. {pull}4574[4574]
- Add support for JSON logging. {pull}4523[4523]
- Add `test output` command, to test Elasticsearch and Logstash output settings. {pull}4590[4590]
- Introduce configurable event queue settings: queue.mem.events, queue.mem.flush.min_events and queue.mem.flush.timeout. {pull}4650[4650]
- Enable pipelining in Logstash output by default. {pull}4650[4650]

*Filebeat*

Expand All @@ -81,6 +84,7 @@ https://github.com/elastic/beats/compare/v6.0.0-alpha2...master[Check the HEAD d
- Add udp prospector type. {pull}4452[4452]
- Enabled Cgo which means libc is dynamically compiled. {pull}4546[4546]
- Add Beta module config reloading mechanism {pull}4566[4566]
- Remove spooler and publisher components and settings. {pull}4644[4644]

*Heartbeat*

Expand Down
29 changes: 21 additions & 8 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,25 @@ auditbeat.modules:
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -284,9 +297,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
11 changes: 0 additions & 11 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -240,21 +240,10 @@ filebeat.prospectors:

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
#filebeat.spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
#filebeat.publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#filebeat.idle_timeout: 5s

# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#filebeat.registry_file: ${path.data}/registry

#
# These config files must have the full filebeat config part inside, but only
# the prospector part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down
5 changes: 0 additions & 5 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ const (

type Config struct {
Prospectors []*common.Config `config:"prospectors"`
SpoolSize uint64 `config:"spool_size" validate:"min=1"`
PublishAsync bool `config:"publish_async"`
IdleTimeout time.Duration `config:"idle_timeout" validate:"nonzero,min=0s"`
RegistryFile string `config:"registry_file"`
ConfigDir string `config:"config_dir"`
ShutdownTimeout time.Duration `config:"shutdown_timeout"`
Expand All @@ -34,8 +31,6 @@ type Config struct {
var (
DefaultConfig = Config{
RegistryFile: "registry",
SpoolSize: 2048,
IdleTimeout: 5 * time.Second,
ShutdownTimeout: 0,
}
)
Expand Down
2 changes: 0 additions & 2 deletions filebeat/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@ func TestReadConfig2(t *testing.T) {
// Reads second config file
err = cfgfile.Read(config, absPath+"/config2.yml")
assert.Nil(t, err)

assert.Equal(t, uint64(0), config.SpoolSize)
}

func TestGetConfigFiles_File(t *testing.T) {
Expand Down
40 changes: 21 additions & 19 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -409,21 +409,10 @@ filebeat.prospectors:

#========================= Filebeat global options ============================

# Event count spool threshold - forces network flush if exceeded
#filebeat.spool_size: 2048

# Enable async publisher pipeline in filebeat (Experimental!)
#filebeat.publish_async: false

# Defines how often the spooler is flushed. After idle_timeout the spooler is
# Flush even though spool_size is not reached.
#filebeat.idle_timeout: 5s

# Name of the registry file. If a relative path is used, it is considered relative to the
# data path.
#filebeat.registry_file: ${path.data}/registry

#
# These config files must have the full filebeat config part inside, but only
# the prospector part is processed. All global options like spool_size are ignored.
# The config_dir MUST point to a different directory then where the main filebeat config file is in.
Expand Down Expand Up @@ -469,12 +458,25 @@ filebeat.prospectors:
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -667,9 +669,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
3 changes: 0 additions & 3 deletions filebeat/tests/system/config/filebeat.yml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,10 @@ filebeat.prospectors:
{{prospector_raw}}
{% endif %}

filebeat.spool_size:
filebeat.shutdown_timeout: {{ shutdown_timeout|default(0) }}
filebeat.idle_timeout: 0.1s
{% if not skip_registry_config %}
filebeat.registry_file: {{ beat.working_dir + '/' }}{{ registryFile|default("registry")}}
{%endif%}
filebeat.publish_async: {{publish_async}}

{% if reload or reload_path -%}
filebeat.config.{{ reload_type|default("prospectors") }}:
Expand Down
1 change: 0 additions & 1 deletion filebeat/tests/system/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ def test_registrar_file_content(self):

self.render_config_template(
path=os.path.abspath(self.working_dir) + "/log/*",
publish_async=True
)
os.mkdir(self.working_dir + "/log/")

Expand Down
29 changes: 21 additions & 8 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,25 @@ heartbeat.scheduler:
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -434,9 +447,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
29 changes: 21 additions & 8 deletions libbeat/_meta/config.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,25 @@
# sub-dictionary. Default is false.
#fields_under_root: false

# Internal queue size for single events in processing pipeline
#queue_size: 1000

# The internal queue size for bulk events in the processing pipeline.
# Do not modify this value.
#bulk_queue_size: 0
# Internal queue configuration for buffering events to be published.
#queue:
# Queue type by name (default 'mem')
# The memory queue will present all available events (up to the outputs
# bulk_max_size) to the output, the moment the output is ready to server
# another batch of events.
#mem:
# Max number of events the queue can buffer.
#events: 4096

# Hints the minimum number of events stored in the queue,
# before providing a batch of events to the outputs.
# A value of 0 (the default) ensures events are immediately available
# to be sent to the outputs.
#flush.min_events: 0

# Maximum duration after which events are available to the outputs,
# if the number of events stored in the queue is < min_flush_events.
#flush.timeout: 0s

# Sets the maximum number of CPUs that can be executing simultaneously. The
# default is the number of logical CPUs available in the system.
Expand Down Expand Up @@ -220,9 +233,9 @@ output.elasticsearch:
# Optional load balance the events between the Logstash hosts
#loadbalance: true

# Number of batches to be send asynchronously to logstash while processing
# Number of batches to be sent asynchronously to logstash while processing
# new batches.
#pipelining: 0
#pipelining: 5

# Optional index name. The default index name is set to name of the beat
# in all lowercase.
Expand Down
5 changes: 4 additions & 1 deletion libbeat/monitoring/report/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ func makeReporter(beat common.BeatInfo, cfg *common.Config) (report.Reporter, er
}

brokerFactory := func(e broker.Eventer) (broker.Broker, error) {
return membroker.NewBroker(e, 20, false), nil
return membroker.NewBroker(membroker.Settings{
Eventer: e,
Events: 20,
}), nil
}
pipeline, err := pipeline.New(brokerFactory, out, pipeline.Settings{
WaitClose: 0,
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/logstash/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type Backoff struct {
var defaultConfig = Config{
Port: 5044,
LoadBalance: false,
Pipelining: 5,
BulkMaxSize: 2048,
CompressionLevel: 3,
Timeout: 30 * time.Second,
Expand Down
33 changes: 20 additions & 13 deletions libbeat/publisher/bc/publisher/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,22 @@ package publisher

import (
"errors"
"fmt"

"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/outputs"
"github.com/elastic/beats/libbeat/processors"
"github.com/elastic/beats/libbeat/publisher/broker"
"github.com/elastic/beats/libbeat/publisher/broker/membroker"
"github.com/elastic/beats/libbeat/publisher/pipeline"
)

const defaultBrokerSize = 8 * 1024

func createPipeline(
beatInfo common.BeatInfo,
shipper ShipperConfig,
processors *processors.Processors,
outcfg common.ConfigNamespace,
) (*pipeline.Pipeline, error) {
queueSize := defaultBrokerSize
if qs := shipper.QueueSize; qs != nil {
if sz := *qs; sz > 0 {
queueSize = sz
}
}

var out outputs.Group
if !(*publishDisabled) {
var err error
Expand Down Expand Up @@ -63,11 +54,27 @@ func createPipeline(
},
}

brokerFactory := func(e broker.Eventer) (broker.Broker, error) {
return membroker.NewBroker(e, queueSize, false), nil
brokerType := "mem"
if b := shipper.Queue.Name(); b != "" {
brokerType = b
}

p, err := pipeline.New(brokerFactory, out, settings)
brokerFactory := broker.FindFactory(brokerType)
if brokerFactory == nil {
return nil, fmt.Errorf("'%v' is no valid queue type", brokerType)
}

brokerConfig := shipper.Queue.Config()
if brokerConfig == nil {
brokerConfig = common.NewConfig()
}

p, err := pipeline.New(
func(eventer broker.Eventer) (broker.Broker, error) {
return brokerFactory(eventer, brokerConfig)
},
out, settings,
)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit cd45c0a

Please sign in to comment.