From 61d1634d3ac7893baa16b3c1b6b8f91b395b5139 Mon Sep 17 00:00:00 2001 From: andsel Date: Mon, 27 Jan 2020 12:48:31 +0100 Subject: [PATCH] Adaptations to internal collector to send data directly to monitoring cluster Close 11573 --- config/logstash.yml | 40 ++-- docker/data/logstash/env2yaml/env2yaml.go | 10 + .../lib/logstash/api/commands/stats.rb | 2 +- .../lib/logstash/instrument/metric_store.rb | 2 +- x-pack/lib/helpers/elasticsearch_options.rb | 52 ++--- x-pack/lib/monitoring/inputs/metrics.rb | 47 ++++- .../inputs/metrics/state_event_factory.rb | 28 ++- .../inputs/metrics/stats_event_factory.rb | 28 ++- x-pack/lib/monitoring/monitoring.rb | 101 +++++++--- .../outputs/elasticsearch_monitoring.rb | 2 +- x-pack/lib/template.cfg.erb | 4 +- .../monitoring/direct_shipping_spec.rb | 37 ++++ .../es_documents_structure_validation_spec.rb | 123 ++++++++++++ x-pack/qa/integration/support/helpers.rb | 5 +- .../metrics/state_event_factory_spec.rb | 79 ++++++++ .../metrics/stats_event_factory_spec.rb | 72 +++++++ x-pack/spec/monitoring/inputs/metrics_spec.rb | 2 +- .../monitoring/pipeline_register_hook_spec.rb | 49 +++++ .../monitoring_document_new_schema.json | 186 ++++++++++++++++++ .../schemas/states_document_new_schema.json | 94 +++++++++ 20 files changed, 874 insertions(+), 89 deletions(-) create mode 100644 x-pack/qa/integration/monitoring/direct_shipping_spec.rb create mode 100644 x-pack/qa/integration/monitoring/es_documents_structure_validation_spec.rb create mode 100644 x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb create mode 100644 x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb create mode 100644 x-pack/spec/monitoring/pipeline_register_hook_spec.rb create mode 100644 x-pack/spec/monitoring/schemas/monitoring_document_new_schema.json create mode 100644 x-pack/spec/monitoring/schemas/states_document_new_schema.json diff --git a/config/logstash.yml b/config/logstash.yml index 3ec8577a029..4a713d32d85 100644 --- a/config/logstash.yml +++ b/config/logstash.yml @@ -225,26 +225,28 @@ pipeline.ordered: auto # Default is false # pipeline.separate_logs: false # -# ------------ X-Pack Settings (not applicable for OSS build)-------------- -# -# X-Pack Monitoring -# https://www.elastic.co/guide/en/logstash/current/monitoring-logstash.html -#xpack.monitoring.enabled: false -#xpack.monitoring.elasticsearch.username: logstash_system -#xpack.monitoring.elasticsearch.password: password -#xpack.monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] +# ------------ Monitoring Settings (not applicable for OSS build)-------------- +# +# https://www.elastic.co/guide/en/logstash/current/monitoring-internal-collection.html +# Enable monitoring via internal collector to an Elasticsearch monitoring cluster +#monitoring.enabled: false +#monitoring.cluster_uuid: elasticsearch_cluster_uuid +#monitoring.elasticsearch.username: logstash_system +#monitoring.elasticsearch.password: password +#monitoring.elasticsearch.hosts: ["https://es1:9200", "https://es2:9200"] # an alternative to hosts + username/password settings is to use cloud_id/cloud_auth -#xpack.monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx -#xpack.monitoring.elasticsearch.cloud_auth: logstash_system:password -#xpack.monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] -#xpack.monitoring.elasticsearch.ssl.truststore.path: path/to/file -#xpack.monitoring.elasticsearch.ssl.truststore.password: password -#xpack.monitoring.elasticsearch.ssl.keystore.path: /path/to/file -#xpack.monitoring.elasticsearch.ssl.keystore.password: password -#xpack.monitoring.elasticsearch.ssl.verification_mode: certificate -#xpack.monitoring.elasticsearch.sniffing: false -#xpack.monitoring.collection.interval: 10s -#xpack.monitoring.collection.pipeline.details.enabled: true +#monitoring.elasticsearch.cloud_id: monitoring_cluster_id:xxxxxxxxxx +#monitoring.elasticsearch.cloud_auth: logstash_system:password +#monitoring.elasticsearch.ssl.certificate_authority: [ "/path/to/ca.crt" ] +#monitoring.elasticsearch.ssl.truststore.path: path/to/file +#monitoring.elasticsearch.ssl.truststore.password: password +#monitoring.elasticsearch.ssl.keystore.path: /path/to/file +#monitoring.elasticsearch.ssl.keystore.password: password +#monitoring.elasticsearch.ssl.verification_mode: certificate +#monitoring.elasticsearch.sniffing: false +#monitoring.collection.interval: 10s +#monitoring.collection.pipeline.details.enabled: true +# ------------ X-Pack Settings (not applicable for OSS build)-------------- # # X-Pack Management # https://www.elastic.co/guide/en/logstash/current/logstash-centralized-pipeline-management.html diff --git a/docker/data/logstash/env2yaml/env2yaml.go b/docker/data/logstash/env2yaml/env2yaml.go index e63390e8cf5..129ad09c054 100644 --- a/docker/data/logstash/env2yaml/env2yaml.go +++ b/docker/data/logstash/env2yaml/env2yaml.go @@ -84,6 +84,16 @@ func normalizeSetting(setting string) (string, error) { "modules", "path.logs", "path.plugins", + "monitoring.enabled", + "monitoring.collection.interval", + "monitoring.elasticsearch.hosts", + "monitoring.elasticsearch.username", + "monitoring.elasticsearch.password", + "monitoring.elasticsearch.ssl.certificate_authority", + "monitoring.elasticsearch.ssl.truststore.path", + "monitoring.elasticsearch.ssl.truststore.password", + "monitoring.elasticsearch.ssl.keystore.path", + "monitoring.elasticsearch.ssl.keystore.password", "xpack.monitoring.enabled", "xpack.monitoring.collection.interval", "xpack.monitoring.elasticsearch.hosts", diff --git a/logstash-core/lib/logstash/api/commands/stats.rb b/logstash-core/lib/logstash/api/commands/stats.rb index a473d5ed034..7a3ba62cf4c 100644 --- a/logstash-core/lib/logstash/api/commands/stats.rb +++ b/logstash-core/lib/logstash/api/commands/stats.rb @@ -169,7 +169,7 @@ def report(stats, extended_stats=nil, opts={}) # @param vertex [Hash{String=>Object}] # @return [Hash{String=>Object}] def decorate_vertex(vertex) - plugin_id = vertex["id"]&.to_s + plugin_id = vertex[:id]&.to_s return vertex unless plugin_id && LogStash::PluginMetadata.exists?(plugin_id) plugin_metadata = LogStash::PluginMetadata.for_plugin(plugin_id) diff --git a/logstash-core/lib/logstash/instrument/metric_store.rb b/logstash-core/lib/logstash/instrument/metric_store.rb index 864f2c01ab7..dbccd04846e 100644 --- a/logstash-core/lib/logstash/instrument/metric_store.rb +++ b/logstash-core/lib/logstash/instrument/metric_store.rb @@ -206,7 +206,7 @@ def key_paths(path) # This method take an array of keys and recursively search the metric store structure # and return a filtered hash of the structure. This method also take into consideration - # getting two different branchs. + # getting two different branches. # # # If one part of the `key_paths` contains a filter key with the following format. diff --git a/x-pack/lib/helpers/elasticsearch_options.rb b/x-pack/lib/helpers/elasticsearch_options.rb index 83fb37959e3..b24b575e52a 100644 --- a/x-pack/lib/helpers/elasticsearch_options.rb +++ b/x-pack/lib/helpers/elasticsearch_options.rb @@ -26,38 +26,44 @@ def es_options_from_settings_or_modules(feature, settings) # Populate the Elasticsearch options from LogStashSettings file, based on the feature that is being used. # @return Hash def es_options_from_settings(feature, settings) + prefix = if feature == "monitoring" && + LogStash::MonitoringExtension.use_direct_shipping?(settings) + "" + else + "xpack." + end opts = {} - if cloud_id = settings.get("xpack.#{feature}.elasticsearch.cloud_id") + if cloud_id = settings.get("#{prefix}#{feature}.elasticsearch.cloud_id") opts['cloud_id'] = cloud_id - check_cloud_id_configuration!(feature, settings) + check_cloud_id_configuration!(feature, settings, prefix) else - opts['hosts'] = settings.get("xpack.#{feature}.elasticsearch.hosts") + opts['hosts'] = settings.get("#{prefix}#{feature}.elasticsearch.hosts") end - if cloud_auth = settings.get("xpack.#{feature}.elasticsearch.cloud_auth") + if cloud_auth = settings.get("#{prefix}#{feature}.elasticsearch.cloud_auth") opts['cloud_auth'] = cloud_auth - check_cloud_auth_configuration!(feature, settings) + check_cloud_auth_configuration!(feature, settings, prefix) else - opts['user'] = settings.get("xpack.#{feature}.elasticsearch.username") - opts['password'] = settings.get("xpack.#{feature}.elasticsearch.password") + opts['user'] = settings.get("#{prefix}#{feature}.elasticsearch.username") + opts['password'] = settings.get("#{prefix}#{feature}.elasticsearch.password") end - opts['sniffing'] = settings.get("xpack.#{feature}.elasticsearch.sniffing") - opts['ssl_certificate_verification'] = settings.get("xpack.#{feature}.elasticsearch.ssl.verification_mode") == 'certificate' + opts['sniffing'] = settings.get("#{prefix}#{feature}.elasticsearch.sniffing") + opts['ssl_certificate_verification'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.verification_mode") == 'certificate' - if cacert = settings.get("xpack.#{feature}.elasticsearch.ssl.certificate_authority") + if cacert = settings.get("#{prefix}#{feature}.elasticsearch.ssl.certificate_authority") opts['cacert'] = cacert opts['ssl'] = true end - if truststore = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.path") + if truststore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.path") opts['truststore'] = truststore - opts['truststore_password'] = settings.get("xpack.#{feature}.elasticsearch.ssl.truststore.password") + opts['truststore_password'] = settings.get("#{prefix}#{feature}.elasticsearch.ssl.truststore.password") opts['ssl'] = true end - if keystore = settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.path") + if keystore = settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.path") opts['keystore'] = keystore - opts['keystore_password']= settings.get("xpack.#{feature}.elasticsearch.ssl.keystore.password") + opts['keystore_password']= settings.get("#{prefix}#{feature}.elasticsearch.ssl.keystore.password") opts['ssl'] = true end opts @@ -135,19 +141,19 @@ def extract_module_settings(settings) private - def check_cloud_id_configuration!(feature, settings) - return if !settings.set?("xpack.#{feature}.elasticsearch.hosts") + def check_cloud_id_configuration!(feature, settings, prefix) + return if !settings.set?("#{prefix}#{feature}.elasticsearch.hosts") - raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_id\" and " + - "\"xpack.#{feature}.elasticsearch.hosts\" specified, please only use one of those.") + raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_id\" and " + + "\"#{prefix}#{feature}.elasticsearch.hosts\" specified, please only use one of those.") end - def check_cloud_auth_configuration!(feature, settings) - return if !settings.set?("xpack.#{feature}.elasticsearch.username") && - !settings.set?("xpack.#{feature}.elasticsearch.password") + def check_cloud_auth_configuration!(feature, settings, prefix) + return if !settings.set?("#{prefix}#{feature}.elasticsearch.username") && + !settings.set?("#{prefix}#{feature}.elasticsearch.password") - raise ArgumentError.new("Both \"xpack.#{feature}.elasticsearch.cloud_auth\" and " + - "\"xpack.#{feature}.elasticsearch.username\"/\"xpack.#{feature}.elasticsearch.password\" " + + raise ArgumentError.new("Both \"#{prefix}#{feature}.elasticsearch.cloud_auth\" and " + + "\"#{prefix}#{feature}.elasticsearch.username\"/\"#{prefix}#{feature}.elasticsearch.password\" " + "specified, please only use one of those.") end diff --git a/x-pack/lib/monitoring/inputs/metrics.rb b/x-pack/lib/monitoring/inputs/metrics.rb index 34c3a09528e..cf5c0b19bbb 100644 --- a/x-pack/lib/monitoring/inputs/metrics.rb +++ b/x-pack/lib/monitoring/inputs/metrics.rb @@ -44,6 +44,7 @@ class Metrics < LogStash::Inputs::Base def register @global_stats = fetch_global_stats @agent = nil + @cluster_uuids = nil @settings = LogStash::SETTINGS.clone @last_updated_pipeline_hashes = [] @agent = execution_context.agent if execution_context @@ -105,15 +106,28 @@ def stop end def update(snapshot) + if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS) + @cluster_uuids ||= extract_cluster_uuids(snapshot.metric_store) + end update_stats(snapshot) update_states end def update_stats(snapshot) @logger.debug("Metrics input: received a new snapshot", :created_at => snapshot.created_at, :snapshot => snapshot) if @logger.debug? + if @cluster_uuids.nil? || @cluster_uuids.empty? + fire_stats_event(snapshot, nil) + else + @cluster_uuids.each do |cluster_uuid| + fire_stats_event(snapshot, cluster_uuid) + end + end + end + private + def fire_stats_event(snapshot, cluster_uuid) begin - event = StatsEventFactory.new(@global_stats, snapshot).make(agent, @extended_performance_collection) + event = StatsEventFactory.new(@global_stats, snapshot, cluster_uuid).make(agent, @extended_performance_collection, @collection_interval) rescue => e if @logger.debug? @logger.error("Failed to create monitoring event", :message => e.message, :error => e.class.name, :backtrace => e.backtrace) @@ -132,6 +146,7 @@ def update_stats(snapshot) emit_event(event) end + public def update_states return unless @agent @@ -153,12 +168,19 @@ def update_states def update_pipeline_state(pipeline) return if pipeline.system? if @config_collection - emit_event(state_event_for(pipeline)) + events = state_event_for(pipeline) + events.each { |event| emit_event(event) } end end def state_event_for(pipeline) - StateEventFactory.new(pipeline).make() + if @cluster_uuids.nil? || @cluster_uuids.empty? + [StateEventFactory.new(pipeline, nil, @collection_interval).make()] + else + @cluster_uuids.map do |cluster_uuid| + StateEventFactory.new(pipeline, cluster_uuid, @collection_interval).make() + end + end end def emit_event(event) @@ -187,5 +209,24 @@ def fetch_global_stats } } end + + def extract_cluster_uuids(stats) + result = stats.extract_metrics([:stats, :pipelines, :main, :config], :cluster_uuids) + if result && !result[:cluster_uuids].empty? + cluster_uuids = result[:cluster_uuids] + @logger.info("Found cluster_uuids from elasticsearch output plugins", :cluster_uuids => cluster_uuids) + if LogStash::SETTINGS.set?("monitoring.cluster_uuid") + @logger.warn("Found monitoring.cluster_uuid setting configured in logstash.yml while using the ones discovered from elasticsearch output plugins, ignoring setting monitoring.cluster_uuid") + end + cluster_uuids + else + if LogStash::SETTINGS.set?("monitoring.cluster_uuid") + [LogStash::SETTINGS.get("monitoring.cluster_uuid")] + else + @logger.warn("Can't find any cluster_uuid from elasticsearch output plugins nor monitoring.cluster_uuid in logstash.yml is defined") + [""] + end + end + end end end; end diff --git a/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb b/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb index 175557b7779..c00c29d8dc0 100644 --- a/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb +++ b/x-pack/lib/monitoring/inputs/metrics/state_event_factory.rb @@ -5,16 +5,30 @@ module LogStash; module Inputs; class Metrics; class StateEventFactory require "logstash/config/lir_serializer" - def initialize(pipeline) + + def initialize(pipeline, cluster_uuid, collection_interval = 10) raise ArgumentError, "No pipeline passed in!" unless pipeline.is_a?(LogStash::Pipeline) || pipeline.is_a?(LogStash::JavaPipeline) - @event = LogStash::Event.new - @event.set("[@metadata]", { - "document_type" => "logstash_state", - "timestamp" => Time.now - }) + pipeline_doc = {"pipeline" => pipeline_data(pipeline)} + + if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS)) + event_body = { + "type" => "logstash_state", + "logstash_state" => pipeline_doc, + "cluster_uuid" => cluster_uuid, + "interval_ms" => collection_interval * 1000, + "timestamp" => DateTime.now.strftime('%Y-%m-%dT%k:%M:%S.%L%z') + } + else + event_body = pipeline_doc + end - @event.set("[pipeline]", pipeline_data(pipeline)) + @event = LogStash::Event.new( + {"@metadata" => { + "document_type" => "logstash_state", + "timestamp" => Time.now + }}.merge(event_body) + ) @event.remove("@timestamp") @event.remove("@version") diff --git a/x-pack/lib/monitoring/inputs/metrics/stats_event_factory.rb b/x-pack/lib/monitoring/inputs/metrics/stats_event_factory.rb index a8d9a24b338..b40341c15c0 100644 --- a/x-pack/lib/monitoring/inputs/metrics/stats_event_factory.rb +++ b/x-pack/lib/monitoring/inputs/metrics/stats_event_factory.rb @@ -7,14 +7,15 @@ class StatsEventFactory include ::LogStash::Util::Loggable require 'logstash/config/pipelines_info' - def initialize(global_stats, snapshot) + def initialize(global_stats, snapshot, cluster_uuid) @global_stats = global_stats @snapshot = snapshot @metric_store = @snapshot.metric_store + @cluster_uuid = cluster_uuid end - def make(agent, extended_performance_collection=true) - LogStash::Event.new( + def make(agent, extended_performance_collection=true, collection_interval=10) + metrics_doc = { "timestamp" => @snapshot.created_at, "logstash" => fetch_node_stats(agent, @metric_store), "events" => format_global_event_count(@metric_store), @@ -24,10 +25,25 @@ def make(agent, extended_performance_collection=true) "jvm" => format_jvm_stats(@metric_store), "os" => format_os_stats(@metric_store), "queue" => format_queue_stats(agent, @metric_store), - "@metadata" => { + } + + if (LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS)) + event_body = { + "type" => "logstash_stats", + "logstash_stats" => metrics_doc, + "cluster_uuid" => @cluster_uuid, + "interval_ms" => collection_interval * 1000, + "timestamp" => DateTime.now.strftime('%Y-%m-%dT%k:%M:%S.%L%z') + } + else + event_body = metrics_doc + end + + LogStash::Event.new( + {"@metadata" => { "document_type" => "logstash_stats", "timestamp" => Time.now - } + }}.merge(event_body) ) end @@ -48,7 +64,7 @@ def format_jvm_stats(stats) result["mem"] = { "heap_used_in_bytes" => heap_stats[:used_in_bytes], "heap_used_percent" => heap_stats[:used_percent], - "heap_max_in_bytes" => heap_stats[:max_in_bytes], + "heap_max_in_bytes" => heap_stats[:max_in_bytes], } result["gc"] = { diff --git a/x-pack/lib/monitoring/monitoring.rb b/x-pack/lib/monitoring/monitoring.rb index 250874afd0c..4852561090b 100644 --- a/x-pack/lib/monitoring/monitoring.rb +++ b/x-pack/lib/monitoring/monitoring.rb @@ -88,6 +88,22 @@ def config_collection? def get_binding binding end + + def monitoring_endpoint + if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS) + "/_bulk/" + else + "/_monitoring/bulk?system_id=logstash&system_api_version=#{system_api_version}&interval=1s" + end + end + + def monitoring_index + if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS) + ".monitoring-logstash-#{system_api_version}-" + Time.now.utc.to_date.strftime("%Y.%m.%d") + else + "" #let the ES xpack's reporter to create it + end + end end class PipelineRegisterHook @@ -124,6 +140,7 @@ def after_agent(runner) # To help keep passivity, assume that if "xpack.monitoring.elasticsearch.hosts" has been set that monitoring should be enabled. # return true if xpack.monitoring.enabled=true (explicitly) or xpack.monitoring.elasticsearch.hosts is configured def monitoring_enabled?(settings) + return settings.get_value("monitoring.enabled") if settings.set?("monitoring.enabled") return settings.get_value("xpack.monitoring.enabled") if settings.set?("xpack.monitoring.enabled") if settings.set?("xpack.monitoring.elasticsearch.hosts") || settings.set?("xpack.monitoring.elasticsearch.cloud_id") @@ -157,22 +174,51 @@ def setup_metrics_pipeline end def generate_pipeline_config(settings) - collection_interval = settings.get("xpack.monitoring.collection.interval") - collection_timeout_interval = settings.get("xpack.monitoring.collection.timeout_interval") - extended_performance_collection = settings.get("xpack.monitoring.collection.pipeline.details.enabled") - config_collection = settings.get("xpack.monitoring.collection.config.enabled") + if settings.set?("xpack.monitoring.enabled") && settings.set?("monitoring.enabled") + raise ArgumentError.new("\"xpack.monitoring.enabled\" is configured while also \"monitoring.enabled\"") + end + + if any_set?(settings, /^xpack.monitoring/) && any_set?(settings, /^monitoring./) + raise ArgumentError.new("\"xpack.monitoring.*\" settings can't be configured while using \"monitoring.*\"") + end + + if MonitoringExtension.use_direct_shipping?(settings) + opt = retrieve_collection_settings(settings) + else + opt = retrieve_collection_settings(settings, "xpack.") + deprecation_logger.deprecated("xpack.monitoring.* settings are deprecated use the new monitoring.*. Please see https://www.elastic.co/guide/en/logstash/current/monitoring-internal-collection.html") + end es_settings = es_options_from_settings_or_modules('monitoring', settings) data = TemplateData.new(LogStash::SETTINGS.get("node.uuid"), API_VERSION, es_settings, - collection_interval, collection_timeout_interval, - extended_performance_collection, config_collection) + opt[:collection_interval], opt[:collection_timeout_interval], + opt[:extended_performance_collection], opt[:config_collection]) template_path = ::File.join(::File.dirname(__FILE__), "..", "template.cfg.erb") template = ::File.read(template_path) ERB.new(template, 3).result(data.get_binding) end + + private + def retrieve_collection_settings(settings, prefix="") + opt = {} + opt[:collection_interval] = settings.get("#{prefix}monitoring.collection.interval") + opt[:collection_timeout_interval] = settings.get("#{prefix}monitoring.collection.timeout_interval") + opt[:extended_performance_collection] = settings.get("#{prefix}monitoring.collection.pipeline.details.enabled") + opt[:config_collection] = settings.get("#{prefix}monitoring.collection.config.enabled") + opt + end + + def any_set?(settings, regexp) + !settings.get_subset(regexp).to_hash.keys.select { |k| settings.set?(k)}.empty? + end end + def self.use_direct_shipping?(settings) + settings.get("monitoring.enabled") + end + + public def initialize # nothing to do here end @@ -184,24 +230,10 @@ def register_hooks(hooks) def additionals_settings(settings) logger.trace("registering additionals_settings") - - settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.enabled", false)) - settings.register(LogStash::Setting::ArrayCoercible.new("xpack.monitoring.elasticsearch.hosts", String, [ "http://localhost:9200" ] )) - settings.register(LogStash::Setting::TimeValue.new("xpack.monitoring.collection.interval", "10s")) - settings.register(LogStash::Setting::TimeValue.new("xpack.monitoring.collection.timeout_interval", "10m")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.username", "logstash_system")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.password")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.cloud_id")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.cloud_auth")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.certificate_authority")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.truststore.path")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.truststore.password")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.keystore.path")) - settings.register(LogStash::Setting::NullableString.new("xpack.monitoring.elasticsearch.ssl.keystore.password")) - settings.register(LogStash::Setting::String.new("xpack.monitoring.elasticsearch.ssl.verification_mode", "certificate", true, ["none", "certificate"])) - settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.elasticsearch.sniffing", false)) - settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.collection.pipeline.details.enabled", true)) - settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.collection.config.enabled", true)) + # Deprecated settings from 7.7 + register_monitoring_settings(settings, "xpack.") + # Direct shipping settings + register_monitoring_settings(settings) settings.register(LogStash::Setting::String.new("node.uuid", "")) rescue => e @@ -209,5 +241,26 @@ def additionals_settings(settings) logger.error e.backtrace.to_s raise e end + + private + def register_monitoring_settings(settings, prefix = "") + settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.enabled", false)) + settings.register(LogStash::Setting::ArrayCoercible.new("#{prefix}monitoring.elasticsearch.hosts", String, [ "http://localhost:9200" ] )) + settings.register(LogStash::Setting::TimeValue.new("#{prefix}monitoring.collection.interval", "10s")) + settings.register(LogStash::Setting::TimeValue.new("#{prefix}monitoring.collection.timeout_interval", "10m")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.username", "logstash_system")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.password")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.cloud_id")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.cloud_auth")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.certificate_authority")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.truststore.path")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.truststore.password")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.keystore.path")) + settings.register(LogStash::Setting::NullableString.new("#{prefix}monitoring.elasticsearch.ssl.keystore.password")) + settings.register(LogStash::Setting::String.new("#{prefix}monitoring.elasticsearch.ssl.verification_mode", "certificate", true, ["none", "certificate"])) + settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.elasticsearch.sniffing", false)) + settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.collection.pipeline.details.enabled", true)) + settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.collection.config.enabled", true)) + end end end diff --git a/x-pack/lib/monitoring/outputs/elasticsearch_monitoring.rb b/x-pack/lib/monitoring/outputs/elasticsearch_monitoring.rb index a9b8e60844b..84dadf8f5f6 100644 --- a/x-pack/lib/monitoring/outputs/elasticsearch_monitoring.rb +++ b/x-pack/lib/monitoring/outputs/elasticsearch_monitoring.rb @@ -10,7 +10,7 @@ class ElasticSearchMonitoring < LogStash::Outputs::ElasticSearch config :document_type, :validate => :string def use_event_type?(client) - true + !LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS) end end end; end diff --git a/x-pack/lib/template.cfg.erb b/x-pack/lib/template.cfg.erb index 211bfbe5e4c..5bebfa0aaf7 100644 --- a/x-pack/lib/template.cfg.erb +++ b/x-pack/lib/template.cfg.erb @@ -20,10 +20,10 @@ output { <% else %> hosts => <%= es_hosts %> <% end %> - bulk_path => "/_monitoring/bulk?system_id=logstash&system_api_version=<%= system_api_version %>&interval=1s" + bulk_path => "<%= monitoring_endpoint %>" manage_template => false document_type => "%{[@metadata][document_type]}" - index => "" + index => "<%= monitoring_index %>" sniffing => <%= sniffing %> <% if auth? && !cloud_auth? %> user => "<%= user %>" diff --git a/x-pack/qa/integration/monitoring/direct_shipping_spec.rb b/x-pack/qa/integration/monitoring/direct_shipping_spec.rb new file mode 100644 index 00000000000..054c13acb8d --- /dev/null +++ b/x-pack/qa/integration/monitoring/direct_shipping_spec.rb @@ -0,0 +1,37 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require_relative "../spec_helper" + +describe "Direct shipping" do + + before :all do + @elasticsearch_service = elasticsearch + + cleanup_elasticsearch + + config = "input { generator { count => 100 } tcp { port => 6000 } } output { null {} }" + + @logstash_service = logstash_with_empty_default("bin/logstash -e '#{config}' -w 1", { + :settings => { + "monitoring.enabled" => true, + "monitoring.elasticsearch.hosts" => ["http://localhost:9200", "http://localhost:9200"], + "monitoring.collection.interval" => "1s", + "monitoring.elasticsearch.username" => "elastic", + "monitoring.elasticsearch.password" => elastic_password + }, # will be saved in the logstash.yml + :belzebuth => { + :wait_condition => /Pipelines running/, + :timeout => 5 * 60 # Fail safe, this mean something went wrong if we hit this before the wait_condition + } + }) + end + + include_examples "record monitoring data to es" + + after :all do + @logstash_service.stop unless @logstash_service.nil? + @elasticsearch_service.stop unless @elasticsearch_service.nil? + end +end diff --git a/x-pack/qa/integration/monitoring/es_documents_structure_validation_spec.rb b/x-pack/qa/integration/monitoring/es_documents_structure_validation_spec.rb new file mode 100644 index 00000000000..5f2d7312e11 --- /dev/null +++ b/x-pack/qa/integration/monitoring/es_documents_structure_validation_spec.rb @@ -0,0 +1,123 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require_relative "../spec_helper" + +describe "Monitoring internal collector documents" do + + before :all do + @elasticsearch_service = elasticsearch + + cleanup_elasticsearch + + # this is tcp input useful to keep main pipeline alive + @config = "input { tcp { port => 6000 } } output { null {} }" + end + + let(:max_retry) { 10 } + let(:schemas_path) { File.join(File.dirname(__FILE__), "..", "..", "..", "spec", "monitoring", "schemas") } + let(:retryable_errors) do + [NoMethodError, + RSpec::Expectations::ExpectationNotMetError, + Elasticsearch::Transport::Transport::Errors::ServiceUnavailable, + Elasticsearch::Transport::Transport::Errors::NotFound] + end + + describe "metrics" do + it "should be equal to with direct shipping" do + @logstash_service = start_monitoring_logstash(@config) + direct_shipping_document = retrieve_monitoring_document_from_es("logstash_stats") + + @logstash_service.stop unless @logstash_service.nil? + + cleanup_elasticsearch + + @logstash_service = start_monitoring_logstash(@config, "xpack") + es_reporter_shaped_document = retrieve_monitoring_document_from_es("logstash_stats") + + @logstash_service.stop unless @logstash_service.nil? + + verify_same_structure(es_reporter_shaped_document, direct_shipping_document) + end + end + + describe "state" do + it "should be equal to with direct shipping" do + @logstash_service = start_monitoring_logstash(@config) + direct_shipping_document = retrieve_monitoring_document_from_es("logstash_state") + @logstash_service.stop unless @logstash_service.nil? + + cleanup_elasticsearch + + @logstash_service = start_monitoring_logstash(@config, "xpack") + es_reporter_shaped_document = retrieve_monitoring_document_from_es("logstash_state") + + @logstash_service.stop unless @logstash_service.nil? + + verify_same_structure(es_reporter_shaped_document, direct_shipping_document) + end + end + + after :all do + @elasticsearch_service.stop unless @elasticsearch_service.nil? + end +end + +def retrieve_monitoring_document_from_es(document_type) + monitoring_document = nil + + Stud.try(max_retry.times, retryable_errors) do + elasticsearch_client.indices.refresh + api_response = elasticsearch_client.search :index => MONITORING_INDEXES, :body => {:query => {:term => {"type" => document_type}}} + expect(api_response["hits"]["total"]["value"]).to be > 0 + api_response["hits"]["hits"].each do |full_document| + monitoring_document = full_document["_source"] + end + end + monitoring_document +end + +def start_monitoring_logstash(config, prefix = "") + if !prefix.nil? && !prefix.empty? + mon_prefix = prefix + "." + else + mon_prefix = "" + end + logstash_with_empty_default("bin/logstash -e '#{config}' -w 1", { + :settings => { + "#{mon_prefix}monitoring.enabled" => true, + "#{mon_prefix}monitoring.elasticsearch.hosts" => ["http://localhost:9200", "http://localhost:9200"], + "#{mon_prefix}monitoring.collection.interval" => "1s", + "#{mon_prefix}monitoring.elasticsearch.username" => "elastic", + "#{mon_prefix}monitoring.elasticsearch.password" => elastic_password + }, # will be saved in the logstash.yml + :belzebuth => { + :wait_condition => /Pipelines running/, + :timeout => 5 * 60 # Fail safe, this mean something went wrong if we hit this before the wait_condition + } + }) +end + +def verify_same_structure(original, other, ignored_keys = /^source_node/) + orig_keys = filter_ignored_and_make_set(flatten_keys(original), ignored_keys) + other_keys = filter_ignored_and_make_set(flatten_keys(other), ignored_keys) + expect(other_keys - orig_keys).to eq([]) + expect(orig_keys - other_keys).to eq([]) +end + +def filter_ignored_and_make_set(keys_list, ignored_keys) + keys_list.sort.uniq.select { |k| !(k =~ ignored_keys) } +end + +def flatten_keys(hash, prefix = "") + flattened_keys = [] + hash.each do |k, v| + if v.is_a? Hash + flattened_keys += flatten_keys(v, k.to_s) + else + flattened_keys << (prefix + (prefix.empty? ? "" : ".") + k.to_s) + end + end + flattened_keys +end \ No newline at end of file diff --git a/x-pack/qa/integration/support/helpers.rb b/x-pack/qa/integration/support/helpers.rb index bf8d30ad334..e28e8e9b6a7 100644 --- a/x-pack/qa/integration/support/helpers.rb +++ b/x-pack/qa/integration/support/helpers.rb @@ -114,6 +114,10 @@ def logstash_command_append(cmd, argument, value) end def logstash(cmd, options = {}) + logstash_with_empty_default(cmd, options, {"xpack.monitoring.enabled" => true}) +end + +def logstash_with_empty_default(cmd, options = {}, default_settings = {}) temporary_settings = Stud::Temporary.directory temporary_data = Stud::Temporary.directory @@ -121,7 +125,6 @@ def logstash(cmd, options = {}) cmd = logstash_command_append(cmd, "--path.data", temporary_data) logstash_yaml = File.join(temporary_settings, "logstash.yml") - default_settings = {"xpack.monitoring.enabled" => true} IO.write(logstash_yaml, YAML::dump(default_settings.merge(options.fetch(:settings, {})))) FileUtils.cp(File.join(get_logstash_path, "config", "log4j2.properties"), File.join(temporary_settings, "log4j2.properties") ) diff --git a/x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb b/x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb new file mode 100644 index 00000000000..ad8498a68e7 --- /dev/null +++ b/x-pack/spec/monitoring/inputs/metrics/state_event_factory_spec.rb @@ -0,0 +1,79 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require "logstash/agent" +require "logstash/runner" +require "spec_helper" +require "monitoring/inputs/metrics/state_event_factory" +require 'json' + +describe LogStash::Inputs::Metrics::StateEventFactory do + + let(:schemas_path) { File.join(File.dirname(__FILE__), "..", "..", "..", "..", "spec", "monitoring", "schemas") } + + let(:config) { + config_part = org.logstash.common.SourceWithMetadata.new("local", "...", 0, 0, "input { dummyblockinginput { } } output { null { } }") + LogStash::Config::PipelineConfig.new("DummySource", "fake_main", [config_part], LogStash::SETTINGS) + } + + let(:pipeline_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone.merge({ + "pipeline.id" => "main", + "config.string" => config.config_parts.first.text, + }) } + + let(:agent) { LogStash::Agent.new(pipeline_settings) } + let(:metric) { agent.metric } + let(:collector) { metric.collector } + let(:agent_task) { start_agent(agent) } + + before :each do + agent + agent_task + + wait(60).for { agent.get_pipeline(:main) }.to_not be_nil + + # collector.snapshot_metric is timing dependant and if fired too fast will miss some metrics. + # after some tests a correct metric_store.size is 72 but when it is incomplete it is lower. + # I guess this 72 is dependant on the metrics we collect and there is probably a better + # way to make sure no metrics are missing without forcing a hard sleep but this is what is + # easily observable, feel free to refactor with a better "timing" test here. + wait(60).for { collector.snapshot_metric.metric_store.size }.to be >= 72 + end + + after :each do + agent.shutdown + agent_task.wait + LogStash::SETTINGS.set_value("monitoring.enabled", false) + end + + let(:pipeline) { LogStash::Pipeline.new(config) } + + subject(:sut) { described_class.new(pipeline, "funky_cluster_uuid") } + + context "with write direct flag enabled" do + let(:schema_file) { File.join(schemas_path, "states_document_new_schema.json") } + + it "should create a valid new event shape" do + LogStash::SETTINGS.set_value("monitoring.enabled", true) + + state_evt = sut.make + json = JSON.parse(state_evt.to_json) + expect(json['type']).to eq('logstash_state') + expect(json['logstash_state']).to be_truthy + expect(json['logstash_state']['pipeline']).to be_truthy + expect(JSON::Validator.fully_validate(schema_file, state_evt.to_json)).to be_empty + end + end + + context "with write direct flag disabled" do + let(:schema_file) { File.join(schemas_path, "states_document_schema.json") } + + it "should create a valid old event shape" do + LogStash::SETTINGS.set_value("monitoring.enabled", false) + + state_evt = sut.make + expect(JSON::Validator.fully_validate(schema_file, state_evt.to_json)).to be_empty + end + end +end \ No newline at end of file diff --git a/x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb b/x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb new file mode 100644 index 00000000000..8de9c32f1ae --- /dev/null +++ b/x-pack/spec/monitoring/inputs/metrics/stats_event_factory_spec.rb @@ -0,0 +1,72 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require "monitoring/inputs/metrics/stats_event_factory" +require 'json' + +describe LogStash::Inputs::Metrics::StatsEventFactory do + let(:schemas_path) { File.join(File.dirname(__FILE__), "..", "..", "..", "..", "spec", "monitoring", "schemas") } + let(:queue) { Concurrent::Array.new } + + let(:config) { "input { dummyblockinginput { } } output { null { } }" } + + let(:pipeline_settings) { LogStash::Runner::SYSTEM_SETTINGS.clone.merge({ + "pipeline.id" => "main", + "config.string" => config, + }) } + + let(:agent) { LogStash::Agent.new(pipeline_settings) } + let(:metric) { agent.metric } + let(:collector) { metric.collector } + let(:agent_task) { start_agent(agent) } + + before :each do + agent + agent_task + + wait(60).for { agent.get_pipeline(:main) }.to_not be_nil + + # collector.snapshot_metric is timing dependant and if fired too fast will miss some metrics. + # after some tests a correct metric_store.size is 72 but when it is incomplete it is lower. + # I guess this 72 is dependant on the metrics we collect and there is probably a better + # way to make sure no metrics are missing without forcing a hard sleep but this is what is + # easily observable, feel free to refactor with a better "timing" test here. + wait(60).for { collector.snapshot_metric.metric_store.size }.to be >= 72 + end + + after :each do + agent.shutdown + agent_task.wait + LogStash::SETTINGS.set_value("monitoring.enabled", false) + end + + context "new model" do + let(:schema_file) { File.join(schemas_path, "monitoring_document_new_schema.json") } + + it "should be valid" do + global_stats = {"uuid" => "00001" } + sut = described_class.new(global_stats, collector.snapshot_metric, "funky_cluster_uuid") + LogStash::SETTINGS.set_value("monitoring.enabled", true) + + monitoring_evt = sut.make(agent, true) + json = JSON.parse(monitoring_evt.to_json) + expect(json['type']).to eq('logstash_stats') + expect(JSON::Validator.fully_validate(schema_file, monitoring_evt.to_json)).to be_empty + end + end + + context "old model" do + let(:schema_file) { File.join(schemas_path, "monitoring_document_schema.json") } + + it "should be valid" do + global_stats = {"uuid" => "00001" } + sut = described_class.new(global_stats, collector.snapshot_metric, nil) + LogStash::SETTINGS.set_value("monitoring.enabled", false) + + monitoring_evt = sut.make(agent, true) + json = JSON.parse(monitoring_evt.to_json) + expect(JSON::Validator.fully_validate(schema_file, monitoring_evt.to_json)).to be_empty + end + end +end \ No newline at end of file diff --git a/x-pack/spec/monitoring/inputs/metrics_spec.rb b/x-pack/spec/monitoring/inputs/metrics_spec.rb index 873ca282f29..3d8b99cb4a1 100644 --- a/x-pack/spec/monitoring/inputs/metrics_spec.rb +++ b/x-pack/spec/monitoring/inputs/metrics_spec.rb @@ -175,7 +175,7 @@ def state_events describe "normal pipelines" do before(:each) do allow(pipeline).to receive(:system?).and_return(false) - allow(metrics_input).to receive(:state_event_for).with(pipeline).and_return(state_event) + allow(metrics_input).to receive(:state_event_for).with(pipeline).and_return([state_event]) allow(metrics_input).to receive(:emit_event) metrics_input.update_pipeline_state(pipeline) diff --git a/x-pack/spec/monitoring/pipeline_register_hook_spec.rb b/x-pack/spec/monitoring/pipeline_register_hook_spec.rb new file mode 100644 index 00000000000..ac630bd4f2d --- /dev/null +++ b/x-pack/spec/monitoring/pipeline_register_hook_spec.rb @@ -0,0 +1,49 @@ +# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +# or more contributor license agreements. Licensed under the Elastic License; +# you may not use this file except in compliance with the Elastic License. + +require 'monitoring/monitoring' + +describe LogStash::MonitoringExtension::PipelineRegisterHook do + + subject { described_class.new } + + before(:all) { + @extension = LogStash::MonitoringExtension.new + # used to register monitoring xpack's settings + @sys_settings = LogStash::Runner::SYSTEM_SETTINGS.clone + @extension.additionals_settings(@sys_settings) + } + + context 'validate monitoring settings' do + it "work without any monitoring settings" do + settings = @sys_settings.clone + settings.set_value("xpack.monitoring.enabled", true) + settings.set_value("xpack.monitoring.elasticsearch.hosts", "http://localhost:9200") + settings.set_value("xpack.monitoring.elasticsearch.username", "elastic") + settings.set_value("xpack.monitoring.elasticsearch.password", "changeme") + expect(subject.generate_pipeline_config(settings)).to be_truthy + end + + it "monitoring.enabled should conflict with xpack.monitoring.enabled" do + settings = @sys_settings.clone + settings.set_value("xpack.monitoring.enabled", true) + settings.set_value("monitoring.enabled", true) + + expect { + subject.generate_pipeline_config(settings) + }.to raise_error(ArgumentError) + end + + it "monitoring.* should conflict with any xpack.monitoring.*" do + settings = @sys_settings.clone + settings.set_value("xpack.monitoring.collection.interval", "10s") + settings.set_value("monitoring.enabled", true) + + expect { + subject.generate_pipeline_config(settings) + }.to raise_error(ArgumentError) + end + end + +end diff --git a/x-pack/spec/monitoring/schemas/monitoring_document_new_schema.json b/x-pack/spec/monitoring/schemas/monitoring_document_new_schema.json new file mode 100644 index 00000000000..d8fd6855bbb --- /dev/null +++ b/x-pack/spec/monitoring/schemas/monitoring_document_new_schema.json @@ -0,0 +1,186 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "definitions": { + "reloads": { + "type": "object", + "required": ["failures", "successes"], + "properties": { + "failures": { "type": "number" }, + "successes": { "type": "number" } + } + }, + "events": { + "type": "object", + "required": ["filtered", "in", "duration_in_millis", "out"], + "properties": { + "filtered": { "type": "number" }, + "in": { "type": "number" }, + "duration_in_millis": { "type": "number" }, + "out": { "type": "number" } + } + } + }, + "type": {"type": "string" }, + "cluster_uuid": {"type": "string" }, + "interval_ms": {"type": "integer" }, + "timestamp": {"type": "string" }, + "logstash_stats": { + "type": "object", + "required": ["jvm", "logstash", "process", "os", "events", "queue", "reloads", "pipelines"], + "properties": { + "jvm": { + "type": "object", + "required": ["mem", "uptime_in_millis", "gc"], + "properties": { + "mem": { + "type": "object", + "require": ["heap_used_in_bytes", "heap_max_in_bytes", "heap_used_percent"], + "properties": { + "heap_used_percent": { "type": "number" }, + "heap_max_in_bytes": { "type": "number" }, + "heap_used_in_bytes": { "type": "number" } + } + }, + "uptime_in_millis": { "type": "number" }, + "gc": { + "type": "object", + "required": ["collectors"], + "properties": { + "collectors": { + "type": "object", + "required": ["young", "old"], + "properties": { + "young": { + "type": "object", + "required": ["collection_count", "collection_time_in_millis"], + "properties": { + "collection_count": {"type": "number"}, + "collection_time_in_millis": {"type": "number"} + } + }, + "old": { + "type": "object", + "required": ["collection_count", "collection_time_in_millis"], + "properties": { + "collection_count": {"type": "number"}, + "collection_time_in_millis": {"type": "number"} + } + } + } + } + } + } + } + }, + "logstash": { + "type": "object", + "required": ["http_address", "uuid", "ephemeral_id"], + "properties": { + "http_address": { "type": "string" }, + "uuid": { "type": "string" }, + "ephemeral_id": { "type": "string" } + } + }, + "process": { + "type": "object", + "required": ["open_file_descriptors", "max_file_descriptors", "cpu"], + "properties": { + "open_file_descriptors": { "type": "number" }, + "max_file_descriptors": { "type": "number" }, + "cpu": { + "type": "object", + "required": ["percent"], + "properties": { "percent": { "type": "number"} } + } + } + }, + "os": { + "type": "object", + "required": ["cpu"], + "properties": { + "cpu": { + "type": "object", + "required": ["load_average"], + "properties": { + "load_average": { + "type": "object", + "required": ["1m"], + "properties": { + "1m": {"type": "number"}, + "5m": {"type": "number"}, + "15m": {"type": "number"} + } + } + } + } + } + }, + "events": {"$ref": "#/definitions/events"}, + "queue": { + "type": "object", + "required": ["events_count"], + "properties": { + "events_count": {"type": "number"} + } + }, + "reloads": {"$ref": "#/definitions/reloads"}, + "timestamp": {"type": "string"}, + "pipelines": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "type": "object", + "required": ["id", "hash", "ephemeral_id", "events", "queue", "reloads", "vertices"], + "properties": { + "id": {"type": "string"}, + "hash": {"type": "string"}, + "ephemeral_id": {"type": "string"}, + "reloads": {"$ref": "#/definitions/reloads"}, + "queue": { + "type": "object", + "required": ["events_count", "type", "queue_size_in_bytes", "max_queue_size_in_bytes"], + "properties": { + "type": {"type": "string"}, + "events_count": {"type": "number"}, + "queue_size_in_bytes": {"type": "number"}, + "max_queue_size_in_bytes": {"type": "number"} + } + }, + "events": {"$ref": "#/definitions/events"}, + "vertices": { + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "type": "object", + "patternProperties": { + "long_counters": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "value": {"type": "number"} + } + } + }, + "double_gauges": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "value": {"type": "string"} + } + } + } + } + } + } + } + } + } + } + } +} diff --git a/x-pack/spec/monitoring/schemas/states_document_new_schema.json b/x-pack/spec/monitoring/schemas/states_document_new_schema.json new file mode 100644 index 00000000000..eb9e90f4f10 --- /dev/null +++ b/x-pack/spec/monitoring/schemas/states_document_new_schema.json @@ -0,0 +1,94 @@ +{ + "$schema": "http://json-schema.org/draft-04/schema#", + "definitions": { + "edge": { + "type": "object", + "properties": { + "from": {"type": "string"}, + "id": {"type": "string"}, + "to": {"type": "string"}, + "type": {"type": "string"} + } + }, + "vertex": { + "type": "object", + "properties": { + "id": {"type": "string"}, + "explicit_id": {"type": "boolean"}, + "type": {"type": "string"}, + "meta": { + "anyOf": [ + {"type": "null"}, + { + "type": "object", + "properties": { + "source": { + "type": "object", + "properties": { + "protocol": {"type": "string"}, + "id": {"type": "string"}, + "line": {"type": "number"}, + "column": {"type": "number"}, + "text": {"type": "string"} + } + } + } + } + ] + } + } + } + }, + "properties": { + "type": {"type": "string" }, + "cluster_uuid": {"type": "string" }, + "interval_ms": {"type": "integer" }, + "timestamp": {"type": "string" }, + "logstash_state": { + "type": "object", + "properties": { + "pipeline": { + "type": "object", + "properties": { + "batch_size": {"type": "integer"}, + "workers": {"type": "integer"}, + "hash": {"type": "string"}, + "ephemeral_id": {"type": "string"}, + "type": {"type": "string"}, + "name": {"type": "string"}, + "representation": { + "type": "object", + "properties": { + "hash": {"type": "string"}, + "version": {"type": "string"}, + "graph": { + "type": "object", + "properties": { + "edges": { + "type": "array", + "items": {"$ref": "#/definitions/edge"} + }, + "vertices": { + "type": "array", + "items": {"$ref": "#/definitions/vertex"} + } + } + }, + "plugins": { + "type": "array", + "items": { + "type": "object", + "properties": { + "name": {"type": "string"}, + "version": {"type": "string"} + } + } + } + } + } + } + } + } + } + } +}