Skip to content

Commit

Permalink
Add sd helper
Browse files Browse the repository at this point in the history
Signed-off-by: Yuta Iwama <ganmacs@gmail.com>
  • Loading branch information
ganmacs committed Aug 7, 2019
1 parent 61a05cd commit 9883173
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 46 deletions.
45 changes: 18 additions & 27 deletions lib/fluent/plugin/out_forward.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
require 'fluent/plugin/out_forward/error'
require 'fluent/plugin/out_forward/connection_manager'
require 'fluent/plugin/out_forward/ack_handler'
require 'fluent/plugin/service_discovery/service_discovery_manager'

module Fluent::Plugin
class ForwardOutput < Output
Fluent::Plugin.register_output('forward', self)

helpers :socket, :server, :timer, :thread, :compat_parameters
helpers :socket, :server, :timer, :thread, :compat_parameters, :service_discovery

LISTEN_PORT = 24224

Expand Down Expand Up @@ -215,19 +214,19 @@ def configure(conf)
socket_cache: socket_cache,
)

@sd_manager = Fluent::Plugin::ServiceDiscovery::ServiceDiscoveryManager.new(
load_balancer: LoadBalancer.new(log),
log: log,
custom_build_method: method(:build_node),
)

configs = conf.elements(name: 'server').map { |c| { type: :static, conf: c } }
conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end
@sd_manager.configure(configs, parent: self)

@sd_manager.services.each do |server|
create_service_discovery_manager(
:out_forward_service_discovery_watcher,
configurations: configs,
load_balancer: LoadBalancer.new(log),
custom_build_method: method(:build_node),
)

discovery_manager.services.each do |server|
@nodes << server
unless @heartbeat_type == :none
begin
Expand Down Expand Up @@ -270,17 +269,9 @@ def prefer_delayed_commit
def start
super

@sd_manager.start
unless @sd_manager.static_config?
# TODO: interval
timer_execute(:out_forward_service_discovery_watcher, 3) do
@sd_manager.run_once
end
end

unless @heartbeat_type == :none
if @heartbeat_type == :udp
@usock = socket_create_udp(@sd_manager.services.first.host, @sd_manager.services.first.port, nonblock: true)
@usock = socket_create_udp(discovery_manager.services.first.host, discovery_manager.services.first.port, nonblock: true)
server_create_udp(:out_forward_heartbeat_receiver, 0, socket: @usock, max_bytes: @read_length, &method(:on_udp_heatbeat_response_recv))
end
timer_execute(:out_forward_heartbeat_request, @heartbeat_interval, &method(:on_heartbeat_timer))
Expand All @@ -298,7 +289,7 @@ def start
end

if @verify_connection_at_startup
@sd_manager.services.each do |node|
discovery_manager.services.each do |node|
begin
node.verify_connection
rescue StandardError => e
Expand Down Expand Up @@ -334,7 +325,7 @@ def write(chunk)
return if chunk.empty?
tag = chunk.metadata.tag

@sd_manager.select_service { |node| node.send_data(tag, chunk) }
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
end

def try_write(chunk)
Expand All @@ -344,7 +335,7 @@ def try_write(chunk)
return
end
tag = chunk.metadata.tag
@sd_manager.select_service { |node| node.send_data(tag, chunk) }
discovery_manager.select_service { |node| node.send_data(tag, chunk) }
end

def create_transfer_socket(host, port, hostname, &block)
Expand Down Expand Up @@ -387,7 +378,7 @@ def create_transfer_socket(host, port, hostname, &block)

def statistics
stats = super
services = @sd_manager.services
services = discovery_manager.services
healty_nodes_count = 0
registed_nodes_count = services.size
services.each do |s|
Expand Down Expand Up @@ -424,7 +415,7 @@ def build_node(server)

def on_heartbeat_timer
need_rebuild = false
@sd_manager.services.each do |n|
discovery_manager.services.each do |n|
begin
log.trace "sending heartbeat", host: n.host, port: n.port, heartbeat_type: @heartbeat_type
n.usock = @usock if @usock
Expand All @@ -439,16 +430,16 @@ def on_heartbeat_timer
end

if need_rebuild
@sd_manager.rebalance
discovery_manager.rebalance
end
end

