Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for async-websocket. #219

Merged
merged 10 commits into from
Sep 7, 2018
46 changes: 24 additions & 22 deletions lib/slack/real_time/concurrency/async.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,45 @@ module Concurrency
module Async
class Client < ::Async::WebSocket::Client
extend ::Forwardable
def_delegators :driver, :on
def_delegators :@driver, :on

def text(message)
driver.text(message)
@driver.text(message)
end

def binary(data)
socket.write(data)
@driver.binary(data)
end
end

class Socket < Slack::RealTime::Socket
attr_reader :client

def start_async(client)
@client = client
client.run_loop
Thread.new do
Copy link
Collaborator Author

@dblock dblock Aug 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We generally want the caller to be responsible for calling this, otherwise we're spawing a thread per client (or a bot)? Or this this necessary even if you run the code within a Async::Reactor.run?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it to make it the same as how Celluloid is working, which spawns one thread per actor AFAIK. However, it's not necessary. We can push that requirement further up the call chain, but it might mean that the top level needs to embed the code in a reactor. Let me try to figure out the race conditions first then we can figure out how to push that code further up.

$stderr.puts "start_async: @driver = #{@driver}"
::Async::Reactor.run do
client.run_loop
end
end
end

def connect!
super
run_loop
::Async::Reactor.run do
super

run_loop
end
end

def close
driver.close
@driver.close
super
end

def run_loop
@socket = build_socket
@connected = @socket.connect
while event = driver.next_event

while @driver and event = @driver.next_event
# $stderr.puts event.inspect
end
end

Expand All @@ -56,18 +61,15 @@ def build_tcp_options
}
end

def build_socket
socket = ::Async::IO::Endpoint.tcp(addr, port, build_tcp_options)
socket = ::Async::IO::SSLEndpoint.new(socket, build_ssl_context) if secure?
socket
def build_endpoint
endpoint = ::Async::IO::Endpoint.tcp(addr, port, build_tcp_options)
endpoint = ::Async::IO::SSLEndpoint.new(endpoint, build_ssl_context) if secure?

return endpoint
end

def build_driver
Client.new(build_socket.connect, url)
end


def connect
@driver = build_driver
@driver = Client.new(build_endpoint.connect, url)
end
end
end
Expand Down
9 changes: 0 additions & 9 deletions spec/integration/integration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,6 @@ def stop_server
wait_for_server
end

if ENV['CONCURRENCY'] == 'async-websocket'
around do |example|
require 'async/reactor'
::Async::Reactor.run do
example.run
end
end
end

context 'client connected' do
before do
start_server
Expand Down