diff --git a/example/out_forward_sd.conf b/example/out_forward_sd.conf new file mode 100644 index 0000000000..1d5c695262 --- /dev/null +++ b/example/out_forward_sd.conf @@ -0,0 +1,17 @@ + + @type dummy + tag test + + + + @type forward + + + @type file + path "#{Dir.pwd}/example/sd.yaml" + + + + flush_interval 1 + + diff --git a/example/sd.yaml b/example/sd.yaml new file mode 100644 index 0000000000..6a148ba424 --- /dev/null +++ b/example/sd.yaml @@ -0,0 +1,8 @@ +- 'host': 127.0.0.1 + 'port': 24224 + 'weight': 1 + 'name': server1 +- 'host': 127.0.0.1 + 'port': 24225 + 'weight': 1 + 'name': server2 diff --git a/lib/fluent/plugin.rb b/lib/fluent/plugin.rb index b7d7a660f9..a94fdc3683 100644 --- a/lib/fluent/plugin.rb +++ b/lib/fluent/plugin.rb @@ -35,8 +35,9 @@ module Plugin PARSER_REGISTRY = Registry.new(:parser, 'fluent/plugin/parser_', dir_search_prefix: 'parser_') FORMATTER_REGISTRY = Registry.new(:formatter, 'fluent/plugin/formatter_', dir_search_prefix: 'formatter_') STORAGE_REGISTRY = Registry.new(:storage, 'fluent/plugin/storage_', dir_search_prefix: 'storage_') + SD_REGISTRY = Registry.new(:sd, 'fluent/plugin/sd_', dir_search_prefix: 'sd_') - REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY] + REGISTRIES = [INPUT_REGISTRY, OUTPUT_REGISTRY, FILTER_REGISTRY, BUFFER_REGISTRY, PARSER_REGISTRY, FORMATTER_REGISTRY, STORAGE_REGISTRY, SD_REGISTRY] def self.register_input(type, klass) register_impl('input', INPUT_REGISTRY, type, klass) @@ -54,6 +55,10 @@ def self.register_buffer(type, klass) register_impl('buffer', BUFFER_REGISTRY, type, klass) end + def self.register_sd(type, klass) + register_impl('sd', SD_REGISTRY, type, klass) + end + def self.register_parser(type, klass_or_proc) if klass_or_proc.is_a?(Regexp) # This usage is not recommended for new API @@ -112,6 +117,10 @@ def self.new_buffer(type, parent: nil) new_impl('buffer', BUFFER_REGISTRY, type, parent) end + def self.new_sd(type, parent: nil) + new_impl('sd', SD_REGISTRY, type, parent) + end + def self.new_parser(type, parent: nil) if type[0] == '/' && type[-1] == '/' # This usage is not recommended for new API... create RegexpParser directly diff --git a/lib/fluent/plugin/out_forward.rb b/lib/fluent/plugin/out_forward.rb index d496e6cc4f..f554c63784 100644 --- a/lib/fluent/plugin/out_forward.rb +++ b/lib/fluent/plugin/out_forward.rb @@ -18,6 +18,7 @@ require 'fluent/config/error' require 'fluent/clock' require 'base64' +require 'forwardable' require 'fluent/compat/socket_util' require 'fluent/plugin/out_forward/handshake_protocol' @@ -32,7 +33,7 @@ module Fluent::Plugin class ForwardOutput < Output Fluent::Plugin.register_output('forward', self) - helpers :socket, :server, :timer, :thread, :compat_parameters + helpers :socket, :server, :timer, :thread, :compat_parameters, :service_discovery LISTEN_PORT = 24224 @@ -224,23 +225,39 @@ def configure(conf) socket_cache: socket_cache, ) - @servers.each do |server| - failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) - name = server.name || "#{server.host}:#{server.port}" + configs = [] - log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id - if @heartbeat_type == :none - @nodes << NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler) - else - node = Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler) + # rewrite for using server as sd_static + conf.elements(name: 'server').each do |s| + s.name = 'service' + end + + unless conf.elements(name: 'service').empty? + configs << { type: :static, conf: conf } + end + + conf.elements(name: 'service_discovery').each_with_index do |c, i| + configs << { type: @service_discovery[i][:@type], conf: c } + end + + service_disovery_create_manager( + :out_forward_service_discovery_watcher, + configurations: configs, + load_balancer: LoadBalancer.new(log), + custom_build_method: method(:build_node), + ) + + discovery_manager.services.each do |server| + # it's only for test + @nodes << server + unless @heartbeat_type == :none begin - node.validate_host_resolution! + server.validate_host_resolution! rescue => e raise unless @ignore_network_errors_at_startup log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e - node.disable! + server.disable! end - @nodes << node end end @@ -252,8 +269,8 @@ def configure(conf) end end - if @nodes.empty? - raise Fluent::ConfigError, "forward output plugin requires at least one is required" + if discovery_manager.services.empty? + raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add or " end if !@keepalive && @keepalive_timeout @@ -274,12 +291,9 @@ def prefer_delayed_commit def start super - @load_balancer = LoadBalancer.new(log) - @load_balancer.rebuild_weight_array(@nodes) - unless @heartbeat_type == :none if @heartbeat_type == :udp - @usock = socket_create_udp(@nodes.first.host, @nodes.first.port, nonblock: true) + @usock = socket_create_udp(discovery_manager.services.first.host, discovery_manager.services.first.port, nonblock: true) server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv)) end timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer)) @@ -297,7 +311,7 @@ def start end if @verify_connection_at_startup - @nodes.each do |node| + discovery_manager.services.each do |node| begin node.verify_connection rescue StandardError => e @@ -333,7 +347,7 @@ def write(chunk) return if chunk.empty? tag = chunk.metadata.tag - @load_balancer.select_healthy_node { |node| node.send_data(tag, chunk) } + discovery_manager.select_service { |node| node.send_data(tag, chunk) } end def try_write(chunk) @@ -343,7 +357,7 @@ def try_write(chunk) return end tag = chunk.metadata.tag - @load_balancer.select_healthy_node { |n| n.send_data(tag, chunk) } + discovery_manager.select_service { |node| node.send_data(tag, chunk) } end def create_transfer_socket(host, port, hostname, &block) @@ -387,6 +401,23 @@ def create_transfer_socket(host, port, hostname, &block) end end + def statistics + stats = super + services = discovery_manager.services + healty_nodes_count = 0 + registed_nodes_count = services.size + services.each do |s| + if s.available? + healty_nodes_count += 1 + end + end + + stats.merge( + 'healty_nodes_count' => healty_nodes_count, + 'registered_nodes_count' => registed_nodes_count, + ) + end + # MessagePack FixArray length is 3 FORWARD_HEADER = [0x93].pack('C').freeze def forward_header @@ -395,9 +426,21 @@ def forward_header private + def build_node(server) + name = server.name || "#{server.host}:#{server.port}" + log.info "adding forwarding server '#{name}'", host: server.host, port: server.port, weight: server.weight, plugin_id: plugin_id + + failure = FailureDetector.new(@heartbeat_interval, @hard_timeout, Time.now.to_i.to_f) + if @heartbeat_type == :none + NoneHeartbeatNode.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler) + else + Node.new(self, server, failure: failure, connection_manager: @connection_manager, ack_handler: @ack_handler) + end + end + def on_heartbeat_timer need_rebuild = false - @nodes.each do |n| + discovery_manager.services.each do |n| begin log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type n.usock = @usock if @usock @@ -412,19 +455,19 @@ def on_heartbeat_timer end if need_rebuild - @load_balancer.rebuild_weight_array(@nodes) + discovery_manager.rebalance end end def on_udp_heatbeat_response_recv(data, sock) sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host) - if node = @nodes.find { |n| n.sockaddr == sockaddr } + if node = discovery_manager.services.find { |n| n.sockaddr == sockaddr } # log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port if node.heartbeat - @load_balancer.rebuild_weight_array(@nodes) + discovery_manager.rebalance end else - log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}") + log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out") end end @@ -463,12 +506,16 @@ def ack_reader end class Node + extend Forwardable + def_delegators :@server, :discovery_id, :host, :port, :name, :weight, :standby, :username, :password, :shared_key + # @param connection_manager [Fluent::Plugin::ForwardOutput::ConnectionManager] # @param ack_handler [Fluent::Plugin::ForwardOutput::AckHandler] def initialize(sender, server, failure:, connection_manager:, ack_handler:) @sender = sender @log = sender.log @compress = sender.compress + @server = server @name = server.name @host = server.host @@ -508,7 +555,7 @@ def initialize(sender, server, failure:, connection_manager:, ack_handler:) attr_accessor :usock - attr_reader :name, :host, :port, :weight, :standby, :state + attr_reader :state attr_reader :sockaddr # used by on_udp_heatbeat_response_recv attr_reader :failure # for test diff --git a/lib/fluent/plugin/out_forward/load_balancer.rb b/lib/fluent/plugin/out_forward/load_balancer.rb index 51b4c698c1..a41a37e85f 100644 --- a/lib/fluent/plugin/out_forward/load_balancer.rb +++ b/lib/fluent/plugin/out_forward/load_balancer.rb @@ -36,8 +36,8 @@ def select_healthy_node wlen = @weight_array.size wlen.times do node = @mutex.synchronize do - r = @rr - @rr = (@rr + 1) % @weight_array.size + r = @rr % @weight_array.size + @rr = (r + 1) % @weight_array.size @weight_array[r] end next unless node.available? @@ -106,6 +106,9 @@ def rebuild_weight_array(nodes) @weight_array = weight_array end end + + alias select_service select_healthy_node + alias rebalance rebuild_weight_array end end end diff --git a/lib/fluent/plugin/sd_file.rb b/lib/fluent/plugin/sd_file.rb new file mode 100644 index 0000000000..01d89bc301 --- /dev/null +++ b/lib/fluent/plugin/sd_file.rb @@ -0,0 +1,155 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'cool.io' + +require 'fluent/plugin_helper' +require 'fluent/plugin/service_discovery' + +module Fluent + module Plugin + class FileServiceDiscovery < ServiceDiscovery + include PluginHelper::Mixin + + Plugin.register_sd('file', self) + + DEFAULT_FILE_TYPE = :yaml + DEFAUT_WEIGHT = 60 + DEFAULT_SD_FILE_PATH = ENV['DEFAULT_SD_FILE_PATH'] || '/etc/fluent/sd.yaml' + + helpers :event_loop + + config_param :path, :string, default: DEFAULT_SD_FILE_PATH + config_param :conf_encoding, :string, default: 'utf-8' + + def initialize + super + + @file_type = nil + end + + def configure(conf) + super + + unless File.exist?(@path) + raise Fluent::ConfigError, "sd_file: path=#{@path} not found" + end + + @file_type = File.basename(@path).split('.', 2).last.to_sym + unless %i[yaml yml json].include?(@file_type) + @file_type = DEFAULT_FILE_TYPE + end + + @services = fetch_server_info + end + + def start(queue) + watcher = StatWatcher.new(@path, @log) do |_prev, _cur| + refresh_file(queue) + end + event_loop_attach(watcher) + + super() + end + + private + + def parser + @parser ||= + case @file_type + when :yaml, :yml + require 'yaml' + -> (v) { YAML.safe_load(v).map } + when :json + require 'json' + -> (v) { JSON.parse(v) } + end + end + + def refresh_file(queue) + s = + begin + fetch_server_info + rescue => e + @log.error("sd_file: #{e}") + end + + if s.nil? + # if any error occurs, skip this turn + return + end + + diff = [] + join = s - @services + # Need service_in first to guarantee that server exist at least one all time. + join.each do |j| + diff << ServiceDiscovery.service_in_msg(j) + end + + drain = @services - s + drain.each do |d| + diff << ServiceDiscovery.service_out_msg(d) + end + + @services = s + + diff.each do |a| + queue.push(a) + end + end + + def fetch_server_info + config_data = + begin + File.open(@path, "r:#{@conf_encoding}:utf-8", &:read) + rescue => e + raise Fluent::ConfigError, "sd_file: path=#{@path} couldn't open #{e}" + end + + parser.call(config_data).map do |s| + Service.new( + :file, + s.fetch('host'), + s.fetch('port'), + s['name'], + s.fetch('weight', DEFAUT_WEIGHT), + s['standby'], + s['username'], + s['password'], + s['shared_key'], + ) + end + rescue KeyError => e + raise Fluent::ConfigError, "#{e}. Service must have `host` and `port`" + end + + class StatWatcher < Coolio::StatWatcher + def initialize(path, log, &callback) + @path = path + @log = log + @callback = callback + super(@path) + end + + def on_change(prev_stat, cur_stat) + @callback.call(prev_stat, cur_stat) + rescue => e + @log.error(e) + end + end + end + end +end diff --git a/lib/fluent/plugin/sd_static.rb b/lib/fluent/plugin/sd_static.rb new file mode 100644 index 0000000000..0b9ccdd959 --- /dev/null +++ b/lib/fluent/plugin/sd_static.rb @@ -0,0 +1,58 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/service_discovery' + +module Fluent + module Plugin + class StaticServiceDiscovery < ServiceDiscovery + Plugin.register_sd('static', self) + + LISTEN_PORT = 24224 + + config_section :service, param_name: :service_configs do + desc 'The IP address or host name of the server.' + config_param :host, :string + desc 'The name of the server. Used for logging and certificate verification in TLS transport (when host is address).' + config_param :name, :string, default: nil + desc 'The port number of the host.' + config_param :port, :integer, default: LISTEN_PORT + desc 'The shared key per server.' + config_param :shared_key, :string, default: nil, secret: true + desc 'The username for authentication.' + config_param :username, :string, default: '' + desc 'The password for authentication.' + config_param :password, :string, default: '', secret: true + desc 'Marks a node as the standby node for an Active-Standby model between Fluentd nodes.' + config_param :standby, :bool, default: false + desc 'The load balancing weight.' + config_param :weight, :integer, default: 60 + end + + def configure(conf) + super + + @services = @service_configs.map do |s| + ServiceDiscovery::Service.new(:static, s.host, s.port, s.name, s.weight, s.standby, s.username, s.password, s.shared_key) + end + end + + def start(queue = nil) + super() + end + end + end +end diff --git a/lib/fluent/plugin/service_discovery.rb b/lib/fluent/plugin/service_discovery.rb new file mode 100644 index 0000000000..8a2ce6a114 --- /dev/null +++ b/lib/fluent/plugin/service_discovery.rb @@ -0,0 +1,80 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/base' + +require 'fluent/log' +require 'fluent/unique_id' +require 'fluent/plugin_id' + +module Fluent + module Plugin + class ServiceDiscovery < Base + include PluginId + include PluginLoggerMixin + include UniqueId::Mixin + + configured_in :service_discovery + + attr_reader :services + + Service = Struct.new(:plugin_name, :host, :port, :name, :weight, :standby, :username, :password, :shared_key) do + def discovery_id + @discovery_id ||= Base64.encode64(to_h.to_s) + end + end + + SERVICE_IN = :service_in + SERVICE_OUT = :service_out + DiscoveryMessage = Struct.new(:type, :service) + + class << self + def service_in_msg(service) + DiscoveryMessage.new(SERVICE_IN, service) + end + + def service_out_msg(service) + DiscoveryMessage.new(SERVICE_OUT, service) + end + end + + def initialize + @services = [] + + super + end + + def start(queue = nil) + super() + end + end + end +end diff --git a/lib/fluent/plugin_helper.rb b/lib/fluent/plugin_helper.rb index 0b143a520e..1611db582d 100644 --- a/lib/fluent/plugin_helper.rb +++ b/lib/fluent/plugin_helper.rb @@ -31,6 +31,7 @@ require 'fluent/plugin_helper/retry_state' require 'fluent/plugin_helper/record_accessor' require 'fluent/plugin_helper/compat_parameters' +require 'fluent/plugin_helper/service_discovery' module Fluent module PluginHelper diff --git a/lib/fluent/plugin_helper/service_discovery.rb b/lib/fluent/plugin_helper/service_discovery.rb new file mode 100644 index 0000000000..0fb671dbc0 --- /dev/null +++ b/lib/fluent/plugin_helper/service_discovery.rb @@ -0,0 +1,80 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin_helper/timer' +require 'fluent/plugin_helper/service_discovery/manager' + +module Fluent + module PluginHelper + module ServiceDiscovery + include Fluent::PluginHelper::Timer + + def self.included(mod) + mod.include ServiceDiscoveryParams + end + + def start + unless @discovery_manager + log.warn('There is no discovery_manager. skip start them') + super + return + end + + @discovery_manager.start + unless @discovery_manager.static_config? + timer_execute(@_plugin_helper_service_discovery_title, @_plugin_helper_service_discovery_iterval) do + @discovery_manager.run_once + end + end + + super + end + + private + + # @param title [Symbol] the thread name. this value should be unique. + # @param configurations [Hash] hash which must has discivery_service type and its configuration like `{ type: :static, conf: }` + # @param load_balancer [Object] object which has two methods #rebalance and #select_service + # @param custom_build_method [Proc] + def service_disovery_create_manager(title, configurations:, load_balancer: nil, custom_build_method: nil, interval: 3) + @_plugin_helper_service_discovery_title = title + @_plugin_helper_service_discovery_iterval = interval + + @discovery_manager = Fluent::PluginHelper::ServiceDiscovery::Manager.new( + log: log, + load_balancer: load_balancer, + custom_build_method: custom_build_method, + ) + + @discovery_manager.configure(configurations, parent: self) + + @discovery_manager + end + + def discovery_manager + @discovery_manager + end + + module ServiceDiscoveryParams + include Fluent::Configurable + + config_section :service_discovery do + config_param :@type, :string + end + end + end + end +end diff --git a/lib/fluent/plugin_helper/service_discovery/manager.rb b/lib/fluent/plugin_helper/service_discovery/manager.rb new file mode 100644 index 0000000000..02369ee50c --- /dev/null +++ b/lib/fluent/plugin_helper/service_discovery/manager.rb @@ -0,0 +1,132 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +require 'fluent/plugin/service_discovery' +require 'fluent/plugin_helper/service_discovery/round_robin_balancer' + +module Fluent + module PluginHelper + module ServiceDiscovery + class Manager + def initialize(log:, load_balancer: nil, custom_build_method: nil) + @log = log + @load_balancer = load_balancer || RoundRobinBalancer.new + @custom_build_method = custom_build_method + + @discoveries = [] + @services = {} + @queue = Queue.new + @static_config = true + end + + def configure(opts, parent: nil) + opts.each do |opt| + sd = Fluent::Plugin.new_sd(opt[:type], parent: parent) + sd.configure(opt[:conf]) + + sd.services.each do |s| + @services[s.discovery_id] = build_service(s) + end + @discoveries << sd + + if @static_config && opt[:type] != :static + @static_config = false + end + end + + rebalance + end + + def static_config? + @static_config + end + + def start + @discoveries.each do |d| + d.start(@queue) + end + end + + def run_once + # Don't care race in this loop intentionally + s = @queue.size + + if s == 0 + return + end + + s.times do + msg = @queue.pop + + unless msg.is_a?(Fluent::Plugin::ServiceDiscovery::DiscoveryMessage) + @log.warn("BUG: #{msg}") + next + end + + begin + handle_message(msg) + rescue => e + @log.error(e) + end + end + + rebalance + end + + def rebalance + @load_balancer.rebalance(services) + end + + def select_service(&block) + @load_balancer.select_service(&block) + end + + def services + @services.values + end + + private + + def handle_message(msg) + service = msg.service + + case msg.type + when Fluent::Plugin::ServiceDiscovery::SERVICE_IN + if (n = build_service(service)) + @log.info("Service in: name=#{service.name} #{service.host}:#{service.port}") + @services[service.discovery_id] = n + else + raise "failed to build service in name=#{service.name} #{service.host}:#{service.port}" + end + when Fluent::Plugin::ServiceDiscovery::SERVICE_OUT + s = @services.delete(service.discovery_id) + if s + @log.info("Service out: name=#{service.name} #{service.host}:#{service.port}") + else + @log.warn("Not found service: name=#{service.name} #{service.host}:#{service.port}") + end + else + @log.error("BUG: unknow message type: #{msg.type}") + end + end + + def build_service(n) + @custom_build_method ? @custom_build_method.call(n) : n + end + end + end + end +end diff --git a/lib/fluent/plugin_helper/service_discovery/round_robin_balancer.rb b/lib/fluent/plugin_helper/service_discovery/round_robin_balancer.rb new file mode 100644 index 0000000000..4a403881da --- /dev/null +++ b/lib/fluent/plugin_helper/service_discovery/round_robin_balancer.rb @@ -0,0 +1,43 @@ +# +# Fluentd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +module Fluent + module PluginHelper + module ServiceDiscovery + class RoundRobinBalancer + def initialize + @services = [] + @mutex = Mutex.new + end + + def rebalance(services) + @mutex.synchronize do + @services = services + end + end + + def select_service + s = @mutex.synchronize do + s = @services.shift + @services.push(s) + s + end + yield(s) + end + end + end + end +end diff --git a/test/plugin/data/sd_file/config b/test/plugin/data/sd_file/config new file mode 100644 index 0000000000..cff4fbec0d --- /dev/null +++ b/test/plugin/data/sd_file/config @@ -0,0 +1,11 @@ +- 'host': 127.0.0.1 + 'port': 24224 + 'weight': 1 + 'name': test1 + 'standby': false + 'username': user1 + 'password': pass1 + 'shared_key': key1 +- 'host': 127.0.0.1 + 'port': 24225 + 'weight': 1 diff --git a/test/plugin/data/sd_file/config.json b/test/plugin/data/sd_file/config.json new file mode 100644 index 0000000000..07bb6c9f8f --- /dev/null +++ b/test/plugin/data/sd_file/config.json @@ -0,0 +1,17 @@ +[ + { + "host": "127.0.0.1", + "port": 24224, + "weight": 1, + "name": "test1", + "standby": false, + "username": "user1", + "password": "pass1", + "shared_key": "key1" + }, + { + "host": "127.0.0.1", + "port": 24225, + "weight": 1 + } +] diff --git a/test/plugin/data/sd_file/config.yaml b/test/plugin/data/sd_file/config.yaml new file mode 100644 index 0000000000..cff4fbec0d --- /dev/null +++ b/test/plugin/data/sd_file/config.yaml @@ -0,0 +1,11 @@ +- 'host': 127.0.0.1 + 'port': 24224 + 'weight': 1 + 'name': test1 + 'standby': false + 'username': user1 + 'password': pass1 + 'shared_key': key1 +- 'host': 127.0.0.1 + 'port': 24225 + 'weight': 1 diff --git a/test/plugin/data/sd_file/config.yml b/test/plugin/data/sd_file/config.yml new file mode 100644 index 0000000000..cff4fbec0d --- /dev/null +++ b/test/plugin/data/sd_file/config.yml @@ -0,0 +1,11 @@ +- 'host': 127.0.0.1 + 'port': 24224 + 'weight': 1 + 'name': test1 + 'standby': false + 'username': user1 + 'password': pass1 + 'shared_key': key1 +- 'host': 127.0.0.1 + 'port': 24225 + 'weight': 1 diff --git a/test/plugin/data/sd_file/invalid_config.yml b/test/plugin/data/sd_file/invalid_config.yml new file mode 100644 index 0000000000..96d54e4d2f --- /dev/null +++ b/test/plugin/data/sd_file/invalid_config.yml @@ -0,0 +1,7 @@ +- 'host': 127.0.0.1 + 'weight': 1 + 'name': test1 + 'standby': false + 'username': user1 + 'password': pass1 + 'shared_key': key1 diff --git a/test/plugin/test_out_forward.rb b/test/plugin/test_out_forward.rb index 6980d8c63a..bd5d53abc7 100644 --- a/test/plugin/test_out_forward.rb +++ b/test/plugin/test_out_forward.rb @@ -39,7 +39,7 @@ def teardown def create_driver(conf=CONFIG) Fluent::Test::Driver::Output.new(Fluent::Plugin::ForwardOutput) { - attr_reader :sent_chunk_ids, :ack_handler + attr_reader :sent_chunk_ids, :ack_handler, :discovery_manager def initialize super @@ -260,6 +260,30 @@ def try_write(chunk) end end + test 'server is an abbreviation of static type of service_discovery' do + @d = d = create_driver(%[ + + host 127.0.0.1 + port 1234 + + + + @type static + + + host 127.0.0.1 + port 1235 + + + ]) + + assert_equal 2, d.instance.discovery_manager.services.size + assert_equal '127.0.0.1', d.instance.discovery_manager.services[0].host + assert_equal 1234, d.instance.discovery_manager.services[0].port + assert_equal '127.0.0.1', d.instance.discovery_manager.services[1].host + assert_equal 1235, d.instance.discovery_manager.services[1].port + end + test 'compress_default_value' do @d = d = create_driver assert_equal :text, d.instance.compress diff --git a/test/plugin/test_sd_file.rb b/test/plugin/test_sd_file.rb new file mode 100644 index 0000000000..b39871493c --- /dev/null +++ b/test/plugin/test_sd_file.rb @@ -0,0 +1,211 @@ +require_relative '../helper' +require 'fluent/plugin/sd_file' +require 'fileutils' +require 'json' + +class FileServiceDiscoveryTest < ::Test::Unit::TestCase + setup do + @dir = File.expand_path('data/sd_file', __dir__) + FileUtils.mkdir_p(File.join(@dir, 'tmp')) + end + + teardown do + FileUtils.rm_r(File.join(@dir, 'tmp')) + end + + sub_test_case 'configure' do + test 'load yml' do + sdf = Fluent::Plugin::FileServiceDiscovery.new + sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yml') })) + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0] + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1] + end + + test 'load yaml' do + sdf = Fluent::Plugin::FileServiceDiscovery.new + sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yaml') })) + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0] + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1] + end + + test 'load json' do + sdf = Fluent::Plugin::FileServiceDiscovery.new + sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.json') })) + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0] + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1] + end + + test 'regard as yaml if ext is not givened' do + sdf = Fluent::Plugin::FileServiceDiscovery.new + sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config') })) + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24224, 'test1', 1, false, 'user1', 'pass1', 'key1'), sdf.services[0] + assert_equal Fluent::Plugin::ServiceDiscovery::Service.new(:file, '127.0.0.1', 24225, nil, 1), sdf.services[1] + end + + test 'raise an error if config has error' do + sdf = Fluent::Plugin::FileServiceDiscovery.new + e = assert_raise Fluent::ConfigError do + sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'invalid_config.yaml') })) + end + assert_match(/path=/, e.message) + end + + test 'raise an error if config file does not exist' do + sdf = Fluent::Plugin::FileServiceDiscovery.new + e = assert_raise Fluent::ConfigError do + sdf.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'invalid_not_found.json') })) + end + assert_match(/not found/, e.message) + end + end + + sub_test_case '#start' do + module TestStatEventHelperWrapper + # easy to control statsevent + def event_loop_attach(watcher) + unless watcher.is_a?(Fluent::Plugin::FileServiceDiscovery::StatWatcher) + super + return + end + + @test_stat_event_helper_wrapper_watchers ||= [] + @test_stat_event_helper_wrapper_watchers << watcher + + @test_stat_event_helper_wrapper_context = Fiber.new do + loop do + @test_stat_event_helper_wrapper_watchers.each do |w| + w.on_change('old', 'new') + end + + if Fiber.yield == :finish + break + end + end + end + resume + end + + def resume + @test_stat_event_helper_wrapper_context.resume(:resume) + end + + def shutdown + super + + if @test_stat_event_helper_wrapper_context + @test_stat_event_helper_wrapper_context.resume(:finish) + end + end + end + + def create_tmp_config(path, body) + File.write(File.join(@dir, 'tmp', path), body) + end + + setup do + sdf = Fluent::Plugin::FileServiceDiscovery.new + @sd_file = sdf + end + + teardown do + if @sd_file + @sd_file.stop unless @sd_file.stopped? + @sd_file.before_shutdown unless @sd_file.before_shutdown? + @sd_file.shutdown unless @sd_file.shutdown? + @sd_file.after_shutdown unless @sd_file.after_shutdown? + @sd_file.close unless @sd_file.closed? + @sd_file.terminate unless @sd_file.terminated? + end + end + + test 'Skip if file is not updated' do + @sd_file.extend(TestStatEventHelperWrapper) + + create_tmp_config('config.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }])) + @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yml') })) + queue = [] + mock.proxy(@sd_file).refresh_file(queue).twice + + @sd_file.start(queue) + assert_empty queue + + @sd_file.resume + assert_empty queue + end + + test 'Skip if file is invalid contents' do + @sd_file.extend(TestStatEventHelperWrapper) + + create_tmp_config('config.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }])) + @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'config.yml') })) + + queue = [] + @sd_file.start(queue) + + mock.proxy(@sd_file).refresh_file(queue).once + create_tmp_config('test.json', 'invalid contents') + @sd_file.resume + + assert_empty queue + end + + test 'if service is updated, service_in and service_out event happen' do + @sd_file.extend(TestStatEventHelperWrapper) + + create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }])) + @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'tmp/test.json') })) + + queue = [] + @sd_file.start(queue) + create_tmp_config('test.json', JSON.generate([{ port: 1234, host: '127.0.0.1' }])) + @sd_file.resume + + assert_equal 2, queue.size + join = queue.shift + drain = queue.shift + assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type + assert_equal 1234, join.service.port + assert_equal '127.0.0.1', join.service.host + + assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type + assert_equal 1233, drain.service.port + assert_equal '127.0.0.1', drain.service.host + end + + test 'if service is deleted, service_out event happens' do + @sd_file.extend(TestStatEventHelperWrapper) + + create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }, { port: 1234, host: '127.0.0.2' }])) + @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'tmp/test.json') })) + + queue = [] + @sd_file.start(queue) + create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }])) + @sd_file.resume + + assert_equal 1, queue.size + drain = queue.shift + assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_OUT, drain.type + assert_equal 1234, drain.service.port + assert_equal '127.0.0.2', drain.service.host + end + + test 'if new service is added, service_in event happens' do + @sd_file.extend(TestStatEventHelperWrapper) + + create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }])) + @sd_file.configure(config_element('service_discovery', '', { 'path' => File.join(@dir, 'tmp/test.json') })) + + queue = [] + @sd_file.start(queue) + create_tmp_config('test.json', JSON.generate([{ port: 1233, host: '127.0.0.1' }, { port: 1234, host: '127.0.0.2' }])) + @sd_file.resume + + assert_equal 1, queue.size + join = queue.shift + assert_equal Fluent::Plugin::ServiceDiscovery::SERVICE_IN, join.type + assert_equal 1234, join.service.port + assert_equal '127.0.0.2', join.service.host + end + end +end diff --git a/test/plugin_helper/service_discovery/test_manager.rb b/test/plugin_helper/service_discovery/test_manager.rb new file mode 100644 index 0000000000..4bcb52f762 --- /dev/null +++ b/test/plugin_helper/service_discovery/test_manager.rb @@ -0,0 +1,93 @@ +require_relative '../../helper' +require 'fluent/plugin_helper/service_discovery/manager' + +class TestServiceDiscoveryManager < ::Test::Unit::TestCase + setup do + @sd_file_dir = File.expand_path('../../plugin/data/sd_file', __dir__) + end + + class TestSdPlugin < Fluent::Plugin::ServiceDiscovery + Fluent::Plugin.register_sd('test_sd', self) + + def initialize + super + end + + def service_in(host, port) + s = Fluent::Plugin::ServiceDiscovery::Service.new(:sd_test, host, port) + @queue << Fluent::Plugin::ServiceDiscovery.service_in_msg(s) + end + + def service_out(host, port) + s = Fluent::Plugin::ServiceDiscovery::Service.new(:sd_test, host, port) + @queue << Fluent::Plugin::ServiceDiscovery.service_out_msg(s) + end + + def start(queue) + @queue = queue + + super + end + end + + sub_test_case '#configure' do + test 'build sd plugins and services' do + sdm = Fluent::PluginHelper::ServiceDiscovery::Manager.new(log: $log) + sdm.configure( + [ + { type: :file, conf: config_element('service_discovery', '', { 'path' => File.join(@sd_file_dir, 'config.yml') }) }, + { type: :static, conf: config_element('root', '', {}, [config_element('service', '', { 'host' => '127.0.0.2', 'port' => '5432' })]) }, + ], + ) + + assert_equal 3, sdm.services.size + assert_equal 24224, sdm.services[0].port + assert_equal '127.0.0.1', sdm.services[0].host + + assert_equal 24225, sdm.services[1].port + assert_equal '127.0.0.1', sdm.services[1].host + + assert_equal 5432, sdm.services[2].port + assert_equal '127.0.0.2', sdm.services[2].host + + assert_false sdm.static_config? + end + + test 'no need to timer if only static' do + sdm = Fluent::PluginHelper::ServiceDiscovery::Manager.new(log: $log) + sdm.configure( + [{ type: :static, conf: config_element('root', '', {}, [config_element('service', '', { 'host' => '127.0.0.2', 'port' => '5432' })]) }] + ) + + assert_equal 1, sdm.services.size + assert_equal 5432, sdm.services[0].port + assert_equal '127.0.0.2', sdm.services[0].host + + assert_true sdm.static_config? + end + end + + sub_test_case '#run_once' do + test 'if new service added and deleted' do + sdm = Fluent::PluginHelper::ServiceDiscovery::Manager.new(log: $log) + t = TestSdPlugin.new + mock(Fluent::Plugin).new_sd(:sd_test, anything) { t } + sdm.configure([{ type: :sd_test, conf: config_element('service_discovery', '', {})}]) + sdm.start + + assert_equal 0, sdm.services.size + + t.service_in('127.0.0.1', '1234') + + sdm.run_once + assert_equal 1, sdm.services.size + assert_equal '127.0.0.1', sdm.services[0].host + assert_equal '1234', sdm.services[0].port + + t.service_out('127.0.0.1', '1234') + + sdm.run_once + assert_equal 0, sdm.services.size + end + end +end diff --git a/test/plugin_helper/service_discovery/test_round_robin_balancer.rb b/test/plugin_helper/service_discovery/test_round_robin_balancer.rb new file mode 100644 index 0000000000..957e07e7da --- /dev/null +++ b/test/plugin_helper/service_discovery/test_round_robin_balancer.rb @@ -0,0 +1,21 @@ +require_relative '../../helper' +require 'fluent/plugin_helper/service_discovery/round_robin_balancer' + +class TestRoundRobinBalancer < ::Test::Unit::TestCase + test 'select_service' do + rrb = Fluent::PluginHelper::ServiceDiscovery::RoundRobinBalancer.new + rrb.rebalance([1, 2, 3]) + + rrb.select_service { |n| assert_equal 1, n } + rrb.select_service { |n| assert_equal 2, n } + rrb.select_service { |n| assert_equal 3, n } + rrb.select_service { |n| assert_equal 1, n } + rrb.select_service { |n| assert_equal 2, n } + rrb.select_service { |n| assert_equal 3, n } + rrb.rebalance([1, 2, 3, 4]) + rrb.select_service { |n| assert_equal 1, n } + rrb.select_service { |n| assert_equal 2, n } + rrb.select_service { |n| assert_equal 3, n } + rrb.select_service { |n| assert_equal 4, n } + end +end diff --git a/test/plugin_helper/test_service_discovery.rb b/test/plugin_helper/test_service_discovery.rb new file mode 100644 index 0000000000..a5e5bac7a5 --- /dev/null +++ b/test/plugin_helper/test_service_discovery.rb @@ -0,0 +1,72 @@ +require_relative '../helper' +require 'flexmock/test_unit' +require 'fluent/plugin_helper/service_discovery' +require 'fluent/plugin/output' + +class ServiceDiscoveryHelper < Test::Unit::TestCase + PORT = unused_port + NULL_LOGGER = Logger.new(nil) + + class Dummy < Fluent::Plugin::TestBase + helpers :service_discovery + + # Make these mehtod public + def service_disovery_create_manager(title, configurations:, load_balancer: nil, custom_build_method: nil, interval: 3) + super + end + + def discovery_manager + super + end + end + + setup do + @sd_file_dir = File.expand_path('../plugin/data/sd_file', __dir__) + + @d = nil + end + + teardown do + if @d + @d.stop unless @d.stopped? + @d.shutdown unless @d.shutdown? + @d.close unless @d.closed? + @d.terminate unless @d.terminated? + end + end + + test 'start discovery manager' do + d = @d = Dummy.new + + d.service_disovery_create_manager( + :service_discovery_helper_test, + configurations: [{ type: :static, conf: config_element('root', '', {}, [config_element('service', '', { 'host' => '127.0.0.1', 'port' => '1234' })]) }], + ) + + assert_true !!d.discovery_manager + + mock.proxy(d.discovery_manager).start.once + mock.proxy(d).timer_execute(:service_discovery_helper_test, anything).never + + d.start + + services = d.discovery_manager.services + assert_equal 1, services.size + assert_equal '127.0.0.1', services[0].host + assert_equal 1234, services[0].port + end + + test 'call timer_execute if dynamic configuration' do + d = @d = Dummy.new + + d.service_disovery_create_manager( + :service_discovery_helper_test, + configurations: [{ type: :file, conf: config_element('file_config', '', { 'path' => File.join(@sd_file_dir, 'config.yml') }) }], + ) + + assert_true !!d.discovery_manager + mock.proxy(d.discovery_manager).start.once + mock(d).timer_execute(:service_discovery_helper_test, anything).once + d.start + end +end