def on_udp_heatbeat_response_recv(data, sock)
sockaddr = Socket.pack_sockaddr_in(sock.remote_port, sock.remote_host)
if node = @sd_manager.services.find { |n| n.sockaddr == sockaddr }
if node = discovery_manager.services.find { |n| n.sockaddr == sockaddr }
# log.trace "heartbeat arrived", name: node.name, host: node.host, port: node.port
if node.heartbeat
@sd_manager.rebalance
discovery_manager.rebalance
end
else
log.warn("Unknown heartbeat response received from #{sock.remote_host}:#{sock.remote_port}. It may service out")
Expand Down
3 changes: 3 additions & 0 deletions lib/fluent/plugin/sd_file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

require 'cool.io'

require 'fluent/plugin_helper'
require 'fluent/plugin/service_discovery'
require 'fluent/plugin/service_discovery/discovery_message'

module Fluent
module Plugin
class FileServiceDiscovery < ServiceDiscovery
include PluginHelper::Mixin

Plugin.register_sd('file', self)

DEFAULT_FILE_TYPE = :yaml
Expand Down
2 changes: 0 additions & 2 deletions lib/fluent/plugin/service_discovery.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
require 'fluent/plugin/base'

require 'fluent/log'
require 'fluent/plugin_helper'
require 'fluent/unique_id'
require 'fluent/plugin_id'

Expand All @@ -41,7 +40,6 @@ module Plugin
class ServiceDiscovery < Base
include PluginId
include PluginLoggerMixin
include PluginHelper::Mixin
include UniqueId::Mixin

configured_in :service_discovery
Expand Down
1 change: 1 addition & 0 deletions lib/fluent/plugin_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
require 'fluent/plugin_helper/retry_state'
require 'fluent/plugin_helper/record_accessor'
require 'fluent/plugin_helper/compat_parameters'
require 'fluent/plugin_helper/service_discovery'

module Fluent
module PluginHelper
Expand Down
68 changes: 68 additions & 0 deletions lib/fluent/plugin_helper/service_discovery.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

require 'fluent/plugin_helper/timer'
require 'fluent/plugin_helper/service_discovery/manager'

module Fluent
module PluginHelper
module ServiceDiscovery
include Fluent::PluginHelper::Timer

def start
unless @discovery_manager
log.warn('There is no discovery_manager. skip start them')
super
return
end

@discovery_manager.start
unless @discovery_manager.static_config?
timer_execute(@_plugin_helper_service_discovery_title, @_plugin_helper_service_discovery_iterval) do
@discovery_manager.run_once
end
end

super
end

private

# @param title [Symbol] the thread name. this value should be unique.
# @param configurations [Hash] hash which must has discivery_service type and its configuration like `{ type: :static, conf: <Fluent::Config::Element> }`
# @param load_balancer [Object] object which has two methods #rebalance and #select_service
# @param custom_build_method [Proc]
def create_service_discovery_manager(title, configurations:, load_balancer: nil, custom_build_method: nil, interval: 3)
@_plugin_helper_service_discovery_title = title
@_plugin_helper_service_discovery_iterval = interval

@discovery_manager = Fluent::PluginHelper::ServiceDiscovery::Manager.new(
log: log,
load_balancer: load_balancer,
custom_build_method: custom_build_method,
)

@discovery_manager.configure(configurations, parent: self)

@discovery_manager
end

def discovery_manager
@discovery_manager
end
end
end
end
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,13 @@
# limitations under the License.
#

require 'fluent/plugin'
require 'fluent/plugin/service_discovery'
require 'fluent/plugin/service_discovery/round_robin_balancer'
require 'fluent/plugin/service_discovery/discovery_message'
require 'fluent/plugin_helper/service_discovery/round_robin_balancer'

module Fluent
module Plugin
class ServiceDiscovery
class ServiceDiscoveryManager
module PluginHelper
module ServiceDiscovery
class Manager
def initialize(log:, load_balancer: nil, custom_build_method: nil)
@log = log
@load_balancer = load_balancer || RoundRobinBalancer.new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
# limitations under the License.
#

require 'fluent/plugin/service_discovery'

module Fluent
module Plugin
class ServiceDiscovery
module PluginHelper
module ServiceDiscovery
class RoundRobinBalancer
def initialize
@services = []
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
require_relative '../../helper'
require 'fluent/plugin/service_discovery/service_discovery_manager'
require 'fluent/plugin_helper/service_discovery/manager'

