Skip to content

Commit

Permalink
S3 support (#129)
Browse files Browse the repository at this point in the history
* add github workflow for building containers

* get s3 download working

* save correct validation strain in augmented batch

* add poetry lock

* update lightray

* have fs utils be able to take paths

* remove tune script for now

* fix s3 path parsing error
  • Loading branch information
EthanMarx authored Sep 10, 2024
1 parent 9b7d8e3 commit 2d4b87a
Show file tree
Hide file tree
Showing 11 changed files with 1,937 additions and 714 deletions.
14 changes: 14 additions & 0 deletions .github/project-filters.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
workflow: &workflow
- '.github/lib-filters.yaml'
- '.github/workflows/lib-tests.yaml'

data:
- *workflow
- 'projects/data/**'
- 'mldatafind/**'

train:
- *workflow
- 'projects/train/**'
- 'amplfi/architectures/**'
- 'ml4gw/**'
119 changes: 119 additions & 0 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
# workflow that builds docker container images
# from the singularity apptainer definition files
# we use for local development, and runs any tests.
# Uses tar archives to convert between the formats
# in order to handle the large memory footprints of
# our containers without toppling over the
# GitHub runner nodes this executes on.
name: project build and tests

on:
push:
branches:
- main

# TODO: currently combining path for nvidia variables
# necessary to launch triton in export container
# with path required by train / data projects.
# Should add conditionals in this workflow to set these dynamically
env:
REGISTRY: ghcr.io
PATH: /opt/env/bin/:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/local/cuda-11.8/bin


jobs:
changes:
runs-on: ubuntu-latest
outputs:
projects: ${{ steps.filter.outputs.changes }}
steps:
- name: Checkout repository
uses: actions/checkout@v2
- name: Filter project changes
id: filter
uses: dorny/paths-filter@v2
with:
filters: .github/project-filters.yaml
if: github.event.pull_request.draft == false

build-test:
runs-on: ubuntu-latest
needs: changes
permissions:
contents: read
packages: write
strategy:
fail-fast: false
matrix:
project: ${{ fromJSON(needs.changes.outputs.projects) }}
exclude:
- project: 'workflow' # Fixed the exclusion key
steps:
-
name: delete huge unnecessary tools folder
run: rm -rf /opt/hostedtoolcache
-
name: Checkout repository
uses: actions/checkout@v4
with:
submodules: recursive
-
name: log in to the Container registry
uses: docker/login-action@65b78e6e13532edd9afa3aa52ac7964289d1a9c1
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}

# build the singularity image as a sandbox directory
# inside a docker container that has singularity
# installed (take a big breath). Then tar that directory
# so that we can import it into docker. Doing everything
# in one fell swoop because of permissions discrepancies
# inside and outside the container.
-
name: build singularity image
run: |
docker run \
--rm \
-v ${{ github.workspace }}:/opt/amplfi \
--workdir /opt/amplfi/projects/${{ matrix.project }} \
--privileged \
--entrypoint /bin/bash \
quay.io/singularity/singularity:v3.8.1 \
-c 'singularity build --sandbox /opt/amplfi/sandbox apptainer.def'
# run tests inside the sandbox;
# if this is a push event, tar the sandbox
# so that we can import it into docker
# container and push it to the registry;
# otherwise (e.g. for a PR) just run the tests
- name: run tests and tar sandbox
run: |
docker run \
--rm \
-v ${{ github.workspace }}:/opt/amplfi \
--workdir /opt/amplfi/projects/${{ matrix.project }} \
--privileged \
--entrypoint /bin/bash \
quay.io/singularity/singularity:v3.8.1 \
-c 'singularity exec --env PATH=${{ env.PATH }} tar -czf /opt/amplfi/app.tar.gz -C /opt/amplfi/sandbox .'
# now copy the fs contents into an empty
# container and push it to the registry,
# using a lowercase version of the tag since
# the github environment variables are case-sensitive
-
name: build and push docker image
# only run on pushes so that we aren't
# building containers for PRs
if: ${{ github.event_name == 'push' }}
env:
tag: ${{ env.REGISTRY }}/${{ github.repository }}/${{ matrix.project }}:${{ github.ref_name }}
run: |
export TAG_LC=${tag,,}
cat app.tar.gz | docker import --change "ENV PATH=${{ env.PATH }}" - $TAG_LC
docker push $TAG_LC
2,349 changes: 1,646 additions & 703 deletions projects/train/poetry.lock

