Skip to content

Commit

Permalink
rename update to overwrite both in config and code
Browse files Browse the repository at this point in the history
  • Loading branch information
kvch committed Apr 10, 2018
1 parent f63fb10 commit 6a151b5
Show file tree
Hide file tree
Showing 9 changed files with 35 additions and 35 deletions.
6 changes: 3 additions & 3 deletions filebeat/_meta/common.reference.p2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ filebeat.inputs:
#filebeat.registry_file_permissions: 0600

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Filebeat updates pipelines everytime
# a new Elasticsearch connection is established.
#filebeat.update_pipelines: false
# already exists. If this option is enabled Filebeat overwrites pipelines
# everytime a new Elasticsearch connection is established.
#filebeat.overwrite_pipelines: false

# These config files must have the full filebeat config part inside, but only
# the input part is processed. All global options like spool_size are ignored.
Expand Down
18 changes: 9 additions & 9 deletions filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,17 +137,17 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {

func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
if !fb.moduleRegistry.Empty() {
updatePipelines := fb.config.UpdatePipelines
overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
updatePipelines = true
overwritePipelines = true
}

b.UpdatePipelinesCallback = func(esConfig *common.Config) error {
b.OverwritePipelinesCallback = func(esConfig *common.Config) error {
esClient, err := elasticsearch.NewConnectedClient(esConfig)
if err != nil {
return err
}
return fb.moduleRegistry.LoadPipelines(esClient, updatePipelines)
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
}
return nil
Expand All @@ -161,15 +161,15 @@ func (fb *Filebeat) loadModulesPipelines(b *beat.Beat) error {
return nil
}

updatePipelines := fb.config.UpdatePipelines
overwritePipelines := fb.config.OverwritePipelines
if b.InSetupCmd {
updatePipelines = true
overwritePipelines = true
}

// register pipeline loading to happen every time a new ES connection is
// established
callback := func(esClient *elasticsearch.Client) error {
return fb.moduleRegistry.LoadPipelines(esClient, updatePipelines)
return fb.moduleRegistry.LoadPipelines(esClient, overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)

Expand Down Expand Up @@ -342,11 +342,11 @@ func (fb *Filebeat) Run(b *beat.Beat) error {
logp.Warn(pipelinesWarning)
}

if config.UpdatePipelines {
if config.OverwritePipelines {
logp.Debug("modules", "Existing Ingest pipelines will be updated")
}

err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.UpdatePipelines)
err = crawler.Start(registrar, config.ConfigInput, config.ConfigModules, pipelineLoaderFactory, config.OverwritePipelines)
if err != nil {
crawler.Stop()
return err
Expand Down
4 changes: 2 additions & 2 deletions filebeat/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ type Config struct {
ConfigProspector *common.Config `config:"config.prospectors"`
ConfigModules *common.Config `config:"config.modules"`
Autodiscover *autodiscover.Config `config:"autodiscover"`
UpdatePipelines bool `config:"update_pipelines"`
OverwritePipelines bool `config:"overwrite_pipelines"`
}

var (
DefaultConfig = Config{
RegistryFile: "registry",
RegistryFilePermissions: 0600,
ShutdownTimeout: 0,
UpdatePipelines: false,
OverwritePipelines: false,
}
)

Expand Down
4 changes: 2 additions & 2 deletions filebeat/crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func New(out channel.Factory, inputConfigs []*common.Config, beatVersion string,

// Start starts the crawler with all inputs
func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, updatePipelines bool) error {
configModules *common.Config, pipelineLoaderFactory fileset.PipelineLoaderFactory, overwritePipelines bool) error {

logp.Info("Loading Inputs: %v", len(c.inputConfigs))

Expand All @@ -67,7 +67,7 @@ func (c *Crawler) Start(r *registrar.Registrar, configInputs *common.Config,
}()
}

c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, updatePipelines, c.beatDone)
c.ModulesFactory = fileset.NewFactory(c.out, r, c.beatVersion, pipelineLoaderFactory, overwritePipelines, c.beatDone)
if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(configModules)
if err := c.modulesReloader.Check(c.ModulesFactory); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -596,9 +596,9 @@ filebeat.inputs:
#filebeat.registry_file_permissions: 0600

# By default Ingest pipelines are not updated if a pipeline with the same ID
# already exists. If this option is enabled Filebeat updates pipelines everytime
# a new Elasticsearch connection is established.
#filebeat.update_pipelines: false
# already exists. If this option is enabled Filebeat overwrites pipelines
# everytime a new Elasticsearch connection is established.
#filebeat.overwrite_pipelines: false

# These config files must have the full filebeat config part inside, but only
# the input part is processed. All global options like spool_size are ignored.
Expand Down
14 changes: 7 additions & 7 deletions filebeat/fileset/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type Factory struct {
registrar *registrar.Registrar
beatVersion string
pipelineLoaderFactory PipelineLoaderFactory
updatePipelines bool
overwritePipelines bool
beatDone chan struct{}
}

Expand All @@ -28,19 +28,19 @@ type inputsRunner struct {
moduleRegistry *ModuleRegistry
inputs []*input.Runner
pipelineLoaderFactory PipelineLoaderFactory
updatePipelines bool
overwritePipelines bool
}

// NewFactory instantiates a new Factory
func NewFactory(outlet channel.Factory, registrar *registrar.Registrar, beatVersion string,
pipelineLoaderFactory PipelineLoaderFactory, updatePipelines bool, beatDone chan struct{}) *Factory {
pipelineLoaderFactory PipelineLoaderFactory, overwritePipelines bool, beatDone chan struct{}) *Factory {
return &Factory{
outlet: outlet,
registrar: registrar,
beatVersion: beatVersion,
beatDone: beatDone,
pipelineLoaderFactory: pipelineLoaderFactory,
updatePipelines: updatePipelines,
overwritePipelines: overwritePipelines,
}
}

Expand Down Expand Up @@ -79,7 +79,7 @@ func (f *Factory) Create(c *common.Config, meta *common.MapStrPointer) (cfgfile.
moduleRegistry: m,
inputs: inputs,
pipelineLoaderFactory: f.pipelineLoaderFactory,
updatePipelines: f.updatePipelines,
overwritePipelines: f.overwritePipelines,
}, nil
}

Expand All @@ -91,7 +91,7 @@ func (p *inputsRunner) Start() {
if err != nil {
logp.Err("Error loading pipeline: %s", err)
} else {
err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.updatePipelines)
err := p.moduleRegistry.LoadPipelines(pipelineLoader, p.overwritePipelines)
if err != nil {
// Log error and continue
logp.Err("Error loading pipeline: %s", err)
Expand All @@ -100,7 +100,7 @@ func (p *inputsRunner) Start() {

// Callback:
callback := func(esClient *elasticsearch.Client) error {
return p.moduleRegistry.LoadPipelines(esClient, p.updatePipelines)
return p.moduleRegistry.LoadPipelines(esClient, p.overwritePipelines)
}
elasticsearch.RegisterConnectCallback(callback)
}
Expand Down
8 changes: 4 additions & 4 deletions filebeat/fileset/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type PipelineLoader interface {
}

// LoadPipelines loads the pipelines for each configured fileset.
func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bool) error {
func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, overwrite bool) error {
for module, filesets := range reg.registry {
for name, fileset := range filesets {
// check that all the required Ingest Node plugins are available
Expand All @@ -37,7 +37,7 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bo
if err != nil {
return fmt.Errorf("Error getting pipeline for fileset %s/%s: %v", module, name, err)
}
err = loadPipeline(esClient, pipelineID, content, forceUpdate)
err = loadPipeline(esClient, pipelineID, content, overwrite)
if err != nil {
return fmt.Errorf("Error loading pipeline for fileset %s/%s: %v", module, name, err)
}
Expand All @@ -46,9 +46,9 @@ func (reg *ModuleRegistry) LoadPipelines(esClient PipelineLoader, forceUpdate bo
return nil
}

func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, forceUpdate bool) error {
func loadPipeline(esClient PipelineLoader, pipelineID string, content map[string]interface{}, overwrite bool) error {
path := "/_ingest/pipeline/" + pipelineID
if !forceUpdate {
if !overwrite {
status, _, _ := esClient.Request("GET", path, "", nil, nil)
if status == 200 {
logp.Debug("modules", "Pipeline %s already loaded", pipelineID)
Expand Down
6 changes: 3 additions & 3 deletions libbeat/beat/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type Beat struct {
SetupMLCallback SetupMLCallback // setup callback for ML job configs
InSetupCmd bool // this is set to true when the `setup` command is called

UpdatePipelinesCallback UpdatePipelinesCallback // ingest pipeline loader callback
OverwritePipelinesCallback OverwritePipelinesCallback // ingest pipeline loader callback
// XXX: remove Config from public interface.
// It's currently used by filebeat modules to setup the Ingest Node
// pipeline and ML jobs.
Expand All @@ -57,6 +57,6 @@ type BeatConfig struct {
// for the enabled modules.
type SetupMLCallback func(*Beat, *common.Config) error

// UpdatePipelinesCallback can be used by the Beat to register Ingest pipeline loader
// OverwritePipelinesCallback can be used by the Beat to register Ingest pipeline loader
// for the enabled modules.
type UpdatePipelinesCallback func(*common.Config) error
type OverwritePipelinesCallback func(*common.Config) error
4 changes: 2 additions & 2 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,9 +404,9 @@ func (b *Beat) Setup(bt beat.Creator, template, dashboards, machineLearning, pip
fmt.Println("Loaded machine learning job configurations")
}

if pipelines && b.UpdatePipelinesCallback != nil {
if pipelines && b.OverwritePipelinesCallback != nil {
esConfig := b.Config.Output.Config()
err = b.UpdatePipelinesCallback(esConfig)
err = b.OverwritePipelinesCallback(esConfig)
if err != nil {
return err
}
Expand Down

0 comments on commit 6a151b5

Please sign in to comment.