class TestServiceDiscoveryManager < ::Test::Unit::TestCase
setup do
@sd_file_dir = File.expand_path('../data/sd_file', __dir__)
@sd_file_dir = File.expand_path('../../plugin/data/sd_file', __dir__)
end

class TestSdPlugin < Fluent::Plugin::ServiceDiscovery
Expand Down Expand Up @@ -32,7 +32,7 @@ def start(queue)

sub_test_case '#configure' do
test 'build sd plugins and services' do
sdm = Fluent::Plugin::ServiceDiscovery::ServiceDiscoveryManager.new(log: $log)
sdm = Fluent::PluginHelper::ServiceDiscovery::Manager.new(log: $log)
sdm.configure(
[
{ type: :file, conf: config_element('service_discovery', '', { 'path' => File.join(@sd_file_dir, 'config.yml') }) },
Expand All @@ -54,7 +54,7 @@ def start(queue)
end

test 'no need to timer if only static' do
sdm = Fluent::Plugin::ServiceDiscovery::ServiceDiscoveryManager.new(log: $log)
sdm = Fluent::PluginHelper::ServiceDiscovery::Manager.new(log: $log)
sdm.configure(
[{ type: :static, conf: config_element('server', '', { 'host' => '127.0.0.2', 'port' => '5432' }) }],
)
Expand All @@ -69,7 +69,7 @@ def start(queue)

sub_test_case '#run_once' do
test 'if new service added and deleted' do
sdm = Fluent::Plugin::ServiceDiscovery::ServiceDiscoveryManager.new(log: $log)
sdm = Fluent::PluginHelper::ServiceDiscovery::Manager.new(log: $log)
t = TestSdPlugin.new
mock(Fluent::Plugin).new_sd(:sd_test, anything) { t }
sdm.configure([{ type: :sd_test, conf: config_element('service_discovery', '', {})}])
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
require_relative '../../helper'
require 'fluent/plugin/service_discovery/round_robin_balancer'
require 'fluent/plugin_helper/service_discovery/round_robin_balancer'

class TestRoundRobinBalancer < ::Test::Unit::TestCase
test 'select_service' do
rrb = Fluent::Plugin::ServiceDiscovery::RoundRobinBalancer.new
rrb = Fluent::PluginHelper::ServiceDiscovery::RoundRobinBalancer.new
rrb.rebalance([1, 2, 3])

rrb.select_service { |n| assert_equal 1, n }
Expand Down
72 changes: 72 additions & 0 deletions test/plugin_helper/test_service_discovery.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
require_relative '../helper'
require 'flexmock/test_unit'
require 'fluent/plugin_helper/service_discovery'
require 'fluent/plugin/output'

class ServiceDiscoveryHelper < Test::Unit::TestCase
PORT = unused_port
NULL_LOGGER = Logger.new(nil)

class Dummy < Fluent::Plugin::TestBase
helpers :service_discovery

# Make these mehtod public
def create_service_discovery_manager(title, configurations:, load_balancer: nil, custom_build_method: nil, interval: 3)
super
end

def discovery_manager
super
end
end

setup do
@sd_file_dir = File.expand_path('../plugin/data/sd_file', __dir__)

@d = nil
end

teardown do
if @d
@d.stop unless @d.stopped?
@d.shutdown unless @d.shutdown?
@d.close unless @d.closed?
@d.terminate unless @d.terminated?
end
end

test 'start discovery manager' do
d = @d = Dummy.new

d.create_service_discovery_manager(
:service_discovery_helper_test,
configurations: [{ type: :static, conf: config_element('server', '', { 'host' => '127.0.0.1', 'port' => '1234' }) }],
)

assert_true !!d.discovery_manager

mock.proxy(d.discovery_manager).start.once
mock.proxy(d).timer_execute(:service_discovery_helper_test, anything).never

d.start

services = d.discovery_manager.services
assert_equal 1, services.size
assert_equal '127.0.0.1', services[0].host
assert_equal 1234, services[0].port
end

test 'call timer_execute if dynamic configuration' do
d = @d = Dummy.new

d.create_service_discovery_manager(
:service_discovery_helper_test,
configurations: [{ type: :file, conf: config_element('server', '', { 'path' => File.join(@sd_file_dir, 'config.yml') }) }],
)

assert_true !!d.discovery_manager
mock.proxy(d.discovery_manager).start.once
mock(d).timer_execute(:service_discovery_helper_test, anything).once
d.start
end
end

0 comments on commit 9883173

Please sign in to comment.