Skip to content

Commit

Permalink
SIGUSR2: Restart new process with zero downtime
Browse files Browse the repository at this point in the history
This replaces the current `SIGUSR2` (#2716) with the new feature.
(Not supported on Windows).

* Restart the new process with zero downtime

The primary motivation is to enable the update of Fluentd
without data loss of plugins such as `in_udp`.

Specification:

* 2 ways to trigger this feature (non-Windows):
  * Signal: `SIGUSR2` to the supervisor.
    * Sending `SIGUSR2` to the workers triggers the traditional
      GracefulReload.
      * (Leave the traditional way, just in case)
  * RPC: `/api/processes.zeroDowntimeRestart`
    * Leave `/api/config.gracefulReload` for the traditional feature.
* This starts the new supervisor and workers with zero downtime
  for some plugins.
  * Input plugins with `zero_downtime_restart` supported work in
    parallel.
    * Supported input plugins:
      * `in_tcp`
      * `in_udp`
      * `in_syslog`
  * The old processes stop after 10s.
* The new supervisor works in `source-only` mode (#4661)
  until the old processes stop.
  * After the old processes stop, the data handled by the new
    processes are loaded and processed.
  * If need, you can configure `source_only_buffer` (see #4661).
* Windows: Not affected at all. Remains the traditional
  GracefulReload.

Mechanism:

1. The supervisor receives SIGUSR2.
2. Spawn a new supervisor.
3. Take over shared sockets.
4. Launch new workers, and stop old processes in parallel.
   * Launch new workers with source-only mode
     * Limit to zero_downtime_restart_ready? input plugin
   * Send SIGTERM to the old supervisor after 10s delay from 3.
5. The old supervisor stops and sends SIGWINCH to the new one.
6. The new workers run fully.

Note: need these feature

* #4661
* treasure-data/serverengine#146

Conditions under which `zero_downtime_restart_ready?` can be enabled:

* Must be able to work in parallel with another Fluentd instance.
* Notes:
  * The sockets provided by server helper are shared with the
    new Fluentd instance.
  * Input plugins managing a position such as `in_tail` should
    not enable its `zero_downtime_restart_ready?`.
    * Such input plugins do not cause data loss on restart, so
      there is no need to enable this in the first place.
  * `in_http` and `in_forward` could also be supported.
    Not supporting them this time is simply a matter of time to
    consider.

The appropriateness of replacing the traditional SIGUSR2:

* The traditional SIGUSR2 feature has some limitations and issues.
  * Limitations:
    1. A change to system_config is ignored because it needs to
       restart(kill/spawn) process.
    2. All plugins must not use class variable when restarting.
  * Issues:
    * #2259
    * #3469
    * #3549
* This new feature allows restarts without downtime and such
  limitations.
  * Although supported plugins are limited, that is not a
    problem for many plugins.
    (The problem is with server-based input plugins where the
    stop results in data loss).
* This new feature has a big advantage that it can also be used
  to update Fluentd.
  * In the future, fluent-package will use this feature to allow
    update with zero downtime by default.
* If needed, we can still use the traditional feature by RPC or
  directly sending `SIGUSR2` to the workers.

Co-authored-by: Shizuo Fujita <fujita@clear-code.com>
Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
  • Loading branch information
daipom and Watson1978 committed Nov 26, 2024
1 parent 76a11ea commit 049b9f7
Show file tree
Hide file tree
Showing 11 changed files with 646 additions and 35 deletions.
4 changes: 2 additions & 2 deletions lib/fluent/engine.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def initialize

attr_reader :root_agent, :system_config, :supervisor_mode

def init(system_config, supervisor_mode: false)
def init(system_config, supervisor_mode: false, start_in_parallel: false)
@system_config = system_config
@supervisor_mode = supervisor_mode

Expand All @@ -60,7 +60,7 @@ def init(system_config, supervisor_mode: false)

@log_event_verbose = system_config.log_event_verbose unless system_config.log_event_verbose.nil?

@root_agent = RootAgent.new(log: log, system_config: @system_config)
@root_agent = RootAgent.new(log: log, system_config: @system_config, start_in_parallel: start_in_parallel)

self
end
Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_syslog.rb
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,10 @@ def multi_workers_ready?
true
end

def zero_downtime_restart_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def multi_workers_ready?
true
end

def zero_downtime_restart_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/in_udp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,10 @@ def multi_workers_ready?
true
end

def zero_downtime_restart_ready?
true
end

def start
super

Expand Down
4 changes: 4 additions & 0 deletions lib/fluent/plugin/input.rb
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@ def metric_callback(es)
def multi_workers_ready?
false
end

def zero_downtime_restart_ready?
false
end
end
end
end
50 changes: 42 additions & 8 deletions lib/fluent/root_agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,43 @@ module Fluent
class RootAgent < Agent
ERROR_LABEL = "@ERROR".freeze # @ERROR is built-in error label

def initialize(log:, system_config: SystemConfig.new)
class SourceOnlyMode
DISABELD = 0
NORMAL = 1
ONLY_ZERO_DOWNTIME_RESTART_READY = 2

def initialize(with_source_only, start_in_parallel)
if start_in_parallel
@mode = ONLY_ZERO_DOWNTIME_RESTART_READY
elsif with_source_only
@mode = NORMAL
else
@mode = DISABELD
end
end

def enabled?
@mode != DISABELD
end

def only_zero_downtime_restart_ready?
@mode == ONLY_ZERO_DOWNTIME_RESTART_READY
end

def disable!
@mode = DISABELD
end
end

def initialize(log:, system_config: SystemConfig.new, start_in_parallel: false)
super(log: log)

@labels = {}
@inputs = []
@suppress_emit_error_log_interval = 0
@next_emit_error_log_time = nil
@without_source = system_config.without_source || false
@with_source_only = system_config.with_source_only || false
@source_only_mode = SourceOnlyMode.new(system_config.with_source_only, start_in_parallel)
@source_only_buffer_agent = nil
@enable_input_metrics = system_config.enable_input_metrics || false

Expand All @@ -67,7 +95,7 @@ def initialize(log:, system_config: SystemConfig.new)
attr_reader :labels

def source_only_router
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @with_source_only
raise "[BUG] 'RootAgent#source_only_router' should not be called when 'with_source_only' is false" unless @source_only_mode.enabled?
@source_only_buffer_agent.event_router
end

Expand Down Expand Up @@ -154,7 +182,7 @@ def configure(conf)

super

setup_source_only_buffer_agent if @with_source_only
setup_source_only_buffer_agent if @source_only_mode.enabled?

# initialize <source> elements
if @without_source
Expand Down Expand Up @@ -187,9 +215,12 @@ def cleanup_source_only_buffer_agent
end

def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
only_zero_downtime_restart_ready = false

unless kind_or_agent_list
if @with_source_only
if @source_only_mode.enabled?
kind_or_agent_list = [:input, @source_only_buffer_agent]
only_zero_downtime_restart_ready = @source_only_mode.only_zero_downtime_restart_ready?
elsif @source_only_buffer_agent
# source_only_buffer_agent can re-reroute events, so the priority is equal to output_with_router.
kind_or_agent_list = [:input, :output_with_router, @source_only_buffer_agent, @labels.values, :filter, :output].flatten
Expand All @@ -214,6 +245,9 @@ def lifecycle(desc: false, kind_callback: nil, kind_or_agent_list: nil)
end
display_kind = (kind == :output_with_router ? :output : kind)
list.each do |instance|
if only_zero_downtime_restart_ready
next unless instance.respond_to?(:zero_downtime_restart_ready?) and instance.zero_downtime_restart_ready?
end
yield instance, display_kind
end
end
Expand Down Expand Up @@ -257,7 +291,7 @@ def flush!
end

def cancel_source_only!
unless @with_source_only
unless @source_only_mode.enabled?
log.info "do nothing for canceling with-source-only because the current mode is not with-source-only."
return
end
Expand Down Expand Up @@ -285,7 +319,7 @@ def cancel_source_only!
setup_source_only_buffer_agent(flush: true)
start(kind_or_agent_list: [@source_only_buffer_agent])

@with_source_only = false
@source_only_mode.disable!
end

def shutdown(kind_or_agent_list: nil)
Expand Down Expand Up @@ -378,7 +412,7 @@ def add_source(type, conf)
# See also 'fluentd/plugin/input.rb'
input.context_router = @event_router
input.configure(conf)
input.event_emitter_apply_source_only if @with_source_only
input.event_emitter_apply_source_only if @source_only_mode.enabled?
if @enable_input_metrics
@event_router.add_metric_callbacks(input.plugin_id, Proc.new {|es| input.metric_callback(es) })
end
Expand Down
Loading

0 comments on commit 049b9f7

Please sign in to comment.