Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multi workers assign syntax on <worker> section. Fix #2289 #2292

Merged
merged 9 commits into from
Feb 25, 2019
42 changes: 32 additions & 10 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -71,19 +71,41 @@ def configure(conf)
raise ConfigError, "Missing worker id on <worker> directive"
end

target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
end
target_worker_ids = target_worker_id_str.split("-")
if target_worker_ids.size == 2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add duplication check to avoid following case?

<worker 0-3>
</worker>

<worker 3-7> # should be 4-7
</worker>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added a collisions checker in d900a73.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I added more concreate error message commit: 03da2b3.

first_worker_id = target_worker_ids.first.to_i
last_worker_id = target_worker_ids.last.to_i
if first_worker_id > last_worker_id
raise ConfigError, "greater first_worker_id<#{first_worker_id}> than last_worker_id<#{last_worker_id}> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fluent::ConfigError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used full-qualified name: 4200740.

end
first_worker_id.step(last_worker_id, 1) do |worker_id|
target_worker_id = worker_id.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end
elem.set_target_worker_id(target_worker_id)
end
end
else
target_worker_id = target_worker_id_str.to_i
if target_worker_id < 0 || target_worker_id > (Fluent::Engine.system_config.workers - 1)
raise ConfigError, "worker id #{target_worker_id} specified by <worker> directive is not allowed. Available worker id is between 0 and #{(Fluent::Engine.system_config.workers - 1)}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end

## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode
## On dry_run mode, all worker sections have to be configured on supervisor (recognized as worker_id = 0).
target_worker_id = 0 if Fluent::Engine.dry_run_mode

e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
e.elements.each do |elem|
unless ['source', 'match', 'filter', 'label'].include?(elem.name)
raise ConfigError, "<worker> section cannot have <#{elem.name}> directive"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

end
elem.set_target_worker_id(target_worker_id)
end
elem.set_target_worker_id(target_worker_id)
end
conf += e
end
Expand Down
64 changes: 64 additions & 0 deletions test/test_root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,28 @@ def configure_ra(conf_str)
end
end

test 'raises configuration error for too big worker id on multi workers syntax' do
errmsg = "worker id 4 specified by <worker> directive is not allowed. Available worker id is between 0 and 3"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 1-4>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for too big worker id on invalid reversed multi workers syntax' do
errmsg = "greater first_worker_id<3> than last_worker_id<0> specified by <worker> directive is not allowed. Available multi worker assign syntax is <smaller_worker_id>-<greater_worker_id>"
assert_raise Fluent::ConfigError.new(errmsg) do
conf = <<-EOC
<worker 3-0>
</worker>
EOC
configure_ra(conf)
end
end

test 'raises configuration error for invalid elements as a child of worker section' do
errmsg = '<worker> section cannot have <system> directive'
assert_raise Fluent::ConfigError.new(errmsg) do
Expand Down Expand Up @@ -836,6 +858,48 @@ def configure_ra(conf_str)
</match>
</label>
</worker>
EOC
ra = configure_ra(conf)
assert_equal 0, ra.inputs.size
assert_equal 0, ra.outputs.size
assert_equal 0, ra.filters.size
assert_equal 0, ra.labels.size
refute ra.error_collector
end

test 'with plugins for workers syntax' do
conf = <<-EOC
<worker 0-1>
<source>
@type tcp
tag test.worker_group1
<parse>
@type none
</parse>
</source>
<match pattern>
@type stdout
</match>
<label @ERROR>
<match>
@type null
</match>
</label>
</worker>

<worker 2-3>
<source>
@type forward
</source>
<match pattern>
@type stdout
</match>
<label @ERROR>
<match>
@type null
</match>
</label>
</worker>
EOC
ra = configure_ra(conf)
assert_equal 0, ra.inputs.size
Expand Down