Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate reads & writes in disk_io #1205

Merged
merged 12 commits into from
Jun 29, 2023
41 changes: 21 additions & 20 deletions dask_cuda/disk_io.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import os
import os.path
import pathlib
Expand Down Expand Up @@ -164,17 +165,19 @@ def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False)
A dict of metadata
"""
cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames)
frame_lengths = tuple(map(nbytes, frames))

if gds and any(cuda_frames):
import kvikio

# Write each frame consecutively into `path` in parallel
with kvikio.CuFile(path, "w") as f:
for frame, length in zip(frames, frame_lengths):
f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0).get()
file_offsets = itertools.accumulate(map(nbytes, frames), initial=0)
futures = [f.pwrite(b, file_offset=o) for b, o in zip(frames, file_offsets)]
for each_fut in futures:
each_fut.get()
else:
with open(path, "wb") as f:
for frame in frames:
f.write(frame)
os.writev(f.fileno(), frames) # type: ignore
return {
"method": "stdio",
"path": SpillToDiskFile(path),
Expand All @@ -200,24 +203,22 @@ def disk_read(header: Mapping, gds=False) -> list:
frames: list
List of read frames
"""
ret = []
ret: list = [
get_new_cuda_buffer()(length)
if gds and is_cuda
else np.empty((length,), dtype="u1")
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"])
]
if gds:
import kvikio # isort:skip

with kvikio.CuFile(header["path"], "rb") as f:
file_offset = 0
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"]):
if is_cuda:
buf = get_new_cuda_buffer()(length)
else:
buf = np.empty((length,), dtype="u1")
f.pread(
buf=buf, count=length, file_offset=file_offset, buf_offset=0
).get()
file_offset += length
ret.append(buf)
with kvikio.CuFile(str(header["path"]), "r") as f:
# Read each frame consecutively from `path` in parallel
file_offsets = itertools.accumulate((b.nbytes for b in ret), initial=0)
futures = [f.pread(b, file_offset=o) for b, o in zip(ret, file_offsets)]
for each_fut in futures:
each_fut.get()
else:
with open(str(header["path"]), "rb") as f:
for length in header["frame-lengths"]:
ret.append(f.read(length))
os.readv(f.fileno(), ret) # type: ignore
return ret