Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance benchmark #483

Merged
merged 15 commits into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions .github/workflows/perf-bench.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# This workflow will install Python dependencies, run tests and lint with a single version of Python
# For more information see: https://docs.github.com/en/actions/automating-builds-and-tests/building-and-testing-python

name: performance_benchmark

on:
workflow_dispatch:
push:
branches:
- main

permissions:
contents: read

env:
ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true

jobs:
unittest-single:
runs-on: [self-hosted, linux]
environment: Testing
steps:
- uses: actions/checkout@v3
with:
path: dj-${{ github.run_id }}

- name: Setup docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose up -d

- name: Install data-juicer
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head pip install -e .\[all\]

- name: Clean dataset cache
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head rm -rf /data/huggingface/dataset

- name: Run performance benchmark standalone
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
run: |
docker compose exec ray-head python tests/benchmark_performance/run.sh ${{ secrets.INTERNAL_WANDB_URL }} ${{ secrets.INTERNAL_WANDB_API_KEY }}

- name: Remove docker compose
working-directory: dj-${{ github.run_id }}/.github/workflows/docker
if: always()
run: |
docker compose down --remove-orphans

- name: Cleanup workspace
if: always()
run: |
rm -rf dj-${{ github.run_id }}
1 change: 1 addition & 0 deletions configs/config_all.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ text_keys: 'text' # the key name of fi
suffixes: [] # the suffix of files that will be read. For example: '.txt', 'txt' or ['txt', '.pdf', 'docx']
use_cache: true # whether to use the cache management of Hugging Face datasets. It might take up lots of disk space when using cache
ds_cache_dir: null # cache dir for Hugging Face datasets. In default, it\'s the same as the environment variable `HF_DATASETS_CACHE`, whose default value is usually "~/.cache/huggingface/datasets". If this argument is set to a valid path by users, it will override the default cache dir
open_monitor: true # Whether to open the monitor to trace resource utilization for each OP during data processing. It\'s True in default.
use_checkpoint: false # whether to use the checkpoint management to save the latest version of dataset to work dir when processing. Rerun the same config will reload the checkpoint and skip ops before it. Cache will be disabled when using checkpoint. If args of ops before the checkpoint are changed, all ops will be rerun from the beginning.
temp_dir: null # the path to the temp directory to store intermediate caches when cache is disabled, these cache files will be removed on-the-fly. In default, it's None, so the temp dir will be specified by system. NOTICE: you should be caution when setting this argument because it might cause unexpected program behaviors when this path is set to an unsafe directory.
open_tracer: false # whether to open the tracer to trace the changes during process. It might take more time when opening tracer
Expand Down
6 changes: 6 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ def init_configs(args: Optional[List[str]] = None):
help='The compression method of the cache file, which can be'
'specified in ["gzip", "zstd", "lz4"]. If this parameter is'
'None, the cache file will not be compressed.')
parser.add_argument(
'--open_monitor',
type=bool,
default=True,
help='Whether to open the monitor to trace resource utilization for '
'each OP during data processing. It\'s True in default.')
parser.add_argument(
'--use_checkpoint',
type=bool,
Expand Down
35 changes: 23 additions & 12 deletions data_juicer/core/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,16 @@ def __getitem__(self, key):
res = super().__getitem__(key)
return nested_obj_factory(res)

def process(self,
operators,
*,
work_dir=None,
exporter=None,
checkpointer=None,
tracer=None):
def process(
self,
operators,
*,
work_dir=None,
exporter=None,
checkpointer=None,
tracer=None,
open_monitor=True,
):
if operators is None:
return self

Expand All @@ -179,7 +182,8 @@ def process(self,
unforkable_operators = set(UNFORKABLE.modules.keys())

# resource utilization monitor
resource_util_list = []
if open_monitor:
resource_util_list = []

dataset = self
try:
Expand All @@ -196,12 +200,16 @@ def process(self,
'exporter': exporter,
'tracer': tracer,
}
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=run_args)
if open_monitor:
dataset, resource_util_per_op = Monitor.monitor_func(
op.run, args=run_args)
else:
dataset = op.run(**run_args)
# record processed ops
if checkpointer is not None:
checkpointer.record(op._op_cfg)
resource_util_list.append(resource_util_per_op)
if open_monitor:
resource_util_list.append(resource_util_per_op)
end = time()
logger.info(f'OP [{op._name}] Done in {end - start:.3f}s. '
f'Left {len(dataset)} samples.')
Expand All @@ -215,7 +223,10 @@ def process(self,
'last op...')
dataset.cleanup_cache_files()
checkpointer.save_ckpt(dataset)
if work_dir:
if work_dir and open_monitor:
# get the analyzed version
resource_util_list = Monitor.analyze_resource_util_list(
resource_util_list)
monitor_dir = os.path.join(work_dir, 'monitor')
os.makedirs(monitor_dir, exist_ok=True)
with open(os.path.join(monitor_dir, 'monitor.json'),
Expand Down
13 changes: 8 additions & 5 deletions data_juicer/core/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,14 @@ def run(self,
# - If checkpoint is open, clean the cache files after each process
logger.info('Processing data...')
tstart = time()
dataset = dataset.process(ops,
work_dir=self.work_dir,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer)
dataset = dataset.process(
ops,
work_dir=self.work_dir,
exporter=self.exporter,
checkpointer=self.ckpt_manager,
tracer=self.tracer,
open_monitor=self.cfg.open_monitor,
)
tend = time()
logger.info(f'All OPs are done in {tend - tstart:.3f}s.')

Expand Down
5 changes: 4 additions & 1 deletion data_juicer/core/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,10 @@ def monitor_func(func, args=None, sample_interval=0.5):
resource_util_dict = {}

# start monitor
ctx = get_context('fork')
start_method = 'fork'
if os.name == 'nt': # for Windows
start_method = 'spawn'
ctx = get_context(start_method)
with ctx.Manager() as manager:
mdict = manager.dict()
mdict['stop'] = False
Expand Down
5 changes: 5 additions & 0 deletions data_juicer/utils/model_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
'punkt.*.pickle':
'https://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/'
'data_juicer/models/',

# ram
'ram_plus_swin_large_14m.pth':
'http://dail-wlcb.oss-cn-wulanchabu.aliyuncs.com/data_juicer/models/'
'ram_plus_swin_large_14m.pth',
}


Expand Down
Loading
Loading