Skip to content
This repository was archived by the owner on Mar 25, 2020. It is now read-only.

ES 1.7 Support #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 2 additions & 24 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,2 @@
## 1.0.4
- Update to Elasticsearch 1.7

## 1.0.3
- Add HTTP proxy support

## 1.0.2
- Upgrade Manticore HTTP Client

## 1.0.1
- Allow client certificates

## 0.2.9
- Add 'path' parameter for ES HTTP hosts behind a proxy on a subpath

## 0.2.8 (June 12, 2015)
- Add option to enable and disable SSL certificate verification during handshake (#160)
- Doc improvements for clarifying round robin behavior using hosts config

## 0.2.7 (May 28, 2015)
- Bump es-ruby version to 1.0.10

## 0.2.6 (May 28, 2015)
- Disable timeouts when using http protocol which would cause bulk requests to fail (#103)
## 1.1.0
- Initial Release. Only supports Node/Transport in ES 1.7
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# Logstash Plugin

This is a plugin for [Logstash](https://github.com/elasticsearch/logstash).
This is a plugin for [Logstash](https://github.com/elastic/logstash).

It is fully free and fully open source. The license is Apache 2.0, meaning you are pretty much free to use it however you want in whatever way.

## Documentation

Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elasticsearch.org/guide/en/logstash/current/).
Logstash provides infrastructure to automatically generate documentation for this plugin. We use the asciidoc format to write documentation so any comments in the source code will be first converted into asciidoc and then into html. All plugin documentation are placed under one [central location](http://www.elastic.co/guide/en/logstash/current/).

- For formatting code or config example, you can use the asciidoc `[source,ruby]` directive
- For more asciidoc formatting tips, see the excellent reference here https://github.com/elasticsearch/docs#asciidoc-guide
- For more asciidoc formatting tips, see the excellent reference here https://github.com/elastic/docs#asciidoc-guide

## Need Help?

Expand Down Expand Up @@ -95,4 +95,4 @@ Programming is not a required skill. Whatever you've seen about open source and

It is more important to the community that you are able to contribute.

For more information about contributing, see the [CONTRIBUTING](https://github.com/elasticsearch/logstash/blob/master/CONTRIBUTING.md) file.
For more information about contributing, see the [CONTRIBUTING](https://github.com/elastic/logstash/blob/master/CONTRIBUTING.md) file.
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,15 @@
require "socket" # for Socket.gethostname
require "thread" # for safe queueing
require "uri" # for escaping user input
require 'logstash-output-elasticsearch_jars.rb'

# This output lets you store logs in Elasticsearch and is the most recommended
# output for Logstash. If you plan on using the Kibana web interface, you'll
# need to use this output.
require 'logstash-output-elasticsearch_java_jars.rb'
require "logstash/outputs/elasticsearch_java/protocol"

# This output lets you store logs in Elasticsearch using the native 'node' and 'transport'
# protocols. It is highly recommended to use the regular 'logstash-outpu-elasticsearch' output
# which uses HTTP instead. This output is, in-fact, sometimes slower, and never faster than that one.
# Additionally, upgrading your Elasticsearch cluster may require you to simultaneously update this
# plugin for any protocol level changes. The HTTP client has far fewer of these issues and is
# generally just easier to work with.
#
# *VERSION NOTE*: Your Elasticsearch cluster must be running Elasticsearch 1.0.0 or later.
#
Expand Down Expand Up @@ -61,14 +65,14 @@
# - Events from the retry queue are submitted again either when the queue reaches its max size or when
# the max interval time is reached, which is set in :retry_max_interval.
# - Events which are not retryable or have reached their max retry count are logged to stderr.
class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
class LogStash::Outputs::ElasticSearchJava < LogStash::Outputs::Base
attr_reader :client

include Stud::Buffer
RETRYABLE_CODES = [429, 503]
RETRYABLE_CODES = [409, 429, 503]
SUCCESS_CODES = [200, 201]

config_name "elasticsearch"
config_name "elasticsearch_java"

# The index to write events to. This can be dynamic using the `%{foo}` syntax.
# The default value will partition your indices by day so you can more easily
Expand Down Expand Up @@ -168,10 +172,9 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# The port for Elasticsearch transport to use.
#
# If you do not set this, the following defaults are used:
# * `protocol => http` - port 9200
# * `protocol => transport` - port 9300-9305
# * `protocol => node` - port 9300-9305
config :port, :validate => :string
config :port, :validate => :string, :default => "9300-9305"

# The name/address of the host to bind to for Elasticsearch clustering
config :bind_host, :validate => :string
Expand Down Expand Up @@ -238,14 +241,11 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# in situations where you cannot permit connections outbound from the
# Elasticsearch cluster to this Logstash server.
#
# The 'http' protocol will use the Elasticsearch REST/HTTP interface to talk
# to elasticsearch.
#
# All protocols will use bulk requests when talking to Elasticsearch.
#
# The default `protocol` setting under java/jruby is "node". The default
# `protocol` on non-java rubies is "http"
config :protocol, :validate => [ "node", "transport", "http" ]
config :protocol, :validate => [ "node", "transport"], :default => "transport"

# The Elasticsearch action to perform. Valid actions are: `index`, `delete`.
#
Expand All @@ -257,25 +257,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# - index: indexes a document (an event from Logstash).
# - delete: deletes a document by id
# - create: indexes a document, fails if a document by that id already exists in the index.
# - update: updates a document by id
# following action is not supported by HTTP protocol
# - create_unless_exists: creates a document, fails if no id is provided
#
# For more details on actions, check out the http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/docs-bulk.html[Elasticsearch bulk API documentation]
config :action, :validate => :string, :default => "index"

# Username and password (only valid when protocol is HTTP; this setting works with HTTP or HTTPS auth)
config :user, :validate => :string
config :password, :validate => :password

# HTTP Path at which the Elasticsearch server lives. Use this if you must run ES behind a proxy that remaps
# the root path for the Elasticsearch HTTP API lives. This option is ignored for non-HTTP transports.
config :path, :validate => :string, :default => "/"

# SSL Configurations (only valid when protocol is HTTP)
#
# Enable SSL
config :ssl, :validate => :boolean, :default => false

# Validate the server's certificate
# Disabling this severely compromises security
# For more information read https://www.cs.utexas.edu/~shmat/shmat_ccs12.pdf
Expand Down Expand Up @@ -311,11 +299,13 @@ class LogStash::Outputs::ElasticSearch < LogStash::Outputs::Base
# Set max interval between bulk retries
config :retry_max_interval, :validate => :number, :default => 5

# Set the address of a forward HTTP proxy. Must be used with the 'http' protocol
# Can be either a string, such as 'http://localhost:123' or a hash in the form
# {host: 'proxy.org' port: 80 scheme: 'http'}
# Note, this is NOT a SOCKS proxy, but a plain HTTP proxy
config :proxy
# Enable doc_as_upsert for update mode
# create a new document with source if document_id doesn't exists
config :doc_as_upsert, :validate => :boolean, :default => false

# Set upsert content for update mode
# create a new document with this parameter as json string if document_id doesn't exists
config :upsert, :validate => :string, :default => ""

public
def register
Expand All @@ -330,76 +320,47 @@ def register

client_settings = {}

client_settings["cluster.name"] = @cluster if @cluster
client_settings["network.host"] = @bind_host if @bind_host
client_settings["transport.tcp.port"] = @bind_port if @bind_port
client_settings["client.transport.sniff"] = @sniffing

if @protocol.nil?
@protocol = LogStash::Environment.jruby? ? "node" : "http"
end

if @protocol == "http"
if @action == "create_unless_exists"
raise(LogStash::ConfigurationError, "action => 'create_unless_exists' is not supported under the HTTP protocol");
end

client_settings[:path] = "/#{@path}/".gsub(/\/+/, "/") # Normalize slashes
@logger.debug? && @logger.debug("Normalizing http path", :path => @path, :normalized => client_settings[:path])
end

if ["node", "transport"].include?(@protocol)
# Node or TransportClient; requires JRuby
raise(LogStash::PluginLoadingError, "This configuration requires JRuby. If you are not using JRuby, you must set 'protocol' to 'http'. For example: output { elasticsearch { protocol => \"http\" } }") unless LogStash::Environment.jruby?

client_settings["cluster.name"] = @cluster if @cluster
client_settings["network.host"] = @bind_host if @bind_host
client_settings["transport.tcp.port"] = @bind_port if @bind_port
client_settings["client.transport.sniff"] = @sniffing

if @node_name
client_settings["node.name"] = @node_name
else
client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
end

@@plugins.each do |plugin|
name = plugin.name.split('-')[-1]
client_settings.merge!(LogStash::Outputs::ElasticSearch.const_get(name.capitalize).create_client_config(self))
end
if @node_name
client_settings["node.name"] = @node_name
else
client_settings["node.name"] = "logstash-#{Socket.gethostname}-#{$$}-#{object_id}"
end

require "logstash/outputs/elasticsearch/protocol"

if @port.nil?
@port = case @protocol
when "http"; "9200"
when "transport", "node"; "9300-9305"
end
@@plugins.each do |plugin|
name = plugin.name.split('-')[-1]
client_settings.merge!(LogStash::Outputs::ElasticSearchJava.const_get(name.capitalize).create_client_config(self))
end

if @host.nil? && @protocol != "node" # node can use zen discovery
@logger.info("No 'host' set in elasticsearch output. Defaulting to localhost")
@host = ["localhost"]
end

client_settings.merge! setup_ssl()
client_settings.merge! setup_proxy()

common_options = {
:protocol => @protocol,
:client_settings => client_settings
}

common_options.merge! setup_basic_auth()
# Update API setup
update_options = {
:upsert => @upsert,
:doc_as_upsert => @doc_as_upsert
}
common_options.merge! update_options if @action == 'update'

client_class = case @protocol
when "transport"
LogStash::Outputs::Elasticsearch::Protocols::TransportClient
when "node"
LogStash::Outputs::Elasticsearch::Protocols::NodeClient
when /http/
LogStash::Outputs::Elasticsearch::Protocols::HTTPClient
end

if @embedded
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewvc Lets remove @embedded functionality. There is no reason to start a ES server on a LS server anymore. This is a source of security issues, high heap usage in a LS node, and was only used as a POC tool.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

raise(LogStash::ConfigurationError, "The 'embedded => true' setting is only valid for the elasticsearch output under JRuby. You are running #{RUBY_DESCRIPTION}") unless LogStash::Environment.jruby?
@logger.warn("The 'embedded => true' setting is enabled. This is not recommended for production use!!!")
# LogStash::Environment.load_elasticsearch_jars!

Expand All @@ -417,7 +378,7 @@ def register
if protocol == "node" || @host.nil? # if @protocol is "node" or @host is not set
options = { :host => @host, :port => @port }.merge(common_options)
@client = [client_class.new(options)]
else # if @protocol in ["transport","http"]
else # if @protocol in ["transport"]
@client = @host.map do |host|
(_host,_port) = host.split ":"
options = { :host => _host, :port => _port || @port }.merge(common_options)
Expand Down Expand Up @@ -470,7 +431,7 @@ def register
public
def get_template
if @template.nil?
@template = ::File.expand_path('elasticsearch/elasticsearch-template.json', ::File.dirname(__FILE__))
@template = ::File.expand_path('elasticsearch_java/elasticsearch-template.json', ::File.dirname(__FILE__))
if !File.exists?(@template)
raise "You must specify 'template => ...' in your elasticsearch output (I looked for '#{@template}')"
end
Expand Down Expand Up @@ -503,11 +464,16 @@ def receive(event)
event["type"] || "logs"
end

index = event.sprintf(@index)
params = {
:_id => @document_id ? event.sprintf(@document_id) : nil,
:_index => event.sprintf(@index),
:_type => type,
:_routing => @routing ? event.sprintf(@routing) : nil
}

params[:_upsert] = LogStash::Json.load(event.sprintf(@upsert)) if @action == 'update' && @upsert != ""

document_id = @document_id ? event.sprintf(@document_id) : nil
routing = @routing ? event.sprintf(@routing) : nil
buffer_receive([event.sprintf(@action), { :_id => document_id, :_index => index, :_type => type, :_routing => routing }, event])
buffer_receive([event.sprintf(@action), params, event])
end # def receive

public
Expand Down Expand Up @@ -602,75 +568,8 @@ def shift_client
@logger.debug? and @logger.debug("Switched current elasticsearch client to ##{@client_idx} at #{@host[@client_idx]}")
end

private
def setup_proxy
return {} unless @proxy

if @protocol != "http"
raise(LogStash::ConfigurationError, "Proxy is not supported for '#{@protocol}'. Change the protocol to 'http' if you need HTTP proxy.")
end

# Symbolize keys
proxy = if @proxy.is_a?(Hash)
Hash[@proxy.map {|k,v| [k.to_sym, v]}]
elsif @proxy.is_a?(String)
@proxy
else
raise LogStash::ConfigurationError, "Expected 'proxy' to be a string or hash, not '#{@proxy}''!"
end

return {:proxy => proxy}
end

private
def setup_ssl
return {} unless @ssl
if @protocol != "http"
raise(LogStash::ConfigurationError, "SSL is not supported for '#{@protocol}'. Change the protocol to 'http' if you need SSL.")
end
@protocol = "https"
if @cacert && @truststore
raise(LogStash::ConfigurationError, "Use either \"cacert\" or \"truststore\" when configuring the CA certificate") if @truststore
end
ssl_options = {}
if @cacert then
@truststore, ssl_options[:truststore_password] = generate_jks @cacert
elsif @truststore
ssl_options[:truststore_password] = @truststore_password.value if @truststore_password
end
ssl_options[:truststore] = @truststore if @truststore
if @keystore
ssl_options[:keystore] = @keystore
ssl_options[:keystore_password] = @keystore_password.value if @keystore_password
end
if @ssl_certificate_verification == false
@logger.warn [
"** WARNING ** Detected UNSAFE options in elasticsearch output configuration!",
"** WARNING ** You have enabled encryption but DISABLED certificate verification.",
"** WARNING ** To make sure your data is secure change :ssl_certificate_verification to true"
].join("\n")
ssl_options[:verify] = false
end
{ ssl: ssl_options }
end

private
def setup_basic_auth
return {} unless @user && @password

if @protocol =~ /http/
{
:user => ::URI.escape(@user, "@:"),
:password => ::URI.escape(@password.value, "@:")
}
else
raise(LogStash::ConfigurationError, "User and password parameters are not supported for '#{@protocol}'. Change the protocol to 'http' if you need them.")
end
end

private
def generate_jks cert_path

require 'securerandom'
require 'tempfile'
require 'java'
Expand Down Expand Up @@ -728,11 +627,11 @@ def retry_push(actions)
}
end

@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch-/ }
@@plugins = Gem::Specification.find_all{|spec| spec.name =~ /logstash-output-elasticsearch_java-/ }

@@plugins.each do |plugin|
name = plugin.name.split('-')[-1]
require "logstash/outputs/elasticsearch/#{name}"
require "logstash/outputs/elasticsearch_java/#{name}"
end

end # class LogStash::Outputs::Elasticsearch
Loading