Skip to content

Commit

Permalink
Adaptations to internal collector to send data directly to monitoring…
Browse files Browse the repository at this point in the history
… cluster Close 11573 Added check on HTTP server before asking for monitoring data in unit test Fixes #11541

Fixes #11641
  • Loading branch information
andsel committed Mar 12, 2020
1 parent 8c11a95 commit d901616
Show file tree
Hide file tree
Showing 20 changed files with 883 additions and 89 deletions.
40 changes: 21 additions & 19 deletions config/logstash.yml
Original file line number Diff line number Diff line change
Expand Up @@ -225,26 +225,28 @@ pipeline.ordered: auto
# Default is false
# pipeline.separate_logs: false
#
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Monitoring
# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html
#xpack.monitoring.enabled: false
#xpack.monitoring.elasticsearch.username: logstash_system
#xpack.monitoring.elasticsearch.password: password
#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# ------------ Monitoring Settings (not applicable for OSS build)--------------
#
# https://www.elastic.co/guide/en/logstash/current/monitoring-internal-collection.html
# Enable monitoring via internal collector to an Elasticsearch monitoring cluster
#monitoring.enabled: false
#monitoring.cluster_uuid: elasticsearch_cluster_uuid
#monitoring.elasticsearch.username: logstash_system
#monitoring.elasticsearch.password: password
#monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"]
# an alternative to hosts + username/password settings is to use cloud_id/cloud_auth
#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password
#xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file
#xpack.monitoring.elasticsearch.ssl.truststore.password: password
#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#xpack.monitoring.elasticsearch.ssl.keystore.password: password
#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate
#xpack.monitoring.elasticsearch.sniffing: false
#xpack.monitoring.collection.interval: 10s
#xpack.monitoring.collection.pipeline.details.enabled: true
#monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx
#monitoring.elasticsearch.cloud_auth: logstash_system:password
#monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ]
#monitoring.elasticsearch.ssl.truststore.path: path/to/file
#monitoring.elasticsearch.ssl.truststore.password: password
#monitoring.elasticsearch.ssl.keystore.path: /path/to/file
#monitoring.elasticsearch.ssl.keystore.password: password
#monitoring.elasticsearch.ssl.verification_mode: certificate
#monitoring.elasticsearch.sniffing: false
#monitoring.collection.interval: 10s
#monitoring.collection.pipeline.details.enabled: true
# ------------ X-Pack Settings (not applicable for OSS build)--------------
#
# X-Pack Management
# https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html
Expand Down
10 changes: 10 additions & 0 deletions docker/data/logstash/env2yaml/env2yaml.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ func normalizeSetting(setting string) (string, error) {
"modules",
"path.logs",
"path.plugins",
"monitoring.enabled",
"monitoring.collection.interval",
"monitoring.elasticsearch.hosts",
"monitoring.elasticsearch.username",
"monitoring.elasticsearch.password",
"monitoring.elasticsearch.ssl.certificate_authority",
"monitoring.elasticsearch.ssl.truststore.path",
"monitoring.elasticsearch.ssl.truststore.password",
"monitoring.elasticsearch.ssl.keystore.path",
"monitoring.elasticsearch.ssl.keystore.password",
"xpack.monitoring.enabled",
"xpack.monitoring.collection.interval",
"xpack.monitoring.elasticsearch.hosts",
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/api/commands/stats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ def report(stats, extended_stats=nil, opts={})
# @param vertex [Hash{String=>Object}]
# @return [Hash{String=>Object}]
def decorate_vertex(vertex)
plugin_id = vertex["id"]&.to_s
plugin_id = vertex[:id]&.to_s
return vertex unless plugin_id && LogStash::PluginMetadata.exists?(plugin_id)

plugin_metadata = LogStash::PluginMetadata.for_plugin(plugin_id)
Expand Down
2 changes: 1 addition & 1 deletion logstash-core/lib/logstash/instrument/metric_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def key_paths(path)

# This method take an array of keys and recursively search the metric store structure
# and return a filtered hash of the structure. This method also take into consideration
# getting two different branchs.
# getting two different branches.
#
#
# If one part of the `key_paths` contains a filter key with the following format.
Expand Down
52 changes: 29 additions & 23 deletions x-pack/lib/helpers/elasticsearch_options.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,38 +26,44 @@ def es_options_from_settings_or_modules(feature, settings)
# Populate the Elasticsearch options from LogStashSettings file, based on the feature that is being used.
# @return Hash
def es_options_from_settings(feature, settings)
prefix = if feature == "monitoring" &&
LogStash::MonitoringExtension.use_direct_shipping?(settings)
""
else
"xpack."
end
opts = {}

if cloud_id = settings.get("xpack.#{feature}.elasticsearch.cloud_id")
if cloud_id = settings.get("#{prefix}#{feature}.elasticsearch.cloud_id")
opts['cloud_id'] = cloud_id
check_cloud_id_configuration!(feature, settings)
check_cloud_id_configuration!(feature, settings, prefix)
else
opts['hosts'] = settings.get("xpack.#{feature}.elasticsearch.hosts")
opts['hosts'] = settings.get("#{prefix}#{feature}.elasticsearch.hosts")
end
if cloud_auth = settings.get("xpack.#{feature}.elasticsearch.cloud_auth")
if cloud_auth = settings.get("#{prefix}#{feature}.elasticsearch.cloud_auth")
opts['cloud_auth'] = cloud_auth
check_cloud_auth_configuration!(feature, settings)
check_cloud_auth_configuration!(feature, settings, prefix)
else
opts['user'] = settings.get("xpack.#{feature}.elasticsearch.username")
opts['password'] = settings.get("xpack.#{feature}.elasticsearch.password")
opts['user'] = settings.get("#{prefix}#{feature}.elasticsearch.username")
opts['password'] = settings.get("#{prefix}#{feature}.elasticsearch.password")
end
opts['sniffing'] = settings.get("xpack.#{feature}.elasticsearch.sniffing")
opts['ssl_certificate_verification'] = settings.get("xpack.#{feature}.elasticsearch.ssl.verification_mode") == 'certificate'
opts['sniffing'] = settings.get("#{prefix}#{feature}.elasticsearch.sniffing")
opts['ssl_certificate_verification'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.verification_mode") == 'certificate'

if cacert = settings.get("xpack.#{feature}.elasticsearch.ssl.certificate_authority")
if cacert = settings.get("#{prefix}#{feature}.elasticsearch.ssl.certificate_authority")
opts['cacert'] = cacert
opts['ssl'] = true
end

if truststore = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.path")
if truststore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.path")
opts['truststore'] = truststore
opts['truststore_password'] = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.password")
opts['truststore_password'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.password")
opts['ssl'] = true
end

if keystore = settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.path")
if keystore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.path")
opts['keystore'] = keystore
opts['keystore_password']= settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.password")
opts['keystore_password']= settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.password")
opts['ssl'] = true
end
opts
Expand Down Expand Up @@ -135,19 +141,19 @@ def extract_module_settings(settings)

private

def check_cloud_id_configuration!(feature, settings)
return if !settings.set?("xpack.#{feature}.elasticsearch.hosts")
def check_cloud_id_configuration!(feature, settings, prefix)
return if !settings.set?("#{prefix}#{feature}.elasticsearch.hosts")

raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_id\" and " +
"\"xpack.#{feature}.elasticsearch.hosts\" specified, please only use one of those.")
raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_id\" and " +
"\"#{prefix}#{feature}.elasticsearch.hosts\" specified, please only use one of those.")
end

def check_cloud_auth_configuration!(feature, settings)
return if !settings.set?("xpack.#{feature}.elasticsearch.username") &&
!settings.set?("xpack.#{feature}.elasticsearch.password")
def check_cloud_auth_configuration!(feature, settings, prefix)
return if !settings.set?("#{prefix}#{feature}.elasticsearch.username") &&
!settings.set?("#{prefix}#{feature}.elasticsearch.password")

raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_auth\" and " +
"\"xpack.#{feature}.elasticsearch.username\"/\"xpack.#{feature}.elasticsearch.password\" " +
raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_auth\" and " +
"\"#{prefix}#{feature}.elasticsearch.username\"/\"#{prefix}#{feature}.elasticsearch.password\" " +
"specified, please only use one of those.")
end

