Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelised string factorisation #22

Open
wants to merge 8 commits into
base: master
Choose a base branch
from

Conversation

ARF1
Copy link

@ARF1 ARF1 commented Mar 6, 2015

Shared variables with manual locking:

  • hash table
  • count
  • out_buffer
  • chunk_

Shared variables without locking requirement:

  • locks

Thread-local variables:

  • thread_id
  • in_buffer_ptr (points to thread-local buffer)
  • out_buffer_ptr (points to thread-local buffer)

Locking scheme:

  • For each thread a lock on the hash table (and other associated shared variables) exists.
  • Each thread processing a chunk begins by acquiring its own lock on the shared hash table.
  • The lock is released when the thread encounters an value that is new to the hash table.
  • Once the thread is ready to write to the hash table, it waits to acquire the locks from all threads.
  • After the write all locks are released.

Shared variables with manual locking:
- hash table
- count
- reverse_keys
- reverse_values
- out_buffer
- chunk_

Shared variables without locking requirement:
- locks

Thread-local variables:
- thread_id
- in_buffer_ptr (points to thread-local buffer)
- out_buffer_ptr (points to thread-local buffer)

Locking scheme:
- For each thread a lock on the hash table (and other associated shared variables) exists.
- Each thread processing a chunk begins by acquiring its own lock on the shared hash table.
- The lock is released when the thread encounters an value that is new to the hash table.
- Once the thread is ready to write to the hash table, it waits to acquire the locks from all threads.
- After the write all locks are released.

---
Uncompressed bcolz timings:
```
--- uncached unique() ---
pandas (in-memory):
In [10]: %timeit -r 10 c.unique()
1 loops, best of 10: 881 ms per loop

bquery master over bcolz (persistent):
In [12]: %timeit -r 10 a.unique('mycol')
1 loops, best of 10: 2.1 s per loop
==> x2.38 slower than pandas

pull request over bcolz (persistent):
In [8]: %timeit -r 10 a.unique('mycol')
1 loops, best of 10: 834 ms per loop
==> x1.05 FASTER than pandas

---- cache_factor ---
bquery master over bcolz (persistent):
In [3]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 2.51 s per loop

pull request with 2 threads over bcolz (persistent):
In [3]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 1.16 s per loop
==> x2.16 faster than master

pull request with 1 thread over bcolz (persistent):
In [3]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 1.69 s per loop
==> x1.48 faster than master (c.f. x1.48 from single-threaded PR visualfabriq#21)
==> parallel code seems to have no performance penalty on single-core machines
```

Compressed bcolz timings:
```
--- uncached unique() ---
pandas (in-memory):
In [10]: %timeit -r 10 c.unique()
1 loops, best of 10: 881 ms per loop

bquery master over bcolz (persistent):
In [12]: %timeit -r 10 a.unique('mycol')
1 loops, best of 10: 3.39 s per loop
==> x3.85 slower than pandas

pull request over bcolz (persistent):
In [8]: %timeit -r 10 a.unique('mycol')
1 loops, best of 10: 1.9 s per loop
==> x2.16 slower than pandas

---- cache_factor ---
bquery master over bcolz (persistent):
In [5]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 4.09 s per loop

pull request with 2 threads over bcolz (persistent):
In [5]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 2.48 s per loop
==> x1.65 faster than master

pull request with 1 thread over bcolz (persistent):
In [5]: %timeit -r 10 a.cache_factor(['mycol'], refresh=True)
1 loops, best of 10: 3.26 s per loop
==> x1.25 faster than master (c.f. x1.28 from single-threaded PR visualfabriq#21)
```
@ARF1
Copy link
Author

ARF1 commented Mar 6, 2015

@CarstVaartjes Had a slight bug resulting in a faulty factor: chunk results were appended out-of-order, e.g. chunk 0, chunk 50, chunk 1, etc.

