Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ld-celluloid-eventsource.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Gem::Specification.new do |spec|

spec.add_dependency 'celluloid-io', '~> 0.17.3'
spec.add_dependency 'celluloid', '~> 0.18.0.pre'
spec.add_dependency 'nio4r', '~> 1.1'
spec.add_dependency 'nio4r', '>= 1.1'
spec.add_dependency 'http_parser.rb', '~> 0.6.0'

spec.add_development_dependency 'atomic', '~> 1.1'
Expand Down
263 changes: 131 additions & 132 deletions lib/celluloid/eventsource.rb
Original file line number Diff line number Diff line change
@@ -1,30 +1,52 @@
require 'celluloid/current'
require 'celluloid/eventsource/version'
require 'celluloid/io'
require 'celluloid/eventsource/event_parser'
require 'celluloid/eventsource/response_parser'
require 'uri'
require 'base64'

module Celluloid
class EventSource
include Celluloid::IO
include Celluloid::Internals::Logger
Celluloid.boot

class UnexpectedContentType < StandardError
end

class ReadTimeout < StandardError
end

attr_reader :url, :with_credentials
attr_reader :ready_state

CONNECTING = 0
OPEN = 1
CLOSED = 2

MAX_RECONNECT_TIME = 30

execute_block_on_receiver :initialize

#
# Constructor for an EventSource.
#
# @param uri [String] the event stream URI
# @param opts [Hash] the configuration options
# @option opts [Hash] :headers Headers to send with the request
# @option opts [Float] :read_timeout Timeout (in seconds) after which to restart the connection if
# the server has sent no data
# @option opts [Float] :reconnect_delay Initial delay (in seconds) between connection attempts; this will
# be increased exponentially if there are repeated failures
#
def initialize(uri, options = {})
self.url = uri
options = options.dup
@ready_state = CONNECTING
@with_credentials = options.delete(:with_credentials) { false }
@headers = default_request_headers.merge(options.fetch(:headers, {}))
@read_timeout = options.fetch(:read_timeout, 0).to_i
proxy = ENV['HTTP_PROXY'] || ENV['http_proxy'] || options[:proxy]
if proxy
proxyUri = URI(proxy)
Expand All @@ -33,15 +55,8 @@ def initialize(uri, options = {})
end
end

@event_type_buffer = ""
@last_event_id_buffer = ""
@data_buffer = ""

@last_event_id = String.new

@reconnect_timeout = 1
@reconnect_timeout = options.fetch(:reconnect_delay, 1)
@on = { open: ->{}, message: ->(_) {}, error: ->(_) {} }
@parser = ResponseParser.new

@chunked = false

Expand All @@ -66,11 +81,13 @@ def listen
while !closed?
begin
establish_connection
chunked? ? process_chunked_stream : process_stream
rescue
# Just reconnect
process_stream
rescue UnexpectedContentType
raise # Let these flow to the top
rescue StandardError => e
info "Reconnecting after exception: #{e}"
# Just reconnect on runtime errors
end
sleep @reconnect_timeout
end
end

Expand All @@ -97,54 +114,79 @@ def on_error(&action)

private

MessageEvent = Struct.new(:type, :data, :last_event_id)

def ssl?
url.scheme == 'https'
end

def establish_connection
if @proxy
sock = ::TCPSocket.new(@proxy.host, @proxy.port)
@socket = Celluloid::IO::TCPSocket.new(sock)

@socket.write(connect_string)
@socket.flush
while (line = @socket.readline.chomp) != '' do @parser << line end
parser = ResponseParser.new
reconnect_attempts = 0
reconnect_jitter_rand = Random.new

unless @parser.status_code == 200
@on[:error].call({status_code: @parser.status_code, body: @parser.chunk})
return
loop do
begin
if @proxy
sock = ::TCPSocket.new(@proxy.host, @proxy.port)
@socket = Celluloid::IO::TCPSocket.new(sock)

@socket.write(connect_string)
@socket.flush
while (line = readline_with_timeout(@socket).chomp) != '' do parser << line end

unless parser.status_code == 200
@on[:error].call({status_code: parser.status_code, body: parser.chunk})
return
end
else
sock = ::TCPSocket.new(@url.host, @url.port)
@socket = Celluloid::IO::TCPSocket.new(sock)
end

if ssl?
@socket = Celluloid::IO::SSLSocket.new(@socket)
@socket.connect
end

@socket.write(request_string)
@socket.flush()

until parser.headers?
parser << readline_with_timeout(@socket)
end

if parser.status_code != 200
until @socket.eof?
parser << readline_with_timeout(@socket)
end
# If the server returns a non-200, we don't want to close-- we just want to
# report an error
# close
@on[:error].call({status_code: parser.status_code, body: parser.chunk})
elsif parser.headers['Content-Type'] && parser.headers['Content-Type'].include?("text/event-stream")
@chunked = !parser.headers["Transfer-Encoding"].nil? && parser.headers["Transfer-Encoding"].include?("chunked")
@ready_state = OPEN
@on[:open].call
return # Success, don't retry
else
close
info "Invalid Content-Type #{parser.headers['Content-Type']}"
@on[:error].call({status_code: parser.status_code, body: "Invalid Content-Type #{parser.headers['Content-Type']}. Expected text/event-stream"})
raise UnexpectedContentType
end

