Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added dlp for spawned workers pytorch #136

Merged
merged 4 commits into from
Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-package-conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ jobs:
run: |
echo "Profiler ${DLIO_PROFILER} gcc $CC"
source ${VENV}/bin/activate
pip install --force-reinstall dlio-profiler-py==0.0.2
pip install --force-reinstall dlio_profiler_py
- name: test_gen_data
run: |
source ${VENV}/bin/activate
Expand Down
7 changes: 6 additions & 1 deletion dlio_benchmark/data_loader/torch_data_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,20 +48,25 @@ def __init__(self, format_type, dataset_type, epoch, num_samples, num_workers, b
self.batch_size = batch_size
args = ConfigArguments.get_instance()
self.serial_args = pickle.dumps(args)
self.dlp_logger = None
if num_workers == 0:
self.worker_init(-1)

@dlp.log
def worker_init(self, worker_id):
pickle.loads(self.serial_args)
_args = ConfigArguments.get_instance()
_args.configure_dlio_logging(True)
_args.configure_dlio_logging(is_child=True)
self.dlp_logger = _args.configure_dlio_profiler(is_child=True, use_pid=True)
logging.debug(f"{utcnow()} worker initialized {worker_id} with format {self.format_type}")
self.reader = ReaderFactory.get_reader(type=self.format_type,
dataset_type=self.dataset_type,
thread_index=worker_id,
epoch_number=self.epoch_number)

def __del__(self):
if self.dlp_logger:
self.dlp_logger.finalize()
@dlp.log
def __len__(self):
return self.num_samples
Expand Down
37 changes: 15 additions & 22 deletions dlio_benchmark/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,33 +75,26 @@ def __init__(self, cfg):

self.output_folder = self.args.output_folder
os.makedirs(self.args.output_folder, mode=0o755, exist_ok=True)
dlp_trace = get_trace_name(self.args.output_folder)
self.comm = DLIOMPI.get_instance().comm()
self.my_rank = self.args.my_rank = DLIOMPI.get_instance().rank()
self.comm_size = self.args.comm_size = DLIOMPI.get_instance().size()
self.dlp_logger = PerfTrace.initialize_log(logfile=dlp_trace,
data_dir=f"{os.path.abspath(self.args.data_folder)}:"
f"{self.args.data_folder}:./{self.args.data_folder}:"
f"{self.args.checkpoint_folder}:./{self.args.checkpoint_folder}:"
f"{os.path.abspath(self.args.checkpoint_folder)}",
process_id=self.my_rank)
self.data_folder = self.args.data_folder
self.storage_root = self.args.storage_root
if self.args.storage_root:
self.storage.create_namespace(exist_ok=True)
self.framework = FrameworkFactory().get_framework(self.args.framework,
self.args.do_profiling)

# Delete previous logfile
if self.my_rank == 0:
if os.path.isfile(self.args.logfile_path):
os.remove(self.args.logfile_path)
self.comm.barrier()
# Configure the logging library
self.args.configure_dlio_logging(is_child=False)
self.dlp_logger = self.args.configure_dlio_profiler(is_child=False, use_pid=False)
with Profile(name=f"{self.__init__.__qualname__}", cat=MODULE_DLIO_BENCHMARK):
self.data_folder = self.args.data_folder
self.storage_root = self.args.storage_root
if self.args.storage_root:
self.storage.create_namespace(exist_ok=True)
self.framework = FrameworkFactory().get_framework(self.args.framework,
self.args.do_profiling)

# Delete previous logfile
if self.my_rank == 0:
if os.path.isfile(self.args.logfile_path):
os.remove(self.args.logfile_path)
self.comm.barrier()
# Configure the logging library
self.args.configure_dlio_logging(False)
if self.args.my_rank == 0:
logging.info(f"{utcnow()} Profiling DLIO {dlp_trace}")
logging.info(f"{utcnow()} Running DLIO with {self.args.comm_size} process(es)")
try:
logging.info(
Expand Down
18 changes: 16 additions & 2 deletions dlio_benchmark/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@
from dlio_benchmark.common.enumerations import StorageType, FormatType, Shuffle, ReadType, FileAccess, Compression, \
FrameworkType, \
DataLoaderType, Profiler, DatasetType, DataLoaderSampler, CheckpointLocationType
from dlio_benchmark.utils.utility import DLIOMPI
from dlio_benchmark.utils.utility import DLIOMPI, get_trace_name, utcnow
from dataclasses import dataclass
import math
import os
import numpy as np

from dlio_profiler.logger import fn_interceptor as Profile
from dlio_profiler.logger import dlio_logger as PerfTrace, fn_interceptor as Profile
dlp = Profile(MODULE_CONFIG)
@dataclass
class ConfigArguments:
Expand Down Expand Up @@ -173,6 +173,20 @@ def configure_dlio_logging(self, is_child=False):
# logging's max timestamp resolution is msecs, we will pass in usecs in the message
)

def configure_dlio_profiler(self, is_child=False, use_pid=False):
# with "multiprocessing_context=fork" the profiler file remains open in the child process
if is_child and self.multiprocessing_context == "fork":
return
# Configure the profiler
dlp_trace = get_trace_name(self.output_folder, use_pid)
logging.info(f"{utcnow()} Profiling DLIO {dlp_trace}")
return PerfTrace.initialize_log(logfile=dlp_trace,
data_dir=f"{os.path.abspath(self.data_folder)}:"
f"{self.data_folder}:./{self.data_folder}:"
f"{self.checkpoint_folder}:./{self.checkpoint_folder}:"
f"{os.path.abspath(self.checkpoint_folder)}",
process_id=self.my_rank)

@dlp.log
def validate(self):
""" validate whether the parameters are set correctly"""
Expand Down
7 changes: 5 additions & 2 deletions dlio_benchmark/utils/utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,5 +230,8 @@ def create_dur_event(name, cat, ts, dur, args={}):
return d


def get_trace_name(output_folder):
return f"{output_folder}/trace-{DLIOMPI.get_instance().rank()}-of-{DLIOMPI.get_instance().size()}.pfw"
def get_trace_name(output_folder, use_pid=False):
val = ""
if use_pid:
val = f"-{os.getpid()}"
return f"{output_folder}/trace-{DLIOMPI.get_instance().rank()}-of-{DLIOMPI.get_instance().size()}{val}.pfw"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
'h5py',
'pandas',
'psutil',
'dlio_profiler_py==0.0.2'
'dlio_profiler_py==0.0.3'
]
x86_deps = [
'hydra-core == 1.2.0',
Expand Down