Large diffs are not rendered by default.

6 changes: 4 additions & 2 deletions projects/train/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
[tool.poetry]
name = "train"
version = "0.1.0"
description = "Train PE models with pytorch lightning"
description = "Train AMPLFI models with PyTorch Lightning"
authors = ["Deep Chatterjee, Ethan Marx"]
license = "MIT"

[tool.poetry.dependencies]
python = ">=3.9,<3.12" #^3.8,<3.12"
numpy = "<2.0.0"
lightning = "<=2.2.1"
ml4gw = ">=0.4"
ml4gw = "^0.5"
h5py = "^3.10.0"
ray = "^2.9.3"
bilby = "^2.2.3"
Expand All @@ -20,6 +20,8 @@ wandb = "^0.16.3"

"amplfi.architectures" = {path = "../../amplfi/architectures", develop = true}
omegaconf = "^2.3.0"
lightray = "^0.1.5"


[tool.poetry.group.dev.dependencies]
jupyter = "^1.0.0"
Expand Down
2 changes: 1 addition & 1 deletion projects/train/train/callbacks.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def on_train_start(self, trainer, pl_module):
background, cross, plus, parameters
)
with h5py.File(os.path.join(save_dir, "val-batch.h5"), "w") as f:
f["strain"] = X.cpu().numpy()
f["strain"] = strain.cpu().numpy()
f["parameters"] = parameters.cpu().numpy()


Expand Down
28 changes: 23 additions & 5 deletions projects/train/train/data/datasets/base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional, Sequence

import h5py
Expand All @@ -11,7 +10,8 @@
from ml4gw.transforms import ChannelWiseScaler, Whiten

from train.augmentations import PsdEstimator, WaveformProjector
from train.data.utils import ZippedDataset
from train.data.utils import fs as fs_utils
from train.data.utils.utils import ZippedDataset
from train.data.waveforms.sampler import WaveformSampler

Tensor = torch.Tensor
Expand Down Expand Up @@ -67,7 +67,7 @@ class AmplfiDataset(pl.LightningDataModule):