Possible solutions:

  1. Keep full-sized out-buffer in memory before compressing the whole thing into a carray (The previous fix does this. But the locking needs to be cleaned up if this is to be the final solution.)
  2. Find a way to write carray chunks for labels out of order, e.g. not labels.append(out-buffer) but something like labels.write(out-buffer, chunk_number).
  3. Append chunk results directly to the labels carray but enforce proper order. This would require more careful scheduling of chunk processing than currently and waiting for "slow" chunks to complete before faster threads can work on new chunks. More elaborate implementation could use a "chunk writing queue" to mitigate the waiting issue. This would be a middle-ground between a full-size buffer and a strict waiting scheme.

For solution 2:
Could we create labels with chunklen=carray_.chunklen. Then append(...) the chunks out-of-order and finally rename the chunk filenames of labels on disk? Of course even better would be writing them with the correct file name to begin with. The downside might be that reading the carray might be slower as the files are not arranged in "natural order" on the disk, possibly leading to additional seeks.

Any preferences or advice?

@CarstVaartjes
Copy link
Member

I was wondering about the order with parallelization actually! (for groupby it's less relevant, but for the factorization it very much is).
Ideally it would be "Find a way to write carray chunks for labels out of order,"; there were other people also asking for that functionality in the bcolz mail group, but I do not think that it's possible atm, right @esc ? So I think the renaming the filenames is a great idea except for the leftover array (we always need to end with that, so we still have). Also: the carray that we're writing might have a different chunk length that the input, which further complicates matters
Keeping the full-sized out-buffer in memory might kill some use-cases (we use x billion record ctables that do not fit in memory for instance)

I'm going to sleep over it for a night I think. The great thing about your solution is that it does work for other use cases that we have that are in-core (the groupby functions) so we should be able to speed up those

exhibits performance issues with unique(), presumably somehow linked to
the in-memory labels carray
@ARF1
Copy link
Author

ARF1 commented Mar 7, 2015

@CarstVaartjes I managed to find a way to write the carray chunks out-of-order. bcolz does expose methods that make this fairly painless.

What is much more painful is that labels and carray_ must not be forced to have the same chunklen. (Which would make reordering easy...) It seems that the associated frequent writes for labels chunks hinder efficient reading of the carray_ chunks. The performance really drops out...

What I implemented now is forcing labels to have a chunklen that is a multiple of the carray_ chunklen. The results from several carray_ chunks are then collated and written as a single labels chunk. This however requires dealing with yet another "leftover" loop: for the carray_ chunks corresponding to carray_.nchunks % chunk_multiple.

To make matters worse, writing in-memory chunks out-of-order requires a different api. On the whole the code is now in urgent need of some refactoring...

But it works... well sort of: out-of-core factorize() is x2.2 faster than master for uncompressed, x1.75 for compressed bcolz for my test case.

What has me completely stumped is that in-memory factorize() is MUCH slower than out-of-core factorize(). It is even slower than the single-threaded code!

@CarstVaartjes
Copy link
Member

Trying to summarize it for myself and thinking out loud:

  • The output array will always be similar in length to the input array, but chunk length might be different (strings have different chunk length than integers or floats).
    • However, we need to synch the chunk length because otherwise we cannot handle it chunk by chunk.
    • For performance reasons we now have a factor x to combine chunks from each thread (I think that this does mean that the number of chunks and the number of threads also needs to be in synch, or you would have multiple incomplete chunks at the end?)
    • I do know that integer columns are much larger in chunk size than strings, so the performance hit is likely caused by bcolz making way too many (and small) chunks. Normally it would first put them in the leftover aeeay until that gets full
    • Small fyi: the new label carray is an uint64 which we transform into int64 before saving because numexpr has issues with uin64
  • Each thread has a local array as output where we have its own output buffer and a shared hashing table.
  • We can read thread-safe from the shared hashing table (checking if the value is known already).
  • A locking mechanism is in place if we find a new hashing value that adds it to the hashing table add that moment. I think theoretically this might mean that one value might be inserted into the hashing table twice (two threads finding the same value at the same time and awaiting the lock), but if the hash table de-duplicates (ie gives the sames index value when you insert something twice), this should not be an issue. (need to check that)

Discussion points are:

  • Every thread could write to its own memory or diskbased carray, in the end we combine the chunks (by moving and renaming them for disk and re-assigning them for in-mem)
    • argh! no that won't work because the original chunks get combined and split into new chunks with different chunk length)
    • you could solve that by giving each thread an index-based part (4 threads -> each 25%) of the original but that's slower AND it also gives issues with the last chunks (each individual carray ends with a leftoer array, which means we cannot serialize the individual results)
    • A way to workaround it would be to get the advised chunklength for an integer-based carray with the length of the original carray and then round that to the nearest multiple of the original carray chunklen + save the multiple "x" of the chunklen difference <- means that each thread has to do in serial x chunks per time (which in part, might not make it really faster for small carrays but only large ones)
  • There are issues with creating a typed numpy array in a local thread. See also: http://stackoverflow.com/questions/28884643/cython-parallel-how-to-initialise-thread-local-ndarray-buffer
  • We have a list that based on the index lets us know what the original values were: reverse_values. We do a insert for each new hashed value from each thread, i'm not sure whether that's thread safe (I guess so, otherwise it would fail right?), but also: wouldn't it be much smarter to just read the actual kh_str_t table in the end (as it contains all values and all indexes), so we don't need to manage a list per-thread? (I think it was faster to directly create the list before in a single core situation, but with multiple threads it might be smarter to do this as a single sweep in the end; in most use cases the amount of unique values should be relatively low compared to the input)
  • i'm not sure if np.empty really is bad for the outbuffer, because (except for leftovers) it will always have the same fixed length and be filled completely

God this is complicated stuff. Really great insights though. And for most functions in the aggregation side your code could already be used -> I think writing to the same ndarray would still require locking (sort of defeating the parallel purpose there, as each row there does write) but instead each thread could have its own array where we use numpy to add the results of the individual ndarrays together. (for sum and count it will work, for sorted_count_distinct not as that requires sequential reading from a point where you now the previous value was different)

broke due to performance debugging
@ARF1
Copy link
Author

ARF1 commented Mar 13, 2015

@CarstVaartjes I was implementing one of your ideas and ran into trouble. You wrote:

We have a list that based on the index lets us know what the original values were: reverse_values. [...] wouldn't it be much smarter to just read the actual kh_str_t table in the end (as it contains all values and all indexes)

I am in the process of implementing this and I think it is not possible: since we are using a hash-table, the position of the elements in the table does not indicate their order of insertion but rather their hash value.

I think the kh_str_t table does not contain the indexes. Am I overlooking a feature of khash here?

@CarstVaartjes @FrancescElies
If not, what we could do is make a hash table of a struct type. In the struct we could store the string value (or char*) and the reverse value. We would then need to implement our own comparison function, similar to what was done for the python object hash table implementation. (See pyobject_cmp.)

What do you think? It could make for a cleaner implementation since we do not need to carry around the reverse_values pointer. On the other hand we would have to create a whole slew of hash table implementations (one for each data type) rather than use those already defined.


Re speed: creating the dict from the reverse vector is definitely faster, even for single-threaded operation. See my (now slightly outdated) PR #21 with performance enhancements.

This was surprising to me as well: insertion into a dict should take the same time no matter where it happens. I believe the reason for the increased performance is, that without python objects as arguments the helper function can have a nogil signature which seems to speed up things somehow. Possibly the performance increase manifests only if the GIL is actually released when calling the helper. - Or maybe my recollection is just wrong.

@ARF1
Copy link
Author

ARF1 commented Mar 15, 2015

@CarstVaartjes Just wanted to let you know to ignore my previous post. I finally understood how khash works. The keys are indeed stored: table.keys. Should have been obvious...

@ARF1
Copy link
Author

ARF1 commented Mar 16, 2015

@FrancescElies @CarstVaartjes OK, here are few revision that can serve as basis for further discussions:

  1. rev 5a3b487 ('stock khash') is about a week old and does not yet use iterblocks: it serves me as a reference implementation but is fairly messy.
  2. rev b2453aa ('pandas khash') is identical to the previous, only instead of khash stock it used pandas' khash v0.2.6: exposes performance problems of stock khash.
  3. rev a1092c6 ('iterblocks') is an iterblocks based implementation: looks very nice but is slow.

All revisions pass my own test cases. I finally ferreted out the synchronisation issue I mentioned to @FrancescElies which was leading to duplicate entries in the reverse dictionary.

Performance measurements were done using on a 12-character column with 1014 unique values and about 9 million entries in total with the following commands:

  1. on-disk factorisation: %timeit -r 10 a.cache_factor(['isin'], refresh=True)
  2. in-memory factorisation: %timeit -r 10 bquery.ctable_ext.factorize(a['mycol'])

For comparison the current visualfabriq/bquery master (rev 3ec8eb8, 'master') was used as a reference.


Uncompressed, in-memory factorisation:

master:       1.98 s
stock khash:  0.820 s (x2.41)
pandas khash: 0.719 s (x2.6)
iterblocks:   1.73 s  (x1.1)

Uncompressed, on-disk factorisation:

master:       2.48 s
stock khash:  0.940 s (x2.6)
pandas khash: 0.808 s (x3.1)
iterblocks:   0.942 s (x2.6)

Compressed, in-memory factorisation:

master:       3.34 s
stock khash:  1.76 s (x1.9)
pandas khash: 1.76 s (x1.9)
iterblocks:   2.85 s (x1.2)

Compressed, on-disk factorisation:

master:       3.99 s
stock khash:  2.24 s (x1.8)
pandas khash: 2.20 s (x1.8)
iterblocks:   2.25 s (x1.8)

Conclusions:

  1. using stock khash v0.2.8 which we just merged into master seems to incur a non-negligible performance penalty
  2. iterblocks seems to incur a significant performance penalty compared to reading in chunks. This could be either due to iterblocks being slow or my implementation in rev a1092c6 having a problem. To narrow-down the problem, I am working on a comparable implementation avoiding iterblocks keeping the overall interface constant but the chunk-scheduling is non-trivial.
  3. My iterblocks implementation (rev a1092c6) seems to have a problem with in-memory performance. In-memory is consistently slower than on-disk!

@ARF1
Copy link
Author

ARF1 commented Mar 16, 2015

to compare changes disregarding the iterblocks revision with windows line endings use:
ARF1/bquery@b2453aa...ARF1:a1092c6

@FrancescElies
Copy link
Contributor

Hi,

apologies for the late answer, we are under some deadline pressures and I am also afraid at the moment we do not have much spare time. I just had a first look, note that I have no practical experience with c++, please be gentle with my mistakes.

About benchmarking, keeping track of all possible different scenarios is going to be difficult, here some ideas taken from other projects, maybe we could consider using vbench https://github.com/wesm/pandas/tree/master/vb_suite (pandas) or airspeed velocity http://spacetelescope.github.io/asv/ (under consideration in bcolz Blosc/bcolz#116).

I saw very nice stuff even using omp pragma directives not supported directly in cython, once I tried some stuff but I found a bit tricky to use objects inside prange, it seems like you managed to do that without problems.
About EOLs maybe could could try #28, what do you think?
I am also not sure why new commits don't run against tests in travis for this branch

Your numba suggestion is also very interessting, this topic will require for sure some time, hopefully this whole situation is fine with you.

@FrancescElies FrancescElies mentioned this pull request May 21, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

Successfully merging this pull request may close these issues.

3 participants