Expand Down
47 changes: 44 additions & 3 deletions x-pack/lib/monitoring/inputs/metrics.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class Metrics < LogStash::Inputs::Base
def register
@global_stats = fetch_global_stats
@agent = nil
@cluster_uuids = nil
@settings = LogStash::SETTINGS.clone
@last_updated_pipeline_hashes = []
@agent = execution_context.agent if execution_context
Expand Down Expand Up @@ -105,15 +106,28 @@ def stop
end

def update(snapshot)
if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS)
@cluster_uuids ||= extract_cluster_uuids(snapshot.metric_store)
end
update_stats(snapshot)
update_states
end

def update_stats(snapshot)
@logger.debug("Metrics input: received a new snapshot", :created_at => snapshot.created_at, :snapshot => snapshot) if @logger.debug?
if @cluster_uuids.nil? || @cluster_uuids.empty?
fire_stats_event(snapshot, nil)
else
@cluster_uuids.each do |cluster_uuid|
fire_stats_event(snapshot, cluster_uuid)
end
end
end

private
def fire_stats_event(snapshot, cluster_uuid)
begin
event = StatsEventFactory.new(@global_stats, snapshot).make(agent, @extended_performance_collection)
event = StatsEventFactory.new(@global_stats, snapshot, cluster_uuid).make(agent, @extended_performance_collection, @collection_interval)
rescue => e
if @logger.debug?
@logger.error("Failed to create monitoring event", :message => e.message, :error => e.class.name, :backtrace => e.backtrace)
Expand All @@ -132,6 +146,7 @@ def update_stats(snapshot)
emit_event(event)
end

public
def update_states
return unless @agent

