Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi worker processing #1386

Merged
merged 34 commits into from
Jan 10, 2017
Merged

Multi worker processing #1386

merged 34 commits into from
Jan 10, 2017

Conversation

tagomoris
Copy link
Member

@tagomoris tagomoris commented Dec 22, 2016

This feature request is to implement "symmetric multi worker processing", which runs specified number of Fluentd worker processes, to use 2 or more CPU cores by just one configuration file.

All plugins used in this configuration MUST support multi worker feature, and MUST show it by #multi_worker_ready? method (returns true or false after #configure).
In default, all input and output plugins return false, and all other plugins return true. 3rd party plugins SHOULD claim whether it supports multi worker processing or not.

Points:

  • buffer paths / local plugin storages are automatically configured to use "workerN" directory under server root directory (or directory specified by "path")
  • worker_id=0 takes care about buffer chunk files under the directory for single worker process configuration
  • when users decrease the number of workers, they should take care about buffer chunk files
    • it works well to move buffer chunk files between directories (workers 4 -> 3, mv worker3/* worker2)
  • logger will print worker id in logs from each worker processes

@tagomoris tagomoris added enhancement Feature request or improve operations v0.14 work-in-progress labels Dec 22, 2016
@tagomoris tagomoris self-assigned this Dec 22, 2016
@tagomoris
Copy link
Member Author

tagomoris commented Dec 22, 2016

I'm getting the workers works well:

``` $ bundle exec bin/fluentd -c example/in_forward_workers.conf 2016-12-22 20:02:15 +0900 [info]: reading config file path="example/in_forward_workers.conf" 2016-12-22 20:02:15 +0900 [info]: starting fluentd-0.14.10 pid=21028 2016-12-22 20:02:15 +0900 [info]: spawn command to main: cmdline=["/Users/tagomoris/.rbenv/versions/2.3.1/bin/ruby", "-Eascii-8bit:ascii-8bit", "-rbundler/setup", "bin/fluentd", "-c", "example/in_forward_workers.conf", "--under-supervisor"] 2016-12-22 20:02:16 +0900 [info]: reading config file path="example/in_forward_workers.conf" 2016-12-22 20:02:16 +0900 [info]: starting fluentd-0.14.10 without supervision pid=21059 2016-12-22 20:02:16 +0900 [info]: gem 'fluentd' version '0.14.10' 2016-12-22 20:02:16 +0900 [info]: adding match pattern="test" type="stdout" 2016-12-22 20:02:16 +0900 [info]: reading config file path="example/in_forward_workers.conf" 2016-12-22 20:02:16 +0900 [info]: starting fluentd-0.14.10 without supervision pid=21058 2016-12-22 20:02:16 +0900 [info]: gem 'fluentd' version '0.14.10' 2016-12-22 20:02:16 +0900 [info]: adding match pattern="test" type="stdout" 2016-12-22 20:02:16 +0900 [info]: adding source type="forward" 2016-12-22 20:02:16 +0900 [info]: using configuration file: workers 3 @type forward @type stdout worker_id_key "worker_id" 2016-12-22 20:02:16 +0900 [info]: starting fluentd worker pid=21059 ppid=21028 worker=2 2016-12-22 20:02:16 +0900 [info]: fluentd worker is now running 2016-12-22 20:02:16 +0900 [info]: reading config file path="example/in_forward_workers.conf" 2016-12-22 20:02:16 +0900 [info]: adding source type="forward" 2016-12-22 20:02:16 +0900 [info]: starting fluentd-0.14.10 without supervision pid=21057 2016-12-22 20:02:16 +0900 [info]: gem 'fluentd' version '0.14.10' 2016-12-22 20:02:16 +0900 [info]: adding match pattern="test" type="stdout" 2016-12-22 20:02:16 +0900 [info]: using configuration file: workers 3 @type forward @type stdout worker_id_key "worker_id" 2016-12-22 20:02:16 +0900 [info]: starting fluentd worker pid=21058 ppid=21028 worker=1 2016-12-22 20:02:16 +0900 [info]: fluentd worker is now running 2016-12-22 20:02:16 +0900 [info]: adding source type="forward" 2016-12-22 20:02:16 +0900 [info]: using configuration file: workers 3 @type forward @type stdout worker_id_key "worker_id" 2016-12-22 20:02:16 +0900 [info]: starting fluentd worker pid=21057 ppid=21028 worker=0 2016-12-22 20:02:16 +0900 [info]: fluentd worker is now running 2016-12-22 20:02:20.210008000 +0900 test: {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaay","worker_id":2} 2016-12-22 20:02:21.137974000 +0900 test: {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaay","worker_id":0} 2016-12-22 20:02:21.972813000 +0900 test: {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaay","worker_id":0} 2016-12-22 20:02:22.808682000 +0900 test: {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaay","worker_id":1} 2016-12-22 20:02:23.612803000 +0900 test: {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaay","worker_id":1} 2016-12-22 20:02:29.273108000 +0900 test: {"message":"yaaaaaaaaaaaaaaaaaaaaaaaaay","worker_id":2} ```

@tagomoris
Copy link
Member Author

Current status:

$ bundle exec bin/fluentd -c example/in_forward_workers.conf 
2016-12-27 11:55:07 +0900 [info]: reading config file path="example/in_forward_workers.conf"
2016-12-27 11:55:07 +0900 [info]: starting fluentd-0.14.11 pid=8877
2016-12-27 11:55:07 +0900 [info]: spawn command to main:  cmdline=["/Users/tagomoris/.rbenv/versions/2.4.0/bin/ruby", "-Eascii-8bit:ascii-8bit", "-rbundler/setup", "bin/fluentd", "-c", "example/in_forward_workers.conf", "--under-supervisor"]
2016-12-27 11:55:08 +0900 [info]: gem 'fluentd' version '0.14.11'
2016-12-27 11:55:08 +0900 [info]: adding match pattern="test" type="stdout"
2016-12-27 11:55:08 +0900 [info]: #2 starting fluentd worker pid=8908 ppid=8877 worker=2
2016-12-27 11:55:08 +0900 [info]: #2 [forward_in_1] listening a tcp port port=24224 bind="0.0.0.0"
2016-12-27 11:55:08 +0900 [info]: #2 fluentd worker is now running worker=2
2016-12-27 11:55:08 +0900 [info]: adding source type="forward"
2016-12-27 11:55:08 +0900 [info]: using configuration file: <ROOT>
  <system>
    workers 3
    root_dir "/Users/tagomoris/github/fluentd/test/tmp/root"
  </system>
  <source>
    @type forward
    @id forward_in_1
  </source>
  <match test>
    @type stdout
    @id stdout_out_1
    <inject>
      worker_id_key "worker_id"
    </inject>
    <buffer>
      @type "file"
      flush_interval 1s
    </buffer>
  </match>
</ROOT>
2016-12-27 11:55:08 +0900 [info]: #0 starting fluentd worker pid=8906 ppid=8877 worker=0
2016-12-27 11:55:08 +0900 [info]: #0 [forward_in_1] listening a tcp port port=24224 bind="0.0.0.0"
2016-12-27 11:55:08 +0900 [info]: #0 fluentd worker is now running worker=0
2016-12-27 11:55:08 +0900 [info]: #1 starting fluentd worker pid=8907 ppid=8877 worker=1
2016-12-27 11:55:08 +0900 [info]: #1 [forward_in_1] listening a tcp port port=24224 bind="0.0.0.0"
2016-12-27 11:55:08 +0900 [info]: #1 fluentd worker is now running worker=1
2016-12-27 11:55:21.513142000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":1}
2016-12-27 11:55:22.239894000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":0}
2016-12-27 11:55:22.606160000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":2}
2016-12-27 11:55:23.564816000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":2}
2016-12-27 11:55:23.950919000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":0}
2016-12-27 11:55:24.484824000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":0}
2016-12-27 11:55:23.159434000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":1}
2016-12-27 11:55:24.180292000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":1}
2016-12-27 11:55:24.792380000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":2}
2016-12-27 11:55:25.168374000 +0900 test: {"message":"yaaaaaaaaaaaaay","worker_id":0}
^C2016-12-27 11:56:04 +0900 [info]: Received graceful stop
2016-12-27 11:56:05 +0900 [info]: #0 shutting down fluentd worker worker=0
2016-12-27 11:56:05 +0900 [info]: #2 shutting down fluentd worker worker=2
2016-12-27 11:56:05 +0900 [info]: #1 shutting down fluentd worker worker=1
2016-12-27 11:56:06 +0900 [info]: Worker 2 finished with status 0
2016-12-27 11:56:06 +0900 [info]: Worker 1 finished with status 0
2016-12-27 11:56:06 +0900 [info]: Worker 0 finished with status 0

@repeatedly
Copy link
Member

Before, we considered following syntax:

<workers 3>
  <source>
    @type forward
  </source>
  <match app.**>
    @type s3
  </match>
</workers>
<source>
  @type dstat
</source>
<match stat.**>
  @type mackerel
</match>

Is this dropped?

@tagomoris
Copy link
Member Author

@repeatedly Yes, in my current opinion. It's not "symmetric".
I can understand such configuration workloads, but it's difficult to implement it in reasonable way.

@tagomoris
Copy link
Member Author

I just found an idea to enable a configuration section only in specified worker process:

<system>
  workers 3
</system>
<source>
  # ...
</source>
<match data.**>
  # ...
</match>

<worker 0>
  <source>
    @type dstat
  </source>
  <match stat.**>
    @type mackerel
  </match>
</worker>

It's much easy to implement, and also easy to understand which worker runs the configuration specified by <worker> section.

@repeatedly
Copy link
Member

It is acceptable configuration for me.

@tagomoris
Copy link
Member Author

OK, i'll create an another issue for that feature request. It's too large to implement in this feature branch, and acceptable to implement in future versions, I think.

@tagomoris
Copy link
Member Author

@repeatedly could you review this feature?

@tagomoris
Copy link
Member Author

@repeatedly ping

worker_id_part = if type == :default && (@process_type == :worker0 || @process_type == :workers)
@worker_id_part
else
""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.freeze

end

if plugin_id_configured?
@log.optional_header = "[#{@id}] "
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need #{self.class.name}?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including plugin class name looks too verbose for me.
It's not useful for end users, and it's clear when -v specified via filename.

@@ -67,33 +73,42 @@ def configure(conf)

@@buffer_paths[@path] = type_of_owner

if File.exist?(@path)
if File.directory?(@path)
if File.exist?(@path) && File.directory?(@path) || !File.exist?(@path) && !@path.include?('.*') # directory
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need both File.exist?(@path) and !File.exist?(@path)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because this clause must NOT match the condition of File.exist?(@path) && !File.directory?(@path).

Copy link
Member

@repeatedly repeatedly Jan 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If so, please use () to separate conditions.
&& || && combination in one line is hard to read and maintain.

end

def multi_workers_ready?
### TODO: add hack to synchronize for multi workers
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need more tasks to improve multi workers support in out_file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, it's just mistake not to remove this comment.

show_plugin_config if @show_plugin_config
read_config
set_system_config

if @workers < 1
raise Fluent::ConfigError, "invalid number of workers:#{@workers}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Invalid number of workers. Must be "> 0" is more clear for users.

else
""
end
log_msg = "#{time.strftime(@time_format)}[#{LEVEL_TEXT[level]}]: #{worker_id_part}"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about inserting worker_id_part before :?
It seems clear that this is system header, not message body.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing there may breaks user-side integrations to match log level parts.

@repeatedly
Copy link
Member

OT: We should write append related NOTICE in the out_file article.

@tagomoris
Copy link
Member Author

I've added some commits for review comments.

@repeatedly
Copy link
Member

when users decrease the number of workers, they should take care about buffer chunk files

Is it manual operation, right?

@tagomoris
Copy link
Member Author

Exactly.

@tagomoris
Copy link
Member Author

Updated/added some commits based on current master HEAD.

@tagomoris
Copy link
Member Author

I'm pushing some other pull-requests for loggers and out_forward. So it's needed to resolve conflicts.
@repeatedly But anyway, are there any comments? Can I merge this after that?

@repeatedly
Copy link
Member

LGTM.
After merged, users can test this feature with actual plugins.

…hown or not.

In multi workers status, the same log messages will be shown 2 or more times without any controls.
With this patch, loggger will show logs about process-wide control just once.
@tagomoris tagomoris merged commit 19ecdd0 into master Jan 10, 2017
@tagomoris tagomoris deleted the multi-worker-processing branch January 10, 2017 17:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Feature request or improve operations v0.14
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants