Skip to content

Commit

Permalink
Moved all internal collector pipeline settings in monitoring. path to…
Browse files Browse the repository at this point in the history
… be used by direct shipping, deprecating the old xpack.monitoring
  • Loading branch information
andsel committed Feb 11, 2020
1 parent e596ae8 commit 5084eb1
Show file tree
Hide file tree
Showing 9 changed files with 114 additions and 59 deletions.
18 changes: 17 additions & 1 deletion config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,23 @@
# Default is false
# pipeline.separate_logs: false
#
# Enables internal collector to shipping directly to Elasticsearch
#monitoring.enabled: false
#monitoring.elasticsearch.username: logstash_system
#monitoring.elasticsearch.password: password
#monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#monitoring.elasticsearch.cloud_auth: logstash_system:password
#monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#monitoring.elasticsearch.ssl.truststore.path: path/to/file
#monitoring.elasticsearch.ssl.truststore.password: password
#monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#monitoring.elasticsearch.ssl.keystore.password: password
#monitoring.elasticsearch.ssl.verification_mode: certificate
#monitoring.elasticsearch.sniffing: false
#monitoring.collection.interval: 10s
#monitoring.collection.pipeline.details.enabled: true
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
Expand All @@ -236,7 +253,6 @@
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#xpack.monitoring.collection.write_direct.enabled: true
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
Expand Down
10 changes: 10 additions & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func normalizeSetting(setting string) (string, error) {
"modules",
"path.logs",
"path.plugins",
"monitoring.enabled",
"monitoring.collection.interval",
"monitoring.elasticsearch.hosts",
"monitoring.elasticsearch.username",
"monitoring.elasticsearch.password",
"monitoring.elasticsearch.ssl.certificate_authority",
"monitoring.elasticsearch.ssl.truststore.path",
"monitoring.elasticsearch.ssl.truststore.password",
"monitoring.elasticsearch.ssl.keystore.path",
"monitoring.elasticsearch.ssl.keystore.password",
"xpack.monitoring.enabled",
"xpack.monitoring.collection.interval",
"xpack.monitoring.elasticsearch.hosts",
Expand Down
52 changes: 29 additions & 23 deletions x-pack/lib/helpers/elasticsearch_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,44 @@ def es_options_from_settings_or_modules(feature, settings)
# Populate the Elasticsearch options from LogStashSettings file, based on the feature that is being used.
# @return Hash
def es_options_from_settings(feature, settings)
prefix = if feature == "monitoring" &&
LogStash::MonitoringExtension.use_direct_shipping?(settings)
""
else
"xpack."
end
opts = {}

if cloud_id = settings.get("xpack.#{feature}.elasticsearch.cloud_id")
if cloud_id = settings.get("#{prefix}#{feature}.elasticsearch.cloud_id")
opts['cloud_id'] = cloud_id
check_cloud_id_configuration!(feature, settings)
check_cloud_id_configuration!(feature, settings, prefix)
else
opts['hosts'] = settings.get("xpack.#{feature}.elasticsearch.hosts")
opts['hosts'] = settings.get("#{prefix}#{feature}.elasticsearch.hosts")
end
if cloud_auth = settings.get("xpack.#{feature}.elasticsearch.cloud_auth")
if cloud_auth = settings.get("#{prefix}#{feature}.elasticsearch.cloud_auth")
opts['cloud_auth'] = cloud_auth
check_cloud_auth_configuration!(feature, settings)
check_cloud_auth_configuration!(feature, settings, prefix)
else
opts['user'] = settings.get("xpack.#{feature}.elasticsearch.username")
opts['password'] = settings.get("xpack.#{feature}.elasticsearch.password")
opts['user'] = settings.get("#{prefix}#{feature}.elasticsearch.username")
opts['password'] = settings.get("#{prefix}#{feature}.elasticsearch.password")
end
opts['sniffing'] = settings.get("xpack.#{feature}.elasticsearch.sniffing")
opts['ssl_certificate_verification'] = settings.get("xpack.#{feature}.elasticsearch.ssl.verification_mode") == 'certificate'
opts['sniffing'] = settings.get("#{prefix}#{feature}.elasticsearch.sniffing")
opts['ssl_certificate_verification'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.verification_mode") == 'certificate'

if cacert = settings.get("xpack.#{feature}.elasticsearch.ssl.certificate_authority")
if cacert = settings.get("#{prefix}#{feature}.elasticsearch.ssl.certificate_authority")
opts['cacert'] = cacert
opts['ssl'] = true
end

if truststore = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.path")
if truststore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.path")
opts['truststore'] = truststore
opts['truststore_password'] = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.password")
opts['truststore_password'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.password")
opts['ssl'] = true
end

if keystore = settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.path")
if keystore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.path")
opts['keystore'] = keystore
opts['keystore_password']= settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.password")
opts['keystore_password']= settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.password")
opts['ssl'] = true
end
opts
Expand Down Expand Up @@ -135,19 +141,19 @@ def extract_module_settings(settings)

private

def check_cloud_id_configuration!(feature, settings)
return if !settings.set?("xpack.#{feature}.elasticsearch.hosts")
def check_cloud_id_configuration!(feature, settings, prefix)
return if !settings.set?("#{prefix}#{feature}.elasticsearch.hosts")

raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_id\" and " +
"\"xpack.#{feature}.elasticsearch.hosts\" specified, please only use one of those.")
raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_id\" and " +
"\"#{prefix}#{feature}.elasticsearch.hosts\" specified, please only use one of those.")
end

def check_cloud_auth_configuration!(feature, settings)
return if !settings.set?("xpack.#{feature}.elasticsearch.username") &&
!settings.set?("xpack.#{feature}.elasticsearch.password")
def check_cloud_auth_configuration!(feature, settings, prefix)
return if !settings.set?("#{prefix}#{feature}.elasticsearch.username") &&
!settings.set?("#{prefix}#{feature}.elasticsearch.password")

raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_auth\" and " +
"\"xpack.#{feature}.elasticsearch.username\"/\"xpack.#{feature}.elasticsearch.password\" " +
raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_auth\" and " +
"\"#{prefix}#{feature}.elasticsearch.username\"/\"#{prefix}#{feature}.elasticsearch.password\" " +
"specified, please only use one of those.")
end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ def initialize(pipeline)