Expand All @@ -153,12 +168,19 @@ def update_states
def update_pipeline_state(pipeline)
return if pipeline.system?
if @config_collection
emit_event(state_event_for(pipeline))
events = state_event_for(pipeline)
events.each { |event| emit_event(event) }
end
end

def state_event_for(pipeline)
StateEventFactory.new(pipeline).make()
if @cluster_uuids.nil? || @cluster_uuids.empty?
[StateEventFactory.new(pipeline, nil, @collection_interval).make()]
else
@cluster_uuids.map do |cluster_uuid|
StateEventFactory.new(pipeline, cluster_uuid, @collection_interval).make()
end
end
end

def emit_event(event)
Expand Down Expand Up @@ -187,5 +209,24 @@ def fetch_global_stats
}
}
end

def extract_cluster_uuids(stats)
result = stats.extract_metrics([:stats, :pipelines, :main, :config], :cluster_uuids)
if result && !result[:cluster_uuids].empty?
cluster_uuids = result[:cluster_uuids]
@logger.info("Found cluster_uuids from elasticsearch output plugins", :cluster_uuids => cluster_uuids)
if LogStash::SETTINGS.set?("monitoring.cluster_uuid")
@logger.warn("Found monitoring.cluster_uuid setting configured in logstash.yml while using the ones discovered from elasticsearch output plugins, ignoring setting monitoring.cluster_uuid")
end
cluster_uuids
else
if LogStash::SETTINGS.set?("monitoring.cluster_uuid")
[LogStash::SETTINGS.get("monitoring.cluster_uuid")]
else
@logger.warn("Can't find any cluster_uuid from elasticsearch output plugins nor monitoring.cluster_uuid in logstash.yml is defined")
[""]
end
end
end
end
end; end
28 changes: 21 additions & 7 deletions x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,30 @@
module LogStash; module Inputs; class Metrics;
class StateEventFactory
require "logstash/config/lir_serializer"
def initialize(pipeline)

def initialize(pipeline, cluster_uuid, collection_interval = 10)
raise ArgumentError, "No pipeline passed in!" unless pipeline.is_a?(LogStash::Pipeline) || pipeline.is_a?(LogStash::JavaPipeline)
@event = LogStash::Event.new

@event.set("[@metadata]", {
"document_type" => "logstash_state",
"timestamp" => Time.now
})
pipeline_doc = {"pipeline" => pipeline_data(pipeline)}

if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS))
event_body = {
"type" => "logstash_state",
"logstash_state" => pipeline_doc,
"cluster_uuid" => cluster_uuid,
"interval_ms" => collection_interval * 1000,
"timestamp" => DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.%L%z')
}
else
event_body = pipeline_doc
end

@event.set("[pipeline]", pipeline_data(pipeline))
@event = LogStash::Event.new(
{"@metadata" => {
"document_type" => "logstash_state",
"timestamp" => Time.now
}}.merge(event_body)
)

@event.remove("@timestamp")
@event.remove("@version")
Expand Down
28 changes: 22 additions & 6 deletions x-pack/lib/monitoring/inputs/metrics/stats_event_factory.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,15 @@ class StatsEventFactory
include ::LogStash::Util::Loggable
require 'logstash/config/pipelines_info'

def initialize(global_stats, snapshot)
def initialize(global_stats, snapshot, cluster_uuid)
@global_stats = global_stats
@snapshot = snapshot
@metric_store = @snapshot.metric_store
@cluster_uuid = cluster_uuid
end

def make(agent, extended_performance_collection=true)
LogStash::Event.new(
def make(agent, extended_performance_collection=true, collection_interval=10)
metrics_doc = {
"timestamp" => @snapshot.created_at,
"logstash" => fetch_node_stats(agent, @metric_store),
"events" => format_global_event_count(@metric_store),
Expand All @@ -24,10 +25,25 @@ def make(agent, extended_performance_collection=true)
"jvm" => format_jvm_stats(@metric_store),
"os" => format_os_stats(@metric_store),
"queue" => format_queue_stats(agent, @metric_store),
"@metadata" => {
}

if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS))
event_body = {
"type" => "logstash_stats",
"logstash_stats" => metrics_doc,
"cluster_uuid" => @cluster_uuid,
"interval_ms" => collection_interval * 1000,
"timestamp" => DateTime.now.strftime('%Y-%m-%dT%H:%M:%S.%L%z')
}
else
event_body = metrics_doc
end

LogStash::Event.new(
{"@metadata" => {
"document_type" => "logstash_stats",
"timestamp" => Time.now
}
}}.merge(event_body)
)
end

Expand All @@ -48,7 +64,7 @@ def format_jvm_stats(stats)
result["mem"] = {
"heap_used_in_bytes" => heap_stats[:used_in_bytes],
"heap_used_percent" => heap_stats[:used_percent],
"heap_max_in_bytes" => heap_stats[:max_in_bytes],
"heap_max_in_bytes" => heap_stats[:max_in_bytes],
}

result["gc"] = {
Expand Down
Loading

0 comments on commit d901616

Please sign in to comment.