Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multithreaded v2 #38

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 79 additions & 67 deletions bquery/ctable_ext.pyx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import numpy as np
from numpy cimport ndarray, dtype, npy_intp, npy_int32, \
npy_uint64, npy_int64, npy_float64, npy_bool
npy_uint64, npy_int64, npy_float64, npy_bool, uint64_t

import cython
import bcolz as bz
Expand Down Expand Up @@ -130,13 +130,11 @@ def factorize_str(carray carray_, carray labels=None):
@cython.wraparound(False)
@cython.boundscheck(False)
cdef void _factorize_int64_helper(Py_ssize_t iter_range,
Py_ssize_t allocation_size,
ndarray[npy_int64] in_buffer,
ndarray[npy_uint64] out_buffer,
npy_int64[:] in_buffer,
uint64_t[:] out_buffer,
kh_int64_t *table,
Py_ssize_t * count,
dict reverse,
):
) nogil:
cdef:
Py_ssize_t i, idx
int ret
Expand All @@ -154,7 +152,6 @@ cdef void _factorize_int64_helper(Py_ssize_t iter_range,
else:
k = kh_put_int64(table, element, &ret)
table.vals[k] = idx = count[0]
reverse[count[0]] = element
count[0] += 1
out_buffer[i] = idx

Expand All @@ -168,6 +165,8 @@ def factorize_int64(carray carray_, carray labels=None):
ndarray[npy_int64] in_buffer
ndarray[npy_uint64] out_buffer
kh_int64_t *table
npy_int64[:] in_buffer_view
uint64_t[:] out_buffer_view

count = 0
ret = 0
Expand All @@ -179,52 +178,55 @@ def factorize_int64(carray carray_, carray labels=None):
labels = carray([], dtype='int64', expectedlen=n)
# in-buffer isn't typed, because cython doesn't support string arrays (?)
out_buffer = np.empty(chunklen, dtype='uint64')
out_buffer_view = out_buffer
in_buffer = np.empty(chunklen, dtype='int64')
table = kh_init_int64()

for i in range(carray_.nchunks):
chunk_ = carray_.chunks[i]
# decompress into in_buffer
chunk_._getitem(0, chunklen, in_buffer.data)
_factorize_int64_helper(chunklen,
carray_.dtype.itemsize + 1,
in_buffer,
out_buffer,
table,
&count,
reverse,
)
in_buffer_view = in_buffer
with nogil:
_factorize_int64_helper(chunklen,
in_buffer_view,
out_buffer_view,
table,
&count
)
# compress out_buffer into labels
labels.append(out_buffer.astype(np.int64))

leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize)
if leftover_elements > 0:
_factorize_int64_helper(leftover_elements,
carray_.dtype.itemsize + 1,
carray_.leftover_array,
out_buffer,
table,
&count,
reverse,
)
in_buffer_view = carray_.leftover_array
with nogil:
_factorize_int64_helper(leftover_elements,
in_buffer_view,
out_buffer_view,
table,
&count
)

# compress out_buffer into labels
labels.append(out_buffer[:leftover_elements].astype(np.int64))

for i in range(table.n_buckets):
if not kh_exist_int64(table, i): # adjust function name to hash-table data-type
continue
reverse[table.vals[i]] = table.keys[i]
kh_destroy_int64(table)

return labels, reverse

@cython.wraparound(False)
@cython.boundscheck(False)
cdef void _factorize_int32_helper(Py_ssize_t iter_range,
Py_ssize_t allocation_size,
ndarray[npy_int32] in_buffer,
ndarray[npy_uint64] out_buffer,
npy_int32[:] in_buffer,
uint64_t[:] out_buffer,
kh_int32_t *table,
Py_ssize_t * count,
dict reverse,
):
) nogil:
cdef:
Py_ssize_t i, idx
int ret
Expand All @@ -242,7 +244,6 @@ cdef void _factorize_int32_helper(Py_ssize_t iter_range,
else:
k = kh_put_int32(table, element, &ret)
table.vals[k] = idx = count[0]
reverse[count[0]] = element
count[0] += 1
out_buffer[i] = idx

Expand All @@ -256,6 +257,8 @@ def factorize_int32(carray carray_, carray labels=None):
ndarray[npy_int32] in_buffer
ndarray[npy_uint64] out_buffer
kh_int32_t *table
npy_int32[:] in_buffer_view
uint64_t[:] out_buffer_view

count = 0
ret = 0
Expand All @@ -267,52 +270,55 @@ def factorize_int32(carray carray_, carray labels=None):
labels = carray([], dtype='int64', expectedlen=n)
# in-buffer isn't typed, because cython doesn't support string arrays (?)
out_buffer = np.empty(chunklen, dtype='uint64')
out_buffer_view = out_buffer
in_buffer = np.empty(chunklen, dtype='int32')
table = kh_init_int32()

for i in range(carray_.nchunks):
chunk_ = carray_.chunks[i]
# decompress into in_buffer
chunk_._getitem(0, chunklen, in_buffer.data)
_factorize_int32_helper(chunklen,
carray_.dtype.itemsize + 1,
in_buffer,
out_buffer,
table,
&count,
reverse,
)
in_buffer_view = in_buffer
with nogil:
_factorize_int32_helper(chunklen,
in_buffer_view,
out_buffer_view,
table,
&count
)
# compress out_buffer into labels
labels.append(out_buffer.astype(np.int64))

leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize)
if leftover_elements > 0:
_factorize_int32_helper(leftover_elements,
carray_.dtype.itemsize + 1,
carray_.leftover_array,
out_buffer,
table,
&count,
reverse,
)
in_buffer_view = carray_.leftover_array
with nogil:
_factorize_int32_helper(leftover_elements,
in_buffer_view,
out_buffer_view,
table,
&count
)

