Skip to content

Commit

Permalink
Aggregate reads & writes in disk_io (#1205)
Browse files Browse the repository at this point in the history
Follow up to this discussion ( #925 (comment) )

* Preallocates buffers before reading
* Uses NumPy `uint8` arrays for all host memory (benefits from hugepages on transfers)
* Handles IO asynchronously with KvikIO and waits at the end
* Uses vectorized IO for host reads & writes

Authors:
  - https://github.com/jakirkham
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)
  - Peter Andreas Entschev (https://github.com/pentschev)

URL: #1205
  • Loading branch information
jakirkham authored Jun 29, 2023
1 parent eafde5f commit a19ef43
Showing 1 changed file with 21 additions and 20 deletions.
41 changes: 21 additions & 20 deletions dask_cuda/disk_io.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import itertools
import os
import os.path
import pathlib
Expand Down Expand Up @@ -164,17 +165,19 @@ def disk_write(path: str, frames: Iterable, shared_filesystem: bool, gds=False)
A dict of metadata
"""
cuda_frames = tuple(hasattr(f, "__cuda_array_interface__") for f in frames)
frame_lengths = tuple(map(nbytes, frames))

if gds and any(cuda_frames):
import kvikio

# Write each frame consecutively into `path` in parallel
with kvikio.CuFile(path, "w") as f:
for frame, length in zip(frames, frame_lengths):
f.pwrite(buf=frame, count=length, file_offset=0, buf_offset=0).get()
file_offsets = itertools.accumulate(map(nbytes, frames), initial=0)
futures = [f.pwrite(b, file_offset=o) for b, o in zip(frames, file_offsets)]
for each_fut in futures:
each_fut.get()
else:
with open(path, "wb") as f:
for frame in frames:
f.write(frame)
os.writev(f.fileno(), frames) # type: ignore
return {
"method": "stdio",
"path": SpillToDiskFile(path),
Expand All @@ -200,24 +203,22 @@ def disk_read(header: Mapping, gds=False) -> list:
frames: list
List of read frames
"""
ret = []
ret: list = [
get_new_cuda_buffer()(length)
if gds and is_cuda
else np.empty((length,), dtype="u1")
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"])
]
if gds:
import kvikio # isort:skip

with kvikio.CuFile(header["path"], "rb") as f:
file_offset = 0
for length, is_cuda in zip(header["frame-lengths"], header["cuda-frames"]):
if is_cuda:
buf = get_new_cuda_buffer()(length)
else:
buf = np.empty((length,), dtype="u1")
f.pread(
buf=buf, count=length, file_offset=file_offset, buf_offset=0
).get()
file_offset += length
ret.append(buf)
with kvikio.CuFile(str(header["path"]), "r") as f:
# Read each frame consecutively from `path` in parallel
file_offsets = itertools.accumulate((b.nbytes for b in ret), initial=0)
futures = [f.pread(b, file_offset=o) for b, o in zip(ret, file_offsets)]
for each_fut in futures:
each_fut.get()
else:
with open(str(header["path"]), "rb") as f:
for length in header["frame-lengths"]:
ret.append(f.read(length))
os.readv(f.fileno(), ret) # type: ignore
return ret

0 comments on commit a19ef43

Please sign in to comment.