Skip to content

Commit

Permalink
Adding ability to terminate pipeline when EOF reached (logstash-plugi…
Browse files Browse the repository at this point in the history
…ns#212)

    * Adding exit_when_all_files_read flag to config
    * Disable active discovery when exit_when_all_files_read set to true
    * Remove file from watched_file_collection when EOF reached and flag set to true
    * Adding condition to exit when file collection is empty and flag set to true

Fixes logstash-plugins#212
  • Loading branch information
BigYellowHammer authored and andsel committed Feb 19, 2020
1 parent fcea343 commit 50ea67f
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 3 deletions.
15 changes: 15 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ see <<plugins-{type}s-{plugin}-string_duration,string_duration>> for the details
| <<plugins-{type}s-{plugin}-sincedb_write_interval>> |<<number,number>> or <<plugins-{type}s-{plugin}-string_duration,string_duration>>|No
| <<plugins-{type}s-{plugin}-start_position>> |<<string,string>>, one of `["beginning", "end"]`|No
| <<plugins-{type}s-{plugin}-stat_interval>> |<<number,number>> or <<plugins-{type}s-{plugin}-string_duration,string_duration>>|No
| <<plugins-{type}s-{plugin}-exit_when_all_files_read>> |<<boolean,boolean>>|No
|=======================================================================

Also see <<plugins-{type}s-{plugin}-common-options>> for a list of options supported by all
Expand Down Expand Up @@ -435,6 +436,20 @@ Reading and enqueuing across all grown files can take time, especially if
the pipeline is congested. So the overall loop time is a combination of the
`stat_interval` and the file read time.

[id="plugins-{type}s-{plugin}-exit_when_all_files_read"]
===== `exit_when_all_files_read`

* Value type is <<boolean,boolean>>
* Default value is `false`

This option can be used in `read` mode to enforce closing all watchers when file gets read.
Can be used in situation when content of the file is static and won't change during execution.
When set to `true` it also disables active discovery of the files - only files that were in
the directories when process was started will be read.
It supports `sincedb` entries. When file was processed once, then modified - next run will only
read newly added entries.


[id="plugins-{type}s-{plugin}-common-options"]
include::{include_path}/{type}.asciidoc[]

Expand Down
12 changes: 11 additions & 1 deletion lib/filewatch/read_mode/processor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,24 @@ def process_active(watched_files)
break if watch.quit?

if watched_file.compressed?
read_zip_file(watched_file)
read_zip_file(watched_file)
else
read_file(watched_file)
end

if @settings.exit_when_all_files_read
common_detach_when_allread(watched_file)
end
# handlers take care of closing and unwatching
end
end

def common_detach_when_allread(watched_file)
watched_file.unwatch
deletable_filepaths << watched_file.path
logger.trace("Whole file read: #{watched_file.path}, removing from collection")
end

def common_deleted_reaction(watched_file, action)
# file has gone away or we can't read it anymore.
watched_file.unwatch
Expand Down
2 changes: 2 additions & 0 deletions lib/filewatch/settings.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class Settings
attr_reader :exclude, :start_new_files_at, :file_chunk_count, :file_chunk_size
attr_reader :sincedb_path, :sincedb_write_interval, :sincedb_expiry_duration
attr_reader :file_sort_by, :file_sort_direction
attr_reader :exit_when_all_files_read

def self.from_options(opts)
new.add_options(opts)
Expand Down Expand Up @@ -50,6 +51,7 @@ def add_options(opts)
@sincedb_expiry_duration = @opts.fetch(:sincedb_clean_after)
@file_sort_by = @opts[:file_sort_by]
@file_sort_direction = @opts[:file_sort_direction]
@exit_when_all_files_read = @opts[:exit_when_all_files_read]
self
end

Expand Down
8 changes: 6 additions & 2 deletions lib/filewatch/watch.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def subscribe(observer, sincedb_collection)
reset_quit
until quit?
iterate_on_state
break if quit?
# Don't discover new files when files to read are known at the beginning
break if quit? || @settings.exit_when_all_files_read
sincedb_collection.write_if_requested
glob += 1
if glob == interval
Expand Down Expand Up @@ -76,7 +77,10 @@ def quit
end

def quit?
@quit.true?
if @settings.exit_when_all_files_read
@exit = @watched_files_collection.empty?
end
@quit.true? || @exit
end

private
Expand Down
6 changes: 6 additions & 0 deletions lib/logstash/inputs/file.rb
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,11 @@ class File < LogStash::Inputs::Base
# perhaps path + asc will help to achieve the goal of controlling the order of file ingestion
config :file_sort_direction, :validate => ["asc", "desc"], :default => "asc"

# When in 'read' mode - this option is closing all file watchers when EOF is hit
# This option also disables discovery of new/changes files. It works only on files found at the beginning
# Sincedb still works, if you run LS once again after doing some changes - only new values will be read
config :exit_when_all_files_read, :validate => :boolean, :default => false

public

class << self
Expand Down Expand Up @@ -260,6 +265,7 @@ def register
:file_chunk_size => @file_chunk_size,
:file_sort_by => @file_sort_by,
:file_sort_direction => @file_sort_direction,
:exit_when_all_files_read => @exit_when_all_files_read,
}

@completed_file_handlers = []
Expand Down

0 comments on commit 50ea67f

Please sign in to comment.