def __init__(
self,
data_dir: Path,
data_dir: str,
inference_params: list[str],
highpass: float,
sample_rate: float,
Expand All @@ -85,9 +85,11 @@ def __init__(
super().__init__()
self.save_hyperparameters(ignore=["waveform_sampler"])
self.init_logging(verbose)
self.data_dir = data_dir
self.waveform_sampler = waveform_sampler
self.train_fnames, self.val_fnames = self.train_val_split()

# generate our local node data directory
# if our specified data source is remote
self.data_dir = fs_utils.get_data_dir(self.hparams.data_dir)

def init_logging(self, verbose: bool):
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
Expand All @@ -97,6 +99,21 @@ def init_logging(self, verbose: bool):
stream=sys.stdout,
)

def prepare_data(self):
"""
Download s3 data if it doesn't exist.
"""
logger = logging.getLogger("AframeDataset")
bucket, _ = fs_utils.split_data_dir(self.hparams.data_dir)
if bucket is None:
return
logger.info(
"Downloading data from S3 bucket {} to {}".format(
bucket, self.data_dir
)
)
fs_utils.download_training_data(bucket, self.data_dir)

# ================================================ #
# Distribution utilities
# ================================================ #
Expand Down Expand Up @@ -253,6 +270,7 @@ def build_transforms(self, stage):
def setup(self, stage: str) -> None:
world_size, rank = self.get_world_size_and_rank()
self._logger = self.get_logger(world_size, rank)
self.train_fnames, self.val_fnames = self.train_val_split()

self._logger.info(f"Setting up data for stage {stage}")

Expand Down
127 changes: 127 additions & 0 deletions projects/train/train/data/utils/fs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from pathlib import Path
from tempfile import gettempdir
from typing import Optional, Tuple, Union

import s3fs
from botocore.exceptions import ClientError, ResponseStreamingError
from filelock import FileLock
from fsspec.exceptions import FSTimeoutError

# s3 retry configuration
retry_config = {"retries": {"total_max_attempts": 10, "mode": "adaptive"}}


def split_data_dir(data_dir: Union[str, Path]) -> Tuple[Optional[str], str]:
"""
Check if a data directory specifies a remote s3
source by including `"s3://"` at the start of
the directory name.
"""
data_dir = str(data_dir)
if data_dir.startswith("s3://"):
bucket = data_dir.replace("s3://", "")

# check if specified a target location to map to
# by adding a colon at the end of our bucket
bucket, *data_dir = bucket.split(":")
data_dir = data_dir[0] if data_dir else None
return bucket, data_dir
else:
return None, data_dir


def get_data_dir(data_dir: str) -> Path:
# generate our local node data directory
# if our specified data source is remote
bucket, data_dir = split_data_dir(data_dir)
if bucket is not None and data_dir is None:
# we have remote data, but we didn't explicitly
# specify a directory to download it to, so create
# a tmp directory using the worker id so that each
# worker process downloads its own copy of the data
# only on its first training run
tmpdir = gettempdir()
logging.info("Downloading data to local tmp directory")
data_dir = f"{tmpdir}/data-tmp"

logging.info(f"Downloading data to {data_dir}")
os.makedirs(data_dir, exist_ok=True)
return Path(data_dir)


def _download(
s3: s3fs.S3FileSystem, source: str, target: str, num_retries: int = 5
):
"""
Cheap wrapper around s3.get to try to avoid issues
from interrupted reads.
"""

lockfile = target + ".lock"
logging.info(f"Downloading {source} to {target}")
for i in range(num_retries):
with FileLock(lockfile):
if os.path.exists(target):
logging.info(
f"Object {source} already downloaded by another process"
)
return
try:
s3.get(source, target)
break
except (ResponseStreamingError, FSTimeoutError, ClientError):
logging.info(
"Download attempt {} for object {} "
"was interrupted, retrying".format(i + 1, source)
)
time.sleep(5)
try:
os.remove(target)
except FileNotFoundError:
continue

else:
raise RuntimeError(
"Failed to download object {} due to repeated "
"connection interruptions".format(source)
)


def download_training_data(bucket: str, data_dir: str):
"""
Download s3 data if it doesn't exist.
"""
data_dir = str(data_dir)
logging.info(
"Downloading data from S3 bucket {} to "
"local directory {}".format(bucket, data_dir)
)

# make a local directory to cache data if it
# doesn't already exist
background_dir = f"{data_dir}/train/background"
os.makedirs(background_dir, exist_ok=True)

# check to make sure the specified bucket
# actually has data to download
s3 = s3fs.S3FileSystem(
key=os.getenv("AWS_ACCESS_KEY_ID"),
secret=os.getenv("AWS_SECRET_ACCESS_KEY"),
endpoint_url=os.getenv("AWS_ENDPOINT_URL"),
config_kwargs=retry_config,
)
sources = s3.glob(f"{bucket}/train/background/*.hdf5")
if not sources:
raise ValueError(f"No data at {bucket} to download")

# multiprocess download of training data
targets = [data_dir + f.replace(f"{bucket}", "") for f in sources]

download = partial(_download, s3)
with ThreadPoolExecutor() as executor:
executor.map(download, sources, targets)
File renamed without changes.
2 changes: 1 addition & 1 deletion projects/train/train/data/waveforms/generator/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import torch

from train.data.utils import ParameterSampler
from train.data.utils.utils import ParameterSampler
from train.data.waveforms.sampler import WaveformSampler

if TYPE_CHECKING:
Expand Down
2 changes: 1 addition & 1 deletion projects/train/train/data/waveforms/sampler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import torch

from train.data.utils import ParameterTransformer
from train.data.utils.utils import ParameterTransformer

Distribution = torch.distributions.Distribution

Expand Down
2 changes: 1 addition & 1 deletion projects/train/train/priors.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from ml4gw import distributions
from torch.distributions import Uniform

from train.data.utils import ParameterSampler, ParameterTransformer
from train.data.utils.utils import ParameterSampler, ParameterTransformer

# prior and parameter transformer for sg use case
sg_prior = ParameterSampler(
Expand Down

0 comments on commit 2d4b87a

Please sign in to comment.