From a575f156ab60bda2b09a41ab967b21c8c01639af Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Wed, 9 Sep 2015 12:43:15 -0500 Subject: [PATCH] Remove HTTP support, rename to _java --- CHANGELOG.md | 26 +- README.md | 8 +- ...ogstash-output-elasticsearch_java_jars.rb} | 0 ...elasticsearch.rb => elasticsearch_java.rb} | 205 ++++----------- .../elasticsearch-template.json | 0 .../protocol.rb | 98 +------ logstash-output-elasticsearch.gemspec | 16 +- spec/es_spec_helper.rb | 70 ++--- .../outputs/elasticsearch/node_spec.rb | 2 +- spec/integration/outputs/index_spec.rb | 12 +- spec/integration/outputs/retry_spec.rb | 248 +++++++++--------- spec/integration/outputs/routing_spec.rb | 56 +--- spec/integration/outputs/secure_spec.rb | 16 +- spec/integration/outputs/templates_spec.rb | 8 +- .../outputs/transport_create_spec.rb | 4 +- spec/integration/outputs/update_spec.rb | 87 ++++++ .../outputs/elasticsearch/protocol_spec.rb | 36 +-- spec/unit/outputs/elasticsearch_proxy_spec.rb | 59 ----- spec/unit/outputs/elasticsearch_spec.rb | 94 +------ spec/unit/outputs/elasticsearch_ssl_spec.rb | 82 ------ 20 files changed, 359 insertions(+), 768 deletions(-) rename lib/{logstash-output-elasticsearch_jars.rb => logstash-output-elasticsearch_java_jars.rb} (100%) rename lib/logstash/outputs/{elasticsearch.rb => elasticsearch_java.rb} (79%) rename lib/logstash/outputs/{elasticsearch => elasticsearch_java}/elasticsearch-template.json (100%) rename lib/logstash/outputs/{elasticsearch => elasticsearch_java}/protocol.rb (70%) create mode 100644 spec/integration/outputs/update_spec.rb delete mode 100644 spec/unit/outputs/elasticsearch_proxy_spec.rb delete mode 100644 spec/unit/outputs/elasticsearch_ssl_spec.rb diff --git a/CHANGELOG.md b/CHANGELOG.md index 257b139..a92e9da 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,24 +1,2 @@ -## 1.0.4 - - Update to Elasticsearch 1.7 - -## 1.0.3 - - Add HTTP proxy support - -## 1.0.2 - - Upgrade Manticore HTTP Client - -## 1.0.1 - - Allow client certificates - -## 0.2.9 - - Add 'path' parameter for ES HTTP hosts behind a proxy on a subpath - -## 0.2.8 (June 12, 2015) - - Add option to enable and disable SSL certificate verification during handshake (#160) - - Doc improvements for clarifying round robin behavior using hosts config - -## 0.2.7 (May 28, 2015) - - Bump es-ruby version to 1.0.10 - -## 0.2.6 (May 28, 2015) - - Disable timeouts when using http protocol which would cause bulk requests to fail (#103) +## 1.1.0 + - Initial Release. Only supports Node/Transport in ES 1.7 \ No newline at end of file diff --git a/README.md b/README.md index 37bd4a8..d1e030a 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,15 @@ # Logstash Plugin -This is a plugin for [Logstash](https://github.com/elasticsearch/logstash). +This is a plugin for [Logstash](https://github.com/elastic/logstash). It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way. ## Documentation -Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elasticsearch.org/guide/en/logstash/current/). +Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/). - For formatting code or config example, you can use the asciidoc `[source,ruby]` directive -- For more asciidoc formatting tips, see the excellent reference here https://github.com/elasticsearch/docs#asciidoc-guide +- For more asciidoc formatting tips, see the excellent reference here https://github.com/elastic/docs#asciidoc-guide ## Need Help? @@ -95,4 +95,4 @@ Programming is not a required skill. Whatever you've seen about open source and It is more important to the community that you are able to contribute. -For more information about contributing, see the [CONTRIBUTING](https://github.com/elasticsearch/logstash/blob/master/CONTRIBUTING.md) file. +For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. diff --git a/lib/logstash-output-elasticsearch_jars.rb b/lib/logstash-output-elasticsearch_java_jars.rb similarity index 100% rename from lib/logstash-output-elasticsearch_jars.rb rename to lib/logstash-output-elasticsearch_java_jars.rb diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch_java.rb similarity index 79% rename from lib/logstash/outputs/elasticsearch.rb rename to lib/logstash/outputs/elasticsearch_java.rb index a6ff16f..49142d2 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch_java.rb @@ -8,11 +8,15 @@ require "socket" # for Socket.gethostname require "thread" # for safe queueing require "uri" # for escaping user input -require 'logstash-output-elasticsearch_jars.rb' - -# This output lets you store logs in Elasticsearch and is the most recommended -# output for Logstash. If you plan on using the Kibana web interface, you'll -# need to use this output. +require 'logstash-output-elasticsearch_java_jars.rb' +require "logstash/outputs/elasticsearch_java/protocol" + +# This output lets you store logs in Elasticsearch using the native 'node' and 'transport' +# protocols. It is highly recommended to use the regular 'logstash-outpu-elasticsearch' output +# which uses HTTP instead. This output is, in-fact, sometimes slower, and never faster than that one. +# Additionally, upgrading your Elasticsearch cluster may require you to simultaneously update this +# plugin for any protocol level changes. The HTTP client has far fewer of these issues and is +# generally just easier to work with. # # *VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch 1.0.0 or later. # @@ -61,14 +65,14 @@ # - Events from the retry queue are submitted again either when the queue reaches its max size or when # the max interval time is reached, which is set in :retry_max_interval. # - Events which are not retryable or have reached their max retry count are logged to stderr. -class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base +class LogStash::Outputs::ElasticSearchJava < LogStash::Outputs::Base attr_reader :client include Stud::Buffer - RETRYABLE_CODES = [429, 503] + RETRYABLE_CODES = [409, 429, 503] SUCCESS_CODES = [200, 201] - config_name "elasticsearch" + config_name "elasticsearch_java" # The index to write events to. This can be dynamic using the `%{foo}` syntax. # The default value will partition your indices by day so you can more easily @@ -168,10 +172,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # The port for Elasticsearch transport to use. # # If you do not set this, the following defaults are used: - # * `protocol => http` - port 9200 # * `protocol => transport` - port 9300-9305 # * `protocol => node` - port 9300-9305 - config :port, :validate => :string + config :port, :validate => :string, :default => "9300-9305" # The name/address of the host to bind to for Elasticsearch clustering config :bind_host, :validate => :string @@ -238,14 +241,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # in situations where you cannot permit connections outbound from the # Elasticsearch cluster to this Logstash server. # - # The 'http' protocol will use the Elasticsearch REST/HTTP interface to talk - # to elasticsearch. - # # All protocols will use bulk requests when talking to Elasticsearch. # # The default `protocol` setting under java/jruby is "node". The default # `protocol` on non-java rubies is "http" - config :protocol, :validate => [ "node", "transport", "http" ] + config :protocol, :validate => [ "node", "transport"], :default => "transport" # The Elasticsearch action to perform. Valid actions are: `index`, `delete`. # @@ -257,25 +257,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # - index: indexes a document (an event from Logstash). # - delete: deletes a document by id # - create: indexes a document, fails if a document by that id already exists in the index. + # - update: updates a document by id # following action is not supported by HTTP protocol # - create_unless_exists: creates a document, fails if no id is provided # # For more details on actions, check out the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation] config :action, :validate => :string, :default => "index" - # Username and password (only valid when protocol is HTTP; this setting works with HTTP or HTTPS auth) - config :user, :validate => :string - config :password, :validate => :password - - # HTTP Path at which the Elasticsearch server lives. Use this if you must run ES behind a proxy that remaps - # the root path for the Elasticsearch HTTP API lives. This option is ignored for non-HTTP transports. - config :path, :validate => :string, :default => "/" - - # SSL Configurations (only valid when protocol is HTTP) - # - # Enable SSL - config :ssl, :validate => :boolean, :default => false - # Validate the server's certificate # Disabling this severely compromises security # For more information read https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf @@ -311,11 +299,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # Set max interval between bulk retries config :retry_max_interval, :validate => :number, :default => 5 - # Set the address of a forward HTTP proxy. Must be used with the 'http' protocol - # Can be either a string, such as 'http://localhost:123' or a hash in the form - # {host: 'proxy.org' port: 80 scheme: 'http'} - # Note, this is NOT a SOCKS proxy, but a plain HTTP proxy - config :proxy + # Enable doc_as_upsert for update mode + # create a new document with source if document_id doesn't exists + config :doc_as_upsert, :validate => :boolean, :default => false + + # Set upsert content for update mode + # create a new document with this parameter as json string if document_id doesn't exists + config :upsert, :validate => :string, :default => "" public def register @@ -330,48 +320,20 @@ def register client_settings = {} + client_settings["cluster.name"] = @cluster if @cluster + client_settings["network.host"] = @bind_host if @bind_host + client_settings["transport.tcp.port"] = @bind_port if @bind_port + client_settings["client.transport.sniff"] = @sniffing - if @protocol.nil? - @protocol = LogStash::Environment.jruby? ? "node" : "http" - end - - if @protocol == "http" - if @action == "create_unless_exists" - raise(LogStash::ConfigurationError, "action => 'create_unless_exists' is not supported under the HTTP protocol"); - end - - client_settings[:path] = "/#{@path}/".gsub(/\/+/, "/") # Normalize slashes - @logger.debug? && @logger.debug("Normalizing http path", :path => @path, :normalized => client_settings[:path]) - end - - if ["node", "transport"].include?(@protocol) - # Node or TransportClient; requires JRuby - raise(LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }") unless LogStash::Environment.jruby? - - client_settings["cluster.name"] = @cluster if @cluster - client_settings["network.host"] = @bind_host if @bind_host - client_settings["transport.tcp.port"] = @bind_port if @bind_port - client_settings["client.transport.sniff"] = @sniffing - - if @node_name - client_settings["node.name"] = @node_name - else - client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" - end - - @@plugins.each do |plugin| - name = plugin.name.split('-')[-1] - client_settings.merge!(LogStash::Outputs::ElasticSearch.const_get(name.capitalize).create_client_config(self)) - end + if @node_name + client_settings["node.name"] = @node_name + else + client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" end - require "logstash/outputs/elasticsearch/protocol" - - if @port.nil? - @port = case @protocol - when "http"; "9200" - when "transport", "node"; "9300-9305" - end + @@plugins.each do |plugin| + name = plugin.name.split('-')[-1] + client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self)) end if @host.nil? && @protocol != "node" # node can use zen discovery @@ -379,27 +341,26 @@ def register @host = ["localhost"] end - client_settings.merge! setup_ssl() - client_settings.merge! setup_proxy() - common_options = { :protocol => @protocol, :client_settings => client_settings } - common_options.merge! setup_basic_auth() + # Update API setup + update_options = { + :upsert => @upsert, + :doc_as_upsert => @doc_as_upsert + } + common_options.merge! update_options if @action == 'update' client_class = case @protocol when "transport" LogStash::Outputs::Elasticsearch::Protocols::TransportClient when "node" LogStash::Outputs::Elasticsearch::Protocols::NodeClient - when /http/ - LogStash::Outputs::Elasticsearch::Protocols::HTTPClient end if @embedded - raise(LogStash::ConfigurationError, "The 'embedded => true' setting is only valid for the elasticsearch output under JRuby. You are running #{RUBY_DESCRIPTION}") unless LogStash::Environment.jruby? @logger.warn("The 'embedded => true' setting is enabled. This is not recommended for production use!!!") # LogStash::Environment.load_elasticsearch_jars! @@ -417,7 +378,7 @@ def register if protocol == "node" || @host.nil? # if @protocol is "node" or @host is not set options = { :host => @host, :port => @port }.merge(common_options) @client = [client_class.new(options)] - else # if @protocol in ["transport","http"] + else # if @protocol in ["transport"] @client = @host.map do |host| (_host,_port) = host.split ":" options = { :host => _host, :port => _port || @port }.merge(common_options) @@ -470,7 +431,7 @@ def register public def get_template if @template.nil? - @template = ::File.expand_path('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__)) + @template = ::File.expand_path('elasticsearch_java/elasticsearch-template.json', ::File.dirname(__FILE__)) if !File.exists?(@template) raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" end @@ -503,11 +464,16 @@ def receive(event) event["type"] || "logs" end - index = event.sprintf(@index) + params = { + :_id => @document_id ? event.sprintf(@document_id) : nil, + :_index => event.sprintf(@index), + :_type => type, + :_routing => @routing ? event.sprintf(@routing) : nil + } + + params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != "" - document_id = @document_id ? event.sprintf(@document_id) : nil - routing = @routing ? event.sprintf(@routing) : nil - buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type, :_routing => routing }, event]) + buffer_receive([event.sprintf(@action), params, event]) end # def receive public @@ -602,75 +568,8 @@ def shift_client @logger.debug? and @logger.debug("Switched current elasticsearch client to ##{@client_idx} at #{@host[@client_idx]}") end - private - def setup_proxy - return {} unless @proxy - - if @protocol != "http" - raise(LogStash::ConfigurationError, "Proxy is not supported for '#{@protocol}'. Change the protocol to 'http' if you need HTTP proxy.") - end - - # Symbolize keys - proxy = if @proxy.is_a?(Hash) - Hash[@proxy.map {|k,v| [k.to_sym, v]}] - elsif @proxy.is_a?(String) - @proxy - else - raise LogStash::ConfigurationError, "Expected 'proxy' to be a string or hash, not '#{@proxy}''!" - end - - return {:proxy => proxy} - end - - private - def setup_ssl - return {} unless @ssl - if @protocol != "http" - raise(LogStash::ConfigurationError, "SSL is not supported for '#{@protocol}'. Change the protocol to 'http' if you need SSL.") - end - @protocol = "https" - if @cacert && @truststore - raise(LogStash::ConfigurationError, "Use either \"cacert\" or \"truststore\" when configuring the CA certificate") if @truststore - end - ssl_options = {} - if @cacert then - @truststore, ssl_options[:truststore_password] = generate_jks @cacert - elsif @truststore - ssl_options[:truststore_password] = @truststore_password.value if @truststore_password - end - ssl_options[:truststore] = @truststore if @truststore - if @keystore - ssl_options[:keystore] = @keystore - ssl_options[:keystore_password] = @keystore_password.value if @keystore_password - end - if @ssl_certificate_verification == false - @logger.warn [ - "** WARNING ** Detected UNSAFE options in elasticsearch output configuration!", - "** WARNING ** You have enabled encryption but DISABLED certificate verification.", - "** WARNING ** To make sure your data is secure change :ssl_certificate_verification to true" - ].join("\n") - ssl_options[:verify] = false - end - { ssl: ssl_options } - end - - private - def setup_basic_auth - return {} unless @user && @password - - if @protocol =~ /http/ - { - :user => ::URI.escape(@user, "@:"), - :password => ::URI.escape(@password.value, "@:") - } - else - raise(LogStash::ConfigurationError, "User and password parameters are not supported for '#{@protocol}'. Change the protocol to 'http' if you need them.") - end - end - private def generate_jks cert_path - require 'securerandom' require 'tempfile' require 'java' @@ -728,11 +627,11 @@ def retry_push(actions) } end - @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ } + @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch_java-/ } @@plugins.each do |plugin| name = plugin.name.split('-')[-1] - require "logstash/outputs/elasticsearch/#{name}" + require "logstash/outputs/elasticsearch_java/#{name}" end end # class LogStash::Outputs::Elasticsearch diff --git a/lib/logstash/outputs/elasticsearch/elasticsearch-template.json b/lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json similarity index 100% rename from lib/logstash/outputs/elasticsearch/elasticsearch-template.json rename to lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json diff --git a/lib/logstash/outputs/elasticsearch/protocol.rb b/lib/logstash/outputs/elasticsearch_java/protocol.rb similarity index 70% rename from lib/logstash/outputs/elasticsearch/protocol.rb rename to lib/logstash/outputs/elasticsearch_java/protocol.rb index 4b2d6ba..c070563 100644 --- a/lib/logstash/outputs/elasticsearch/protocol.rb +++ b/lib/logstash/outputs/elasticsearch_java/protocol.rb @@ -1,4 +1,4 @@ -require "logstash/outputs/elasticsearch" +require "logstash/outputs/elasticsearch_java" require "cabin" require "base64" @@ -44,88 +44,6 @@ def bulk(actions) public(:initialize, :template_install) end - class HTTPClient < Base - private - - DEFAULT_OPTIONS = { - :port => 9200 - } - - def initialize(options={}) - super - require "elasticsearch" # gem 'elasticsearch-ruby' - # manticore http transport - require "elasticsearch/transport/transport/http/manticore" - @options = DEFAULT_OPTIONS.merge(options) - @client = client - end - - def build_client(options) - uri = "#{options[:protocol]}://#{options[:host]}:#{options[:port]}#{options[:client_settings][:path]}" - - client_options = { - :host => [uri], - :ssl => options[:client_settings][:ssl], - :transport_options => { # manticore settings so we - :socket_timeout => 0, # do not timeout socket reads - :request_timeout => 0, # and requests - :proxy => options[:client_settings][:proxy] - }, - :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore - } - - if options[:user] && options[:password] then - token = Base64.strict_encode64(options[:user] + ":" + options[:password]) - client_options[:headers] = { "Authorization" => "Basic #{token}" } - end - - Elasticsearch::Client.new client_options - end - - def self.normalize_bulk_response(bulk_response) - if bulk_response["errors"] - # The structure of the response from the REST Bulk API is follows: - # {"took"=>74, "errors"=>true, "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", - # "_type"=>"logs", - # "_id"=>"AUxTS2C55Jrgi-hC6rQF", - # "_version"=>1, - # "status"=>400, - # "error"=>"MapperParsingException[failed to parse]..."}}]} - # where each `item` is a hash of {OPTYPE => Hash[]}. calling first, will retrieve - # this hash as a single array with two elements, where the value is the second element (i.first[1]) - # then the status of that item is retrieved. - {"errors" => true, "statuses" => bulk_response["items"].map { |i| i.first[1]['status'] }} - else - {"errors" => false} - end - end - - def bulk(actions) - bulk_response = @client.bulk(:body => actions.collect do |action, args, source| - if source - next [ { action => args }, source ] - else - next { action => args } - end - end.flatten) - - self.class.normalize_bulk_response(bulk_response) - end # def bulk - - def template_exists?(name) - @client.indices.get_template(:name => name) - return true - rescue Elasticsearch::Transport::Transport::Errors::NotFound - return false - end # def template_exists? - - def template_put(name, template) - @client.indices.put_template(:name => name, :body => template) - end # template_put - - public(:bulk) - end # class HTTPClient - class NodeClient < Base private @@ -254,9 +172,21 @@ def build_request(action, args, source) else raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.") end + when "update" + unless args[:_id].nil? + request = org.elasticsearch.action.update.UpdateRequest.new(args[:_index], args[:_type], args[:_id]) + request.routing(args[:_routing]) if args[:_routing] + request.doc(source) + if @options[:doc_as_upsert] + request.docAsUpsert(true) + else + request.upsert(args[:_upsert]) if args[:_upsert] + end + else + raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.") + end else raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.") - #when "update" end # case action request.type(args[:_type]) if args[:_type] diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index cf03b5d..717e46a 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,10 +1,9 @@ Gem::Specification.new do |s| - - s.name = 'logstash-output-elasticsearch' - s.version = '1.0.5' + s.name = 'logstash-output-elasticsearch_java' + s.version = '1.1.0' s.licenses = ['apache-2.0'] - s.summary = "Logstash Output to Elasticsearch" - s.description = "Output events to elasticsearch" + s.summary = "Logstash Output to Elasticsearch using Java node/transport client" + s.description = "Output events to elasticsearch using the java client" s.authors = ["Elastic"] s.email = 'info@elastic.co' s.homepage = "http://logstash.net/" @@ -28,13 +27,6 @@ Gem::Specification.new do |s| s.add_development_dependency 'ftw', '~> 0.0.42' s.add_development_dependency 'logstash-input-generator' - - - if RUBY_PLATFORM == 'java' - s.platform = RUBY_PLATFORM - s.add_runtime_dependency "manticore", '~> 0.4.2' - end - s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'longshoreman' end diff --git a/spec/es_spec_helper.rb b/spec/es_spec_helper.rb index f524b27..0a98e78 100644 --- a/spec/es_spec_helper.rb +++ b/spec/es_spec_helper.rb @@ -9,21 +9,21 @@ CONTAINER_IMAGE = "elasticsearch" CONTAINER_TAG = "1.6" -module ESHelper +DOCKER_INTEGRATION = ENV["DOCKER_INTEGRATION"] +module ESHelper def get_host - Longshoreman.new.get_host_ip + DOCKER_INTEGRATION ? Longshoreman.new.get_host_ip : "127.0.0.1" end def get_port(protocol) + unless DOCKER_INTEGRATION + return protocol.to_sym == :http ? 9200 : 9300 + end + container = Longshoreman::Container.new container.get(CONTAINER_NAME) - case protocol - when "http" - container.rport(9200) - when "transport", "node" - container.rport(9300) - end + container.rport(9300) end def get_client @@ -31,35 +31,41 @@ def get_client end end + RSpec.configure do |config| config.include ESHelper - # this :all hook gets run before every describe block that is tagged with :integration => true. - config.before(:all, :integration => true) do - # check if container exists already before creating new one. - begin - ls = Longshoreman::new - ls.container.get(CONTAINER_NAME) - rescue Docker::Error::NotFoundError - Longshoreman.new("#{CONTAINER_IMAGE}:#{CONTAINER_TAG}", CONTAINER_NAME) - # TODO(talevy): verify ES is running instead of static timeout - sleep 10 + + if DOCKER_INTEGRATION + # this :all hook gets run before every describe block that is tagged with :integration => true. + config.before(:all, :integration => true) do + + + # check if container exists already before creating new one. + begin + ls = Longshoreman::new + ls.container.get(CONTAINER_NAME) + rescue Docker::Error::NotFoundError + Longshoreman.new("#{CONTAINER_IMAGE}:#{CONTAINER_TAG}", CONTAINER_NAME) + # TODO(talevy): verify ES is running instead of static timeout + sleep 10 + end end - end - # we want to do a final cleanup after all :integration runs, - # but we don't want to clean up before the last block. - # This is a final blind check to see if the ES docker container is running and - # needs to be cleaned up. If no container can be found and/or docker is not - # running on the system, we do nothing. - config.after(:suite) do - # only cleanup docker container if system has docker and the container is running - begin - ls = Longshoreman::new - ls.container.get(CONTAINER_NAME) - ls.cleanup - rescue Docker::Error::NotFoundError, Excon::Errors::SocketError - # do nothing + # we want to do a final cleanup after all :integration runs, + # but we don't want to clean up before the last block. + # This is a final blind check to see if the ES docker container is running and + # needs to be cleaned up. If no container can be found and/or docker is not + # running on the system, we do nothing. + config.after(:suite) do + # only cleanup docker container if system has docker and the container is running + begin + ls = Longshoreman::new + ls.container.get(CONTAINER_NAME) + ls.cleanup + rescue Docker::Error::NotFoundError, Excon::Errors::SocketError + # do nothing + end end end end diff --git a/spec/integration/outputs/elasticsearch/node_spec.rb b/spec/integration/outputs/elasticsearch/node_spec.rb index a822174..a88170b 100644 --- a/spec/integration/outputs/elasticsearch/node_spec.rb +++ b/spec/integration/outputs/elasticsearch/node_spec.rb @@ -1,5 +1,5 @@ require_relative "../../../../spec/es_spec_helper" -require "logstash/outputs/elasticsearch/protocol" +require "logstash/outputs/elasticsearch_java/protocol" describe "elasticsearch node client", :integration => true do # Test ElasticSearch Node Client diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index ea2cd94..7d23bfb 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -51,10 +51,10 @@ } } output { - elasticsearch { + elasticsearch_java { host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" + port => "#{get_port('transport')}" + protocol => "transport" index => "#{index}" flush_size => #{flush_size} } @@ -76,10 +76,10 @@ } } output { - elasticsearch { + elasticsearch_java { host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" + port => "#{get_port('transport')}" + protocol => "transport" index => "#{index}" flush_size => #{flush_size} } diff --git a/spec/integration/outputs/retry_spec.rb b/spec/integration/outputs/retry_spec.rb index 0526ed0..aea7329 100644 --- a/spec/integration/outputs/retry_spec.rb +++ b/spec/integration/outputs/retry_spec.rb @@ -1,4 +1,5 @@ -require "logstash/outputs/elasticsearch" +require "elasticsearch" +require "logstash/outputs/elasticsearch_java" require_relative "../../../spec/es_spec_helper" describe "failures in bulk class expected behavior", :integration => true do @@ -11,146 +12,137 @@ let(:max_retries) { 3 } def mock_actions_with_response(*resp) - LogStash::Outputs::Elasticsearch::Protocols::HTTPClient - .any_instance.stub(:bulk).and_return(*resp) - LogStash::Outputs::Elasticsearch::Protocols::NodeClient - .any_instance.stub(:bulk).and_return(*resp) + allow_any_instance_of(LogStash::Outputs::Elasticsearch::Protocols::NodeClient).to receive(:bulk).and_return(*resp) end - ["transport", "http"].each do |protocol| - context "with protocol => #{protocol}" do - subject! do - settings = { - "manage_template" => true, - "index" => "logstash-2014.11.17", - "template_overwrite" => true, - "protocol" => protocol, - "host" => get_host(), - "port" => get_port(protocol), - "retry_max_items" => 10, - "retry_max_interval" => 1, - "max_retries" => max_retries - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - before :each do - # Delete all templates first. - require "elasticsearch" + subject! do + settings = { + "manage_template" => true, + "index" => "logstash-2014.11.17", + "template_overwrite" => true, + "protocol" => 'transport', + "host" => get_host(), + "port" => get_port('transport'), + "retry_max_items" => 10, + "retry_max_interval" => 1, + "max_retries" => max_retries + } + next LogStash::Outputs::ElasticSearchJava.new(settings) + end - # Clean ES of data before we start. - @es = get_client - @es.indices.delete_template(:name => "*") - @es.indices.delete(:index => "*") - @es.indices.refresh - end + before :each do + # Clean ES of data before we start. + @es = get_client + @es.indices.delete_template(:name => "*") + @es.indices.delete(:index => "*") + @es.indices.refresh + end - it "should return no errors if all bulk actions are successful" do - mock_actions_with_response({"errors" => false}) - expect(subject).to receive(:submit).with([action1, action2]).once.and_call_original - subject.register - subject.receive(event1) - subject.receive(event2) - subject.buffer_flush(:final => true) - sleep(2) - end + it "should return no errors if all bulk actions are successful" do + mock_actions_with_response({"errors" => false}) + expect(subject).to receive(:submit).with([action1, action2]).once.and_call_original + subject.register + subject.receive(event1) + subject.receive(event2) + subject.buffer_flush(:final => true) + sleep(2) + end - it "should raise exception and be retried by stud::buffer" do - call_count = 0 - expect(subject).to receive(:submit).with([action1]).exactly(3).times do - if (call_count += 1) <= 2 - raise "error first two times" - else - {"errors" => false} - end - end - subject.register - subject.receive(event1) - subject.teardown + it "should raise exception and be retried by stud::buffer" do + call_count = 0 + expect(subject).to receive(:submit).with([action1]).exactly(3).times do + if (call_count += 1) <= 2 + raise "error first two times" + else + {"errors" => false} end + end + subject.register + subject.receive(event1) + subject.teardown + end - it "should retry actions with response status of 503" do - mock_actions_with_response({"errors" => true, "statuses" => [200, 200, 503, 503]}, - {"errors" => true, "statuses" => [200, 503]}, - {"errors" => false}) - expect(subject).to receive(:submit).with([action1, action1, action1, action2]).ordered.once.and_call_original - expect(subject).to receive(:submit).with([action1, action2]).ordered.once.and_call_original - expect(subject).to receive(:submit).with([action2]).ordered.once.and_call_original - - subject.register - subject.receive(event1) - subject.receive(event1) - subject.receive(event1) - subject.receive(event2) - subject.buffer_flush(:final => true) - sleep(3) - end + it "should retry actions with response status of 503" do + mock_actions_with_response({"errors" => true, "statuses" => [200, 200, 503, 503]}, + {"errors" => true, "statuses" => [200, 503]}, + {"errors" => false}) + expect(subject).to receive(:submit).with([action1, action1, action1, action2]).ordered.once.and_call_original + expect(subject).to receive(:submit).with([action1, action2]).ordered.once.and_call_original + expect(subject).to receive(:submit).with([action2]).ordered.once.and_call_original + + subject.register + subject.receive(event1) + subject.receive(event1) + subject.receive(event1) + subject.receive(event2) + subject.buffer_flush(:final => true) + sleep(3) + end - it "should retry actions with response status of 429" do - mock_actions_with_response({"errors" => true, "statuses" => [429]}, - {"errors" => false}) - expect(subject).to receive(:submit).with([action1]).twice.and_call_original - subject.register - subject.receive(event1) - subject.buffer_flush(:final => true) - sleep(3) - end + it "should retry actions with response status of 429" do + mock_actions_with_response({"errors" => true, "statuses" => [429]}, + {"errors" => false}) + expect(subject).to receive(:submit).with([action1]).twice.and_call_original + subject.register + subject.receive(event1) + subject.buffer_flush(:final => true) + sleep(3) + end - it "should retry an event until max_retries reached" do - mock_actions_with_response({"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}) - expect(subject).to receive(:submit).with([action1]).exactly(max_retries).times.and_call_original - subject.register - subject.receive(event1) - subject.buffer_flush(:final => true) - sleep(3) - end + it "should retry an event until max_retries reached" do + mock_actions_with_response({"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}) + expect(subject).to receive(:submit).with([action1]).exactly(max_retries).times.and_call_original + subject.register + subject.receive(event1) + subject.buffer_flush(:final => true) + sleep(3) + end - it "non-retryable errors like mapping errors (400) should be dropped and not be retried (unfortunetly)" do - subject.register - subject.receive(invalid_event) - expect(subject).not_to receive(:retry_push) - subject.teardown - - @es.indices.refresh - sleep(5) - Stud::try(10.times) do - r = @es.search - insist { r["hits"]["total"] } == 0 - end - end + it "non-retryable errors like mapping errors (400) should be dropped and not be retried (unfortunetly)" do + subject.register + subject.receive(invalid_event) + expect(subject).not_to receive(:retry_push) + subject.teardown + + @es.indices.refresh + sleep(5) + Stud::try(10.times) do + r = @es.search + insist { r["hits"]["total"] } == 0 + end + end - it "successful requests should not be appended to retry queue" do - subject.register - subject.receive(event1) - expect(subject).not_to receive(:retry_push) - subject.teardown - - @es.indices.refresh - sleep(5) - Stud::try(10.times) do - r = @es.search - insist { r["hits"]["total"] } == 1 - end - end + it "successful requests should not be appended to retry queue" do + subject.register + subject.receive(event1) + expect(subject).not_to receive(:retry_push) + subject.teardown + + @es.indices.refresh + sleep(5) + Stud::try(10.times) do + r = @es.search + insist { r["hits"]["total"] } == 1 + end + end - it "should only index proper events" do - subject.register - subject.receive(invalid_event) - subject.receive(event1) - subject.teardown - - @es.indices.refresh - sleep(5) - Stud::try(10.times) do - r = @es.search - insist { r["hits"]["total"] } == 1 - end - end + it "should only index proper events" do + subject.register + subject.receive(invalid_event) + subject.receive(event1) + subject.teardown + @es.indices.refresh + sleep(5) + + Stud::try(10.times) do + r = @es.search + insist { r["hits"]["total"] } == 1 end end end diff --git a/spec/integration/outputs/routing_spec.rb b/spec/integration/outputs/routing_spec.rb index a44131d..ab0ad13 100644 --- a/spec/integration/outputs/routing_spec.rb +++ b/spec/integration/outputs/routing_spec.rb @@ -32,60 +32,6 @@ end end -describe "(http protocol) index events with static routing", :integration => true do - it_behaves_like 'a routing indexer' do - let(:routing) { "test" } - let(:config) { - <<-CONFIG - input { - generator { - message => "hello world" - count => #{event_count} - type => "#{type}" - } - } - output { - elasticsearch { - host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" - index => "#{index}" - flush_size => #{flush_size} - routing => "#{routing}" - } - } - CONFIG - } - end -end - -describe "(http_protocol) index events with fieldref in routing value", :integration => true do - it_behaves_like 'a routing indexer' do - let(:routing) { "test" } - let(:config) { - <<-CONFIG - input { - generator { - message => "#{routing}" - count => #{event_count} - type => "#{type}" - } - } - output { - elasticsearch { - host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" - index => "#{index}" - flush_size => #{flush_size} - routing => "%{message}" - } - } - CONFIG - } - end -end - describe "(transport protocol) index events with fieldref in routing value", :integration => true do it_behaves_like 'a routing indexer' do let(:routing) { "test" } @@ -99,7 +45,7 @@ } } output { - elasticsearch { + elasticsearch_java { host => "#{get_host()}" port => "#{get_port('transport')}" protocol => "transport" diff --git a/spec/integration/outputs/secure_spec.rb b/spec/integration/outputs/secure_spec.rb index 224f2d2..eabe536 100644 --- a/spec/integration/outputs/secure_spec.rb +++ b/spec/integration/outputs/secure_spec.rb @@ -2,7 +2,7 @@ describe "send messages to ElasticSearch using HTTPS", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "node_name" => "logstash", @@ -16,7 +16,7 @@ #"truststore" => "/tmp/ca/truststore.jks", #"truststore_password" => "testeteste" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do @@ -33,7 +33,7 @@ describe "connect using HTTP Authentication", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "cluster" => "elasticsearch", @@ -41,7 +41,7 @@ "user" => "user", "password" => "changeme", } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do @@ -58,7 +58,7 @@ describe "send messages to ElasticSearch using HTTPS", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "node_name" => "logstash", @@ -72,7 +72,7 @@ #"truststore" => "/tmp/ca/truststore.jks", #"truststore_password" => "testeteste" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do @@ -89,7 +89,7 @@ describe "connect using HTTP Authentication", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "cluster" => "elasticsearch", @@ -97,7 +97,7 @@ "user" => "user", "password" => "changeme", } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do diff --git a/spec/integration/outputs/templates_spec.rb b/spec/integration/outputs/templates_spec.rb index f4f070b..17182a7 100644 --- a/spec/integration/outputs/templates_spec.rb +++ b/spec/integration/outputs/templates_spec.rb @@ -1,19 +1,19 @@ require_relative "../../../spec/es_spec_helper" describe "index template expected behavior", :integration => true do - ["transport", "http"].each do |protocol| + ["transport"].each do |protocol| context "with protocol => #{protocol}" do subject! do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "manage_template" => true, "template_overwrite" => true, "protocol" => protocol, "host" => "#{get_host()}", - "port" => "#{get_port(protocol)}" + "port" => "#{get_port('transport')}" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do diff --git a/spec/integration/outputs/transport_create_spec.rb b/spec/integration/outputs/transport_create_spec.rb index f617db2..cfa6b4f 100644 --- a/spec/integration/outputs/transport_create_spec.rb +++ b/spec/integration/outputs/transport_create_spec.rb @@ -1,7 +1,7 @@ require_relative "../../../spec/es_spec_helper" describe "transport client create actions", :integration => true do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" require "elasticsearch" def get_es_output(action, id = nil) @@ -15,7 +15,7 @@ def get_es_output(action, id = nil) "action" => action } settings['document_id'] = id unless id.nil? - LogStash::Outputs::ElasticSearch.new(settings) + LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do diff --git a/spec/integration/outputs/update_spec.rb b/spec/integration/outputs/update_spec.rb new file mode 100644 index 0000000..a58c15e --- /dev/null +++ b/spec/integration/outputs/update_spec.rb @@ -0,0 +1,87 @@ +require_relative "../../../spec/es_spec_helper" + +describe "all protocols update actions", :integration => true do + require "logstash/outputs/elasticsearch_java" + require "elasticsearch" + + def get_es_output( protocol, id = nil, upsert = nil, doc_as_upsert=nil) + settings = { + "manage_template" => true, + "index" => "logstash-update", + "template_overwrite" => true, + "protocol" => protocol, + "host" => get_host(), + "port" => get_port(protocol), + "action" => "update" + } + settings['upsert'] = upsert unless upsert.nil? + settings['document_id'] = id unless id.nil? + settings['doc_as_upsert'] = doc_as_upsert unless doc_as_upsert.nil? + LogStash::Outputs::ElasticSearchJava.new(settings) + end + + before :each do + @es = get_client + # Delete all templates first. + # Clean ES of data before we start. + @es.indices.delete_template(:name => "*") + # This can fail if there are no indexes, ignore failure. + @es.indices.delete(:index => "*") rescue nil + @es.index( + :index => 'logstash-update', + :type => 'logs', + :id => "123", + :body => { :message => 'Test' } + ) + @es.indices.refresh + end + + ["node", "transport"].each do |protocol| + context "update only with #{protocol} protocol" do + it "should failed without a document_id" do + event = LogStash::Event.new("somevalue" => 100, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) + action = ["update", {:_id=>nil, :_index=>"logstash-2014.11.17", :_type=>"logs"}, event] + subject = get_es_output(protocol) + subject.register + expect { subject.flush([action]) }.to raise_error + end + + it "should not create new document" do + subject = get_es_output(protocol, "456") + subject.register + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + expect {@es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + end + + it "should update existing document" do + subject = get_es_output(protocol, "123") + subject.register + subject.receive(LogStash::Event.new("message" => "updated message here")) + subject.buffer_flush(:final => true) + r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "123", :refresh => true) + insist { r["_source"]["message"] } == 'updated message here' + end + end + + context "upsert with #{protocol} protocol" do + it "should create new documents with upsert content" do + subject = get_es_output(protocol, "456", '{"message": "upsert message"}') + subject.register + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true) + insist { r["_source"]["message"] } == 'upsert message' + end + + it "should create new documents with event/doc as upsert" do + subject = get_es_output(protocol, "456", nil, true) + subject.register + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true) + insist { r["_source"]["message"] } == 'sample message here' + end + end + end +end diff --git a/spec/unit/outputs/elasticsearch/protocol_spec.rb b/spec/unit/outputs/elasticsearch/protocol_spec.rb index 893eb64..08cf19c 100644 --- a/spec/unit/outputs/elasticsearch/protocol_spec.rb +++ b/spec/unit/outputs/elasticsearch/protocol_spec.rb @@ -1,15 +1,17 @@ require "logstash/devutils/rspec/spec_helper" -require "logstash/outputs/elasticsearch/protocol" +require "logstash/outputs/elasticsearch_java/protocol" require "java" describe LogStash::Outputs::Elasticsearch::Protocols::NodeClient do context "successful" do it "should map correctly" do index_response = org.elasticsearch.action.index.IndexResponse.new("my_index", "my_type", "my_id", 123, true) + update_response = org.elasticsearch.action.update.UpdateResponse.new("my_index", "my_type", "my_id", 123, false) delete_response = org.elasticsearch.action.delete.DeleteResponse.new("my_index", "my_type", "my_id", 123, true) bulk_item_response_index = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "index", index_response) + bulk_item_response_update = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "update", update_response) bulk_item_response_delete = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "delete", delete_response) - bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_delete], 0) + bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_update, bulk_item_response_delete], 0) ret = LogStash::Outputs::Elasticsearch::Protocols::NodeClient.normalize_bulk_response(bulk_response) insist { ret } == {"errors" => false} end @@ -19,33 +21,11 @@ it "should map correctly" do failure = org.elasticsearch.action.bulk.BulkItemResponse::Failure.new("my_index", "my_type", "my_id", "error message", org.elasticsearch.rest.RestStatus::BAD_REQUEST) bulk_item_response_index = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "index", failure) + bulk_item_response_update = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "update", failure) bulk_item_response_delete = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "delete", failure) - bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_delete], 0) + bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_update, bulk_item_response_delete], 0) actual = LogStash::Outputs::Elasticsearch::Protocols::NodeClient.normalize_bulk_response(bulk_response) - insist { actual } == {"errors" => true, "statuses" => [400, 400]} + insist { actual } == {"errors" => true, "statuses" => [400, 400, 400]} end end -end - -describe LogStash::Outputs::Elasticsearch::Protocols::HTTPClient do - context "successful" do - it "should map correctly" do - bulk_response = {"took"=>74, "errors"=>false, "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", - "_type"=>"logs", "_id"=>"AUxTS2C55Jrgi-hC6rQF", - "_version"=>1, "status"=>201}}]} - actual = LogStash::Outputs::Elasticsearch::Protocols::HTTPClient.normalize_bulk_response(bulk_response) - insist { actual } == {"errors"=> false} - end - end - - context "contains failures" do - it "should map correctly" do - bulk_response = {"took"=>71, "errors"=>true, - "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", - "_type"=>"logs", "_id"=>"AUxTQ_OI5Jrgi-hC6rQB", "status"=>400, - "error"=>"MapperParsingException[failed to parse]..."}}]} - actual = LogStash::Outputs::Elasticsearch::Protocols::HTTPClient.normalize_bulk_response(bulk_response) - insist { actual } == {"errors"=> true, "statuses"=> [400]} - end - end -end +end \ No newline at end of file diff --git a/spec/unit/outputs/elasticsearch_proxy_spec.rb b/spec/unit/outputs/elasticsearch_proxy_spec.rb deleted file mode 100644 index 8ebb582..0000000 --- a/spec/unit/outputs/elasticsearch_proxy_spec.rb +++ /dev/null @@ -1,59 +0,0 @@ -require_relative "../../../spec/es_spec_helper" -require 'stud/temporary' -require 'elasticsearch' -require "logstash/outputs/elasticsearch" - -describe "Proxy option" do - let(:settings) { - { - "protocol" => "http", - "host" => "node01", - "proxy" => proxy - } - } - subject { - LogStash::Outputs::ElasticSearch.new(settings) - } - - before do - allow(::Elasticsearch::Client).to receive(:new).with(any_args) - end - - describe "valid configs" do - before do - subject.register - end - - context "when specified as a string" do - let(:proxy) { "http://127.0.0.1:1234" } - - it "should set the proxy to the exact value" do - expect(::Elasticsearch::Client).to have_received(:new) do |options| - expect(options[:transport_options][:proxy]).to eql(proxy) - end - end - end - - context "when specified as a hash" do - let(:proxy) { {"host" => "127.0.0.1", "protocol" => "http"} } - - it "should pass through the proxy values as symbols" do - expected = {:host => proxy["host"], :protocol => proxy["protocol"]} - expect(::Elasticsearch::Client).to have_received(:new) do |options| - expect(options[:transport_options][:proxy]).to eql(expected) - end - end - end - end - - describe "invalid configs" do - let(:proxy) { ["bad", "stuff"] } - - it "should have raised an exception" do - expect { - subject.register - }.to raise_error(LogStash::ConfigurationError) - end - end - -end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 99785dc..4ea9f25 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -1,101 +1,23 @@ require_relative "../../../spec/es_spec_helper" -describe "outputs/elasticsearch" do +describe "outputs/elasticsearch_java" do context "registration" do it "should register" do - output = LogStash::Plugin.lookup("output", "elasticsearch").new("embedded" => "false", "protocol" => "transport", "manage_template" => "false") + output = LogStash::Plugin.lookup("output", "elasticsearch_java").new("embedded" => "false", "protocol" => "transport", "manage_template" => "false") # register will try to load jars and raise if it cannot find jars expect {output.register}.to_not raise_error end - - it "should fail to register when protocol => http, action => create_unless_exists" do - output = LogStash::Plugin.lookup("output", "elasticsearch").new("protocol" => "http", "action" => "create_unless_exists") - expect {output.register}.to raise_error - end - end - - describe "Authentication option" do - ["node", "transport"].each do |protocol| - context "with protocol => #{protocol}" do - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "node_name" => "logstash", - "cluster" => "elasticsearch", - "host" => "node01", - "user" => "test", - "password" => "test" - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - it "should fail in register" do - expect {subject.register}.to raise_error - end - end - end - end - - describe "http client create" do - require "logstash/outputs/elasticsearch" - require "elasticsearch" - - let(:options) { - { - "protocol" => "http", - "index" => "my-index", - "host" => "localhost", - "path" => "some-path" - } - } - - let(:eso) {LogStash::Outputs::ElasticSearch.new(options)} - - let(:manticore_host) { - eso.client.first.send(:client).transport.options[:host].first - } - - around(:each) do |block| - thread = eso.register - block.call() - thread.kill() - end - - describe "with path" do - it "should properly create a URI with the path" do - expect(eso.path).to eql(options["path"]) - end - - - it "should properly set the path on the HTTP client adding slashes" do - expect(manticore_host).to include("/" + options["path"] + "/") - end - - context "with extra slashes" do - let(:path) { "/slashed-path/ "} - let(:eso) { - LogStash::Outputs::ElasticSearch.new(options.merge("path" => "/some-path/")) - } - - it "should properly set the path on the HTTP client without adding slashes" do - expect(manticore_host).to include(options["path"]) - end - end - - - end end describe "transport protocol" do context "host not configured" do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "transport", "node_name" => "mynode" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end it "should set host to localhost" do @@ -114,13 +36,13 @@ context "sniffing => true" do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "host" => "node01", "protocol" => "transport", "sniffing" => true } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end it "should set the sniffing property to true" do @@ -135,13 +57,13 @@ context "sniffing => false" do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "host" => "node01", "protocol" => "transport", "sniffing" => false } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end it "should set the sniffing property to true" do diff --git a/spec/unit/outputs/elasticsearch_ssl_spec.rb b/spec/unit/outputs/elasticsearch_ssl_spec.rb deleted file mode 100644 index cb000ff..0000000 --- a/spec/unit/outputs/elasticsearch_ssl_spec.rb +++ /dev/null @@ -1,82 +0,0 @@ -require_relative "../../../spec/es_spec_helper" -require 'stud/temporary' - -describe "SSL option" do - ["node", "transport"].each do |protocol| - context "with protocol => #{protocol}" do - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "node_name" => "logstash", - "cluster" => "elasticsearch", - "host" => "node01", - "ssl" => true - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - it "should fail in register" do - expect {subject.register}.to raise_error - end - end - end - - context "when using http protocol" do - protocol = "http" - context "when using ssl without cert verification" do - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "host" => "node01", - "ssl" => true, - "ssl_certificate_verification" => false - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - it "should pass the flag to the ES client" do - expect(::Elasticsearch::Client).to receive(:new) do |args| - expect(args[:ssl]).to eq(:verify => false) - end - subject.register - end - - it "print a warning" do - expect(subject.logger).to receive(:warn) - subject.register - end - end - - context "when using ssl with client certificates" do - - let(:keystore_path) { Stud::Temporary.file.path } - - after :each do - File.delete(keystore_path) - end - - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "host" => "node01", - "ssl" => true, - "keystore" => keystore_path, - "keystore_password" => "test" - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - - it "should pass the keystore parameters to the ES client" do - expect(::Elasticsearch::Client).to receive(:new) do |args| - expect(args[:ssl]).to include(:keystore => keystore_path, :keystore_password => "test") - end - subject.register - end - - end - end -end