Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate reads & writes in disk_io #1205

Merged
merged 12 commits into from
Jun 29, 2023
38 changes: 21 additions & 17 deletions dask_cuda/disk_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,16 @@ def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False)
import kvikio

with kvikio.CuFile(path, "w") as f:
for frame, length in zip(frames, frame_lengths):
f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0).get()
futs = []
file_offset = 0
for b, length in zip(frames, frame_lengths):
futs.append(f.pwrite(b, file_offset=file_offset))
file_offset += length
for each_fut in futs:
each_fut.get()
jakirkham marked this conversation as resolved.
Show resolved Hide resolved
jakirkham marked this conversation as resolved.
Show resolved Hide resolved
else:
with open(path, "wb") as f:
for frame in frames:
f.write(frame)
os.writev(f.fileno(), frames) # type: ignore
return {
"method": "stdio",
"path": SpillToDiskFile(path),
Expand All @@ -200,24 +204,24 @@ def disk_read(header: Mapping, gds=False) -> list:
frames: list
List of read frames
"""
ret = []
ret: list = [
get_new_cuda_buffer()(length)
if gds and is_cuda
else np.empty((length,), dtype="u1")
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"])
]
if gds:
import kvikio # isort:skip

with kvikio.CuFile(header["path"], "rb") as f:
futs = []
file_offset = 0
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"]):
if is_cuda:
buf = get_new_cuda_buffer()(length)
else:
buf = np.empty((length,), dtype="u1")
f.pread(
buf=buf, count=length, file_offset=file_offset, buf_offset=0
).get()
file_offset += length
ret.append(buf)
for b in ret:
futs.append(f.pread(b, file_offset=file_offset))
file_offset += b.nbytes
for each_fut in futs:
each_fut.get()
jakirkham marked this conversation as resolved.
Show resolved Hide resolved
else:
with open(str(header["path"]), "rb") as f:
for length in header["frame-lengths"]:
ret.append(f.read(length))
os.readv(f.fileno(), ret) # type: ignore
return ret