Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Begin perf recording app #513

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions icechunk-python/perf/array.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from enum import StrEnum, auto


class Framework(StrEnum):
zarr = auto()
dask = auto() # dask wrapping zarr
xarray_zarr = auto() # Xarray wrapping zarr
xarry_dask = auto() # Xarray wrapping dask
68 changes: 68 additions & 0 deletions icechunk-python/perf/lib.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import operator
import time
from contextlib import contextmanager
from dataclasses import dataclass, field
from itertools import product, repeat

import pandas as pd
import toolz as tlz


def slices_from_chunks(shape: tuple[int, ...], chunks: tuple[int, ...]):
"""slightly modified from dask.array.core.slices_from_chunks to be lazy"""

extras = ((s % c,) if s % c > 0 else () for s, c in zip(shape, chunks, strict=True))
# need this twice
chunks = tuple(
tuple(tlz.concatv(repeat(c, s // c), e))
for s, c, e in zip(shape, chunks, extras, strict=True)
)
cumdims = (tlz.accumulate(operator.add, bds[:-1], 0) for bds in chunks)
slices = (
(slice(s, s + dim) for s, dim in zip(starts, shapes, strict=True))
for starts, shapes in zip(cumdims, chunks, strict=True)
)
return product(*slices)


def normalize_chunks(
*, shape: tuple[int, ...], chunks: tuple[int, ...]
) -> tuple[int, ...]:
assert len(shape) == len(chunks)
chunks = tuple(s if c == -1 else c for s, c in zip(shape, chunks, strict=True))
return chunks


def get_task_chunk_shape(
*, task_nchunks: int, shape: tuple[int, ...], chunks: tuple[int, ...]
) -> tuple[int, ...]:
left = task_nchunks
task_chunk_shape = []
for s, c in zip(shape, chunks, strict=True):
if c == s or left is None:
task_chunk_shape.append(c)
else:
q, r = divmod(s, c)
if q > left:
task_chunk_shape.append(left * c)
else:
task_chunk_shape.append(q * c)
left /= q
print(f"{task_chunk_shape=!r}")
return task_chunk_shape


@dataclass
class Timer:
diagnostics: list = field(default_factory=list)

@contextmanager
def time(self, **kwargs):
tic = time.perf_counter()
yield
toc = time.perf_counter()
kwargs["runtime"] = toc - tic
self.diagnostics.append(kwargs)

def dataframe(self):
return pd.DataFrame(self.diagnostics)
233 changes: 233 additions & 0 deletions icechunk-python/perf/tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
#!/usr/bin/env python3

# This app records performance measurements for icechunk reads/writes orchestrated using plain tasks.
# See array.py that orchestrates similar operations using an array library.

# Orchestration
# --executor=threads, --executor=processes, with --num-workers
# --executor=dask-threads, --executor=dask-processes, --executor=dask-distributed --num-workers --threads-per-worker
# --framework=dask-array or --framework=tasks

#
import math
import os
import time
from concurrent import futures
from dataclasses import dataclass
from datetime import timedelta
from enum import StrEnum, auto
from typing import Annotated, cast
from urllib.parse import urlparse

import lib
import numpy as np
import tqdm
import typer

import zarr
from icechunk import Repository, Session, StorageConfig
from icechunk.distributed import merge_sessions

BRANCH_NAME = "main"


@dataclass
class Task:
"""A read/write task"""

# The worker will use this Icechunk store to read/write to the dataset
session: Session
# Region of the array to write to
region: tuple[slice, ...]


@dataclass
class TaskResult:
session: Session
time: timedelta


def generate_task_array(task: Task, shape: tuple[int, ...]) -> np.typing.ArrayLike:
"""Generates a random array with the given shape and using the seed in the Task"""
seed = math.prod(slicer.stop for slicer in task.region)
np.random.seed(seed)
return np.random.rand(*shape)


def execute_write_task(task: Task) -> TaskResult:
"""Execute task as a write task.

This will read the time coordinade from `task` and write a "pancake" in that position,
using random data. Random data is generated using the task seed.

Returns the Icechunk store after the write is done.

As you can see Icechunk stores can be passed to remote workers, and returned from them.
The reason to return the store is that we'll need all the remote stores, when they are
done, to be able to do a single, global commit to Icechunk.
"""

store = task.session.store()
group = zarr.group(store=store, overwrite=False)
array = cast(zarr.Array, group["array"])
data = generate_task_array(task, array[task.region].shape)
assert len(task.region) == array.ndim
tic = time.perf_counter()
array[task.region] = data
toc = time.perf_counter()
return TaskResult(task.session, time=toc - tic)


def get_repo(url: str) -> Repository:
url = urlparse(url)
if url.netloc == "" or url.params != "" or url.query != "" or url.fragment != "":
raise ValueError(f"Invalid {url=!r}")

if url.scheme == "s3":
if url.path == "":
raise ValueError(f"Invalid {url=!r}")
storage = StorageConfig.s3_from_env(bucket=url.netloc, prefix=url.path)

elif url.scheme == "local":
storage = StorageConfig.filesystem(os.path.expanduser(f"{url.netloc}/{url.path}"))

return Repository.open_or_create(storage=storage)


class Executor(StrEnum):
threads = auto()
processes = auto()
# dask_threads = auto()
# dask_processes = auto()
dask_distributed = auto()


app = typer.Typer()


@app.command()
def read(
# data
url,
# execution
executor: Executor = Executor.threads,
workers: int | None = None,
threads_per_worker: int | None = None,
) -> None:
pass


def validate_data(*, num_arrays, shape, chunks) -> None:
if len(chunks) != len(shape):
raise ValueError(f"{shape=!r} does not match {chunks=!r}")


def get_executor(
*,
executor: Executor,
workers: int | None = None,
threads_per_worker: int | None = None,
) -> futures.Executor:
if executor is Executor.threads:
return futures.ThreadPoolExecutor(workers)
elif executor is Executor.processes:
return futures.ProcessPoolExecutor(workers)
elif executor is Executor.dask_distributed:
from distributed import Client

return Client(n_workers=workers, threads_per_worker=threads_per_worker)
else:
raise NotImplementedError


def initialize_store(repo: Repository, *, shape, chunks):
session = repo.writable_session(BRANCH_NAME)
store = session.store()
group = zarr.group(store, overwrite=True)
group.create_array(
"array", shape=shape, dtype=np.float32, chunk_shape=chunks, exists_ok=True
)
session.commit("initialized")


@app.command()
def write(
# data
url: str,
num_arrays: Annotated[int, typer.Option(min=1, max=1)] = 1,
# scale: # TODO: [small, medium, large]
shape: list[int] = [3, 720, 1440], # noqa: B006
chunks: list[int] = [1, -1, -1], # noqa: B006
task_nchunks: int = 1,
# execution
executor: Executor = Executor.threads,
workers: int | None = None,
threads_per_worker: int | None = None,
record: bool = False,
) -> None:
timer = lib.Timer()
validate_data(num_arrays=num_arrays, shape=shape, chunks=chunks)
chunks = lib.normalize_chunks(shape=shape, chunks=chunks)
executor = get_executor(
executor=executor, workers=workers, threads_per_worker=threads_per_worker
)
repo = get_repo(url)
initialize_store(repo, shape=shape, chunks=chunks)

session = repo.writable_session("main")

task_chunk_shape = lib.get_task_chunk_shape(
task_nchunks=task_nchunks, shape=shape, chunks=chunks
)

# TODO: record nchunks, chunk size in bytes for all timers?
# TODO: record commit hash of benchmarked commit too.
timer_context = {}
with timer.time(op="total_write_time", **timer_context):
tasks = [
executor.submit(execute_write_task, Task(session=session, region=slicer))
for slicer in lib.slices_from_chunks(shape, task_chunk_shape)
]
results = [f.result() for f in tqdm.tqdm(tasks)]

timer.diagnostics.append(
{"op": "write_task", "runtime": tuple(r.time for r in results)}
)

# TODO: record time & byte size?
with timer.time(op="merge_changeset", **timer_context):
if not isinstance(executor, futures.ThreadPoolExecutor):
session = merge_sessions(session, *(res.session for res in results))
assert session.has_uncommitted_changes

with timer.time(op="commit", **timer_context):
commit_res = session.commit("committed.")
assert commit_res
print(timer)
print("Written and committed!")


@app.command()
def verify(
# data
# execution
executor: Executor = Executor.threads,
workers: int | None = None,
threads_per_worker: int | None = None,
) -> None:
pass


@app.command()
def create(
# data
# execution
executor: Executor = Executor.threads,
workers: int | None = None,
threads_per_worker: int | None = None,
) -> None:
pass


if __name__ == "__main__":
app()
1 change: 1 addition & 0 deletions icechunk-python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ readme = "../README.md"
packages = [{ include = "icechunk", from = "python" }]

[project.optional-dependencies]
perf = ["typer", "tqdm", "toolz"]
test = [
"boto3",
"coverage",
Expand Down
Loading