Skip to content

Commit

Permalink
Removed explicit Enumerator usage. Closes #66.
Browse files Browse the repository at this point in the history
Most methods are now dual-mode, either accepting a block or returning
an enumerator created with enum_for.
  • Loading branch information
csw committed Jul 11, 2012
1 parent 61a30d5 commit e77d682
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 49 deletions.
3 changes: 2 additions & 1 deletion lib/bio/maf/index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ def self.build(parser, path)
# @param [Parser] parser MAF parser for file to fetch blocks
# from.
# @param [Hash] filter Block filter expression.
# @return [Array<Block>]
# @yield [block] each {Block} matched, in turn
# @return [Enumerable<Block>] each matching {Block}, if no block given
# @api public
def find(intervals, parser, filter={}, &blk)
# start = Time.now
Expand Down
88 changes: 40 additions & 48 deletions lib/bio/maf/parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -419,19 +419,21 @@ def set_last_block_pos!
# @param [Array] block_offsets Offsets of blocks to parse.
# @return [Array<Block>]
def fetch_blocks(offset, len, block_offsets)
start_chunk_read_if_needed(offset, len)
# read chunks until we have the entire merged set of
# blocks ready to parse
# to avoid fragment joining
append_chunks_to(len)
# parse the blocks
Enumerator.new do |y|
if block_given?
start_chunk_read_if_needed(offset, len)
# read chunks until we have the entire merged set of
# blocks ready to parse
# to avoid fragment joining
append_chunks_to(len)
# parse the blocks
block_offsets.each do |expected_offset|
block = parse_block
ctx.parse_error("expected a block at offset #{expected_offset} but could not parse one!") unless block
ctx.parse_error("got block with offset #{block.offset}, expected #{expected_offset}!") unless block.offset == expected_offset
y << block
yield block
end
else
enum_for(:fetch_blocks, offset, len, block_offsets)
end
end

Expand Down Expand Up @@ -578,7 +580,8 @@ def sequence_filter=(filter)
# `fetch_list` should be an array of `[offset, length]` tuples.
#
# @param [Array] fetch_list the fetch list
# @return [Array<Block>] the requested alignment blocks
# @yield [block] each block matched, in turn
# @return [Enumerable<Block>] each matching {Block}, if no block given
def fetch_blocks(fetch_list, &blk)
if blk
merged = merge_fetch_list(fetch_list)
Expand All @@ -601,10 +604,7 @@ def fetch_blocks_merged(fetch_list, &blk)
total_size = fetch_list.collect { |e| e[1] }.reduce(:+)
with_context(@random_access_chunk_size) do |ctx|
fetch_list.each do |e|
ctx.fetch_blocks(*e).each do |block|
yield block
#total_size += block.size
end
ctx.fetch_blocks(*e, &blk)
end
end
elapsed = Time.now - start
Expand Down Expand Up @@ -725,23 +725,27 @@ def _parse_header
# Delegates to {#parse_blocks_parallel} if `:threads` is set
# under JRuby.
#
# @return [Enumerator<Block>] enumerator of alignment blocks.
# @return [Enumerator<Block>] enumerator of {Block}s if no block given.
# @yield [block] Passes each {Block} in turn to a block
# @api public
def parse_blocks
if RUBY_PLATFORM == 'java' && @opts.has_key?(:threads)
parse_blocks_parallel
else
Enumerator.new do |y|
def each_block(&blk)
if block_given?
if RUBY_PLATFORM == 'java' && @opts.has_key?(:threads)
parse_blocks_parallel(&blk)
else
until at_end
y << parse_block()
yield parse_block()
end
end
else
enum_for(:parse_blocks)
end
end
alias_method :parse_blocks, :each_block

# Parse alignment blocks with a worker thread.
#
# @return [Enumerator<Block>] enumerator of alignment blocks.
# @block block handler
# @api private
def parse_blocks_parallel
queue = java.util.concurrent.LinkedBlockingQueue.new(128)
Expand All @@ -756,35 +760,23 @@ def parse_blocks_parallel
$stderr.puts $!.backtrace.join("\n")
end
end
Enumerator.new do |y|
saw_eof = false
n_final_poll = 0
while true
block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS)
if block == :eof
saw_eof = true
break
elsif block
y << block
else
# timed out
n_final_poll += 1 unless worker.alive?
end
break if n_final_poll > 1
end
unless saw_eof
raise "worker exited unexpectedly!"
saw_eof = false
n_final_poll = 0
while true
block = queue.poll(1, java.util.concurrent.TimeUnit::SECONDS)
if block == :eof
saw_eof = true
break
elsif block
yield block
else
# timed out
n_final_poll += 1 unless worker.alive?
end
break if n_final_poll > 1
end
end

def each_block
if block_given?
until at_end
yield parse_block()
end
else
enum_for(:each_block)
unless saw_eof
raise "worker exited unexpectedly!"
end
end

Expand Down

0 comments on commit e77d682

Please sign in to comment.