diff --git a/CHANGELOG.md b/CHANGELOG.md index 257b139..23c39e3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,24 +1,2 @@ -## 1.0.4 - - Update to Elasticsearch 1.7 - -## 1.0.3 - - Add HTTP proxy support - -## 1.0.2 - - Upgrade Manticore HTTP Client - -## 1.0.1 - - Allow client certificates - -## 0.2.9 - - Add 'path' parameter for ES HTTP hosts behind a proxy on a subpath - -## 0.2.8 (June 12, 2015) - - Add option to enable and disable SSL certificate verification during handshake (#160) - - Doc improvements for clarifying round robin behavior using hosts config - -## 0.2.7 (May 28, 2015) - - Bump es-ruby version to 1.0.10 - -## 0.2.6 (May 28, 2015) - - Disable timeouts when using http protocol which would cause bulk requests to fail (#103) +## 2.0.0 + - Initial Release. Only supports Node/Transport in ES2.x \ No newline at end of file diff --git a/README.md b/README.md index 37bd4a8..d1e030a 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,15 @@ # Logstash Plugin -This is a plugin for [Logstash](https://github.com/elasticsearch/logstash). +This is a plugin for [Logstash](https://github.com/elastic/logstash). It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way. ## Documentation -Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elasticsearch.org/guide/en/logstash/current/). +Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/). - For formatting code or config example, you can use the asciidoc `[source,ruby]` directive -- For more asciidoc formatting tips, see the excellent reference here https://github.com/elasticsearch/docs#asciidoc-guide +- For more asciidoc formatting tips, see the excellent reference here https://github.com/elastic/docs#asciidoc-guide ## Need Help? @@ -95,4 +95,4 @@ Programming is not a required skill. Whatever you've seen about open source and It is more important to the community that you are able to contribute. -For more information about contributing, see the [CONTRIBUTING](https://github.com/elasticsearch/logstash/blob/master/CONTRIBUTING.md) file. +For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file. diff --git a/lib/logstash-output-elasticsearch_jars.rb b/lib/logstash-output-elasticsearch_java_jars.rb similarity index 100% rename from lib/logstash-output-elasticsearch_jars.rb rename to lib/logstash-output-elasticsearch_java_jars.rb diff --git a/lib/logstash/outputs/elasticsearch/protocol.rb b/lib/logstash/outputs/elasticsearch/protocol.rb deleted file mode 100644 index 4b2d6ba..0000000 --- a/lib/logstash/outputs/elasticsearch/protocol.rb +++ /dev/null @@ -1,307 +0,0 @@ -require "logstash/outputs/elasticsearch" -require "cabin" -require "base64" - -module LogStash::Outputs::Elasticsearch - module Protocols - class Base - private - def initialize(options={}) - # host(s), port, cluster - @logger = Cabin::Channel.get - end - - def client - return @client if @client - @client = build_client(@options) - return @client - end # def client - - - def template_install(name, template, force=false) - if template_exists?(name) && !force - @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) - return - end - template_put(name, template) - end - - # Do a bulk request with the given actions. - # - # 'actions' is expected to be an array of bulk requests as string json - # values. - # - # Each 'action' becomes a single line in the bulk api call. For more - # details on the format of each. - def bulk(actions) - raise NotImplemented, "You must implement this yourself" - # bulk([ - # '{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }', - # '{ "field1" : "value1" }' - #]) - end - - public(:initialize, :template_install) - end - - class HTTPClient < Base - private - - DEFAULT_OPTIONS = { - :port => 9200 - } - - def initialize(options={}) - super - require "elasticsearch" # gem 'elasticsearch-ruby' - # manticore http transport - require "elasticsearch/transport/transport/http/manticore" - @options = DEFAULT_OPTIONS.merge(options) - @client = client - end - - def build_client(options) - uri = "#{options[:protocol]}://#{options[:host]}:#{options[:port]}#{options[:client_settings][:path]}" - - client_options = { - :host => [uri], - :ssl => options[:client_settings][:ssl], - :transport_options => { # manticore settings so we - :socket_timeout => 0, # do not timeout socket reads - :request_timeout => 0, # and requests - :proxy => options[:client_settings][:proxy] - }, - :transport_class => ::Elasticsearch::Transport::Transport::HTTP::Manticore - } - - if options[:user] && options[:password] then - token = Base64.strict_encode64(options[:user] + ":" + options[:password]) - client_options[:headers] = { "Authorization" => "Basic #{token}" } - end - - Elasticsearch::Client.new client_options - end - - def self.normalize_bulk_response(bulk_response) - if bulk_response["errors"] - # The structure of the response from the REST Bulk API is follows: - # {"took"=>74, "errors"=>true, "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", - # "_type"=>"logs", - # "_id"=>"AUxTS2C55Jrgi-hC6rQF", - # "_version"=>1, - # "status"=>400, - # "error"=>"MapperParsingException[failed to parse]..."}}]} - # where each `item` is a hash of {OPTYPE => Hash[]}. calling first, will retrieve - # this hash as a single array with two elements, where the value is the second element (i.first[1]) - # then the status of that item is retrieved. - {"errors" => true, "statuses" => bulk_response["items"].map { |i| i.first[1]['status'] }} - else - {"errors" => false} - end - end - - def bulk(actions) - bulk_response = @client.bulk(:body => actions.collect do |action, args, source| - if source - next [ { action => args }, source ] - else - next { action => args } - end - end.flatten) - - self.class.normalize_bulk_response(bulk_response) - end # def bulk - - def template_exists?(name) - @client.indices.get_template(:name => name) - return true - rescue Elasticsearch::Transport::Transport::Errors::NotFound - return false - end # def template_exists? - - def template_put(name, template) - @client.indices.put_template(:name => name, :body => template) - end # template_put - - public(:bulk) - end # class HTTPClient - - class NodeClient < Base - private - - DEFAULT_OPTIONS = { - :port => 9300, - } - - def initialize(options={}) - super - require "java" - @options = DEFAULT_OPTIONS.merge(options) - setup(@options) - @client = client - end # def initialize - - def settings - return @settings - end - - def setup(options={}) - @settings = org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder - if options[:host] - @settings.put("discovery.zen.ping.multicast.enabled", false) - @settings.put("discovery.zen.ping.unicast.hosts", NodeClient.hosts(options)) - end - - @settings.put("node.client", true) - @settings.put("http.enabled", false) - - if options[:client_settings] - options[:client_settings].each do |key, value| - @settings.put(key, value) - end - end - - return @settings - end - - def self.hosts(options) - # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ - result = Array.new - if options[:host].class == Array - options[:host].each do |host| - if host.to_s =~ /^.+:.+$/ - # For host in format: host:port, ignore options[:port] - result << host - else - if options[:port].to_s =~ /^\d+-\d+$/ - # port ranges are 'host[port1-port2]' - result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" } - else - result << "#{host}:#{options[:port]}" - end - end - end - else - if options[:host].to_s =~ /^.+:.+$/ - # For host in format: host:port, ignore options[:port] - result << options[:host] - else - if options[:port].to_s =~ /^\d+-\d+$/ - # port ranges are 'host[port1-port2]' according to - # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ - # However, it seems to only query the first port. - # So generate our own list of unicast hosts to scan. - range = Range.new(*options[:port].split("-")) - result << range.collect { |p| "#{options[:host]}:#{p}" } - else - result << "#{options[:host]}:#{options[:port]}" - end - end - end - result.flatten.join(",") - end # def self.hosts - - def build_client(options) - nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder - return nodebuilder.settings(@settings).node.client - end # def build_client - - def self.normalize_bulk_response(bulk_response) - # TODO(talevy): parse item response objects to retrieve correct 200 (OK) or 201(created) status codes - if bulk_response.has_failures() - {"errors" => true, - "statuses" => bulk_response.map { |i| (i.is_failed && i.get_failure.get_status.get_status) || 200 }} - else - {"errors" => false} - end - end - - def bulk(actions) - # Actions an array of [ action, action_metadata, source ] - prep = @client.prepareBulk - actions.each do |action, args, source| - prep.add(build_request(action, args, source)) - end - response = prep.execute.actionGet() - - self.class.normalize_bulk_response(response) - end # def bulk - - def build_request(action, args, source) - case action - when "index" - request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) - request.id(args[:_id]) if args[:_id] - request.routing(args[:_routing]) if args[:_routing] - request.source(source) - when "delete" - request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index]) - request.id(args[:_id]) - request.routing(args[:_routing]) if args[:_routing] - when "create" - request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) - request.id(args[:_id]) if args[:_id] - request.routing(args[:_routing]) if args[:_routing] - request.source(source) - request.opType("create") - when "create_unless_exists" - unless args[:_id].nil? - request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) - request.id(args[:_id]) - request.routing(args[:_routing]) if args[:_routing] - request.source(source) - request.opType("create") - else - raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.") - end - else - raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.") - #when "update" - end # case action - - request.type(args[:_type]) if args[:_type] - return request - end # def build_request - - def template_exists?(name) - request = org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequestBuilder.new(@client.admin.indices, name) - response = request.get - return !response.getIndexTemplates.isEmpty - end # def template_exists? - - def template_put(name, template) - request = org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequestBuilder.new(@client.admin.indices, name) - request.setSource(LogStash::Json.dump(template)) - - # execute the request and get the response, if it fails, we'll get an exception. - request.get - end # template_put - - public(:initialize, :bulk) - end # class NodeClient - - class TransportClient < NodeClient - private - def build_client(options) - client = org.elasticsearch.client.transport.TransportClient.new(settings.build) - - if options[:host] - client.addTransportAddress( - org.elasticsearch.common.transport.InetSocketTransportAddress.new( - options[:host], options[:port].to_i - ) - ) - end - - return client - end # def build_client - end # class TransportClient - end # module Protocols - - module Requests - class GetIndexTemplates; end - class Bulk; end - class Index; end - class Delete; end - end -end diff --git a/lib/logstash/outputs/elasticsearch.rb b/lib/logstash/outputs/elasticsearch_java.rb similarity index 63% rename from lib/logstash/outputs/elasticsearch.rb rename to lib/logstash/outputs/elasticsearch_java.rb index a6ff16f..d70b6dd 100644 --- a/lib/logstash/outputs/elasticsearch.rb +++ b/lib/logstash/outputs/elasticsearch_java.rb @@ -8,11 +8,14 @@ require "socket" # for Socket.gethostname require "thread" # for safe queueing require "uri" # for escaping user input -require 'logstash-output-elasticsearch_jars.rb' - -# This output lets you store logs in Elasticsearch and is the most recommended -# output for Logstash. If you plan on using the Kibana web interface, you'll -# need to use this output. +require "logstash/outputs/elasticsearch_java/protocol" + +# This output lets you store logs in Elasticsearch using the native 'node' and 'transport' +# protocols. It is highly recommended to use the regular 'logstash-output-elasticsearch' output +# which uses HTTP instead. This output is, in-fact, sometimes slower, and never faster than that one. +# Additionally, upgrading your Elasticsearch cluster may require you to simultaneously update this +# plugin for any protocol level changes. The HTTP client may be easier to work with due to wider +# familiarity with HTTP. # # *VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch 1.0.0 or later. # @@ -61,14 +64,14 @@ # - Events from the retry queue are submitted again either when the queue reaches its max size or when # the max interval time is reached, which is set in :retry_max_interval. # - Events which are not retryable or have reached their max retry count are logged to stderr. -class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base +class LogStash::Outputs::ElasticSearchJava < LogStash::Outputs::Base attr_reader :client include Stud::Buffer - RETRYABLE_CODES = [429, 503] + RETRYABLE_CODES = [409, 429, 503] SUCCESS_CODES = [200, 201] - config_name "elasticsearch" + config_name "elasticsearch_java" # The index to write events to. This can be dynamic using the `%{foo}` syntax. # The default value will partition your indices by day so you can more easily @@ -128,6 +131,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # The name of your cluster if you set it on the Elasticsearch side. Useful # for discovery when using `node` or `transport` protocols. # By default, it looks for a cluster named 'elasticsearch'. + # Equivalent to the Elasticsearch option 'cluster.name' config :cluster, :validate => :string # For the `node` protocol, if you do not specify `host`, it will attempt to use @@ -162,37 +166,23 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # `["127.0.0.1:9200","127.0.0.2:9200"]` # It is important to exclude http://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html[dedicated master nodes] from the `host` list # to prevent LS from sending bulk requests to the master nodes. So this parameter should only reference either data or client nodes. - - config :host, :validate => :array + config :hosts, :validate => :array, :default => ["127.0.0.1"] # The port for Elasticsearch transport to use. # # If you do not set this, the following defaults are used: - # * `protocol => http` - port 9200 # * `protocol => transport` - port 9300-9305 # * `protocol => node` - port 9300-9305 - config :port, :validate => :string + config :port, :validate => :string, :default => "9300-9305" - # The name/address of the host to bind to for Elasticsearch clustering - config :bind_host, :validate => :string + # The name/address of the host to bind to for Elasticsearch clustering. Equivalent to the Elasticsearch option 'network.host' + # option. + # This MUST be set for either protocol to work (node or transport)! The internal Elasticsearch node + # will bind to this ip. This ip MUST be reachable by all nodes in the Elasticsearch cluster + config :network_host, :validate => :string, :required => true - # This is only valid for the 'node' protocol. - # - # The port for the node to listen on. - config :bind_port, :validate => :number - - # Run the Elasticsearch server embedded in this process. - # This option is useful if you want to run a single Logstash process that - # handles log processing and indexing; it saves you from needing to run - # a separate Elasticsearch process. An example use case is - # proof-of-concept testing. - # WARNING: This is not recommended for production use! - config :embedded, :validate => :boolean, :default => false - - # If you are running the embedded Elasticsearch server, you can set the http - # port it listens on here; it is not common to need this setting changed from - # default. - config :embedded_http_port, :validate => :string, :default => "9200-9300" + # This sets the local port to bind to. Equivalent to the Elasticsrearch option 'transport.tcp.port' + config :transport_tcp_port, :validate => :number # This setting no longer does anything. It exists to keep config validation # from failing. It will be removed in future versions. @@ -208,7 +198,7 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # events before flushing that out to Elasticsearch. This setting # controls how many events will be buffered before sending a batch # of events. - config :flush_size, :validate => :number, :default => 5000 + config :flush_size, :validate => :number, :default => 500 # The amount of time since last flush before a flush is forced. # @@ -238,14 +228,8 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # in situations where you cannot permit connections outbound from the # Elasticsearch cluster to this Logstash server. # - # The 'http' protocol will use the Elasticsearch REST/HTTP interface to talk - # to elasticsearch. - # # All protocols will use bulk requests when talking to Elasticsearch. - # - # The default `protocol` setting under java/jruby is "node". The default - # `protocol` on non-java rubies is "http" - config :protocol, :validate => [ "node", "transport", "http" ] + config :protocol, :validate => [ "node", "transport"], :default => "transport" # The Elasticsearch action to perform. Valid actions are: `index`, `delete`. # @@ -257,25 +241,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # - index: indexes a document (an event from Logstash). # - delete: deletes a document by id # - create: indexes a document, fails if a document by that id already exists in the index. + # - update: updates a document by id # following action is not supported by HTTP protocol # - create_unless_exists: creates a document, fails if no id is provided # # For more details on actions, check out the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation] config :action, :validate => :string, :default => "index" - # Username and password (only valid when protocol is HTTP; this setting works with HTTP or HTTPS auth) - config :user, :validate => :string - config :password, :validate => :password - - # HTTP Path at which the Elasticsearch server lives. Use this if you must run ES behind a proxy that remaps - # the root path for the Elasticsearch HTTP API lives. This option is ignored for non-HTTP transports. - config :path, :validate => :string, :default => "/" - - # SSL Configurations (only valid when protocol is HTTP) - # - # Enable SSL - config :ssl, :validate => :boolean, :default => false - # Validate the server's certificate # Disabling this severely compromises security # For more information read https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf @@ -298,8 +270,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # Set the truststore password config :keystore_password, :validate => :password - # Enable cluster sniffing (transport only) + # Enable cluster sniffing (transport only). # Asks host for the list of all cluster nodes and adds them to the hosts list + # Equivalent to the Elasticsearch option 'client.transport.sniff' config :sniffing, :validate => :boolean, :default => false # Set max retry for each event @@ -311,11 +284,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base # Set max interval between bulk retries config :retry_max_interval, :validate => :number, :default => 5 - # Set the address of a forward HTTP proxy. Must be used with the 'http' protocol - # Can be either a string, such as 'http://localhost:123' or a hash in the form - # {host: 'proxy.org' port: 80 scheme: 'http'} - # Note, this is NOT a SOCKS proxy, but a plain HTTP proxy - config :proxy + # Enable doc_as_upsert for update mode + # create a new document with source if document_id doesn't exists + config :doc_as_upsert, :validate => :boolean, :default => false + + # Set upsert content for update mode + # create a new document with this parameter as json string if document_id doesn't exists + config :upsert, :validate => :string, :default => "" public def register @@ -328,122 +303,70 @@ def register @retry_queue_not_full = ConditionVariable.new @retry_queue = Queue.new - client_settings = {} - - - if @protocol.nil? - @protocol = LogStash::Environment.jruby? ? "node" : "http" - end - - if @protocol == "http" - if @action == "create_unless_exists" - raise(LogStash::ConfigurationError, "action => 'create_unless_exists' is not supported under the HTTP protocol"); - end - client_settings[:path] = "/#{@path}/".gsub(/\/+/, "/") # Normalize slashes - @logger.debug? && @logger.debug("Normalizing http path", :path => @path, :normalized => client_settings[:path]) + if @protocol =='node' && !@network_host + raise LogStash::ConfigurationError, "network_host MUST be set if the 'node' protocol is in use! If this is set incorrectly Logstash will hang attempting to connect!" end - if ["node", "transport"].include?(@protocol) - # Node or TransportClient; requires JRuby - raise(LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }") unless LogStash::Environment.jruby? - - client_settings["cluster.name"] = @cluster if @cluster - client_settings["network.host"] = @bind_host if @bind_host - client_settings["transport.tcp.port"] = @bind_port if @bind_port - client_settings["client.transport.sniff"] = @sniffing - - if @node_name - client_settings["node.name"] = @node_name - else - client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" - end + client_settings = {} + client_settings["cluster.name"] = @cluster if @cluster + client_settings["network.host"] = @network_host if @network_host + client_settings["transport.tcp.port"] = @transport_tcp_port if @transport_tcp_port + client_settings["client.transport.sniff"] = @sniffing - @@plugins.each do |plugin| - name = plugin.name.split('-')[-1] - client_settings.merge!(LogStash::Outputs::ElasticSearch.const_get(name.capitalize).create_client_config(self)) - end + if @node_name + client_settings["node.name"] = @node_name + else + client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}" end - require "logstash/outputs/elasticsearch/protocol" - - if @port.nil? - @port = case @protocol - when "http"; "9200" - when "transport", "node"; "9300-9305" - end + @@plugins.each do |plugin| + name = plugin.name.split('-')[-1] + client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self)) end - if @host.nil? && @protocol != "node" # node can use zen discovery - @logger.info("No 'host' set in elasticsearch output. Defaulting to localhost") - @host = ["localhost"] + if (@hosts.nil? || @hosts.empty?) && @protocol != "node" # node can use zen discovery + @logger.info("No 'hosts' set in elasticsearch output. Defaulting to localhost") + @hosts = ["localhost"] end - client_settings.merge! setup_ssl() - client_settings.merge! setup_proxy() - common_options = { :protocol => @protocol, - :client_settings => client_settings + :client_settings => client_settings, + :hosts => @hosts, + :port => @port } - common_options.merge! setup_basic_auth() + # Update API setup + update_options = { + :upsert => @upsert, + :doc_as_upsert => @doc_as_upsert + } + common_options.merge! update_options if @action == 'update' client_class = case @protocol when "transport" - LogStash::Outputs::Elasticsearch::Protocols::TransportClient + LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient when "node" - LogStash::Outputs::Elasticsearch::Protocols::NodeClient - when /http/ - LogStash::Outputs::Elasticsearch::Protocols::HTTPClient + LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient end - if @embedded - raise(LogStash::ConfigurationError, "The 'embedded => true' setting is only valid for the elasticsearch output under JRuby. You are running #{RUBY_DESCRIPTION}") unless LogStash::Environment.jruby? - @logger.warn("The 'embedded => true' setting is enabled. This is not recommended for production use!!!") -# LogStash::Environment.load_elasticsearch_jars! - - # Default @host with embedded to localhost. This should help avoid - # newbies tripping on ubuntu and other distros that have a default - # firewall that blocks multicast. - @host ||= ["localhost"] - - # Start Elasticsearch local. - start_local_elasticsearch - end - - @client = Array.new - - if protocol == "node" || @host.nil? # if @protocol is "node" or @host is not set - options = { :host => @host, :port => @port }.merge(common_options) - @client = [client_class.new(options)] - else # if @protocol in ["transport","http"] - @client = @host.map do |host| - (_host,_port) = host.split ":" - options = { :host => _host, :port => _port || @port }.merge(common_options) - @logger.info "Create client to elasticsearch server on #{_host}:#{_port}" - client_class.new(options) - end # @host.map - end + @client = client_class.new(common_options) if @manage_template - for client in @client - begin - @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) - client.template_install(@template_name, get_template, @template_overwrite) - break - rescue => e - @logger.error("Failed to install template: #{e.message}") - end - end # for @client loop - end # if @manage_templates + begin + @logger.info("Automatic template management enabled", :manage_template => @manage_template.to_s) + client.template_install(@template_name, get_template, @template_overwrite) + rescue => e + @logger.error("Failed to install template", + :message => e.message, + :error_class => e.class.name, + ) + end + end @logger.info("New Elasticsearch output", :cluster => @cluster, - :host => @host, :port => @port, :embedded => @embedded, - :protocol => @protocol) - - @client_idx = 0 - @current_client = @client[@client_idx] + :hosts => @host, :port => @port, :protocol => @protocol) buffer_initialize( :max_items => @flush_size, @@ -470,7 +393,7 @@ def register public def get_template if @template.nil? - @template = ::File.expand_path('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__)) + @template = ::File.expand_path('elasticsearch_java/elasticsearch-template.json', ::File.dirname(__FILE__)) if !File.exists?(@template) raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')" end @@ -503,22 +426,26 @@ def receive(event) event["type"] || "logs" end - index = event.sprintf(@index) + params = { + :_id => @document_id ? event.sprintf(@document_id) : nil, + :_index => event.sprintf(@index), + :_type => type, + :_routing => @routing ? event.sprintf(@routing) : nil + } + + params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != "" - document_id = @document_id ? event.sprintf(@document_id) : nil - routing = @routing ? event.sprintf(@routing) : nil - buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type, :_routing => routing }, event]) + buffer_receive([event.sprintf(@action), params, event]) end # def receive public - # synchronize the @current_client.bulk call to avoid concurrency/thread safety issues with the - # # client libraries which might not be thread safe. the submit method can be called from both the - # # Stud::Buffer flush thread and from our own retry thread. + # The submit method can be called from both the + # Stud::Buffer flush thread and from our own retry thread. def submit(actions) es_actions = actions.map { |a, doc, event| [a, doc, event.to_hash] } @submit_mutex.lock begin - bulk_response = @current_client.bulk(es_actions) + bulk_response = @client.bulk(es_actions) ensure @submit_mutex.unlock end @@ -546,11 +473,6 @@ def flush(actions, teardown = false) rescue => e @logger.error "Got error to send bulk of actions: #{e.message}" raise e - ensure - unless @protocol == "node" - @logger.debug? and @logger.debug "Shifting current elasticsearch client" - shift_client - end end end # def flush @@ -580,117 +502,6 @@ def teardown retry_flush end - protected - def start_local_elasticsearch - @logger.info("Starting embedded Elasticsearch local node.") - builder = org.elasticsearch.node.NodeBuilder.nodeBuilder - # Disable 'local only' - LOGSTASH-277 - #builder.local(true) - builder.settings.put("cluster.name", @cluster) if @cluster - builder.settings.put("node.name", @node_name) if @node_name - builder.settings.put("network.host", @bind_host) if @bind_host - builder.settings.put("http.port", @embedded_http_port) - - @embedded_elasticsearch = builder.node - @embedded_elasticsearch.start - end # def start_local_elasticsearch - - protected - def shift_client - @client_idx = (@client_idx+1) % @client.length - @current_client = @client[@client_idx] - @logger.debug? and @logger.debug("Switched current elasticsearch client to ##{@client_idx} at #{@host[@client_idx]}") - end - - private - def setup_proxy - return {} unless @proxy - - if @protocol != "http" - raise(LogStash::ConfigurationError, "Proxy is not supported for '#{@protocol}'. Change the protocol to 'http' if you need HTTP proxy.") - end - - # Symbolize keys - proxy = if @proxy.is_a?(Hash) - Hash[@proxy.map {|k,v| [k.to_sym, v]}] - elsif @proxy.is_a?(String) - @proxy - else - raise LogStash::ConfigurationError, "Expected 'proxy' to be a string or hash, not '#{@proxy}''!" - end - - return {:proxy => proxy} - end - - private - def setup_ssl - return {} unless @ssl - if @protocol != "http" - raise(LogStash::ConfigurationError, "SSL is not supported for '#{@protocol}'. Change the protocol to 'http' if you need SSL.") - end - @protocol = "https" - if @cacert && @truststore - raise(LogStash::ConfigurationError, "Use either \"cacert\" or \"truststore\" when configuring the CA certificate") if @truststore - end - ssl_options = {} - if @cacert then - @truststore, ssl_options[:truststore_password] = generate_jks @cacert - elsif @truststore - ssl_options[:truststore_password] = @truststore_password.value if @truststore_password - end - ssl_options[:truststore] = @truststore if @truststore - if @keystore - ssl_options[:keystore] = @keystore - ssl_options[:keystore_password] = @keystore_password.value if @keystore_password - end - if @ssl_certificate_verification == false - @logger.warn [ - "** WARNING ** Detected UNSAFE options in elasticsearch output configuration!", - "** WARNING ** You have enabled encryption but DISABLED certificate verification.", - "** WARNING ** To make sure your data is secure change :ssl_certificate_verification to true" - ].join("\n") - ssl_options[:verify] = false - end - { ssl: ssl_options } - end - - private - def setup_basic_auth - return {} unless @user && @password - - if @protocol =~ /http/ - { - :user => ::URI.escape(@user, "@:"), - :password => ::URI.escape(@password.value, "@:") - } - else - raise(LogStash::ConfigurationError, "User and password parameters are not supported for '#{@protocol}'. Change the protocol to 'http' if you need them.") - end - end - - private - def generate_jks cert_path - - require 'securerandom' - require 'tempfile' - require 'java' - import java.io.FileInputStream - import java.io.FileOutputStream - import java.security.KeyStore - import java.security.cert.CertificateFactory - - jks = java.io.File.createTempFile("cert", ".jks") - - ks = KeyStore.getInstance "JKS" - ks.load nil, nil - cf = CertificateFactory.getInstance "X.509" - cert = cf.generateCertificate FileInputStream.new(cert_path) - ks.setCertificateEntry "cacert", cert - pwd = SecureRandom.urlsafe_base64(9) - ks.store FileOutputStream.new(jks), pwd.to_java.toCharArray - [jks.path, pwd] - end - private # in charge of submitting any actions in @retry_queue that need to be # retried @@ -728,11 +539,11 @@ def retry_push(actions) } end - @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ } + @@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch_java-/ } @@plugins.each do |plugin| name = plugin.name.split('-')[-1] - require "logstash/outputs/elasticsearch/#{name}" + require "logstash/outputs/elasticsearch_java/#{name}" end -end # class LogStash::Outputs::Elasticsearch +end # class LogStash::Outputs::ElasticSearchJava diff --git a/lib/logstash/outputs/elasticsearch/elasticsearch-template.json b/lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json similarity index 100% rename from lib/logstash/outputs/elasticsearch/elasticsearch-template.json rename to lib/logstash/outputs/elasticsearch_java/elasticsearch-template.json diff --git a/lib/logstash/outputs/elasticsearch_java/protocol.rb b/lib/logstash/outputs/elasticsearch_java/protocol.rb new file mode 100644 index 0000000..f4cc379 --- /dev/null +++ b/lib/logstash/outputs/elasticsearch_java/protocol.rb @@ -0,0 +1,264 @@ +require "cabin" +require "base64" +require 'logstash-output-elasticsearch_java_jars.rb' +require 'logstash/outputs/elasticsearch_java' + +module LogStash + module Outputs + module ElasticSearchJavaPlugins + module Protocols + class Base + private + def initialize(options={}) + # host(s), port, cluster + @logger = Cabin::Channel.get + end + + def client + return @client if @client + @client = build_client(@options) + return @client + end + + def template_install(name, template, force=false) + if template_exists?(name) && !force + @logger.debug("Found existing Elasticsearch template. Skipping template management", :name => name) + return + end + template_put(name, template) + end + + # Do a bulk request with the given actions. + # + # 'actions' is expected to be an array of bulk requests as string json + # values. + # + # Each 'action' becomes a single line in the bulk api call. For more + # details on the format of each. + def bulk(actions) + raise NotImplemented, "You must implement this yourself" + # bulk([ + # '{ "index" : { "_index" : "test", "_type" : "type1", "_id" : "1" } }', + # '{ "field1" : "value1" }' + #]) + end + + public(:initialize, :template_install) + end + + class NodeClient < Base + CLIENT_MUTEX = Mutex.new + + def self.get_client(settings) + CLIENT_MUTEX.synchronize { + if @client + @client + else + nodebuilder = org.elasticsearch.node.NodeBuilder.nodeBuilder + @client = nodebuilder.settings(settings.build).node().client() + end + } + end + + def self.clear_client() + CLIENT_MUTEX.synchronize { + @client = null + } + end + + private + + DEFAULT_OPTIONS = { + :port => 9300, + } + + def initialize(options={}) + super + require "java" + @options = DEFAULT_OPTIONS.merge(options) + setup(@options) + end # def initialize + + def settings + return @settings + end + + def client + self.class.get_client(settings) + end + + def setup(options={}) + @settings = org.elasticsearch.common.settings.Settings.settingsBuilder() + if options[:hosts] + @settings.put("discovery.zen.ping.multicast.enabled", false) + @settings.put("discovery.zen.ping.unicast.hosts", NodeClient.hosts(options)) + end + + @settings.put("node.client", true) + @settings.put("http.enabled", false) + @settings.put("path.home", Dir.pwd) + + if options[:client_settings] + options[:client_settings].each do |key, value| + @settings.put(key, value) + end + end + + return @settings + end + + def self.hosts(options) + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + result = Array.new + if options[:hosts].class == Array + options[:hosts].each do |host| + if host.to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << host + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]' + result << Range.new(*options[:port].split("-")).collect { |p| "#{host}:#{p}" } + else + result << "#{host}:#{options[:port]}" + end + end + end + else + if options[:hosts].to_s =~ /^.+:.+$/ + # For host in format: host:port, ignore options[:port] + result << options[:hosts] + else + if options[:port].to_s =~ /^\d+-\d+$/ + # port ranges are 'host[port1-port2]' according to + # http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ + # However, it seems to only query the first port. + # So generate our own list of unicast hosts to scan. + range = Range.new(*options[:port].split("-")) + result << range.collect { |p| "#{options[:hosts]}:#{p}" } + else + result << "#{options[:hosts]}:#{options[:port]}" + end + end + end + result.flatten.join(",") + end + + def self.normalize_bulk_response(bulk_response) + # TODO(talevy): parse item response objects to retrieve correct 200 (OK) or 201(created) status codes + if bulk_response.has_failures() + {"errors" => true, + "statuses" => bulk_response.map { |i| (i.is_failed && i.get_failure.get_status.get_status) || 200 }} + else + {"errors" => false} + end + end + + def bulk(actions) + # Actions an array of [ action, action_metadata, source ] + prep = client.prepareBulk + actions.each do |action, args, source| + prep.add(build_request(action, args, source)) + end + response = prep.execute.actionGet() + + self.class.normalize_bulk_response(response) + end # def bulk + + def build_request(action, args, source) + case action + when "index" + request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) + request.id(args[:_id]) if args[:_id] + request.routing(args[:_routing]) if args[:_routing] + request.source(source) + when "delete" + request = org.elasticsearch.action.delete.DeleteRequest.new(args[:_index]) + request.id(args[:_id]) + request.routing(args[:_routing]) if args[:_routing] + when "create" + request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) + request.id(args[:_id]) if args[:_id] + request.routing(args[:_routing]) if args[:_routing] + request.source(source) + request.opType("create") + when "create_unless_exists" + unless args[:_id].nil? + request = org.elasticsearch.action.index.IndexRequest.new(args[:_index]) + request.id(args[:_id]) + request.routing(args[:_routing]) if args[:_routing] + request.source(source) + request.opType("create") + else + raise(LogStash::ConfigurationError, "Specifying action => 'create_unless_exists' without a document '_id' is not supported.") + end + when "update" + unless args[:_id].nil? + request = org.elasticsearch.action.update.UpdateRequest.new(args[:_index], args[:_type], args[:_id]) + request.routing(args[:_routing]) if args[:_routing] + request.doc(source) + if @options[:doc_as_upsert] + request.docAsUpsert(true) + else + request.upsert(args[:_upsert]) if args[:_upsert] + end + else + raise(LogStash::ConfigurationError, "Specifying action => 'update' without a document '_id' is not supported.") + end + else + raise(LogStash::ConfigurationError, "action => '#{action_name}' is not currently supported.") + end # case action + + request.type(args[:_type]) if args[:_type] + return request + end # def build_request + + def template_exists?(name) + return !client.admin.indices. + prepareGetTemplates(name). + execute(). + actionGet(). + getIndexTemplates(). + isEmpty + end # def template_exists? + + def template_put(name, template) + response = client.admin.indices. + preparePutTemplate(name). + setSource(LogStash::Json.dump(template)). + execute(). + actionGet() + + raise "Could not index template!" unless response.isAcknowledged + end # template_put + + public(:initialize, :bulk) + end # class NodeClient + + class TransportClient < NodeClient + private + def build_client(options) + client = org.elasticsearch.client.transport.TransportClient. + builder(). + settings((settings.build)). + build() + + options[:hosts].each do |host| + matches = host.match /(.+)(?:.*)/ + + inet_addr = java.net.InetAddress.getByName(matches[1]) + port = (matches[2] || options[:port]).to_i + client.addTransportAddress( + org.elasticsearch.common.transport.InetSocketTransportAddress.new( + inet_addr, port + ) + ) + end + + return client + end + end + end + end + end +end \ No newline at end of file diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch_java.gemspec similarity index 75% rename from logstash-output-elasticsearch.gemspec rename to logstash-output-elasticsearch_java.gemspec index cf03b5d..704a59b 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch_java.gemspec @@ -1,10 +1,9 @@ Gem::Specification.new do |s| - - s.name = 'logstash-output-elasticsearch' - s.version = '1.0.5' + s.name = 'logstash-output-elasticsearch_java' + s.version = '2.0.0.beta5' s.licenses = ['apache-2.0'] - s.summary = "Logstash Output to Elasticsearch" - s.description = "Output events to elasticsearch" + s.summary = "Logstash Output to Elasticsearch using Java node/transport client" + s.description = "Output events to elasticsearch using the java client" s.authors = ["Elastic"] s.email = 'info@elastic.co' s.homepage = "http://logstash.net/" @@ -28,13 +27,6 @@ Gem::Specification.new do |s| s.add_development_dependency 'ftw', '~> 0.0.42' s.add_development_dependency 'logstash-input-generator' - - - if RUBY_PLATFORM == 'java' - s.platform = RUBY_PLATFORM - s.add_runtime_dependency "manticore", '~> 0.4.2' - end - s.add_development_dependency 'logstash-devutils' s.add_development_dependency 'longshoreman' end diff --git a/spec/es_spec_helper.rb b/spec/es_spec_helper.rb index f524b27..1bbd083 100644 --- a/spec/es_spec_helper.rb +++ b/spec/es_spec_helper.rb @@ -4,62 +4,78 @@ require "logstash/json" require "stud/try" require "longshoreman" +require "logstash/outputs/elasticsearch_java" +require "logstash/outputs/elasticsearch_java/protocol" CONTAINER_NAME = "logstash-output-elasticsearch-#{rand(999).to_s}" CONTAINER_IMAGE = "elasticsearch" CONTAINER_TAG = "1.6" +DOCKER_INTEGRATION = ENV["DOCKER_INTEGRATION"] + module ESHelper + def get_local_host + "127.0.0.1" + end def get_host - Longshoreman.new.get_host_ip + DOCKER_INTEGRATION ? Longshoreman.new.get_host_ip : "127.0.0.1" end def get_port(protocol) + unless DOCKER_INTEGRATION + return protocol.to_sym == :http ? 9200 : 9300 + end + container = Longshoreman::Container.new container.get(CONTAINER_NAME) - case protocol - when "http" - container.rport(9200) - when "transport", "node" - container.rport(9300) - end + container.rport(9300) end def get_client - Elasticsearch::Client.new(:host => "#{get_host}:#{get_port('http')}") + Elasticsearch::Client.new(:hosts => "#{get_host}:#{get_port('http')}") end end + RSpec.configure do |config| config.include ESHelper - # this :all hook gets run before every describe block that is tagged with :integration => true. - config.before(:all, :integration => true) do - # check if container exists already before creating new one. - begin - ls = Longshoreman::new - ls.container.get(CONTAINER_NAME) - rescue Docker::Error::NotFoundError - Longshoreman.new("#{CONTAINER_IMAGE}:#{CONTAINER_TAG}", CONTAINER_NAME) - # TODO(talevy): verify ES is running instead of static timeout - sleep 10 + + if DOCKER_INTEGRATION + # this :all hook gets run before every describe block that is tagged with :integration => true. + config.before(:all, :integration => true) do + + + # check if container exists already before creating new one. + begin + ls = Longshoreman::new + ls.container.get(CONTAINER_NAME) + rescue Docker::Error::NotFoundError + Longshoreman.new("#{CONTAINER_IMAGE}:#{CONTAINER_TAG}", CONTAINER_NAME) + # TODO(talevy): verify ES is running instead of static timeout + sleep 10 + end + end + + # we want to do a final cleanup after all :integration runs, + # but we don't want to clean up before the last block. + # This is a final blind check to see if the ES docker container is running and + # needs to be cleaned up. If no container can be found and/or docker is not + # running on the system, we do nothing. + config.after(:suite) do + # only cleanup docker container if system has docker and the container is running + begin + ls = Longshoreman::new + ls.container.get(CONTAINER_NAME) + ls.cleanup + rescue Docker::Error::NotFoundError, Excon::Errors::SocketError + # do nothing + end end - end - # we want to do a final cleanup after all :integration runs, - # but we don't want to clean up before the last block. - # This is a final blind check to see if the ES docker container is running and - # needs to be cleaned up. If no container can be found and/or docker is not - # running on the system, we do nothing. - config.after(:suite) do - # only cleanup docker container if system has docker and the container is running - begin - ls = Longshoreman::new - ls.container.get(CONTAINER_NAME) - ls.cleanup - rescue Docker::Error::NotFoundError, Excon::Errors::SocketError - # do nothing + config.after(:each) do + LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient.clear_client() end end end diff --git a/spec/integration/outputs/elasticsearch/node_spec.rb b/spec/integration/outputs/elasticsearch/node_spec.rb index a822174..2efd081 100644 --- a/spec/integration/outputs/elasticsearch/node_spec.rb +++ b/spec/integration/outputs/elasticsearch/node_spec.rb @@ -1,36 +1,36 @@ require_relative "../../../../spec/es_spec_helper" -require "logstash/outputs/elasticsearch/protocol" +require "logstash/outputs/elasticsearch_java/protocol" describe "elasticsearch node client", :integration => true do # Test ElasticSearch Node Client # Reference: http://www.elasticsearch.org/guide/reference/modules/discovery/zen/ - subject { LogStash::Outputs::Elasticsearch::Protocols::NodeClient } + subject { LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient } it "should support hosts in both string and array" do # Because we defined *hosts* method in NodeClient as private, # we use *obj.send :method,[args...]* to call method *hosts* # Node client should support host in string - # Case 1: default :host in string - insist { subject.send :hosts, :host => "host",:port => 9300 } == "host:9300" + # Case 1: default :hosts in string + insist { subject.send :hosts, :hosts => "host",:port => 9300 } == "host:9300" # Case 2: :port =~ /^\d+_\d+$/ - insist { subject.send :hosts, :host => "host",:port => "9300-9302"} == "host:9300,host:9301,host:9302" - # Case 3: :host =~ /^.+:.+$/ - insist { subject.send :hosts, :host => "host:9303",:port => 9300 } == "host:9303" - # Case 4: :host =~ /^.+:.+$/ and :port =~ /^\d+_\d+$/ - insist { subject.send :hosts, :host => "host:9303",:port => "9300-9302"} == "host:9303" + insist { subject.send :hosts, :hosts => "host",:port => "9300-9302"} == "host:9300,host:9301,host:9302" + # Case 3: :hosts =~ /^.+:.+$/ + insist { subject.send :hosts, :hosts=> "host:9303",:port => 9300 } == "host:9303" + # Case 4: :hosts=~ /^.+:.+$/ and :port =~ /^\d+_\d+$/ + insist { subject.send :hosts, :hosts => "host:9303",:port => "9300-9302"} == "host:9303" # Node client should support host in array - # Case 5: :host in array with single item - insist { subject.send :hosts, :host => ["host"],:port => 9300 } == ("host:9300") - # Case 6: :host in array with more than one items - insist { subject.send :hosts, :host => ["host1","host2"],:port => 9300 } == "host1:9300,host2:9300" - # Case 7: :host in array with more than one items and :port =~ /^\d+_\d+$/ - insist { subject.send :hosts, :host => ["host1","host2"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9300,host2:9301,host2:9302" - # Case 8: :host in array with more than one items and some :host =~ /^.+:.+$/ - insist { subject.send :hosts, :host => ["host1","host2:9303"],:port => 9300 } == "host1:9300,host2:9303" - # Case 9: :host in array with more than one items, :port =~ /^\d+_\d+$/ and some :host =~ /^.+:.+$/ - insist { subject.send :hosts, :host => ["host1","host2:9303"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9303" + # Case 5: :hosts in array with single item + insist { subject.send :hosts, :hosts => ["host"],:port => 9300 } == ("host:9300") + # Case 6: :hostsin array with more than one items + insist { subject.send :hosts, :hosts=> ["host1","host2"],:port => 9300 } == "host1:9300,host2:9300" + # Case 7: :hostsin array with more than one items and :port =~ /^\d+_\d+$/ + insist { subject.send :hosts, :hosts=> ["host1","host2"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9300,host2:9301,host2:9302" + # Case 8: :hostsin array with more than one items and some :hosts=~ /^.+:.+$/ + insist { subject.send :hosts, :hosts=> ["host1","host2:9303"],:port => 9300 } == "host1:9300,host2:9303" + # Case 9: :hostsin array with more than one items, :port =~ /^\d+_\d+$/ and some :hosts=~ /^.+:.+$/ + insist { subject.send :hosts, :hosts => ["host1","host2:9303"],:port => "9300-9302" } == "host1:9300,host1:9301,host1:9302,host2:9303" end end diff --git a/spec/integration/outputs/index_spec.rb b/spec/integration/outputs/index_spec.rb index ea2cd94..9b33faf 100644 --- a/spec/integration/outputs/index_spec.rb +++ b/spec/integration/outputs/index_spec.rb @@ -51,10 +51,10 @@ } } output { - elasticsearch { - host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" + elasticsearch_java { + hosts => "#{get_host()}" + port => "#{get_port('transport')}" + protocol => "transport" index => "#{index}" flush_size => #{flush_size} } @@ -76,10 +76,10 @@ } } output { - elasticsearch { - host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" + elasticsearch_java { + hosts => "#{get_host()}" + port => "#{get_port('transport')}" + protocol => "transport" index => "#{index}" flush_size => #{flush_size} } diff --git a/spec/integration/outputs/retry_spec.rb b/spec/integration/outputs/retry_spec.rb index 0526ed0..a7e9d91 100644 --- a/spec/integration/outputs/retry_spec.rb +++ b/spec/integration/outputs/retry_spec.rb @@ -1,4 +1,5 @@ -require "logstash/outputs/elasticsearch" +require "elasticsearch" +require "logstash/outputs/elasticsearch_java" require_relative "../../../spec/es_spec_helper" describe "failures in bulk class expected behavior", :integration => true do @@ -11,146 +12,137 @@ let(:max_retries) { 3 } def mock_actions_with_response(*resp) - LogStash::Outputs::Elasticsearch::Protocols::HTTPClient - .any_instance.stub(:bulk).and_return(*resp) - LogStash::Outputs::Elasticsearch::Protocols::NodeClient - .any_instance.stub(:bulk).and_return(*resp) + allow_any_instance_of(LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient).to receive(:bulk).and_return(*resp) end - ["transport", "http"].each do |protocol| - context "with protocol => #{protocol}" do - subject! do - settings = { - "manage_template" => true, - "index" => "logstash-2014.11.17", - "template_overwrite" => true, - "protocol" => protocol, - "host" => get_host(), - "port" => get_port(protocol), - "retry_max_items" => 10, - "retry_max_interval" => 1, - "max_retries" => max_retries - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - before :each do - # Delete all templates first. - require "elasticsearch" + subject! do + settings = { + "manage_template" => true, + "index" => "logstash-2014.11.17", + "template_overwrite" => true, + "protocol" => 'transport', + "hosts" => get_host(), + "port" => get_port('transport'), + "retry_max_items" => 10, + "retry_max_interval" => 1, + "max_retries" => max_retries + } + next LogStash::Outputs::ElasticSearchJava.new(settings) + end - # Clean ES of data before we start. - @es = get_client - @es.indices.delete_template(:name => "*") - @es.indices.delete(:index => "*") - @es.indices.refresh - end + before :each do + # Clean ES of data before we start. + @es = get_client + @es.indices.delete_template(:name => "*") + @es.indices.delete(:index => "*") + @es.indices.refresh + end - it "should return no errors if all bulk actions are successful" do - mock_actions_with_response({"errors" => false}) - expect(subject).to receive(:submit).with([action1, action2]).once.and_call_original - subject.register - subject.receive(event1) - subject.receive(event2) - subject.buffer_flush(:final => true) - sleep(2) - end + it "should return no errors if all bulk actions are successful" do + mock_actions_with_response({"errors" => false}) + expect(subject).to receive(:submit).with([action1, action2]).once.and_call_original + subject.register + subject.receive(event1) + subject.receive(event2) + subject.buffer_flush(:final => true) + sleep(2) + end - it "should raise exception and be retried by stud::buffer" do - call_count = 0 - expect(subject).to receive(:submit).with([action1]).exactly(3).times do - if (call_count += 1) <= 2 - raise "error first two times" - else - {"errors" => false} - end - end - subject.register - subject.receive(event1) - subject.teardown + it "should raise exception and be retried by stud::buffer" do + call_count = 0 + expect(subject).to receive(:submit).with([action1]).exactly(3).times do + if (call_count += 1) <= 2 + raise "error first two times" + else + {"errors" => false} end + end + subject.register + subject.receive(event1) + subject.teardown + end - it "should retry actions with response status of 503" do - mock_actions_with_response({"errors" => true, "statuses" => [200, 200, 503, 503]}, - {"errors" => true, "statuses" => [200, 503]}, - {"errors" => false}) - expect(subject).to receive(:submit).with([action1, action1, action1, action2]).ordered.once.and_call_original - expect(subject).to receive(:submit).with([action1, action2]).ordered.once.and_call_original - expect(subject).to receive(:submit).with([action2]).ordered.once.and_call_original - - subject.register - subject.receive(event1) - subject.receive(event1) - subject.receive(event1) - subject.receive(event2) - subject.buffer_flush(:final => true) - sleep(3) - end + it "should retry actions with response status of 503" do + mock_actions_with_response({"errors" => true, "statuses" => [200, 200, 503, 503]}, + {"errors" => true, "statuses" => [200, 503]}, + {"errors" => false}) + expect(subject).to receive(:submit).with([action1, action1, action1, action2]).ordered.once.and_call_original + expect(subject).to receive(:submit).with([action1, action2]).ordered.once.and_call_original + expect(subject).to receive(:submit).with([action2]).ordered.once.and_call_original + + subject.register + subject.receive(event1) + subject.receive(event1) + subject.receive(event1) + subject.receive(event2) + subject.buffer_flush(:final => true) + sleep(3) + end - it "should retry actions with response status of 429" do - mock_actions_with_response({"errors" => true, "statuses" => [429]}, - {"errors" => false}) - expect(subject).to receive(:submit).with([action1]).twice.and_call_original - subject.register - subject.receive(event1) - subject.buffer_flush(:final => true) - sleep(3) - end + it "should retry actions with response status of 429" do + mock_actions_with_response({"errors" => true, "statuses" => [429]}, + {"errors" => false}) + expect(subject).to receive(:submit).with([action1]).twice.and_call_original + subject.register + subject.receive(event1) + subject.buffer_flush(:final => true) + sleep(3) + end - it "should retry an event until max_retries reached" do - mock_actions_with_response({"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}, - {"errors" => true, "statuses" => [429]}) - expect(subject).to receive(:submit).with([action1]).exactly(max_retries).times.and_call_original - subject.register - subject.receive(event1) - subject.buffer_flush(:final => true) - sleep(3) - end + it "should retry an event until max_retries reached" do + mock_actions_with_response({"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}, + {"errors" => true, "statuses" => [429]}) + expect(subject).to receive(:submit).with([action1]).exactly(max_retries).times.and_call_original + subject.register + subject.receive(event1) + subject.buffer_flush(:final => true) + sleep(3) + end - it "non-retryable errors like mapping errors (400) should be dropped and not be retried (unfortunetly)" do - subject.register - subject.receive(invalid_event) - expect(subject).not_to receive(:retry_push) - subject.teardown - - @es.indices.refresh - sleep(5) - Stud::try(10.times) do - r = @es.search - insist { r["hits"]["total"] } == 0 - end - end + it "non-retryable errors like mapping errors (400) should be dropped and not be retried (unfortunetly)" do + subject.register + subject.receive(invalid_event) + expect(subject).not_to receive(:retry_push) + subject.teardown + + @es.indices.refresh + sleep(5) + Stud::try(10.times) do + r = @es.search + insist { r["hits"]["total"] } == 0 + end + end - it "successful requests should not be appended to retry queue" do - subject.register - subject.receive(event1) - expect(subject).not_to receive(:retry_push) - subject.teardown - - @es.indices.refresh - sleep(5) - Stud::try(10.times) do - r = @es.search - insist { r["hits"]["total"] } == 1 - end - end + it "successful requests should not be appended to retry queue" do + subject.register + subject.receive(event1) + expect(subject).not_to receive(:retry_push) + subject.teardown + + @es.indices.refresh + sleep(5) + Stud::try(10.times) do + r = @es.search + insist { r["hits"]["total"] } == 1 + end + end - it "should only index proper events" do - subject.register - subject.receive(invalid_event) - subject.receive(event1) - subject.teardown - - @es.indices.refresh - sleep(5) - Stud::try(10.times) do - r = @es.search - insist { r["hits"]["total"] } == 1 - end - end + it "should only index proper events" do + subject.register + subject.receive(invalid_event) + subject.receive(event1) + subject.teardown + @es.indices.refresh + sleep(5) + + Stud::try(10.times) do + r = @es.search + insist { r["hits"]["total"] } == 1 end end end diff --git a/spec/integration/outputs/routing_spec.rb b/spec/integration/outputs/routing_spec.rb index a44131d..5ec02ba 100644 --- a/spec/integration/outputs/routing_spec.rb +++ b/spec/integration/outputs/routing_spec.rb @@ -32,60 +32,6 @@ end end -describe "(http protocol) index events with static routing", :integration => true do - it_behaves_like 'a routing indexer' do - let(:routing) { "test" } - let(:config) { - <<-CONFIG - input { - generator { - message => "hello world" - count => #{event_count} - type => "#{type}" - } - } - output { - elasticsearch { - host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" - index => "#{index}" - flush_size => #{flush_size} - routing => "#{routing}" - } - } - CONFIG - } - end -end - -describe "(http_protocol) index events with fieldref in routing value", :integration => true do - it_behaves_like 'a routing indexer' do - let(:routing) { "test" } - let(:config) { - <<-CONFIG - input { - generator { - message => "#{routing}" - count => #{event_count} - type => "#{type}" - } - } - output { - elasticsearch { - host => "#{get_host()}" - port => "#{get_port('http')}" - protocol => "http" - index => "#{index}" - flush_size => #{flush_size} - routing => "%{message}" - } - } - CONFIG - } - end -end - describe "(transport protocol) index events with fieldref in routing value", :integration => true do it_behaves_like 'a routing indexer' do let(:routing) { "test" } @@ -99,8 +45,8 @@ } } output { - elasticsearch { - host => "#{get_host()}" + elasticsearch_java { + hosts => "#{get_host()}" port => "#{get_port('transport')}" protocol => "transport" index => "#{index}" diff --git a/spec/integration/outputs/secure_spec.rb b/spec/integration/outputs/secure_spec.rb index 224f2d2..798146c 100644 --- a/spec/integration/outputs/secure_spec.rb +++ b/spec/integration/outputs/secure_spec.rb @@ -2,12 +2,12 @@ describe "send messages to ElasticSearch using HTTPS", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "node_name" => "logstash", "cluster" => "elasticsearch", - "host" => "node01", + "hosts" => "node01", "user" => "user", "password" => "changeme", "ssl" => true, @@ -16,7 +16,7 @@ #"truststore" => "/tmp/ca/truststore.jks", #"truststore_password" => "testeteste" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do @@ -33,15 +33,15 @@ describe "connect using HTTP Authentication", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "cluster" => "elasticsearch", - "host" => "node01", + "hosts" => "node01", "user" => "user", "password" => "changeme", } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do @@ -58,12 +58,12 @@ describe "send messages to ElasticSearch using HTTPS", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "node_name" => "logstash", "cluster" => "elasticsearch", - "host" => "node01", + "hosts" => "node01", "user" => "user", "password" => "changeme", "ssl" => true, @@ -72,7 +72,7 @@ #"truststore" => "/tmp/ca/truststore.jks", #"truststore_password" => "testeteste" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do @@ -89,15 +89,15 @@ describe "connect using HTTP Authentication", :elasticsearch_secure => true do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "http", "cluster" => "elasticsearch", - "host" => "node01", + "hosts" => "node01", "user" => "user", "password" => "changeme", } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do diff --git a/spec/integration/outputs/templates_spec.rb b/spec/integration/outputs/templates_spec.rb index f4f070b..79f6a82 100644 --- a/spec/integration/outputs/templates_spec.rb +++ b/spec/integration/outputs/templates_spec.rb @@ -1,19 +1,19 @@ require_relative "../../../spec/es_spec_helper" describe "index template expected behavior", :integration => true do - ["transport", "http"].each do |protocol| + ["transport"].each do |protocol| context "with protocol => #{protocol}" do subject! do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "manage_template" => true, "template_overwrite" => true, "protocol" => protocol, - "host" => "#{get_host()}", - "port" => "#{get_port(protocol)}" + "hosts" => "#{get_host()}", + "port" => "#{get_port('transport')}" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do diff --git a/spec/integration/outputs/transport_create_spec.rb b/spec/integration/outputs/transport_create_spec.rb index f617db2..2a8a51d 100644 --- a/spec/integration/outputs/transport_create_spec.rb +++ b/spec/integration/outputs/transport_create_spec.rb @@ -1,7 +1,7 @@ require_relative "../../../spec/es_spec_helper" describe "transport client create actions", :integration => true do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" require "elasticsearch" def get_es_output(action, id = nil) @@ -10,12 +10,12 @@ def get_es_output(action, id = nil) "index" => "logstash-create", "template_overwrite" => true, "protocol" => "transport", - "host" => get_host(), + "hosts" => get_host(), "port" => get_port('transport'), "action" => action } settings['document_id'] = id unless id.nil? - LogStash::Outputs::ElasticSearch.new(settings) + LogStash::Outputs::ElasticSearchJava.new(settings) end before :each do diff --git a/spec/integration/outputs/update_spec.rb b/spec/integration/outputs/update_spec.rb new file mode 100644 index 0000000..16bfea5 --- /dev/null +++ b/spec/integration/outputs/update_spec.rb @@ -0,0 +1,89 @@ +require_relative "../../../spec/es_spec_helper" + +describe "all protocols update actions", :integration => true do + require "logstash/outputs/elasticsearch_java" + require "elasticsearch" + + def get_es_output( protocol, id = nil, upsert = nil, doc_as_upsert=nil) + settings = { + "manage_template" => true, + "index" => "logstash-update", + "template_overwrite" => true, + "protocol" => protocol, + "hosts" => get_host(), + "port" => get_port(protocol), + "network_host" => get_local_host, + "action" => "update" + } + settings['upsert'] = upsert unless upsert.nil? + settings['document_id'] = id unless id.nil? + settings['doc_as_upsert'] = doc_as_upsert unless doc_as_upsert.nil? + + LogStash::Outputs::ElasticSearchJava.new(settings) + end + + before :each do + @es = get_client + # Delete all templates first. + # Clean ES of data before we start. + @es.indices.delete_template(:name => "*") + # This can fail if there are no indexes, ignore failure. + @es.indices.delete(:index => "*") rescue nil + @es.index( + :index => 'logstash-update', + :type => 'logs', + :id => "123", + :body => { :message => 'Test' } + ) + @es.indices.refresh + end + + ["node", "transport"].each do |protocol| + context "update only with #{protocol} protocol" do + it "should failed without a document_id" do + event = LogStash::Event.new("somevalue" => 100, "@timestamp" => "2014-11-17T20:37:17.223Z", "@metadata" => {"retry_count" => 0}) + action = ["update", {:_id=>nil, :_index=>"logstash-2014.11.17", :_type=>"logs"}, event] + subject = get_es_output(protocol) + subject.register + expect { subject.flush([action]) }.to raise_error + end + + it "should not create new document" do + subject = get_es_output(protocol, "456") + subject.register + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + expect {@es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true)}.to raise_error(Elasticsearch::Transport::Transport::Errors::NotFound) + end + + it "should update existing document" do + subject = get_es_output(protocol, "123") + subject.register + subject.receive(LogStash::Event.new("message" => "updated message here")) + subject.buffer_flush(:final => true) + r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "123", :refresh => true) + insist { r["_source"]["message"] } == 'updated message here' + end + end + + context "upsert with #{protocol} protocol" do + it "should create new documents with upsert content" do + subject = get_es_output(protocol, "456", '{"message": "upsert message"}') + subject.register + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true) + insist { r["_source"]["message"] } == 'upsert message' + end + + it "should create new documents with event/doc as upsert" do + subject = get_es_output(protocol, "456", nil, true) + subject.register + subject.receive(LogStash::Event.new("message" => "sample message here")) + subject.buffer_flush(:final => true) + r = @es.get(:index => 'logstash-update', :type => 'logs', :id => "456", :refresh => true) + insist { r["_source"]["message"] } == 'sample message here' + end + end + end +end diff --git a/spec/unit/outputs/elasticsearch/protocol_spec.rb b/spec/unit/outputs/elasticsearch/protocol_spec.rb index 893eb64..c913e80 100644 --- a/spec/unit/outputs/elasticsearch/protocol_spec.rb +++ b/spec/unit/outputs/elasticsearch/protocol_spec.rb @@ -1,51 +1,31 @@ require "logstash/devutils/rspec/spec_helper" -require "logstash/outputs/elasticsearch/protocol" +require "logstash/outputs/elasticsearch_java/protocol" require "java" -describe LogStash::Outputs::Elasticsearch::Protocols::NodeClient do +describe LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient do context "successful" do it "should map correctly" do index_response = org.elasticsearch.action.index.IndexResponse.new("my_index", "my_type", "my_id", 123, true) + update_response = org.elasticsearch.action.update.UpdateResponse.new("my_index", "my_type", "my_id", 123, false) delete_response = org.elasticsearch.action.delete.DeleteResponse.new("my_index", "my_type", "my_id", 123, true) bulk_item_response_index = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "index", index_response) + bulk_item_response_update = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "update", update_response) bulk_item_response_delete = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "delete", delete_response) - bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_delete], 0) - ret = LogStash::Outputs::Elasticsearch::Protocols::NodeClient.normalize_bulk_response(bulk_response) + bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_update, bulk_item_response_delete], 0) + ret = LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient.normalize_bulk_response(bulk_response) insist { ret } == {"errors" => false} end end context "contains failures" do it "should map correctly" do - failure = org.elasticsearch.action.bulk.BulkItemResponse::Failure.new("my_index", "my_type", "my_id", "error message", org.elasticsearch.rest.RestStatus::BAD_REQUEST) + failure = org.elasticsearch.action.bulk.BulkItemResponse::Failure.new("my_index", "my_type", "my_id", java.lang.IllegalArgumentException.new("bad_request")) bulk_item_response_index = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "index", failure) + bulk_item_response_update = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "update", failure) bulk_item_response_delete = org.elasticsearch.action.bulk.BulkItemResponse.new(32, "delete", failure) - bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_delete], 0) - actual = LogStash::Outputs::Elasticsearch::Protocols::NodeClient.normalize_bulk_response(bulk_response) - insist { actual } == {"errors" => true, "statuses" => [400, 400]} + bulk_response = org.elasticsearch.action.bulk.BulkResponse.new([bulk_item_response_index, bulk_item_response_update, bulk_item_response_delete], 0) + actual = LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient.normalize_bulk_response(bulk_response) + insist { actual } == {"errors" => true, "statuses" => [400, 400, 400]} end end -end - -describe LogStash::Outputs::Elasticsearch::Protocols::HTTPClient do - context "successful" do - it "should map correctly" do - bulk_response = {"took"=>74, "errors"=>false, "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", - "_type"=>"logs", "_id"=>"AUxTS2C55Jrgi-hC6rQF", - "_version"=>1, "status"=>201}}]} - actual = LogStash::Outputs::Elasticsearch::Protocols::HTTPClient.normalize_bulk_response(bulk_response) - insist { actual } == {"errors"=> false} - end - end - - context "contains failures" do - it "should map correctly" do - bulk_response = {"took"=>71, "errors"=>true, - "items"=>[{"create"=>{"_index"=>"logstash-2014.11.17", - "_type"=>"logs", "_id"=>"AUxTQ_OI5Jrgi-hC6rQB", "status"=>400, - "error"=>"MapperParsingException[failed to parse]..."}}]} - actual = LogStash::Outputs::Elasticsearch::Protocols::HTTPClient.normalize_bulk_response(bulk_response) - insist { actual } == {"errors"=> true, "statuses"=> [400]} - end - end -end +end \ No newline at end of file diff --git a/spec/unit/outputs/elasticsearch_proxy_spec.rb b/spec/unit/outputs/elasticsearch_proxy_spec.rb deleted file mode 100644 index 8ebb582..0000000 --- a/spec/unit/outputs/elasticsearch_proxy_spec.rb +++ /dev/null @@ -1,59 +0,0 @@ -require_relative "../../../spec/es_spec_helper" -require 'stud/temporary' -require 'elasticsearch' -require "logstash/outputs/elasticsearch" - -describe "Proxy option" do - let(:settings) { - { - "protocol" => "http", - "host" => "node01", - "proxy" => proxy - } - } - subject { - LogStash::Outputs::ElasticSearch.new(settings) - } - - before do - allow(::Elasticsearch::Client).to receive(:new).with(any_args) - end - - describe "valid configs" do - before do - subject.register - end - - context "when specified as a string" do - let(:proxy) { "http://127.0.0.1:1234" } - - it "should set the proxy to the exact value" do - expect(::Elasticsearch::Client).to have_received(:new) do |options| - expect(options[:transport_options][:proxy]).to eql(proxy) - end - end - end - - context "when specified as a hash" do - let(:proxy) { {"host" => "127.0.0.1", "protocol" => "http"} } - - it "should pass through the proxy values as symbols" do - expected = {:host => proxy["host"], :protocol => proxy["protocol"]} - expect(::Elasticsearch::Client).to have_received(:new) do |options| - expect(options[:transport_options][:proxy]).to eql(expected) - end - end - end - end - - describe "invalid configs" do - let(:proxy) { ["bad", "stuff"] } - - it "should have raised an exception" do - expect { - subject.register - }.to raise_error(LogStash::ConfigurationError) - end - end - -end diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index 99785dc..0c5d074 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -1,106 +1,28 @@ require_relative "../../../spec/es_spec_helper" -describe "outputs/elasticsearch" do +describe "outputs/elasticsearch_java" do context "registration" do it "should register" do - output = LogStash::Plugin.lookup("output", "elasticsearch").new("embedded" => "false", "protocol" => "transport", "manage_template" => "false") + output = LogStash::Plugin.lookup("output", "elasticsearch_java").new("protocol" => "transport", "manage_template" => "false") # register will try to load jars and raise if it cannot find jars expect {output.register}.to_not raise_error end - - it "should fail to register when protocol => http, action => create_unless_exists" do - output = LogStash::Plugin.lookup("output", "elasticsearch").new("protocol" => "http", "action" => "create_unless_exists") - expect {output.register}.to raise_error - end - end - - describe "Authentication option" do - ["node", "transport"].each do |protocol| - context "with protocol => #{protocol}" do - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "node_name" => "logstash", - "cluster" => "elasticsearch", - "host" => "node01", - "user" => "test", - "password" => "test" - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - it "should fail in register" do - expect {subject.register}.to raise_error - end - end - end - end - - describe "http client create" do - require "logstash/outputs/elasticsearch" - require "elasticsearch" - - let(:options) { - { - "protocol" => "http", - "index" => "my-index", - "host" => "localhost", - "path" => "some-path" - } - } - - let(:eso) {LogStash::Outputs::ElasticSearch.new(options)} - - let(:manticore_host) { - eso.client.first.send(:client).transport.options[:host].first - } - - around(:each) do |block| - thread = eso.register - block.call() - thread.kill() - end - - describe "with path" do - it "should properly create a URI with the path" do - expect(eso.path).to eql(options["path"]) - end - - - it "should properly set the path on the HTTP client adding slashes" do - expect(manticore_host).to include("/" + options["path"] + "/") - end - - context "with extra slashes" do - let(:path) { "/slashed-path/ "} - let(:eso) { - LogStash::Outputs::ElasticSearch.new(options.merge("path" => "/some-path/")) - } - - it "should properly set the path on the HTTP client without adding slashes" do - expect(manticore_host).to include(options["path"]) - end - end - - - end end describe "transport protocol" do context "host not configured" do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { "protocol" => "transport", "node_name" => "mynode" } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end it "should set host to localhost" do - expect(LogStash::Outputs::Elasticsearch::Protocols::TransportClient).to receive(:new).with({ - :host => "localhost", + expect(LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::TransportClient).to receive(:new).with({ + :hosts => ["127.0.0.1"], :port => "9300-9305", :protocol => "transport", :client_settings => { @@ -114,40 +36,40 @@ context "sniffing => true" do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { - "host" => "node01", + "hosts" => "node01", "protocol" => "transport", "sniffing" => true } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end it "should set the sniffing property to true" do - expect_any_instance_of(LogStash::Outputs::Elasticsearch::Protocols::TransportClient).to receive(:client).and_return(nil) + allow_any_instance_of(LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient).to receive(:client).and_return(nil) subject.register - client = subject.instance_eval("@current_client") + client = subject.instance_eval("@client") settings = client.instance_eval("@settings") - expect(settings.build.getAsMap["client.transport.sniff"]).to eq("true") + expect(settings.build.get("client.transport.sniff")).to eq("true") end end context "sniffing => false" do subject do - require "logstash/outputs/elasticsearch" + require "logstash/outputs/elasticsearch_java" settings = { - "host" => "node01", + "hosts" => "node01", "protocol" => "transport", "sniffing" => false } - next LogStash::Outputs::ElasticSearch.new(settings) + next LogStash::Outputs::ElasticSearchJava.new(settings) end it "should set the sniffing property to true" do - expect_any_instance_of(LogStash::Outputs::Elasticsearch::Protocols::TransportClient).to receive(:client).and_return(nil) + allow_any_instance_of(LogStash::Outputs::ElasticSearchJavaPlugins::Protocols::NodeClient).to receive(:client).and_return(nil) subject.register - client = subject.instance_eval("@current_client") + client = subject.instance_eval("@client") settings = client.instance_eval("@settings") expect(settings.build.getAsMap["client.transport.sniff"]).to eq("false") diff --git a/spec/unit/outputs/elasticsearch_ssl_spec.rb b/spec/unit/outputs/elasticsearch_ssl_spec.rb deleted file mode 100644 index cb000ff..0000000 --- a/spec/unit/outputs/elasticsearch_ssl_spec.rb +++ /dev/null @@ -1,82 +0,0 @@ -require_relative "../../../spec/es_spec_helper" -require 'stud/temporary' - -describe "SSL option" do - ["node", "transport"].each do |protocol| - context "with protocol => #{protocol}" do - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "node_name" => "logstash", - "cluster" => "elasticsearch", - "host" => "node01", - "ssl" => true - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - it "should fail in register" do - expect {subject.register}.to raise_error - end - end - end - - context "when using http protocol" do - protocol = "http" - context "when using ssl without cert verification" do - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "host" => "node01", - "ssl" => true, - "ssl_certificate_verification" => false - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - it "should pass the flag to the ES client" do - expect(::Elasticsearch::Client).to receive(:new) do |args| - expect(args[:ssl]).to eq(:verify => false) - end - subject.register - end - - it "print a warning" do - expect(subject.logger).to receive(:warn) - subject.register - end - end - - context "when using ssl with client certificates" do - - let(:keystore_path) { Stud::Temporary.file.path } - - after :each do - File.delete(keystore_path) - end - - subject do - require "logstash/outputs/elasticsearch" - settings = { - "protocol" => protocol, - "host" => "node01", - "ssl" => true, - "keystore" => keystore_path, - "keystore_password" => "test" - } - next LogStash::Outputs::ElasticSearch.new(settings) - end - - - it "should pass the keystore parameters to the ES client" do - expect(::Elasticsearch::Client).to receive(:new) do |args| - expect(args[:ssl]).to include(:keystore => keystore_path, :keystore_password => "test") - end - subject.register - end - - end - end -end diff --git a/vendor/jar-dependencies/runtime-jars/HdrHistogram-2.1.6.jar b/vendor/jar-dependencies/runtime-jars/HdrHistogram-2.1.6.jar new file mode 100644 index 0000000..04eb380 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/HdrHistogram-2.1.6.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/apache-log4j-extras-1.2.17.jar b/vendor/jar-dependencies/runtime-jars/apache-log4j-extras-1.2.17.jar new file mode 100644 index 0000000..9212a96 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/apache-log4j-extras-1.2.17.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/commons-cli-1.3.1.jar b/vendor/jar-dependencies/runtime-jars/commons-cli-1.3.1.jar new file mode 100644 index 0000000..c3e7a1f Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/commons-cli-1.3.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/compiler-0.8.13.jar b/vendor/jar-dependencies/runtime-jars/compiler-0.8.13.jar new file mode 100644 index 0000000..4e68e95 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/compiler-0.8.13.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/compress-lzf-1.0.2.jar b/vendor/jar-dependencies/runtime-jars/compress-lzf-1.0.2.jar new file mode 100644 index 0000000..43f6ab8 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/compress-lzf-1.0.2.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/elasticsearch-1.7.0.jar b/vendor/jar-dependencies/runtime-jars/elasticsearch-1.7.0.jar deleted file mode 100644 index 631e7ee..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/elasticsearch-1.7.0.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/elasticsearch-2.0.0-beta1-SNAPSHOT.jar b/vendor/jar-dependencies/runtime-jars/elasticsearch-2.0.0-beta1-SNAPSHOT.jar new file mode 100644 index 0000000..af428c5 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/elasticsearch-2.0.0-beta1-SNAPSHOT.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/groovy-all-2.4.4-indy.jar b/vendor/jar-dependencies/runtime-jars/groovy-all-2.4.4-indy.jar new file mode 100644 index 0000000..3154c33 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/groovy-all-2.4.4-indy.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/guava-18.0.jar b/vendor/jar-dependencies/runtime-jars/guava-18.0.jar new file mode 100644 index 0000000..8f89e49 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/guava-18.0.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/hppc-0.7.1.jar b/vendor/jar-dependencies/runtime-jars/hppc-0.7.1.jar new file mode 100644 index 0000000..ed318de Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/hppc-0.7.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/jackson-core-2.5.3.jar b/vendor/jar-dependencies/runtime-jars/jackson-core-2.5.3.jar new file mode 100644 index 0000000..f3ae1ae Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/jackson-core-2.5.3.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/jackson-dataformat-cbor-2.5.3.jar b/vendor/jar-dependencies/runtime-jars/jackson-dataformat-cbor-2.5.3.jar new file mode 100644 index 0000000..e8ecc04 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/jackson-dataformat-cbor-2.5.3.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/jackson-dataformat-smile-2.5.3.jar b/vendor/jar-dependencies/runtime-jars/jackson-dataformat-smile-2.5.3.jar new file mode 100644 index 0000000..ef97fd9 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/jackson-dataformat-smile-2.5.3.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/jackson-dataformat-yaml-2.5.3.jar b/vendor/jar-dependencies/runtime-jars/jackson-dataformat-yaml-2.5.3.jar new file mode 100644 index 0000000..7e7d53d Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/jackson-dataformat-yaml-2.5.3.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/jna-4.1.0.jar b/vendor/jar-dependencies/runtime-jars/jna-4.1.0.jar new file mode 100644 index 0000000..b1a3922 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/jna-4.1.0.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/joda-convert-1.2.jar b/vendor/jar-dependencies/runtime-jars/joda-convert-1.2.jar new file mode 100644 index 0000000..1ab3925 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/joda-convert-1.2.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/joda-time-2.8.jar b/vendor/jar-dependencies/runtime-jars/joda-time-2.8.jar new file mode 100644 index 0000000..6f1a31e Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/joda-time-2.8.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/jsr166e-1.1.0.jar b/vendor/jar-dependencies/runtime-jars/jsr166e-1.1.0.jar new file mode 100644 index 0000000..87b6231 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/jsr166e-1.1.0.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/jts-1.13.jar b/vendor/jar-dependencies/runtime-jars/jts-1.13.jar new file mode 100644 index 0000000..bbaa20b Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/jts-1.13.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/log4j-1.2.17.jar b/vendor/jar-dependencies/runtime-jars/log4j-1.2.17.jar new file mode 100644 index 0000000..1d425cf Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/log4j-1.2.17.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-analyzers-common-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-analyzers-common-5.2.1.jar similarity index 56% rename from vendor/jar-dependencies/runtime-jars/lucene-analyzers-common-4.10.4.jar rename to vendor/jar-dependencies/runtime-jars/lucene-analyzers-common-5.2.1.jar index 701caeb..aaa26a1 100644 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-analyzers-common-4.10.4.jar and b/vendor/jar-dependencies/runtime-jars/lucene-analyzers-common-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-backward-codecs-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-backward-codecs-5.2.1.jar new file mode 100644 index 0000000..bbdfff8 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-backward-codecs-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-core-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-core-4.10.4.jar deleted file mode 100644 index 823664c..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-core-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-core-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-core-5.2.1.jar new file mode 100644 index 0000000..18b887f Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-core-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-expressions-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-expressions-5.2.1.jar new file mode 100644 index 0000000..3692d10 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-expressions-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-grouping-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-grouping-4.10.4.jar deleted file mode 100644 index 0819b41..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-grouping-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-grouping-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-grouping-5.2.1.jar new file mode 100644 index 0000000..6d7b46d Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-grouping-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-highlighter-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-highlighter-4.10.4.jar deleted file mode 100644 index b58e7db..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-highlighter-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-highlighter-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-highlighter-5.2.1.jar new file mode 100644 index 0000000..e81ecbd Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-highlighter-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-join-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-join-4.10.4.jar deleted file mode 100644 index 3d3e4d3..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-join-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-join-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-join-5.2.1.jar new file mode 100644 index 0000000..5702751 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-join-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-memory-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-memory-4.10.4.jar deleted file mode 100644 index 362a68b..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-memory-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-memory-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-memory-5.2.1.jar new file mode 100644 index 0000000..7c4eb8e Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-memory-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-misc-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-misc-4.10.4.jar deleted file mode 100644 index ee73cf7..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-misc-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-misc-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-misc-5.2.1.jar new file mode 100644 index 0000000..ce6eeba Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-misc-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-queries-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-queries-4.10.4.jar deleted file mode 100644 index 0920f77..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-queries-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-queries-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-queries-5.2.1.jar new file mode 100644 index 0000000..01742fa Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-queries-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-queryparser-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-queryparser-5.2.1.jar similarity index 62% rename from vendor/jar-dependencies/runtime-jars/lucene-queryparser-4.10.4.jar rename to vendor/jar-dependencies/runtime-jars/lucene-queryparser-5.2.1.jar index f9ac0e4..ee75993 100644 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-queryparser-4.10.4.jar and b/vendor/jar-dependencies/runtime-jars/lucene-queryparser-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-sandbox-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-sandbox-4.10.4.jar deleted file mode 100644 index b43a287..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-sandbox-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-sandbox-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-sandbox-5.2.1.jar new file mode 100644 index 0000000..4209385 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-sandbox-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-spatial-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-spatial-4.10.4.jar deleted file mode 100644 index 09cff6b..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-spatial-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-spatial-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-spatial-5.2.1.jar new file mode 100644 index 0000000..51b3623 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-spatial-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-suggest-4.10.4.jar b/vendor/jar-dependencies/runtime-jars/lucene-suggest-4.10.4.jar deleted file mode 100644 index 6eb7a8b..0000000 Binary files a/vendor/jar-dependencies/runtime-jars/lucene-suggest-4.10.4.jar and /dev/null differ diff --git a/vendor/jar-dependencies/runtime-jars/lucene-suggest-5.2.1.jar b/vendor/jar-dependencies/runtime-jars/lucene-suggest-5.2.1.jar new file mode 100644 index 0000000..e30c5d4 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/lucene-suggest-5.2.1.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/netty-3.10.3.Final.jar b/vendor/jar-dependencies/runtime-jars/netty-3.10.3.Final.jar new file mode 100644 index 0000000..f8bc63c Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/netty-3.10.3.Final.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/snakeyaml-1.12.jar b/vendor/jar-dependencies/runtime-jars/snakeyaml-1.12.jar new file mode 100644 index 0000000..fd314d3 Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/snakeyaml-1.12.jar differ diff --git a/vendor/jar-dependencies/runtime-jars/t-digest-3.0.jar b/vendor/jar-dependencies/runtime-jars/t-digest-3.0.jar new file mode 100644 index 0000000..cfb29bc Binary files /dev/null and b/vendor/jar-dependencies/runtime-jars/t-digest-3.0.jar differ