From 6a8587e23da7f719be8f51a13ec42c32900f3d58 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 13 Feb 2019 15:20:40 +0900 Subject: [PATCH 1/9] Support multi workers assign syntax on `` section. Fix #2289 Signed-off-by: Hiroshi Hatake --- lib/fluent/root_agent.rb | 42 +++++++++++++++++++------- test/test_root_agent.rb | 64 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+), 10 deletions(-) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index a3db981bb5..6e0e3b905a 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -71,19 +71,41 @@ def configure(conf) raise ConfigError, "Missing worker id on directive" end - target_worker_id = target_worker_id_str.to_i - if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) - raise ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" - end + target_worker_ids = target_worker_id_str.split("-") + if target_worker_ids.size == 2 + first_worker_id = target_worker_ids.first.to_i + last_worker_id = target_worker_ids.last.to_i + if first_worker_id > last_worker_id + raise ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by directive is not allowed. Available multi worker assign syntax is -" + end + first_worker_id.step(last_worker_id, 1) do |worker_id| + target_worker_id = worker_id.to_i + if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) + raise ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" + end + + e.elements.each do |elem| + unless ['source', 'match', 'filter', 'label'].include?(elem.name) + raise ConfigError, " section cannot have <#{elem.name}> directive" + end + elem.set_target_worker_id(target_worker_id) + end + end + else + target_worker_id = target_worker_id_str.to_i + if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) + raise ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" + end - ## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). - target_worker_id = 0 if Fluent::Engine.dry_run_mode + ## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). + target_worker_id = 0 if Fluent::Engine.dry_run_mode - e.elements.each do |elem| - unless ['source', 'match', 'filter', 'label'].include?(elem.name) - raise ConfigError, " section cannot have <#{elem.name}> directive" + e.elements.each do |elem| + unless ['source', 'match', 'filter', 'label'].include?(elem.name) + raise ConfigError, " section cannot have <#{elem.name}> directive" + end + elem.set_target_worker_id(target_worker_id) end - elem.set_target_worker_id(target_worker_id) end conf += e end diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 78d6eb0a5b..25beac9449 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -686,6 +686,28 @@ def configure_ra(conf_str) end end + test 'raises configuration error for too big worker id on multi workers syntax' do + errmsg = "worker id 4 specified by directive is not allowed. Available worker id is between 0 and 3" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + +EOC + configure_ra(conf) + end + end + + test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do + errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by directive is not allowed. Available multi worker assign syntax is -" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + +EOC + configure_ra(conf) + end + end + test 'raises configuration error for invalid elements as a child of worker section' do errmsg = ' section cannot have directive' assert_raise Fluent::ConfigError.new(errmsg) do @@ -836,6 +858,48 @@ def configure_ra(conf_str) +EOC + ra = configure_ra(conf) + assert_equal 0, ra.inputs.size + assert_equal 0, ra.outputs.size + assert_equal 0, ra.filters.size + assert_equal 0, ra.labels.size + refute ra.error_collector + end + + test 'with plugins for workers syntax' do + conf = <<-EOC + + + @type tcp + tag test.worker_group1 + + @type none + + + + @type stdout + + + + + + + @type forward + + + @type stdout + + + EOC ra = configure_ra(conf) assert_equal 0, ra.inputs.size From 4200740fb2a7404630dd2f8bc0416fdbd23be521 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 18 Feb 2019 17:43:30 +0900 Subject: [PATCH 2/9] Use full-qualified class name Signed-off-by: Hiroshi Hatake --- lib/fluent/root_agent.rb | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 6e0e3b905a..9b5ac9304e 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -68,7 +68,7 @@ def configure(conf) conf.elements(name: 'worker').each do |e| target_worker_id_str = e.arg if target_worker_id_str.empty? - raise ConfigError, "Missing worker id on directive" + raise Fluent::ConfigError, "Missing worker id on directive" end target_worker_ids = target_worker_id_str.split("-") @@ -76,17 +76,17 @@ def configure(conf) first_worker_id = target_worker_ids.first.to_i last_worker_id = target_worker_ids.last.to_i if first_worker_id > last_worker_id - raise ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by directive is not allowed. Available multi worker assign syntax is -" + raise Fluent::ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by directive is not allowed. Available multi worker assign syntax is -" end first_worker_id.step(last_worker_id, 1) do |worker_id| target_worker_id = worker_id.to_i if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) - raise ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" + raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) - raise ConfigError, " section cannot have <#{elem.name}> directive" + raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" end elem.set_target_worker_id(target_worker_id) end @@ -94,7 +94,7 @@ def configure(conf) else target_worker_id = target_worker_id_str.to_i if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) - raise ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" + raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end ## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). @@ -102,7 +102,7 @@ def configure(conf) e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) - raise ConfigError, " section cannot have <#{elem.name}> directive" + raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" end elem.set_target_worker_id(target_worker_id) end From 6b6fb9317ebaa0cce8cd0988fbefb1126cb9d79f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Mon, 18 Feb 2019 18:03:17 +0900 Subject: [PATCH 3/9] Add worker_id collision detector Signed-off-by: Hiroshi Hatake --- lib/fluent/root_agent.rb | 5 +++++ test/test_root_agent.rb | 13 +++++++++++++ 2 files changed, 18 insertions(+) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 9b5ac9304e..89c63e8ae2 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -64,6 +64,7 @@ def initialize(log:, system_config: SystemConfig.new) attr_reader :labels def configure(conf) + used_worker_ids = [] # initialize elements conf.elements(name: 'worker').each do |e| target_worker_id_str = e.arg @@ -83,6 +84,10 @@ def configure(conf) if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end + if used_worker_ids.include?(worker_id) + raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on directive. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)} without collisions" + end + used_worker_ids << worker_id e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 25beac9449..143a9a63dd 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -697,6 +697,19 @@ def configure_ra(conf_str) end end + test 'raises configuration error for worker id collisions on multi workers syntax' do + errmsg = "specified worker_id<2> collisions is detected on directive. Available worker id is between 0 and 3 without collisions" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + + + +EOC + configure_ra(conf) + end + end + test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by directive is not allowed. Available multi worker assign syntax is -" assert_raise Fluent::ConfigError.new(errmsg) do From 03da2b3b51fcc6ccd9a435a51e73703dd2d1f208 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Tue, 19 Feb 2019 10:42:57 +0900 Subject: [PATCH 4/9] Make more kindly error message when occurring worker_ids collisions Signed-off-by: Hiroshi Hatake --- lib/fluent/root_agent.rb | 7 ++++++- test/test_root_agent.rb | 15 ++++++++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 89c63e8ae2..e726ae8019 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -65,6 +65,10 @@ def initialize(log:, system_config: SystemConfig.new) def configure(conf) used_worker_ids = [] + available_worker_ids = [] + 0.step(Fluent::Engine.system_config.workers - 1, 1).each do |id| + available_worker_ids << id + end # initialize elements conf.elements(name: 'worker').each do |e| target_worker_id_str = e.arg @@ -84,8 +88,9 @@ def configure(conf) if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end + available_worker_ids.delete(worker_id) if available_worker_ids.include?(worker_id) if used_worker_ids.include?(worker_id) - raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on directive. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)} without collisions" + raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" end used_worker_ids << worker_id diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 143a9a63dd..054dc1fbdc 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -698,7 +698,7 @@ def configure_ra(conf_str) end test 'raises configuration error for worker id collisions on multi workers syntax' do - errmsg = "specified worker_id<2> collisions is detected on directive. Available worker id is between 0 and 3 without collisions" + errmsg = "specified worker_id<2> collisions is detected on directive. Available worker id(s): [3]" assert_raise Fluent::ConfigError.new(errmsg) do conf = <<-EOC @@ -710,6 +710,19 @@ def configure_ra(conf_str) end end + test 'raises configuration error for worker id collisions on multi workers syntax when multi avaliable worker_ids are left' do + errmsg = "specified worker_id<1> collisions is detected on directive. Available worker id(s): [2, 3]" + assert_raise Fluent::ConfigError.new(errmsg) do + conf = <<-EOC + + + + +EOC + configure_ra(conf) + end + end + test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by directive is not allowed. Available multi worker assign syntax is -" assert_raise Fluent::ConfigError.new(errmsg) do From 668273b2b717eeb72518a861da20876292aff24c Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 20 Feb 2019 12:03:44 +0900 Subject: [PATCH 5/9] Support multiple worker ids inclusion relation on Config::Element Signed-off-by: Hiroshi Hatake --- lib/fluent/config/element.rb | 19 +++++++++++++------ test/config/test_element.rb | 34 +++++++++++++++++++++++++++++----- 2 files changed, 42 insertions(+), 11 deletions(-) diff --git a/lib/fluent/config/element.rb b/lib/fluent/config/element.rb index 177fa009ab..1359faa5af 100644 --- a/lib/fluent/config/element.rb +++ b/lib/fluent/config/element.rb @@ -36,12 +36,12 @@ def initialize(name, arg, attrs, elements, unused = nil) # it's global logger, not plugin logger: deprecated message should be global warning, not plugin level. @logger = defined?($log) ? $log : nil - @target_worker_id = nil + @target_worker_ids = [] end attr_accessor :name, :arg, :unused, :v1_config, :corresponding_proxies, :unused_in attr_writer :elements - attr_reader :target_worker_id + attr_reader :target_worker_ids RESERVED_PARAMETERS_COMPAT = { '@type' => 'type', @@ -223,22 +223,29 @@ def self.unescape_parameter(v) end def set_target_worker_id(worker_id) - @target_worker_id = worker_id + @target_worker_ids = [worker_id] @elements.each { |e| e.set_target_worker_id(worker_id) } end + def set_target_worker_ids(worker_ids) + @target_worker_ids = worker_ids.uniq + @elements.each { |e| + e.set_target_worker_ids(worker_ids.uniq) + } + end + def for_every_workers? - @target_worker_id == nil + @target_worker_ids.empty? end def for_this_worker? - @target_worker_id == Fluent::Engine.worker_id + @target_worker_ids.include?(Fluent::Engine.worker_id) end def for_another_worker? - @target_worker_id != nil && @target_worker_id != Fluent::Engine.worker_id + !@target_worker_ids.empty? && !@target_worker_ids.include?(Fluent::Engine.worker_id) end end end diff --git a/test/config/test_element.rb b/test/config/test_element.rb index 9e03094bfb..81e871f112 100644 --- a/test/config/test_element.rb +++ b/test/config/test_element.rb @@ -406,11 +406,11 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil) test 'set target_worker_id recursively' do e = element('label', '@mytest', {}, [ element('filter', '**'), element('match', '**', {}, [ element('store'), element('store') ]) ]) e.set_target_worker_id(1) - assert_equal 1, e.target_worker_id - assert_equal 1, e.elements[0].target_worker_id - assert_equal 1, e.elements[1].target_worker_id - assert_equal 1, e.elements[1].elements[0].target_worker_id - assert_equal 1, e.elements[1].elements[1].target_worker_id + assert_equal [1], e.target_worker_ids + assert_equal [1], e.elements[0].target_worker_ids + assert_equal [1], e.elements[1].target_worker_ids + assert_equal [1], e.elements[1].elements[0].target_worker_ids + assert_equal [1], e.elements[1].elements[1].target_worker_ids end end @@ -434,12 +434,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil) assert e.for_this_worker? end + test 'target_worker_ids includes current worker_id' do + e = element() + e.set_target_worker_ids([0]) + assert e.for_this_worker? + end + test 'target_worker_id != current worker_id' do e = element() e.set_target_worker_id(1) assert_false e.for_this_worker? end + test 'target_worker_ids does not includes current worker_id' do + e = element() + e.set_target_worker_ids([1, 2]) + assert_false e.for_this_worker? + end + test "doesn't have target_worker_id" do e = element() assert_false e.for_this_worker? @@ -453,12 +465,24 @@ def element(name = 'ROOT', arg = '', attrs = {}, elements = [], unused = nil) assert_false e.for_another_worker? end + test 'target_worker_ids contains current worker_id' do + e = element() + e.set_target_worker_ids([0, 1]) + assert_false e.for_another_worker? + end + test 'target_worker_id != current worker_id' do e = element() e.set_target_worker_id(1) assert e.for_another_worker? end + test 'target_worker_ids does not contains current worker_id' do + e = element() + e.set_target_worker_ids([1, 2]) + assert e.for_another_worker? + end + test "doesn't have target_worker_id" do e = element() assert_false e.for_another_worker? From 66ac4a343eb22445a85ea9ca07e610f848dac6f3 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 20 Feb 2019 15:05:53 +0900 Subject: [PATCH 6/9] Support multi assigned workers not for all Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/base.rb | 3 ++- lib/fluent/root_agent.rb | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 6011ae137e..1e1e785ac3 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -52,7 +52,8 @@ def fluentd_worker_id def configure(conf) if conf.respond_to?(:for_this_worker?) && conf.for_this_worker? - system_config_override(workers: 1) + workers = conf.target_worker_ids.size || 1 + system_config_override(workers: workers) end super @_state ||= State.new(false, false, false, false, false, false, false, false, false) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index e726ae8019..7b92ae5336 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -83,22 +83,31 @@ def configure(conf) if first_worker_id > last_worker_id raise Fluent::ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by directive is not allowed. Available multi worker assign syntax is -" end + target_worker_ids = [] first_worker_id.step(last_worker_id, 1) do |worker_id| target_worker_id = worker_id.to_i + target_worker_ids << target_worker_id + if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1) raise Fluent::ConfigError, "worker id #{target_worker_id} specified by directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}" end - available_worker_ids.delete(worker_id) if available_worker_ids.include?(worker_id) - if used_worker_ids.include?(worker_id) + available_worker_ids.delete(target_worker_id) if available_worker_ids.include?(target_worker_id) + if used_worker_ids.include?(target_worker_id) && !Fluent::Engine.dry_run_mode raise Fluent::ConfigError, "specified worker_id<#{worker_id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" end - used_worker_ids << worker_id + used_worker_ids << target_worker_id e.elements.each do |elem| unless ['source', 'match', 'filter', 'label'].include?(elem.name) raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" end - elem.set_target_worker_id(target_worker_id) + end + + # On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0). + target_worker_ids = [0] if Fluent::Engine.dry_run_mode + + unless target_worker_ids.empty? + e.set_target_worker_ids(target_worker_ids.uniq) end end else From 9b898c1274e56cb2bd162acf8c0fe462bba26b3f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Wed, 20 Feb 2019 15:07:41 +0900 Subject: [PATCH 7/9] Fix test for multi workers assign syntax on RootAgent Signed-off-by: Hiroshi Hatake --- test/test_root_agent.rb | 36 +++++++++++------------------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/test/test_root_agent.rb b/test/test_root_agent.rb index 054dc1fbdc..73ecd601c3 100644 --- a/test/test_root_agent.rb +++ b/test/test_root_agent.rb @@ -893,30 +893,16 @@ def configure_ra(conf_str) refute ra.error_collector end - test 'with plugins for workers syntax' do + test 'with plugins for workers syntax should match worker_id equals to 2' do conf = <<-EOC - - - @type tcp - tag test.worker_group1 - - @type none - - - - @type stdout - - - - - + @type forward + + @type test_filter + @id test_filter + @type stdout @@ -927,12 +913,12 @@ def configure_ra(conf_str) EOC + ra = configure_ra(conf) - assert_equal 0, ra.inputs.size - assert_equal 0, ra.outputs.size - assert_equal 0, ra.filters.size - assert_equal 0, ra.labels.size - refute ra.error_collector + assert_kind_of Fluent::Plugin::ForwardInput, ra.inputs.first + assert_kind_of Fluent::Plugin::StdoutOutput, ra.outputs.first + assert_kind_of FluentTestFilter, ra.filters.first + assert ra.error_collector end end end From 44f206a991a9b14971aaebdc0637bac9a1fb81e6 Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 21 Feb 2019 12:45:30 +0900 Subject: [PATCH 8/9] Use range syntax for available_worker_ids Signed-off-by: Hiroshi Hatake --- lib/fluent/root_agent.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 7b92ae5336..e5d8a9c7f4 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -65,10 +65,7 @@ def initialize(log:, system_config: SystemConfig.new) def configure(conf) used_worker_ids = [] - available_worker_ids = [] - 0.step(Fluent::Engine.system_config.workers - 1, 1).each do |id| - available_worker_ids << id - end + available_worker_ids = (0..Fluent::Engine.system_config.workers - 1).to_a # initialize elements conf.elements(name: 'worker').each do |e| target_worker_id_str = e.arg From 44b67c99290cc29eedb5717edf31cb6b7172310f Mon Sep 17 00:00:00 2001 From: Hiroshi Hatake Date: Thu, 21 Feb 2019 12:49:11 +0900 Subject: [PATCH 9/9] Ensure workers to 1 when target_worker_ids nil or false Signed-off-by: Hiroshi Hatake --- lib/fluent/plugin/base.rb | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/lib/fluent/plugin/base.rb b/lib/fluent/plugin/base.rb index 1e1e785ac3..70ded69bd3 100644 --- a/lib/fluent/plugin/base.rb +++ b/lib/fluent/plugin/base.rb @@ -52,7 +52,11 @@ def fluentd_worker_id def configure(conf) if conf.respond_to?(:for_this_worker?) && conf.for_this_worker? - workers = conf.target_worker_ids.size || 1 + workers = if conf.target_worker_ids && !conf.target_worker_ids.empty? + conf.target_worker_ids.size + else + 1 + end system_config_override(workers: workers) end super