pipeline_doc = {"pipeline" => pipeline_data(pipeline)}

if (LogStash::SETTINGS.get_value("xpack.monitoring.collection.write_direct.enabled"))
if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS))
event_body = {
"type" => "logstash_state",
"logstash_state" => pipeline_doc
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def make(agent, extended_performance_collection=true)
"queue" => format_queue_stats(agent, @metric_store),
}

if (LogStash::SETTINGS.get_value("xpack.monitoring.collection.write_direct.enabled"))
if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS))
event_body = {
"type" => "logstash_stats",
"logstash_stats" => metrics_doc
Expand Down
75 changes: 49 additions & 26 deletions x-pack/lib/monitoring/monitoring.rb
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def get_binding
end

def monitoring_endpoint
if LogStash::SETTINGS.get_value("xpack.monitoring.collection.write_direct.enabled")
if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS)
"/_bulk/monitoring"
else
"/_monitoring/bulk"
Expand Down Expand Up @@ -126,6 +126,7 @@ def after_agent(runner)
# To help keep passivity, assume that if "xpack.monitoring.elasticsearch.hosts" has been set that monitoring should be enabled.
# return true if xpack.monitoring.enabled=true (explicitly) or xpack.monitoring.elasticsearch.hosts is configured
def monitoring_enabled?(settings)
return settings.get_value("monitoring.enabled") if settings.set?("monitoring.enabled")
return settings.get_value("xpack.monitoring.enabled") if settings.set?("xpack.monitoring.enabled")

if settings.set?("xpack.monitoring.elasticsearch.hosts") || settings.set?("xpack.monitoring.elasticsearch.cloud_id")
Expand Down Expand Up @@ -159,22 +160,38 @@ def setup_metrics_pipeline
end

def generate_pipeline_config(settings)
collection_interval = settings.get("xpack.monitoring.collection.interval")
collection_timeout_interval = settings.get("xpack.monitoring.collection.timeout_interval")
extended_performance_collection = settings.get("xpack.monitoring.collection.pipeline.details.enabled")
config_collection = settings.get("xpack.monitoring.collection.config.enabled")
if MonitoringExtension.use_direct_shipping?(settings)
opt = retrieve_collection_settings(settings)
else
opt = retrieve_collection_settings(settings, "xpack.")
end
es_settings = es_options_from_settings_or_modules('monitoring', settings)
data = TemplateData.new(LogStash::SETTINGS.get("node.uuid"), API_VERSION,
es_settings,
collection_interval, collection_timeout_interval,
extended_performance_collection, config_collection)
opt[:collection_interval], opt[:collection_timeout_interval],
opt[:extended_performance_collection], opt[:config_collection])

template_path = ::File.join(::File.dirname(__FILE__), "..", "template.cfg.erb")
template = ::File.read(template_path)
ERB.new(template, 3).result(data.get_binding)
end

private
def retrieve_collection_settings(settings, prefix="")
opt = {}
opt[:collection_interval] = settings.get("#{prefix}monitoring.collection.interval")
opt[:collection_timeout_interval] = settings.get("#{prefix}monitoring.collection.timeout_interval")
opt[:extended_performance_collection] = settings.get("#{prefix}monitoring.collection.pipeline.details.enabled")
opt[:config_collection] = settings.get("#{prefix}monitoring.collection.config.enabled")
opt
end
end

def self.use_direct_shipping?(settings)
settings.get("monitoring.enabled")
end

public
def initialize
# nothing to do here
end
Expand All @@ -186,31 +203,37 @@ def register_hooks(hooks)

def additionals_settings(settings)
logger.trace("registering additionals_settings")

settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.enabled", false))
settings.register(LogStash::Setting::ArrayCoercible.new("xpack.monitoring.elasticsearch.hosts", String, [ "http://localhost:9200" ] ))
settings.register(LogStash::Setting::TimeValue.new("xpack.monitoring.collection.interval", "10s"))
settings.register(LogStash::Setting::TimeValue.new("xpack.monitoring.collection.timeout_interval", "10m"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.username", "logstash_system"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.password"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.cloud_id"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.cloud_auth"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.certificate_authority"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.truststore.path"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.truststore.password"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.keystore.path"))
settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.keystore.password"))
settings.register(LogStash::Setting::String.new("xpack.monitoring.elasticsearch.ssl.verification_mode", "certificate", true, ["none", "certificate"]))
settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.elasticsearch.sniffing", false))
settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.collection.pipeline.details.enabled", true))
settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.collection.config.enabled", true))
# Deprecated settings from 7.7
register_monitoring_settings(settings, "xpack.")
# Direct shipping settings
register_monitoring_settings(settings)

settings.register(LogStash::Setting::String.new("node.uuid", ""))
settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.collection.write_direct.enabled", false))
rescue => e
logger.error e.message
logger.error e.backtrace.to_s
raise e
end

private
def register_monitoring_settings(settings, prefix = "")
settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.enabled", false))
settings.register(LogStash::Setting::ArrayCoercible.new("#{prefix}monitoring.elasticsearch.hosts", String, [ "http://localhost:9200" ] ))
settings.register(LogStash::Setting::TimeValue.new("#{prefix}monitoring.collection.interval", "10s"))
settings.register(LogStash::Setting::TimeValue.new("#{prefix}monitoring.collection.timeout_interval", "10m"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.username", "logstash_system"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.password"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.cloud_id"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.cloud_auth"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.certificate_authority"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.truststore.path"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.truststore.password"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.keystore.path"))
settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.keystore.password"))
settings.register(LogStash::Setting::String.new("#{prefix}monitoring.elasticsearch.ssl.verification_mode", "certificate", true, ["none", "certificate"]))
settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.elasticsearch.sniffing", false))
settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.collection.pipeline.details.enabled", true))
settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.collection.config.enabled", true))
end
end
end
2 changes: 1 addition & 1 deletion x-pack/lib/monitoring/outputs/elasticsearch_monitoring.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ class ElasticSearchMonitoring < LogStash::Outputs::ElasticSearch
config :document_type, :validate => :string

def use_event_type?(client)
!LogStash::SETTINGS.get_value("xpack.monitoring.collection.write_direct.enabled")
!LogStash::SETTINGS.get_value("monitoring.enabled")
end
end
end; end
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

describe LogStash::Inputs::Metrics::StateEventFactory do
after :each do
LogStash::SETTINGS.set_value("xpack.monitoring.collection.write_direct.enabled", false)
LogStash::SETTINGS.set_value("monitoring.enabled", false)
end

let(:schemas_path) { File.join(File.dirname(__FILE__), "..", "..", "..", "..", "spec", "monitoring", "schemas") }
Expand All @@ -25,7 +25,7 @@
let(:schema_file) { File.join(schemas_path, "states_document_new_schema.json") }

it "should create a valid new event shape" do
LogStash::SETTINGS.set_value("xpack.monitoring.collection.write_direct.enabled", true)
LogStash::SETTINGS.set_value("monitoring.enabled", true)

state_evt = sut.make
json = JSON.parse(state_evt.to_json)
Expand All @@ -40,7 +40,7 @@
let(:schema_file) { File.join(schemas_path, "states_document_schema.json") }

it "should create a valid old event shape" do
LogStash::SETTINGS.set_value("xpack.monitoring.collection.write_direct.enabled", false)
LogStash::SETTINGS.set_value("monitoring.enabled", false)

state_evt = sut.make
expect(JSON::Validator.fully_validate(schema_file, state_evt.to_json)).to be_empty
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
after :each do
agent.shutdown
agent_task.wait
LogStash::SETTINGS.set_value("xpack.monitoring.collection.write_direct.enabled", false)
LogStash::SETTINGS.set_value("monitoring.enabled", false)
end

context "new model" do
Expand All @@ -47,7 +47,7 @@
it "should be valid" do
global_stats = {"uuid" => "00001" }
sut = described_class.new(global_stats, collector.snapshot_metric)
LogStash::SETTINGS.set_value("xpack.monitoring.collection.write_direct.enabled", true)
LogStash::SETTINGS.set_value("monitoring.enabled", true)

monitoring_evt = sut.make(agent, true)
json = JSON.parse(monitoring_evt.to_json)
Expand All @@ -62,7 +62,7 @@
it "should be valid" do
global_stats = {"uuid" => "00001" }
sut = described_class.new(global_stats, collector.snapshot_metric)
LogStash::SETTINGS.set_value("xpack.monitoring.collection.write_direct.enabled", false)
LogStash::SETTINGS.set_value("monitoring.enabled", false)

monitoring_evt = sut.make(agent, true)
json = JSON.parse(monitoring_evt.to_json)
Expand Down

0 comments on commit 5084eb1

Please sign in to comment.