Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] add connection pool class #4

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
4 changes: 4 additions & 0 deletions lib/ok_hbase/no_connections_available.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
module OkHbase
class NoConnectionsAvailable < RuntimeError
end
end
83 changes: 83 additions & 0 deletions lib/ok_hbase/pool.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
require 'thread'
require 'timeout'

require 'ok_hbase/connection'
require 'ok_hbase/no_connections_available'


module OkHbase
class Pool
@_lock
@_connection_queue

def initialize(size, opts={})
raise TypeError.new("'size' must be an integer") unless size.is_a? Integer
raise ArgumentError.new("'size' must be > 0") unless size > 0

OkHbase.logger.debug("Initializing connection pool with #{size} connections.")

@_lock = Mutex.new
@_connection_queue = Queue.new

connection_opts = opts

connection_opts[:auto_connect] = false

size.times do
connection = OkHbase::Connection.new(connection_opts)
@_connection_queue << connection
end

# The first connection is made immediately so that trivial
# mistakes like unresolvable host names are raised immediately.
# Subsequent connections are connected lazily.
self.connection {}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is something that you should allow the app to decide to do. In Phoenix, there may be instances where the thread pool gets created due to some initialization code, but we don't want to immediately try connection. For example, it would be great if Phoenix could do a rake -T without attempting to connect to hbase.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll add a flag to handle this

end

def connection(timeout = nil)
connection = Thread.current[:current_connection]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to use a symbol name like :ok_hbase_connection, just in case something else like ActiveRecord uses a symbol called :current_connection

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good call


return_after_use = false

unless connection
return_after_use = true
connection = _acquire_connection(timeout)
@_lock.synchronize do
Thread.current[:current_connection] = connection

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you synchronizing this block? Thread.current[:current_connection] would be distinct for each thread, so there isn't any sort of a resource conflict here.

I think it might be the line above (the _acquire_connection) that you want to make sure only one thread is doing at a time.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an artifact of the Python code I started from. Python has a bug where acquiring the lock there is needed. I'll fix it.

end
end

begin
connection.open()
yield connection
rescue Apache::Hadoop::Hbase::Thrift::IOError, Thrift::TransportException, SocketError => e
OkHbase.logger.info("Replacing tainted pool connection")

connection.send(:_refresh_thrift_client)
connection.open
raise e
ensure
if return_after_use
Thread.current.delete[:current_connection]
_return_connection(connection)
end
end
end

private

def _acquire_connection(timeout = nil)
begin
Timeout.timeout(timeout) do
return @_connection_queue.deq
end
rescue TimeoutError
raise OkHbase::NoConnectionsAvailable.new("No connection available from pool within specified timeout: #{timeout}")
end
end

def _return_connection(connection)
@_connection_queue << connection

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked whether enqueueing an element like this is thread safe? Or might you need to wrap that in a synchronize call?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the Queue class in ruby is meant specifically for inter-thread usage: http://www.ruby-doc.org/stdlib-1.9.3/libdoc/thread/rdoc/Queue.html

end
end
end