Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

out_forward plugin can set targets dynamically via service discovery plugin. #2541

Merged
merged 25 commits into from
Oct 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions example/out_forward_sd.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<source>
@type dummy
tag test
</source>

<match test>
@type forward

<service_discovery>
@type file
path "#{Dir.pwd}/example/sd.yaml"
</service_discovery>

<buffer>
flush_interval 1
</buffer>
</match>
8 changes: 8 additions & 0 deletions example/sd.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
- 'host': 127.0.0.1
'port': 24224
'weight': 1
'name': server1
- 'host': 127.0.0.1
'port': 24225
'weight': 1
'name': server2
11 changes: 10 additions & 1 deletion lib/fluent/plugin.rb
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ module Plugin
PARSER_REGISTRY = Registry.new(:parser, 'fluent/plugin/parser_', dir_search_prefix: 'parser_')
FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_', dir_search_prefix: 'formatter_')
STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage_', dir_search_prefix: 'storage_')
SD_REGISTRY = Registry.new(:sd, 'fluent/plugin/sd_', dir_search_prefix: 'sd_')

REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY]
REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY]

def self.register_input(type, klass)
register_impl('input', INPUT_REGISTRY, type, klass)
Expand All @@ -54,6 +55,10 @@ def self.register_buffer(type, klass)
register_impl('buffer', BUFFER_REGISTRY, type, klass)
end

def self.register_sd(type, klass)
register_impl('sd', SD_REGISTRY, type, klass)
end

def self.register_parser(type, klass_or_proc)
if klass_or_proc.is_a?(Regexp)
# This usage is not recommended for new API
Expand Down Expand Up @@ -112,6 +117,10 @@ def self.new_buffer(type, parent: nil)
new_impl('buffer', BUFFER_REGISTRY, type, parent)
end

def self.new_sd(type, parent: nil)
new_impl('sd', SD_REGISTRY, type, parent)
end

def self.new_parser(type, parent: nil)
if type[0] == '/' && type[-1] == '/'
# This usage is not recommended for new API... create RegexpParser directly
Expand Down
101 changes: 74 additions & 27 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
require 'fluent/config/error'
require 'fluent/clock'
require 'base64'
require 'forwardable'

require 'fluent/compat/socket_util'
require 'fluent/plugin/out_forward/handshake_protocol'
Expand All @@ -32,7 +33,7 @@ module Fluent::Plugin
class ForwardOutput < Output
Fluent::Plugin.register_output('forward', self)

helpers :socket, :server, :timer, :thread, :compat_parameters
helpers :socket, :server, :timer, :thread, :compat_parameters, :service_discovery

LISTEN_PORT = 24224

Expand Down Expand Up @@ -224,23 +225,39 @@ def configure(conf)
socket_cache: socket_cache,
)

@servers.each do |server|
failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
name = server.name || "#{server.host}:#{server.port}"
configs = []

log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id
if @heartbeat_type == :none
@nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
else
node = Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
# rewrite for using server as sd_static
conf.elements(name: 'server').each do |s|
s.name = 'service'
end

unless conf.elements(name: 'service').empty?
configs << { type: :static, conf: conf }
end

conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end

service_disovery_create_manager(
:out_forward_service_discovery_watcher,
configurations: configs,
load_balancer: LoadBalancer.new(log),
custom_build_method: method(:build_node),
)

discovery_manager.services.each do |server|
# it's only for test
@nodes << server
unless @heartbeat_type == :none
begin
node.validate_host_resolution!
server.validate_host_resolution!
rescue => e
raise unless @ignore_network_errors_at_startup
log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e
node.disable!
server.disable!
end
@nodes << node
end
end

Expand All @@ -252,8 +269,8 @@ def configure(conf)
end
end

if @nodes.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one <server> is required"
if discovery_manager.services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end

if !@keepalive && @keepalive_timeout
Expand All @@ -274,12 +291,9 @@ def prefer_delayed_commit
def start
super

@load_balancer = LoadBalancer.new(log)
@load_balancer.rebuild_weight_array(@nodes)

