From 6e0ae68cc1da413edd41974a415942610ced56b4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=82=A2=E6=9C=9D=E9=BE=99?= Date: Wed, 26 Jan 2022 11:45:26 +0800 Subject: [PATCH] fix the excessively high memory usage of data processing --- requirements.txt | 2 + setup.py | 1 + tests/st/deepfm.py | 3 +- tinyms/data/loader.py | 149 +++++++++++++++++++++++++----------------- tinyms/data/utils.py | 29 +++++--- 5 files changed, 115 insertions(+), 69 deletions(-) diff --git a/requirements.txt b/requirements.txt index 77f7f7a3..efd519f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,4 +15,6 @@ opencv-python==4.1.2.30 flask_cors>=3.0.10 pycocotools>=2.0.0 # for st test wget==3.2 +tqdm==4.62.3 +psutil==5.8.0 scikit-learn==1.0.1 diff --git a/setup.py b/setup.py index 6dd6456b..ee48aaad 100644 --- a/setup.py +++ b/setup.py @@ -58,6 +58,7 @@ def _write_version(file): 'wget==3.2', 'scikit-learn==1.0.1', 'tqdm==4.62.3', + 'psutil==5.8.0', ] test_required_package = [ diff --git a/tests/st/deepfm.py b/tests/st/deepfm.py index c423d5f9..0bb85918 100644 --- a/tests/st/deepfm.py +++ b/tests/st/deepfm.py @@ -71,13 +71,14 @@ def create_dataset(data_path, batch_size=16000): batch_size = args_opt.batch_size dataset_path = args_opt.dataset_path dataset_sink_mode = not args_opt.device_target == "CPU" + convert_dtype = not args_opt.device_target == "CPU" checkpoint_dir = args_opt.checkpoint_dir if args_opt.checkpoint_dir is not None else "." # create train and eval dataset train_ds, eval_ds = create_dataset(data_path=dataset_path, batch_size=batch_size) # build base network data_size = train_ds.get_dataset_size() - net = DeepFM(field_size=39, vocab_size=184965, embed_size=80, convert_dtype=True) + net = DeepFM(field_size=39, vocab_size=184965, embed_size=80, convert_dtype=convert_dtype) # build train network train_net = DeepFMTrainModel(DeepFMWithLoss(net)) # build eval network diff --git a/tinyms/data/loader.py b/tinyms/data/loader.py index da2042f9..667f2600 100644 --- a/tinyms/data/loader.py +++ b/tinyms/data/loader.py @@ -28,6 +28,7 @@ import codecs import collections import pickle +import psutil from itertools import chain from tqdm import tqdm @@ -351,8 +352,7 @@ def get_imdb_data(features, labels): class KaggleDisplayAdvertisingDataset: """ - parse aclImdb data to features and labels. - sentence->tokenized->encoded->padding->features + Convert KaggleDisplayAdvertisingDataset to MindRecord format. Args: data_dir (str): The path where the uncompressed dataset stored. @@ -376,8 +376,9 @@ def __init__(self, data_dir, num_parallel_workers=None, shuffle=True): self.skip_id_convert = False self.train_line_count = 45840617 self.test_size = 0.1 - self.seed = 20191005 - self.line_per_sample = 1000 + self.seed = 2020 + self.line_per_sample = 100 + self.write_per_sample = 1000 self.epochs = 1 self.num_parallel_workers = num_parallel_workers self._check_num_parallel_workers() @@ -525,6 +526,20 @@ def __map_cat2id(self, values, cats): weight_list.append(1.0) return id_list, weight_list + def __set_write_per_sample(self): + mem = psutil.virtual_memory() + mem_free = float(mem.free) / 1024 / 1024 / 1024 + if mem_free > 32: + self.write_per_sample = 40000 + elif mem_free > 16: + self.write_per_sample = 20000 + elif mem_free > 8: + self.write_per_sample = 10000 + elif mem_free > 4: + self.write_per_sample = 1000 + else: + self.write_per_sample = 100 + def stats_data(self): """ stats data @@ -543,11 +558,7 @@ def stats_data(self): items = line.split("\t") if len(items) != num_splits: error_stat_lines_num.append(num_line) - # print("Found line length: {}, suppose to be {}, the line is {}".format( - # len(items), num_splits, line)) continue - # if num_line % 1000000 == 0: - # print("Have handled {}w lines.".format(num_line // 10000)) values = items[1: self.dense_dim + 1] cats = items[self.dense_dim + 1:] assert len(values) == self.dense_dim, "values.size: {}".format(len(values)) @@ -556,26 +567,26 @@ def stats_data(self): self.__stats_cats(cats) self.__save_stats_dict() + print("\n************** Size of error_stat_lines_num: {} **************".format(len(error_stat_lines_num)), flush=True) error_stat_path = os.path.join(self.data_dir, "error_stat_lines_num.npy") np.save(error_stat_path, error_stat_lines_num) def convert_to_mindrecord(self): + # set write_per_sample based on free memory. + self.__set_write_per_sample() + print("************** Flush mindrecord data every {} samples **************". + format(self.write_per_sample * self.line_per_sample), flush=True) + test_size = int(self.train_line_count * self.test_size) all_indices = [i for i in range(self.train_line_count)] np.random.seed(self.seed) np.random.shuffle(all_indices) test_indices_set = set(all_indices[:test_size]) - train_data_list = [] - test_data_list = [] - ids_list = [] - wts_list = [] - label_list = [] - schema = { - "label": {"type": "float32", "shape": [-1]}, + "feat_ids": {"type": "int32", "shape": [-1]}, "feat_vals": {"type": "float32", "shape": [-1]}, - "feat_ids": {"type": "int32", "shape": [-1]} + "label": {"type": "float32", "shape": [-1]} } train_writer = FileWriter(os.path.join(self.mindrecord_dir, "train_input_part.mindrecord"), 21) @@ -583,7 +594,6 @@ def convert_to_mindrecord(self): train_writer.add_schema(schema, "CRITEO_TRAIN") test_writer.add_schema(schema, "CRITEO_TEST") - part_rows = 2000000 num_splits = self.dense_dim + self.slot_dim + 1 error_conv_lines_num = [] @@ -592,12 +602,18 @@ def convert_to_mindrecord(self): t_f = tqdm(f, total=self.train_line_count) t_f.set_description("Processing Convert2MR") num_line = 0 - train_part_number = 0 - test_part_number = 0 + train_samples = [] + test_samples = [] + + extend_train_ids = [] + extend_train_wts = [] + extend_train_labels = [] + + extend_test_ids = [] + extend_test_wts = [] + extend_test_labels = [] for line in t_f: num_line += 1 - # if num_line % 1000000 == 0: - # print("Converting to MindRecord. Have handle {}w lines.".format(num_line // 10000), flush=True) line = line.strip("\n") items = line.split("\t") if len(items) != num_splits: @@ -611,54 +627,69 @@ def convert_to_mindrecord(self): assert len(cats) == self.slot_dim, "cats.size: {}".format(len(cats)) ids, wts = self.__map_cat2id(values, cats) - ids_list.extend(ids) - wts_list.extend(wts) - label_list.append(label) - - if num_line % self.line_per_sample == 0: - if num_line not in test_indices_set: - train_data_list.append({"feat_ids": np.array(ids_list, dtype=np.int32), - "feat_vals": np.array(wts_list, dtype=np.float32), - "label": np.array(label_list, dtype=np.float32) - }) - else: - test_data_list.append({"feat_ids": np.array(ids_list, dtype=np.int32), - "feat_vals": np.array(wts_list, dtype=np.float32), - "label": np.array(label_list, dtype=np.float32) - }) - if train_data_list and len(train_data_list) % part_rows == 0: - train_writer.write_raw_data(train_data_list) - train_data_list.clear() - train_part_number += 1 - - if test_data_list and len(test_data_list) % part_rows == 0: - test_writer.write_raw_data(test_data_list) - test_data_list.clear() - test_part_number += 1 - - ids_list.clear() - wts_list.clear() - label_list.clear() - if train_data_list: - train_writer.write_raw_data(train_data_list) - if test_data_list: - test_writer.write_raw_data(test_data_list) + + if num_line not in test_indices_set: + extend_train_ids.extend(ids) + extend_train_wts.extend(wts) + extend_train_labels.append(label) + if len(extend_train_labels) % self.line_per_sample == 0: + sample = { + "feat_ids": np.array(extend_train_ids, dtype=np.int32), + "feat_vals": np.array(extend_train_wts, dtype=np.float32), + "label": np.array(extend_train_labels, dtype=np.float32) + } + extend_train_ids.clear() + extend_train_wts.clear() + extend_train_labels.clear() + train_samples.append(sample) + if len(train_samples) % self.write_per_sample == 0: + train_writer.write_raw_data(train_samples) + train_samples.clear() + else: + extend_test_ids.extend(ids) + extend_test_wts.extend(wts) + extend_test_labels.append(label) + if len(extend_test_labels) % self.line_per_sample == 0: + sample = { + "feat_ids": np.array(extend_test_ids, dtype=np.int32), + "feat_vals": np.array(extend_test_wts, dtype=np.float32), + "label": np.array(extend_test_labels, dtype=np.float32) + } + extend_test_ids.clear() + extend_test_wts.clear() + extend_test_labels.clear() + test_samples.append(sample) + if len(test_samples) % self.write_per_sample == 0: + test_writer.write_raw_data(test_samples) + test_samples.clear() + # Maybe not enough batch size + if train_samples: + train_writer.write_raw_data(train_samples) + if test_samples: + test_writer.write_raw_data(test_samples) train_writer.commit() test_writer.commit() + print("\n************** Size of error_conv_lines_num: {} **************". + format(len(error_conv_lines_num)), flush=True) error_stat_path = os.path.join(self.data_dir, "error_conv_lines_num.npy") np.save(error_stat_path, error_conv_lines_num) - def load_mindreocrd_dataset(self, usage='train', batch_size=1000): + def load_mindreocrd_dataset(self, usage='train', batch_size=16000): """ load mindrecord dataset. Args: usage (str): Dataset mode. Default: 'train'. - batch_size (int): batch size. Default: 1000. + batch_size (int): batch size. Default: 16000. Returns: MindDataset """ + real_batch_size = int(batch_size / self.line_per_sample) + if real_batch_size < 1: + real_batch_size = 1 + num_samples = real_batch_size * self.line_per_sample + if usage == 'train': train_mode = True else: @@ -673,10 +704,10 @@ def load_mindreocrd_dataset(self, usage='train', batch_size=1000): num_parallel_workers=self.num_parallel_workers, shuffle=self.shuffle, num_shards=None, shard_id=None) - dataset = dataset.batch(int(batch_size / self.line_per_sample), drop_remainder=True) - dataset = dataset.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(batch_size, 39), - np.array(y).flatten().reshape(batch_size, 39), - np.array(z).flatten().reshape(batch_size, 1))), + dataset = dataset.batch(real_batch_size, drop_remainder=True) + dataset = dataset.map(operations=(lambda x, y, z: (np.array(x).flatten().reshape(num_samples, 39), + np.array(y).flatten().reshape(num_samples, 39), + np.array(z).flatten().reshape(num_samples, 1))), input_columns=['feat_ids', 'feat_vals', 'label'], column_order=['feat_ids', 'feat_vals', 'label'], num_parallel_workers=self.num_parallel_workers) diff --git a/tinyms/data/utils.py b/tinyms/data/utils.py index 87d5deca..a0f27865 100644 --- a/tinyms/data/utils.py +++ b/tinyms/data/utils.py @@ -121,13 +121,13 @@ def _fetch_and_unzip_by_wget(url, file_name): file_name: str, local path of downloaded file """ # function to show download progress - def bar_progress(current, total, width=80): + def _bar_progress(current, total, width=80): progress_message = "Downloading: %d%% [%d / %d] bytes" % (current / total * 100, current, total) # Don't use print() as it will print in new line every time. sys.stdout.write("\r" + progress_message) sys.stdout.flush() # using wget is faster than fetch. - wget.download(url, out=file_name, bar=bar_progress) + wget.download(url, out=file_name, bar=_bar_progress) print("\n============== {} is ready ==============".format(file_name)) _unzip(file_name) os.remove(file_name) @@ -230,18 +230,29 @@ def _download_kaggle_display_advertising(local_path): if not os.path.exists(dataset_path): os.makedirs(dataset_path) - print("************** Downloading the Kaggle Display Advertising Challenge dataset **************") remote_url = "http://go.criteo.net/criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz" file_name = os.path.join(dataset_path, remote_url.split('/')[-1]) - # already exist criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz + + # existing uncompressed kaggle-display-advertising data + if _check_uncompressed_kaggle_display_advertising_files(dataset_path): + print("************** Uncompressed kaggle-display-advertising data already exists **************", + flush=True) + return os.path.join(dataset_path) + + # existing criteo-research-kaggle-display-advertising-challenge-dataset.tar.gz if os.path.exists(file_name): if os.path.getsize(file_name) == 4576820670: - print("************** Uncompress already exists tar format data **************", flush=True) + print("************** Uncompress existing tar.gz format file **************", flush=True) _unzip(file_name) - if not _check_uncompressed_kaggle_display_advertising_files(dataset_path): - _fetch_and_unzip_by_wget(remote_url, file_name) - else: - print("{} already have uncompressed kaggle display advertising dataset.".format(dataset_path), flush=True) + # check uncompressed files + if _check_uncompressed_kaggle_display_advertising_files(dataset_path): + return os.path.join(dataset_path) + else: + print("************** {} **************". + format("Uncompress existing tar.gz format file failed, need to download again"), flush=True) + + print("************** Downloading the Kaggle Display Advertising Challenge dataset **************", flush=True) + _fetch_and_unzip_by_wget(remote_url, file_name) return os.path.join(dataset_path)