Skip to content

Commit

Permalink
Merge pull request #34 from YusukeIwaki/revert-32-revert_workaround_f…
Browse files Browse the repository at this point in the history
…or_send_recv

Revert "Revert deletion of workaround for unordered SEND/RECV "
  • Loading branch information
YusukeIwaki authored Dec 2, 2020
2 parents 397c909 + 375886b commit 578b940
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 32 deletions.
1 change: 0 additions & 1 deletion lib/puppeteer/browser_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def initialize(executable_path, process_arguments, temp_directory)
@proc = nil
@connection = nil
@closed = true
@listeners = []
end

attr_reader :proc, :connection
Expand Down
30 changes: 8 additions & 22 deletions lib/puppeteer/cdp_session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ class Error < StandardError; end
# @param {string} targetType
# @param {string} sessionId
def initialize(connection, target_type, session_id)
@callbacks = {}
@callbacks = Concurrent::Hash.new
@connection = connection
@target_type = target_type
@session_id = session_id
@pending_messages = {}
end

attr_reader :connection
Expand All @@ -32,15 +31,14 @@ def async_send_message(method, params = {})
if !@connection
raise Error.new("Protocol error (#{method}): Session closed. Most likely the #{@target_type} has been closed.")
end
id = @connection.raw_send(message: { sessionId: @session_id, method: method, params: params })

promise = resolvable_future
callback = Puppeteer::Connection::MessageCallback.new(method: method, promise: promise)
if pending_message = @pending_messages.delete(id)
debug_puts "Pending message (id: #{id}) is handled"
callback_with_message(callback, pending_message)
else
@callbacks[id] = callback

@connection.generate_id do |id|
@callbacks[id] = Puppeteer::Connection::MessageCallback.new(method: method, promise: promise)
@connection.raw_send(id: id, message: { sessionId: @session_id, method: method, params: params })
end

promise
end

Expand All @@ -50,19 +48,7 @@ def handle_message(message)
if callback = @callbacks.delete(message['id'])
callback_with_message(callback, message)
else
debug_puts "unknown id: #{id}. Store it into pending message"

# RECV is sometimes notified before SEND.
# Something is wrong (thread-unsafe)...
# As a Workaround,
# wait about 10 frames before throwing an error.
message_id = message['id']
@pending_messages[message_id] = message
Concurrent::Promises.schedule(0.16, message_id) do |id|
if @pending_messages.delete(id)
raise Error.new("unknown id: #{id}")
end
end
raise Error.new("unknown id: #{message['id']}")
end
else
emit_event(message['method'], message['params'])
Expand Down
37 changes: 28 additions & 9 deletions lib/puppeteer/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def reject(error)
def initialize(url, transport, delay = 0)
@url = url
@last_id = 0
@callbacks = {}
@callbacks = Concurrent::Hash.new
@delay = delay

@transport = transport
Expand All @@ -52,7 +52,7 @@ def initialize(url, transport, delay = 0)
handle_close
end

@sessions = {}
@sessions = Concurrent::Hash.new
@closed = false
end

Expand Down Expand Up @@ -92,22 +92,41 @@ def send_message(method, params = {})
end

def async_send_message(method, params = {})
id = raw_send(message: { method: method, params: params })
promise = resolvable_future
@callbacks[id] = MessageCallback.new(method: method, promise: promise)

generate_id do |id|
@callbacks[id] = MessageCallback.new(method: method, promise: promise)
raw_send(id: id, message: { method: method, params: params })
end

promise
end

private def generate_id
@last_id += 1
# package private. not intended to use externally.
#
# ```usage
# connection.generate_id do |generated_id|
# # play with generated_id
# end
# ````
#
def generate_id(&block)
block.call(@last_id += 1)
end

def raw_send(message:)
id = generate_id
# package private. not intended to use externally.
def raw_send(id:, message:)
# In original puppeteer (JS) implementation,
# id is generated here using #generate_id and the id argument is not passed to #raw_send.
#
# However with concurrent-ruby, '#handle_message' is sometimes called
# just soon after @transport.send_text and **before returning the id.**
#
# So we have to know the message id in advance before send_text.
#
payload = JSON.fast_generate(message.compact.merge(id: id))
@transport.send_text(payload)
request_debug_printer.handle_payload(payload)
id
end

# Just for effective debugging :)
Expand Down
38 changes: 38 additions & 0 deletions spec/puppeteer/cdp_session_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
require 'spec_helper'

RSpec.describe Puppeteer::CDPSession do
let(:connection) { double(Puppeteer::Connection) }
let(:cdp_session_id) { SecureRandom.hex(16) }
let(:cdp_session) { Puppeteer::CDPSession.new(connection, 'page', cdp_session_id) }

describe '#send_message' do
before {
allow(connection).to receive(:generate_id) { |&block| block.call(SecureRandom.hex(16)) }
allow(connection).to receive(:raw_send) do |id:, message:|
Thread.new(id) do |message_id|
resp = {
'sessionId' => cdp_session_id,
'id' => message_id,
'result' => "pong",
}
cdp_session.handle_message(resp)
end
end
}

it 'should be thread safe' do
Timeout.timeout(5) do
await_all(1000.times.map { cdp_session.async_send_message('ping') })
end
end

it 'should raise error for unknown id' do
resp = {
'sessionId' => cdp_session_id,
'id' => -123,
'result' => "pong",
}
expect { cdp_session.handle_message(resp) }.to raise_error(/unknown id: -123/)
end
end
end

0 comments on commit 578b940

Please sign in to comment.