Skip to content

Commit

Permalink
Merge pull request fluent#4619 from Watson1978/async-2.x
Browse files Browse the repository at this point in the history
http_server: Ready to support Async 2.0 gem
  • Loading branch information
ashie authored Oct 9, 2024
2 parents 8369296 + 483be95 commit b4814cb
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 11 deletions.
1 change: 0 additions & 1 deletion fluentd.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ Gem::Specification.new do |gem|
gem.add_development_dependency("test-unit", ["~> 3.3"])
gem.add_development_dependency("test-unit-rr", ["~> 1.0"])
gem.add_development_dependency("oj", [">= 2.14", "< 4"])
gem.add_development_dependency("async", "~> 1.23")
gem.add_development_dependency("async-http", ">= 0.50.0")
gem.add_development_dependency("aws-sigv4", ["~> 1.8"])
gem.add_development_dependency("aws-sdk-core", ["~> 3.191"])
Expand Down
30 changes: 23 additions & 7 deletions lib/fluent/plugin_helper/http_server/server.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil)
scheme = tls_context ? 'https' : 'http'
@uri = URI("#{scheme}://#{@addr}:#{@port}").to_s
@router = Router.new(default_app)
@reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(@logger))
@server_task = nil
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)

opts = if tls_context
{ ssl_context: tls_context }
Expand All @@ -54,24 +55,35 @@ def initialize(addr:, port:, logger:, default_app: nil, tls_context: nil)
end

def start(notify = nil)
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)
@logger.debug("Start async HTTP server listening #{@uri}")
task = @reactor.run do
@server.run

Async do |task|
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)
@server_task = task.async do
Console.logger = Fluent::Log::ConsoleAdapter.wrap(@logger)
@server.run
end
if notify
notify.push(:ready)
end

if async_v2?
@server_task_queue = ::Thread::Queue.new
@server_task_queue.pop
@server_task&.stop
end
end

task.stop
@logger.debug('Finished HTTP server')
end

def stop
@logger.debug('closing HTTP server')

if @reactor
@reactor.stop
if async_v2?
@server_task_queue&.push(:stop)
else
@server_task&.stop
end
end

Expand All @@ -88,6 +100,10 @@ def stop
@router.mount(name, path, app || block)
end
end

private def async_v2?
Gem::Version.new(Async::VERSION) >= Gem::Version.new('2.0')
end
end
end
end
Expand Down
7 changes: 4 additions & 3 deletions test/plugin_helper/test_http_server_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -127,20 +127,21 @@ def start_https_request(addr, port, verify: true, cert_path: nil, selfsigned: tr
end

client = Async::HTTP::Client.new(Async::HTTP::Endpoint.parse("https://#{addr}:#{port}", ssl_context: context))
reactor = Async::Reactor.new(nil, logger: Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER))
Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER)

resp = nil
error = nil

reactor.run do
Sync do
Console.logger = Fluent::Log::ConsoleAdapter.wrap(NULL_LOGGER)
begin
response = yield(client)
rescue => e # Async::Reactor rescue all error. handle it by myself
error = e
end

if response
resp = Response.new(response.status.to_s, response.body.read, response.headers)
resp = Response.new(response.status.to_s, response.read, response.headers)
end
end

Expand Down

0 comments on commit b4814cb

Please sign in to comment.