diff --git a/Rakefile b/Rakefile index ff5a5ad05d..8b895d2273 100755 --- a/Rakefile +++ b/Rakefile @@ -14,8 +14,8 @@ Rake::TestTask.new(:base_test) do |t| # $ bundle exec rake base_test TEST=test/test_*.rb t.libs << "test" t.test_files = Dir["test/**/test_*.rb"].sort - t.verbose = false - t.warning = false + t.verbose = true + t.warning = true t.ruby_opts = ["-Eascii-8bit:ascii-8bit"] end diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 5cc097a9fd..f86f6a9deb 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -44,6 +44,7 @@ def initialize(metadata, path, mode, perm: system_config.file_permission || FILE super(metadata) @permission = perm @bytesize = @size = @adding_bytes = @adding_size = 0 + @meta = nil case mode when :create then create_new_chunk(path, perm) diff --git a/lib/fluent/plugin/filter_record_transformer.rb b/lib/fluent/plugin/filter_record_transformer.rb index c2ca53ca9d..4f07f7a033 100644 --- a/lib/fluent/plugin/filter_record_transformer.rb +++ b/lib/fluent/plugin/filter_record_transformer.rb @@ -315,6 +315,7 @@ def expand(str, placeholders, force_stringify = false) class CleanroomExpander def expand(__str_to_eval__, tag, time, record, tag_parts, tag_prefix, tag_suffix, hostname) tags = tag_parts # for old version compatibility + _ = tags # to suppress "unused variable" warning for tags Thread.current[:record_transformer_record] = record # for old version compatibility instance_eval(__str_to_eval__) end diff --git a/lib/fluent/plugin/output.rb b/lib/fluent/plugin/output.rb index 088b537b3f..ebfee6b4cb 100644 --- a/lib/fluent/plugin/output.rb +++ b/lib/fluent/plugin/output.rb @@ -169,6 +169,12 @@ def initialize @buffering = true end @custom_format = implement?(:custom_format) + + @buffer = nil + @secondary = nil + @simple_chunking = nil + @chunk_keys = @chunk_key_time = @chunk_key_tag = nil + @flush_mode = nil end def acts_as_secondary(primary) @@ -623,8 +629,8 @@ def handle_stream_with_standard_format(tag, es, enqueue: false) records += 1 end meta_and_data_bulk = {} - meta_and_data.each_pair do |meta, es| - meta_and_data_bulk[meta] = [es.to_msgpack_stream(time_int: @time_as_integer), es.size] + meta_and_data.each_pair do |meta, m_es| + meta_and_data_bulk[meta] = [m_es.to_msgpack_stream(time_int: @time_as_integer), m_es.size] end write_guard do @buffer.write(meta_and_data_bulk, bulk: true, enqueue: enqueue) @@ -694,7 +700,6 @@ def rollback_write(chunk_id) end def try_rollback_write - now = Time.now @dequeued_chunks_mutex.synchronize do while @dequeued_chunks.first && @dequeued_chunks.first.expired? info = @dequeued_chunks.shift diff --git a/lib/fluent/plugin/storage_local.rb b/lib/fluent/plugin/storage_local.rb index 16102cb6d9..8c21e84822 100644 --- a/lib/fluent/plugin/storage_local.rb +++ b/lib/fluent/plugin/storage_local.rb @@ -34,7 +34,7 @@ def configure(conf) @on_memory = true end elsif @path - path = @path.dup + # ok else # @_plugin_id_configured is true raise NotImplementedError, "implement this feature later with system_config" ## TODO: get process-wide directory for plugin storage, and generate path for this plugin storage instance diff --git a/lib/fluent/plugin_helper/event_emitter.rb b/lib/fluent/plugin_helper/event_emitter.rb index 103a8ef47b..e0b72a2ff7 100644 --- a/lib/fluent/plugin_helper/event_emitter.rb +++ b/lib/fluent/plugin_helper/event_emitter.rb @@ -14,7 +14,6 @@ # limitations under the License. # -require 'fluent/engine' require 'fluent/time' module Fluent @@ -49,6 +48,7 @@ def initialize end def configure(conf) + require 'fluent/engine' super if label_name = conf['@label'] diff --git a/lib/fluent/root_agent.rb b/lib/fluent/root_agent.rb index 00d3a85b7f..916499bca4 100644 --- a/lib/fluent/root_agent.rb +++ b/lib/fluent/root_agent.rb @@ -150,7 +150,7 @@ def flush! log.warn_backtrace end end - flushing_threads = [] + flushing_threads << t end end flushing_threads.each{|t| t.join } diff --git a/lib/fluent/test/input_test.rb b/lib/fluent/test/input_test.rb index 98420b8a86..a04d64e9cb 100644 --- a/lib/fluent/test/input_test.rb +++ b/lib/fluent/test/input_test.rb @@ -99,14 +99,26 @@ def run_should_stop? false end + module EmitStreamWrapper + def emit_stream_callee=(method) + @emit_stream_callee = method + end + def emit_stream(tag, es) + @emit_stream_callee.call(tag, es) + end + end + def run(num_waits = 10, &block) m = method(:emit_stream) - Engine.define_singleton_method(:emit_stream) {|tag,es| - m.call(tag, es) - } - instance.router.define_singleton_method(:emit_stream) {|tag,es| - m.call(tag, es) - } + (class << Engine; self; end).module_eval do + prepend EmitStreamWrapper + end + Engine.emit_stream_callee = m + (class << instance.router; self; end).module_eval do + prepend EmitStreamWrapper + end + instance.router.emit_stream_callee = m + super(num_waits) { block.call if block diff --git a/lib/fluent/test/output_test.rb b/lib/fluent/test/output_test.rb index 04c12ab935..d590688507 100644 --- a/lib/fluent/test/output_test.rb +++ b/lib/fluent/test/output_test.rb @@ -139,7 +139,6 @@ def run(&block) assert_equal(@expected_buffer, buffer) end - chunks = [] lines.keys.each do |meta| chunk = @instance.buffer.generate_chunk(meta) chunk.append(lines[meta]) diff --git a/test/config/test_configurable.rb b/test/config/test_configurable.rb index 3f26824bd0..a8dff79b87 100644 --- a/test/config/test_configurable.rb +++ b/test/config/test_configurable.rb @@ -665,14 +665,14 @@ class TestConfigurable < ::Test::Unit::TestCase checker = lambda { |conf| ConfigurableSpec::Example0.new.configure(conf) } assert_nothing_raised { checker.call(complete) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "stringvalue" }) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "boolvalue" }) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "integervalue"}) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "sizevalue" }) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "timevalue" }) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "floatvalue" }) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "hashvalue" }) } - assert_raise(Fluent::ConfigError) { checker.call(complete.reject{|k,v| k == "arrayvalue" }) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("stringvalue"); checker.call(c) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("boolvalue"); checker.call(c) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("integervalue"); checker.call(c) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("sizevalue"); checker.call(c) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("timevalue"); checker.call(c) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("floatvalue"); checker.call(c) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("hashvalue"); checker.call(c) } + assert_raise(Fluent::ConfigError) { c = complete.dup; c.delete("arrayvalue"); checker.call(c) } end test 'generates section with default values for init:true sections' do diff --git a/test/plugin/test_filter_record_transformer.rb b/test/plugin/test_filter_record_transformer.rb index 76e82c4e50..b59ceff620 100644 --- a/test/plugin/test_filter_record_transformer.rb +++ b/test/plugin/test_filter_record_transformer.rb @@ -117,13 +117,13 @@ def emit(config, msgs = ['']) config = %[ enable_ruby yes - message ${hostname} ${tag_parts.last} ${URI.encode(message)} + message ${hostname} ${tag_parts.last} ${"'" + message + "'"} ] msgs = ['1', '2'] es = emit(config, msgs) es.each_with_index do |(_t, r), i| - assert_equal("#{@hostname} #{@tag_parts[-1]} #{msgs[i]}", r['message']) + assert_equal("#{@hostname} #{@tag_parts[-1]} '#{msgs[i]}'", r['message']) end end diff --git a/test/plugin/test_in_http.rb b/test/plugin/test_in_http.rb index 7e7265893f..bf52aa596a 100644 --- a/test/plugin/test_in_http.rb +++ b/test/plugin/test_in_http.rb @@ -251,7 +251,7 @@ def test_with_regexp body = record.map { |k, v| v.to_s }.join(':') - res = post("/#{tag}?time=#{_time.to_s}", body) + res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'application/octet-stream'}) assert_equal "200", res.code } end @@ -273,7 +273,7 @@ def test_with_csv d.run do d.expected_emits.each { |tag, _time, record| body = record.map { |k, v| v }.to_csv - res = post("/#{tag}?time=#{_time.to_s}", body) + res = post("/#{tag}?time=#{_time.to_s}", body, {'Content-Type' => 'text/comma-separated-values'}) assert_equal "200", res.code } end @@ -308,7 +308,7 @@ def test_cors_allowed begin d.run do Net::HTTP.start("127.0.0.1", PORT) do |http| - req = Net::HTTP::Post.new("/foo/bar", {"Origin" => "http://foo.com"}) + req = Net::HTTP::Post.new("/foo/bar", {"Origin" => "http://foo.com", "Content-Type" => "application/octet-stream"}) res = http.request(req) acao = res["Access-Control-Allow-Origin"] @@ -333,7 +333,7 @@ def test_cors_disallowed begin d.run do Net::HTTP.start("127.0.0.1", PORT) do |http| - req = Net::HTTP::Post.new("/foo/bar", {"Origin" => "http://bar.com"}) + req = Net::HTTP::Post.new("/foo/bar", {"Origin" => "http://bar.com", "Content-Type" => "application/octet-stream"}) res = http.request(req) response_code = res.code @@ -352,6 +352,10 @@ def test_cors_disallowed $test_in_http_content_types = [] $test_in_http_content_types_flag = false module ContentTypeHook + def initialize(*args) + @io_handler = nil + super + end def on_headers_complete(headers) super if $test_in_http_content_types_flag @@ -402,8 +406,14 @@ def post(path, params, header = {}) http = Net::HTTP.new("127.0.0.1", PORT) req = Net::HTTP::Post.new(path, header) if params.is_a?(String) + unless header.has_key?('Content-Type') + header['Content-Type'] = 'application/octet-stream' + end req.body = params else + unless header.has_key?('Content-Type') + header['Content-Type'] = 'application/x-www-form-urlencoded' + end req.set_form_data(params) end http.request(req) diff --git a/test/plugin/test_output_as_buffered.rb b/test/plugin/test_output_as_buffered.rb index b7bf1977d8..6793dd5528 100644 --- a/test/plugin/test_output_as_buffered.rb +++ b/test/plugin/test_output_as_buffered.rb @@ -272,7 +272,6 @@ def waiting(seconds) @i.register(:format){|tag,time,record| [tag,time,record].to_json + "\n" } @i.register(:write){|chunk| chunk.read.split("\n").reject{|l| l.empty? }.each{|data| ary << data } } - tag = "test.tag" t = event_time() r = {} (0...10).each do |i| @@ -382,7 +381,6 @@ def waiting(seconds) @i.register(:format){|tag,time,record| [tag,time,record].to_json + "\n" } @i.register(:write){|chunk| chunk.read.split("\n").reject{|l| l.empty? }.each{|data| ary << data } } - tag = "test.tag" t = event_time() r = {} (0...10).each do |i| diff --git a/test/plugin/test_output_as_buffered_overflow.rb b/test/plugin/test_output_as_buffered_overflow.rb index a60d8d7143..acfbfbd71b 100644 --- a/test/plugin/test_output_as_buffered_overflow.rb +++ b/test/plugin/test_output_as_buffered_overflow.rb @@ -15,6 +15,10 @@ def register(name, &block) end end class DummyAsyncOutput < DummyBareOutput + def initialize + super + @format = @write = nil + end def format(tag, time, record) @format ? @format.call(tag, time, record) : [tag, time, record].to_json end @@ -37,7 +41,8 @@ def waiting(seconds) yield end rescue Timeout::Error - STDERR.print *(@i.log.out.logs) + logs = @i.log.out.logs + STDERR.print(*logs) raise end end @@ -133,7 +138,7 @@ def waiting(seconds) assert !@i.buffer.storable? - thread = Thread.new do + Thread.new do sleep 3 failing = false end