-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Migrate exec plugins to v0.14 api #1297
Conversation
618069b
to
a15583e
Compare
a15583e
to
853342e
Compare
@repeatedly Could you review this change? |
8fcb357
to
046c787
Compare
Rebased on #1305. |
@tagomoris I will review it. You pushed some changes after previous comment, so I waited to finish your additional commit ;p |
:) |
Hmm, |
@sonots Could you also check this PR? This patch is very large so need more reviewers. |
class TSVFormatter < Formatter | ||
Plugin.register_formatter('tsv', self) | ||
|
||
desc 'Names of fields included in each lines' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Field names" is more simpler than "Names of fields"
|
||
def on_record(time, record) | ||
tag = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line is not needed.
router.emit(tag, time, record) | ||
rescue => e | ||
log.error "exec failed to emit", error: e, tag: tag, record: Yajl.dump(record) | ||
log.error "exec failed to emit", tag: tag, record: Yajl.dump(record), error: e | ||
router.emit_error_event(tag, time, record, "exec failed to emit") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
4th error argument should be an exception object.
prog = "#{@command} #{chunk.path}" | ||
record = inject_values_to_record(tag, time, record) | ||
if @formatter.formatter_type == :text_per_line | ||
@formatter.format(tag, time, record).chomp + "\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using constant value for "\n" is better for reducing temporary object allocation.
prog = if chunk.respond_to?(:path) | ||
"#{@command} #{chunk.path}" | ||
else | ||
tmpfile = Tempfile.new("fluent-plugin-exec-") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"fluent-plugin-out-exec-" is more better.
chunk_id = chunk.unique_id | ||
callback = ->(status){ | ||
begin | ||
if tmpfile |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code should be moved to enusre in try_write
instead of here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. ensure
of #try_write
doesn't take care about child process status.
This callback is called after child process plugin helper find the child process's exit status. So here is the only place to delete tmpfile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm... what happen if exception happens before calling callback?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course, temp files will leak.
But all exceptions about child process status must be rescued by child process plugin helper code. Only bugs should be able to raise errors before callback.
|
||
def on_record(time, record) | ||
tag = nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
tag = nil | ||
tag = extract_tag_from_record(record) | ||
tag = @added_prefix_string + tag if tag && @add_prefix | ||
tag ||= @tag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Off topic. We should care extrated result and @tag
is nil
case.
@next_log_time = Time.now.to_i + @suppress_error_log_interval | ||
end | ||
router.emit_error_event(tag, time, record, "exec_filter failed to emit") if tag && time && record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. 4th argument should be an error object.
end | ||
alias parse_partial_data parse | ||
|
||
def parse_io(io) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be use parse_io(io, &block)
for API signature?
@@ -77,10 +79,20 @@ module CompatParameters | |||
"utc" => nil, | |||
} | |||
|
|||
EXTRACT_PARAMS = { | |||
"time_key" => "time_key", | |||
"time_format" => "time_format", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
More spaces than expected.
…to show situation clearly
This reverts commit 2ea9540.
046c787
to
4686902
Compare
I've pushed commits for review comments. |
555e7a2
to
d96b08f
Compare
d96b08f
to
9d8b6e7
Compare
@repeatedly ping |
tag = tag_remove_prefix(tag) | ||
record = inject_values_to_record(tag, time, record) | ||
if @formatter.formatter_type == :text_per_line | ||
@formatter.format(tag, time, record).chomp + "\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use NEWLINE = "\n"
like above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh
LGTM. But my code review doesn't guarantee new process handling code has no problem on actual workloads. |
This change is to migrate
out_exec_filter
to v0.14 API, and makein/out_exec
plugins free fromFluent::ExecUtil::*
classes.This change includes the changes to:
This change is successor of #1048.