unless @heartbeat_type == :none
if @heartbeat_type == :udp
@usock = socket_create_udp(@nodes.first.host, @nodes.first.port, nonblock: true)
@usock = socket_create_udp(discovery_manager.services.first.host, discovery_manager.services.first.port, nonblock: true)
server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
end
timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
Expand All @@ -297,7 +311,7 @@ def start
end

if @verify_connection_at_startup
@nodes.each do |node|
discovery_manager.services.each do |node|
begin
node.verify_connection
rescue StandardError => e
Expand Down Expand Up @@ -333,7 +347,7 @@ def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag

@load_balancer.select_healthy_node { |node| node.send_data(tag, chunk) }
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
end

def try_write(chunk)
Expand All @@ -343,7 +357,7 @@ def try_write(chunk)
return
end
tag = chunk.metadata.tag
@load_balancer.select_healthy_node { |n| n.send_data(tag, chunk) }
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
end

def create_transfer_socket(host, port, hostname, &block)
Expand Down Expand Up @@ -387,6 +401,23 @@ def create_transfer_socket(host, port, hostname, &block)
end
end

def statistics
stats = super
services = discovery_manager.services
healty_nodes_count = 0
registed_nodes_count = services.size
services.each do |s|
if s.available?
healty_nodes_count += 1
end
end

stats.merge(
'healty_nodes_count' => healty_nodes_count,
'registered_nodes_count' => registed_nodes_count,
)
end

# MessagePack FixArray length is 3
FORWARD_HEADER = [0x93].pack('C').freeze
def forward_header
Expand All @@ -395,9 +426,21 @@ def forward_header

private

def build_node(server)
name = server.name || "#{server.host}:#{server.port}"
log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id

failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f)
if @heartbeat_type == :none
NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
else
Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler)
end
end

def on_heartbeat_timer
need_rebuild = false
@nodes.each do |n|
discovery_manager.services.each do |n|
begin
log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
n.usock = @usock if @usock
Expand All @@ -412,19 +455,19 @@ def on_heartbeat_timer
end

if need_rebuild
@load_balancer.rebuild_weight_array(@nodes)
discovery_manager.rebalance
end
end

def on_udp_heatbeat_response_recv(data, sock)
sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host)
if node = @nodes.find { |n| n.sockaddr == sockaddr }
if node = discovery_manager.services.find { |n| n.sockaddr == sockaddr }
# log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port
if node.heartbeat
@load_balancer.rebuild_weight_array(@nodes)
discovery_manager.rebalance
end
else
log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}")
log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out")
end
end

Expand Down Expand Up @@ -463,12 +506,16 @@ def ack_reader
end

class Node
extend Forwardable
def_delegators :@server, :discovery_id, :host, :port, :name, :weight, :standby, :username, :password, :shared_key

# @param connection_manager [Fluent::Plugin::ForwardOutput::ConnectionManager]
# @param ack_handler [Fluent::Plugin::ForwardOutput::AckHandler]
def initialize(sender, server, failure:, connection_manager:, ack_handler:)
@sender = sender
@log = sender.log
@compress = sender.compress
@server = server

@name = server.name
@host = server.host
Expand Down Expand Up @@ -508,7 +555,7 @@ def initialize(sender, server, failure:, connection_manager:, ack_handler:)

attr_accessor :usock

attr_reader :name, :host, :port, :weight, :standby, :state
attr_reader :state
attr_reader :sockaddr # used by on_udp_heatbeat_response_recv
attr_reader :failure # for test

Expand Down
7 changes: 5 additions & 2 deletions lib/fluent/plugin/out_forward/load_balancer.rb
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ def select_healthy_node
wlen = @weight_array.size
wlen.times do
node = @mutex.synchronize do
r = @rr
@rr = (@rr + 1) % @weight_array.size
r = @rr % @weight_array.size
@rr = (r + 1) % @weight_array.size
@weight_array[r]
end
next unless node.available?
Expand Down Expand Up @@ -106,6 +106,9 @@ def rebuild_weight_array(nodes)
@weight_array = weight_array
end
end

alias select_service select_healthy_node
alias rebalance rebuild_weight_array
end
end
end
Loading