# compress out_buffer into labels
labels.append(out_buffer[:leftover_elements].astype(np.int64))

for i in range(table.n_buckets):
if not kh_exist_int32(table, i): # adjust function name to hash-table data-type
continue
reverse[table.vals[i]] = table.keys[i]
kh_destroy_int32(table)

return labels, reverse

@cython.wraparound(False)
@cython.boundscheck(False)
cdef void _factorize_float64_helper(Py_ssize_t iter_range,
Py_ssize_t allocation_size,
ndarray[npy_float64] in_buffer,
ndarray[npy_uint64] out_buffer,
npy_float64[:] in_buffer,
uint64_t[:] out_buffer,
kh_float64_t *table,
Py_ssize_t * count,
dict reverse,
):
) nogil:
cdef:
Py_ssize_t i, idx
int ret
Expand All @@ -330,7 +336,6 @@ cdef void _factorize_float64_helper(Py_ssize_t iter_range,
else:
k = kh_put_float64(table, element, &ret)
table.vals[k] = idx = count[0]
reverse[count[0]] = element
count[0] += 1
out_buffer[i] = idx

Expand All @@ -344,6 +349,8 @@ def factorize_float64(carray carray_, carray labels=None):
ndarray[npy_float64] in_buffer
ndarray[npy_uint64] out_buffer
kh_float64_t *table
npy_float64[:] in_buffer_view
uint64_t[:] out_buffer_view

count = 0
ret = 0
Expand All @@ -355,38 +362,43 @@ def factorize_float64(carray carray_, carray labels=None):
labels = carray([], dtype='int64', expectedlen=n)
# in-buffer isn't typed, because cython doesn't support string arrays (?)
out_buffer = np.empty(chunklen, dtype='uint64')
out_buffer_view = out_buffer
in_buffer = np.empty(chunklen, dtype='float64')
table = kh_init_float64()

for i in range(carray_.nchunks):
chunk_ = carray_.chunks[i]
# decompress into in_buffer
chunk_._getitem(0, chunklen, in_buffer.data)
_factorize_float64_helper(chunklen,
carray_.dtype.itemsize + 1,
in_buffer,
out_buffer,
table,
&count,
reverse,
)
in_buffer_view = in_buffer
with nogil:
_factorize_float64_helper(chunklen,
in_buffer_view,
out_buffer_view,
table,
&count
)
# compress out_buffer into labels
labels.append(out_buffer.astype(np.int64))

leftover_elements = cython.cdiv(carray_.leftover, carray_.atomsize)
if leftover_elements > 0:
_factorize_float64_helper(leftover_elements,
carray_.dtype.itemsize + 1,
carray_.leftover_array,
out_buffer,
table,
&count,
reverse,
)
in_buffer_view = carray_.leftover_array
with nogil:
_factorize_float64_helper(leftover_elements,
in_buffer_view,
out_buffer_view,
table,
&count
)

# compress out_buffer into labels
labels.append(out_buffer[:leftover_elements].astype(np.int64))

for i in range(table.n_buckets):
if not kh_exist_float64(table, i): # adjust function name to hash-table data-type
continue
reverse[table.vals[i]] = table.keys[i]
kh_destroy_float64(table)

return labels, reverse
Expand Down
12 changes: 6 additions & 6 deletions bquery/khash.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ cdef extern from "khash_python.h":
inline kh_int64_t* kh_init_int64()
inline void kh_destroy_int64(kh_int64_t*)
inline void kh_clear_int64(kh_int64_t*)
inline khint_t kh_get_int64(kh_int64_t*, int64_t)
inline khint_t kh_get_int64(kh_int64_t*, int64_t) nogil
inline void kh_resize_int64(kh_int64_t*, khint_t)
inline khint_t kh_put_int64(kh_int64_t*, int64_t, int*)
inline khint_t kh_put_int64(kh_int64_t*, int64_t, int*) nogil
inline void kh_del_int64(kh_int64_t*, khint_t)

bint kh_exist_int64(kh_int64_t*, khiter_t)
Expand All @@ -81,9 +81,9 @@ cdef extern from "khash_python.h":
inline kh_float64_t* kh_init_float64()
inline void kh_destroy_float64(kh_float64_t*)
inline void kh_clear_float64(kh_float64_t*)
inline khint_t kh_get_float64(kh_float64_t*, float64_t)
inline khint_t kh_get_float64(kh_float64_t*, float64_t) nogil
inline void kh_resize_float64(kh_float64_t*, khint_t)
inline khint_t kh_put_float64(kh_float64_t*, float64_t, int*)
inline khint_t kh_put_float64(kh_float64_t*, float64_t, int*) nogil
inline void kh_del_float64(kh_float64_t*, khint_t)

bint kh_exist_float64(kh_float64_t*, khiter_t)
Expand All @@ -97,9 +97,9 @@ cdef extern from "khash_python.h":
inline kh_int32_t* kh_init_int32()
inline void kh_destroy_int32(kh_int32_t*)
inline void kh_clear_int32(kh_int32_t*)
inline khint_t kh_get_int32(kh_int32_t*, int32_t)
inline khint_t kh_get_int32(kh_int32_t*, int32_t) nogil
inline void kh_resize_int32(kh_int32_t*, khint_t)
inline khint_t kh_put_int32(kh_int32_t*, int32_t, int*)
inline khint_t kh_put_int32(kh_int32_t*, int32_t, int*) nogil
inline void kh_del_int32(kh_int32_t*, khint_t)

bint kh_exist_int32(kh_int32_t*, khiter_t)
Expand Down
Loading