rescue UnexpectedContentType
raise # Let these flow to the top

rescue StandardError => e
warn "Waiting to try again after exception while connecting: #{e}"
# Just try again after a delay for any other exceptions
end
else
sock = ::TCPSocket.new(@url.host, @url.port)
@socket = Celluloid::IO::TCPSocket.new(sock)
end

if ssl?
@socket = Celluloid::IO::SSLSocket.new(@socket)
@socket.connect
base_sleep_time = ([@reconnect_timeout * (2 ** reconnect_attempts), MAX_RECONNECT_TIME].min).to_f
sleep_time = (base_sleep_time / 2) + reconnect_jitter_rand.rand(base_sleep_time / 2)
sleep sleep_time
reconnect_attempts += 1
end

@socket.write(request_string)
@socket.flush()

until @parser.headers?
@parser << @socket.readline
end

if @parser.status_code != 200
until @socket.eof?
@parser << @socket.readline
end
# If the server returns a non-200, we don't want to close-- we just want to
# report an error
# close
@on[:error].call({status_code: @parser.status_code, body: @parser.chunk})
return
end

handle_headers(@parser.headers)
end

def default_request_headers
Expand All @@ -155,100 +197,59 @@ def default_request_headers
}
end

def clear_buffers!
@data_buffer = ""
@event_type_buffer = ""
end

def dispatch_event(event)
unless closed?
@on[event.type] && @on[event.type].call(event)
end
end

def chunked?
@chunked
end

def process_chunked_stream
until closed? || @socket.eof?
handle_chunked_stream
end
end

def process_stream
until closed? || @socket.eof?
line = @socket.readline
line.strip.empty? ? process_event : parse_line(line)
end
end

def handle_chunked_stream
chunk_header = @socket.readline
bytes_to_read = chunk_header.to_i(16)
bytes_read = 0
while bytes_read < bytes_to_read do
line = @socket.readline
bytes_read += line.size

line.strip.empty? ? process_event : parse_line(line)
end

if !line.nil? && line.strip.empty?
process_event
def read_chunked_lines(socket)
Enumerator.new do |lines|
chunk_header = readline_with_timeout(socket)
bytes_to_read = chunk_header.to_i(16)
bytes_read = 0
while bytes_read < bytes_to_read do
line = readline_with_timeout(@socket)
bytes_read += line.size
lines << line
end
end
end

def parse_line(line)
case line
when /^:.*$/
when /^(\w+): ?(.*)$/
process_field($1, $2)
else
if chunked? && !@data_buffer.empty?
@data_buffer.rstrip!
process_field("data", line.rstrip)
def read_lines
Enumerator.new do |lines|
loop do
break if closed?
if chunked?
for line in read_chunked_lines(@socket) do
break if closed?
lines << line
end
else
lines << readline_with_timeout(@socket)
end
end
end
end

def process_event
@last_event_id = @last_event_id_buffer

return if @data_buffer.empty?

@data_buffer.chomp!("\n") if @data_buffer.end_with?("\n")
event = MessageEvent.new(:message, @data_buffer, @last_event_id)
event.type = @event_type_buffer.to_sym unless @event_type_buffer.empty?

dispatch_event(event)
ensure
clear_buffers!
end

def process_field(field_name, field_value)
case field_name
when "event"
@event_type_buffer = field_value
when "data"
@data_buffer << field_value.concat("\n")
when "id"
@last_event_id_buffer = field_value
when "retry"
if /^(?<num>\d+)$/ =~ field_value
@reconnect_timeout = num.to_i
end
def process_stream
parser = EventParser.new(read_lines, @chunked,->(timeout) { @read_timeout = timeout })
parser.each do |event|
@on[event.type] && @on[event.type].call(event)
@last_event_id = event.id
end
end

def handle_headers(headers)
if headers['Content-Type'].include?("text/event-stream")
@chunked = !headers["Transfer-Encoding"].nil? && headers["Transfer-Encoding"].include?("chunked")
@ready_state = OPEN
@on[:open].call
def readline_with_timeout(socket)
if @read_timeout > 0
begin
timeout(@read_timeout) do
socket.readline
end
rescue Celluloid::TaskTimeout
@on[:error].call({body: "Read timeout, will attempt reconnection"})
raise ReadTimeout
end
else
close
@on[:error].call({status_code: @parser.status_code, body: "Invalid Content-Type #{headers['Content-Type']}. Expected text/event-stream"})
return socket.readline
end
end

Expand All @@ -267,7 +268,5 @@ def connect_string
end
req << "\r\n"
end

end

end
Loading