Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support ray actor #511

Merged
merged 14 commits into from
Dec 25, 2024
5 changes: 5 additions & 0 deletions data_juicer/config/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,11 @@ def init_setup_from_cfg(cfg: Namespace):

# check number of processes np
sys_cpu_count = os.cpu_count()
if not cfg.np:
cfg.np = sys_cpu_count
logger.warning(
f'Number of processes `np` is not set, '
f'set it to cpu count [{sys_cpu_count}] as default value.')
if cfg.np > sys_cpu_count:
logger.warning(f'Number of processes `np` is set as [{cfg.np}], which '
f'is larger than the cpu count [{sys_cpu_count}]. Due '
Expand Down
42 changes: 34 additions & 8 deletions data_juicer/core/ray_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,41 @@ def _run_single_op(self, op):
batch_size = getattr(op, 'batch_size',
1) if op.is_batched_op() else 1
if isinstance(op, Mapper):
self.data = self.data.map_batches(op.process,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.use_cuda():
op_kwargs = op._op_cfg[op._name]
self.data = self.data.map_batches(
op.__class__,
fn_args=None,
fn_kwargs=None,
fn_constructor_args=None,
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_gpus=num_gpus,
concurrency=op_proc,
batch_format='pyarrow')
else:
self.data = self.data.map_batches(op.process,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
elif isinstance(op, Filter):
self.data = self.data.map_batches(op.compute_stats,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.use_cuda():
op_kwargs = op._op_cfg[op._name]
self.data = self.data.map_batches(
op.__class__,
fn_args=None,
fn_kwargs=None,
fn_constructor_args=None,
fn_constructor_kwargs=op_kwargs,
batch_size=batch_size,
num_gpus=num_gpus,
concurrency=op_proc,
batch_format='pyarrow')
else:
self.data = self.data.map_batches(op.compute_stats,
batch_size=batch_size,
batch_format='pyarrow',
num_gpus=num_gpus)
if op.stats_export_path is not None:
self.data.write_json(op.stats_export_path,
force_ascii=False)
Expand Down
6 changes: 6 additions & 0 deletions data_juicer/ops/base_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ def __init_subclass__(cls, **kwargs):
f'{cls.__name__}. Please implement {method_name}_single '
f'or {method_name}_batched.')

def __call__(self, *args, **kwargs):
return self.process(*args, **kwargs)

def process_batched(self, samples, *args, **kwargs):
keys = samples.keys()
first_key = next(iter(keys))
Expand Down Expand Up @@ -378,6 +381,9 @@ def __init_subclass__(cls, **kwargs):
f'{cls.__name__}. Please implement {method_name}_single '
f'or {method_name}_batched.')

def __call__(self, *args, **kwargs):
return self.compute_stats(*args, **kwargs)

def compute_stats_batched(self, samples, *args, **kwargs):
keys = samples.keys()
num_samples = len(samples[Fields.stats])
Expand Down
46 changes: 32 additions & 14 deletions data_juicer/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,32 +57,50 @@ def calculate_np(name,
"""Calculate the optimum number of processes for the given OP"""
eps = 1e-9 # about 1 byte

if num_proc is None:
num_proc = psutil.cpu_count()

if use_cuda:
auto_num_proc = None
cuda_mem_available = get_min_cuda_memory() / 1024
op_proc = min(
num_proc,
math.floor(cuda_mem_available / (mem_required + eps)) *
cuda_device_count())
if use_cuda and mem_required == 0:
if mem_required == 0:
logger.warning(f'The required cuda memory of Op[{name}] '
f'has not been specified. '
f'Please specify the mem_required field in the '
f'config file, or you might encounter CUDA '
f'out of memory error. You can reference '
f'the mem_required field in the '
f'config_all.yaml file.')
if op_proc < 1.0:
logger.warning(f'The required cuda memory:{mem_required}GB might '
f'be more than the available cuda memory:'
f'{cuda_mem_available}GB.'
f'This Op[{name}] might '
f'require more resource to run.')
else:
auto_num_proc = math.floor(
cuda_mem_available / mem_required) * cuda_device_count()
if cuda_mem_available / mem_required < 1.0:
logger.warning(
f'The required cuda memory:{mem_required}GB might '
f'be more than the available cuda memory:'
f'{cuda_mem_available}GB.'
f'This Op[{name}] might '
f'require more resource to run.')

if auto_num_proc and num_proc:
op_proc = min(auto_num_proc, num_proc)
if num_proc > auto_num_proc:
logger.warning(
f'The given num_proc: {num_proc} is greater than '
f'the value {auto_num_proc} auto calculated based '
f'on the mem_required of Op[{name}]. '
f'Set the `num_proc` to {auto_num_proc}.')
elif not auto_num_proc and not num_proc:
op_proc = cuda_device_count()
logger.warning(
f'Both mem_required and num_proc of Op[{name}] are not set.'
f'Set the `num_proc` to number of GPUs {op_proc}.')
else:
op_proc = auto_num_proc if auto_num_proc else num_proc

op_proc = max(op_proc, 1)
return op_proc
else:
if num_proc is None:
num_proc = psutil.cpu_count()

op_proc = num_proc
cpu_available = psutil.cpu_count()
mem_available = psutil.virtual_memory().available
Expand Down
103 changes: 101 additions & 2 deletions tests/tools/test_process_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,49 @@
import subprocess
import tempfile
import unittest
import uuid
import yaml

from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase


def run_in_subprocess(cmd):
try:
with subprocess.Popen(
cmd, shell=True, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) as return_info:
while True:
next_line = return_info.stdout.readline()
return_line = next_line.decode('utf-8', 'ignore').strip()
if return_line == '' and return_info.poll() != None:
break
if return_line != '':
print(return_line)

err_lines = ''
while True:
next_line = return_info.stderr.readline()
return_line = next_line.decode('utf-8', 'ignore').strip()
if return_line == '' and return_info.poll() != None:
break
if return_line != '':
print(return_line)
err_lines += return_line + '\n'

return_code = return_info.wait()
if return_code:
raise RuntimeError(err_lines)
except Exception as e:
raise e


class ProcessDataTest(DataJuicerTestCaseBase):

def setUp(self):
super().setUp()

self.tmp_dir = tempfile.TemporaryDirectory().name
if not osp.exists(self.tmp_dir):
os.makedirs(self.tmp_dir)
os.makedirs(self.tmp_dir, exist_ok=True)

def tearDown(self):
super().tearDown()
Expand Down Expand Up @@ -66,5 +96,74 @@ def test_status_code_1(self):
self.assertFalse(osp.exists(tmp_out_path))


class ProcessDataRayTest(DataJuicerTestCaseBase):

def setUp(self):
super().setUp()

cur_dir = osp.dirname(osp.abspath(__file__))
self.tmp_dir = osp.join(cur_dir, f'tmp_{uuid.uuid4().hex}')
os.makedirs(self.tmp_dir, exist_ok=True)

def tearDown(self):
super().tearDown()

if osp.exists(self.tmp_dir):
shutil.rmtree(self.tmp_dir)

import ray
ray.shutdown()

def test_ray_image(self):
tmp_yaml_file = osp.join(self.tmp_dir, 'config_0.yaml')
tmp_out_path = osp.join(self.tmp_dir, 'output_0.json')
text_keys = 'text'

data_path = osp.join(osp.dirname(osp.dirname(osp.dirname(osp.realpath(__file__)))),
'demos', 'data', 'demo-dataset-images.jsonl')
yaml_config = {
'dataset_path': data_path,
'executor_type': 'ray',
'ray_address': 'auto',
'text_keys': text_keys,
'image_key': 'images',
'export_path': tmp_out_path,
'process': [
{
'image_nsfw_filter': {
'hf_nsfw_model': 'Falconsai/nsfw_image_detection',
'trust_remote_code': True,
'score_threshold': 0.5,
'any_or_all': 'any',
'mem_required': '8GB'
},
'image_aspect_ratio_filter':{
'min_ratio': 0.5,
'max_ratio': 2.0
}
}
]
}

with open(tmp_yaml_file, 'w') as file:
yaml.dump(yaml_config, file)

run_in_subprocess(f'python tools/process_data.py --config {tmp_yaml_file}')

self.assertTrue(osp.exists(tmp_out_path))

from datasets import load_dataset
jsonl_files = [os.path.join(tmp_out_path, f) \
for f in os.listdir(tmp_out_path) \
if f.endswith('.json')]
dataset = load_dataset(
'json',
data_files={'jsonl': jsonl_files})

self.assertEqual(len(dataset['jsonl']), 3)
for item in dataset['jsonl']:
self.assertIn('aspect_ratios', item['__dj__stats__'])


if __name__ == '__main__':
unittest.main()
Loading