Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimenting with queues and threads #1553

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

betatim
Copy link
Member

@betatim betatim commented Dec 13, 2016

This is an experimental branch to address both the bugs mentioned in #1248 and see
if we can improve the speed of filter-abund and friends.

Current ideas are based on:

  • reading from disk isn't slow or CPU intensive (except when the input is gzipped but you can't multithread that)
  • making a call from python -> C is "expensive"
  • want to release the GIL for as long as possible
  • release/acquire the GIL as few times as possible

Use one thread to read the input, dump batches of reads into a queue. Several consumer threads get a batch from the Q, call to C land, convert the sequence in the batch to char*s, release the GIL, enter them into the hashtable, acquire the GIL.

Focussing on comparing scripts/abundance-dist-single.py with one thread to scripts/abundance-dist-single-threaded.py which is the experimental version.

  • Is it mergeable?
  • make test Did it pass the tests?
  • make clean diff-cover If it introduces new functionality in
    scripts/ is it tested?
  • make format diff_pylint_report cppcheck doc pydocstyle Is it well
    formatted?
  • Did it change the command-line interface? Only backwards-compatible
    additions are allowed without a major version increment. Changing file
    formats also requires a major version number increment.
  • For substantial changes or changes to the command-line interface, is it
    documented in CHANGELOG.md? See keepachangelog
    for more details.
  • Was a spellchecker run on the source code and documentation after
    changes were made?
  • Do the changes respect streaming IO? (Are they
    tested for streaming IO?)

@betatim
Copy link
Member Author

betatim commented Dec 13, 2016

Why do we get a different number of unique kmers with the two versions of the script?

@betatim
Copy link
Member Author

betatim commented Dec 13, 2016

With only one thread modifying the countgraph the discrepancy goes away.

@betatim
Copy link
Member Author

betatim commented Dec 13, 2016

Idea: one bloom filter per thread, then merge them at the end.

@ctb
Copy link
Member

ctb commented Dec 13, 2016 via email

@ctb
Copy link
Member

ctb commented Dec 13, 2016

To be a little clearer: in the search for speed, would a better target be the data structures & support for cache locality?

A concern with the bloom filters approach is that in memory limited situations (or with data sets that require big mem) it won't actually help to do that :). A potentially productive alternate approach would be to support easy chunking of files on different machines -- that is, if you have a 50 GB file and 5 machines, tell machine 1 to handle GB 0-10, machine 2 to handle GB 10-20, etc. Then you could do various kinds of processing in parallel and merge/whatever afterwards. This strategy could work well for load-into-counting, load-graph, normalize-by-median, and trim-low-abund - all of the major use cases, basically.

@betatim
Copy link
Member Author

betatim commented Dec 13, 2016

Query: our current data structures do not support cache locality and cross-CPU NUMA memory access. How big a difference would that make? Could be tested by changing graph sizes.

Probably a lot. You'd go and measure how many instructions/cycle are executed, or directly how many cache misses you have. For a (large) bloom filter I am not sure how much you can do as you want the index into table N+1 to be essentially random wrt the index into table N. As well as the index of the next k-mer being random wrt to the index of the previous one (even though they are only different by one base (if not it might make sense to handle insertion into table N for several kmers before moving on to table N+1).

I think cuckoo filters promise to be better for that in this regard.

@ctb
Copy link
Member

ctb commented Dec 13, 2016 via email

@betatim
Copy link
Member Author

betatim commented Dec 13, 2016

I think it is Ok to practice how to split things into components connected by a queue using threads. We will need that if you want to go to multiple machines (maybe via multiple processes first?).

Data structures that are inherently parallel would be super useful.

If we make one BF per thread to handle 1/Nth of the reads, would each BF have to be as large as a BF that can handle all reads? How does this work if you want to merge them later on?

Do you know the typical ratio of allocated vs used memory for these very large BFs? Wondering what trickery the kernel has available for allocating more memory than is available if you never use it. Not enough of a memory expert to know off the top of my head.


This branch as it is achieves 166% cpu and elapsed 55.235s (81.93s user) to run through ecoli with one thread for reading and one for filling the countgraph. Compared to 99% cpu and elapsed 1:18.11 (75.15s user). So you have some overhead but if you have two cores less human time passes to do the same job. (🚧 Doing the same with two consumer threads gives you ~270% cpu and elapsed 31s (80.96s user) but the answer is "wrong" because the two threads somehow step on each others toes 🚧)

@standage
Copy link
Member

The binary bloom filter (node table/graph) can be built in pieces and merged: see the update_from method. However, this requires each of the pieces to be the same size as the final product, so memory consumption would scale linearly with threads. :(

This approach will not work for the counting bloom filter (count table/graph).

@ctb
Copy link
Member

ctb commented Dec 13, 2016 via email

@ctb
Copy link
Member

ctb commented Dec 13, 2016 via email

@standage
Copy link
Member

This approach will not work for the counting bloom filter (count table/graph).

I think it will, no?

I guess it should work for this use case: you should be able to simply add the counts. The update_from had me thinking of the more general case where it doesn't really make sense: combining multiple samples into a single countgraph.

@ctb
Copy link
Member

ctb commented Dec 13, 2016 via email

loading_thread.join()

log_info('Total number of unique k-mers: {nk}',
nk=countgraph.n_unique_kmers())
Copy link
Member Author

@betatim betatim Dec 16, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any general thoughts on the yay or nay-ness of reworking things into this kind of pattern (read from L142 to here)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks familiar to me :) -- see https://github.com/dib-lab/khmer/blob/master/khmer/thread_utils.py#L73, which @camillescott once convinced me didn't add anything to the speed of things.

So I like the pattern, if it can be made fast!

This fixes the discrepancy with the single threaded
abundance-dist-single.py script.
@betatim betatim force-pushed the fix/multi_threading_bugs branch from 0dfcedf to ba7e3a8 Compare December 16, 2016 13:42
@betatim
Copy link
Member Author

betatim commented Dec 16, 2016

Producing values is faster than consuming them. One thread reading from a .gz can keep a queue full (10 values) for 3 consuming threads. With 4 the consumers sometimes stall. As measured by printing out the size of the Q every once in a while.

@betatim betatim mentioned this pull request Dec 16, 2016
@betatim
Copy link
Member Author

betatim commented Dec 20, 2016

Related to #1551 (comment): does someone understand why this is not threadsafe? Running python scripts/abundance-dist-single-threaded.py -s -x 1e8 -N 2 -k 17 -z ecoli_ref-5m.fastq /tmp/test.dist with multiple consumers (even with the mutex not commented out) leads to different total kmer counts each time you run it.

Does someone have time to take a look or listen to me explain it to them?

@ctb ctb mentioned this pull request Dec 21, 2016
@betatim
Copy link
Member Author

betatim commented Dec 22, 2016

And yes, no way to achieve locality with bloom filters. You could keep a cache of things to insert into the bloom filters, tho, organized by location in filter, and then flush from there periodically.

To insert something into one of the tables we need to compute the index (hash the kmer), fetch that chunk of memory, twiddle some bits. Two things we could do:

  • maintain a cache of kmer -> hashvalue to avoid recomputing it. Most likely implemented with a map, which requires computing a hash to lookup entries. No wins possible??

  • maintain a buffer that groups updates to different chunks of memory, apply them as one when you have "enough" for a certain region. How to keep things in sync across threads? Are we smarter than a compiler/prefetcher?

(mainly to help me think about this.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants