From bef6002b5d29fde048f08e10e85ecd0c1d307bc6 Mon Sep 17 00:00:00 2001 From: tianshuo78520a <707759223@qq.com> Date: Tue, 20 Apr 2021 14:53:24 +0800 Subject: [PATCH 1/4] Delete WITH_GRPC flag and Distributed old code --- CMakeLists.txt | 4 - cmake/configure.cmake | 4 - paddle/fluid/pybind/pybind.cc | 5 - paddle/scripts/paddle_build.sh | 6 +- paddle/testing/paddle_gtest_main.cc | 3 +- python/paddle/fluid/__init__.py | 1 - python/paddle/fluid/contrib/reader/README.md | 25 - .../paddle/fluid/contrib/reader/__init__.py | 20 - .../contrib/reader/distributed_reader.py | 65 -- python/paddle/fluid/contrib/utils/__init__.py | 23 - .../paddle/fluid/contrib/utils/hdfs_utils.py | 603 ------------------ .../fluid/contrib/utils/lookup_table_utils.py | 496 -------------- .../fluid/incubate/data_generator/__init__.py | 375 ----------- .../data_generator/test_data_generator.py | 36 -- python/paddle/incubate/__init__.py | 2 - python/setup.py.in | 3 - 16 files changed, 2 insertions(+), 1669 deletions(-) delete mode 100644 python/paddle/fluid/contrib/reader/README.md delete mode 100644 python/paddle/fluid/contrib/reader/__init__.py delete mode 100644 python/paddle/fluid/contrib/reader/distributed_reader.py delete mode 100644 python/paddle/fluid/contrib/utils/__init__.py delete mode 100644 python/paddle/fluid/contrib/utils/hdfs_utils.py delete mode 100644 python/paddle/fluid/contrib/utils/lookup_table_utils.py delete mode 100644 python/paddle/fluid/incubate/data_generator/__init__.py delete mode 100644 python/paddle/fluid/incubate/data_generator/test_data_generator.py diff --git a/CMakeLists.txt b/CMakeLists.txt index 30f9e3a3dcdd2..306228ced3e08 100755 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -182,7 +182,6 @@ option(WITH_PSLIB "Compile with pslib support" OFF) option(WITH_BOX_PS "Compile with box_ps support" OFF) option(WITH_XBYAK "Compile with xbyak support" ON) option(WITH_CONTRIB "Compile the third-party contributation" OFF) -option(WITH_GRPC "Use grpc as the default rpc framework" ${WITH_DISTRIBUTE}) option(WITH_PSCORE "Compile with parameter server support" ${WITH_DISTRIBUTE}) option(WITH_HETERPS "Compile with heterps" OFF}) option(WITH_INFERENCE_API_TEST "Test fluid inference C++ high-level api interface" OFF) @@ -259,9 +258,6 @@ endif() if(WITH_BRPC_RDMA) message(STATUS "Use brpc with rdma.") - if(WITH_GRPC) - message(FATAL_ERROR "Can't use grpc with brpc rdma.") - endif() if(NOT WITH_DISTRIBUTE) message(FATAL_ERROR "Can't use brpc rdma in no distribute env.") endif() diff --git a/cmake/configure.cmake b/cmake/configure.cmake index bf1352d4e1147..e7f125269be1f 100644 --- a/cmake/configure.cmake +++ b/cmake/configure.cmake @@ -177,10 +177,6 @@ if(WITH_HETERPS) add_definitions(-DPADDLE_WITH_HETERPS) endif() -if(WITH_GRPC) - add_definitions(-DPADDLE_WITH_GRPC) -endif(WITH_GRPC) - if(WITH_BRPC_RDMA) add_definitions(-DPADDLE_WITH_BRPC_RDMA) endif(WITH_BRPC_RDMA) diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 038bcc7f85099..e31c8f2529404 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -268,11 +268,6 @@ bool IsCompiledWithBrpc() { #ifndef PADDLE_WITH_DISTRIBUTE return false; #endif - -#ifdef PADDLE_WITH_GRPC - return false; -#endif - return true; } diff --git a/paddle/scripts/paddle_build.sh b/paddle/scripts/paddle_build.sh index 404099f5e8b00..80a1ddeef606f 100755 --- a/paddle/scripts/paddle_build.sh +++ b/paddle/scripts/paddle_build.sh @@ -227,7 +227,6 @@ function cmake_base() { fi distibuted_flag=${WITH_DISTRIBUTE:-OFF} - grpc_flag="OFF" gloo_flag=${distibuted_flag} cat < envs; std::vector undefok; -#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_GRPC) && \ - !defined(PADDLE_WITH_PSLIB) +#if defined(PADDLE_WITH_DISTRIBUTE) && !defined(PADDLE_WITH_PSLIB) std::string str_max_body_size; if (::GFLAGS_NAMESPACE::GetCommandLineOption("max_body_size", &str_max_body_size)) { diff --git a/python/paddle/fluid/__init__.py b/python/paddle/fluid/__init__.py index 6dd1478dc1f45..b37fe5538dbb1 100644 --- a/python/paddle/fluid/__init__.py +++ b/python/paddle/fluid/__init__.py @@ -71,7 +71,6 @@ from .core import LoDTensor, LoDTensorArray, Scope, _Scope from .core import CPUPlace, XPUPlace, CUDAPlace, CUDAPinnedPlace, NPUPlace from .incubate import fleet -from .incubate import data_generator from .transpiler import DistributeTranspiler, \ memory_optimize, release_memory, DistributeTranspilerConfig from .lod_tensor import create_lod_tensor, create_random_int_lodtensor diff --git a/python/paddle/fluid/contrib/reader/README.md b/python/paddle/fluid/contrib/reader/README.md deleted file mode 100644 index f043a17493ec2..0000000000000 --- a/python/paddle/fluid/contrib/reader/README.md +++ /dev/null @@ -1,25 +0,0 @@ -## CTR READER - -An multi-thread cpp reader that has the same interface with py_reader. It -uses cpp multi-thread to read file and is much more faster then the Python read -thread in py_reader. - -Currently, it support two types of file: - - gzip - - plain text file - -and two types of data format: - - cvs data format is : - * label dense_fea,dense_fea sparse_fea,sparse_fea - - the svm data format is : - * label slot1:fea_sign slot2:fea_sign slot1:fea_sign - -## Distributed reader - -The distributed reader is mainly used by multi-process tasks, and the input must be a batch reader. - -Cons: - - It can be operated conveniently so that different processes can read different data. - -Pros: - - If batch_reader produces training data, and batch_reader loads or preprocesses data for a long time, this data reading method may be slower. diff --git a/python/paddle/fluid/contrib/reader/__init__.py b/python/paddle/fluid/contrib/reader/__init__.py deleted file mode 100644 index 32054d1421a27..0000000000000 --- a/python/paddle/fluid/contrib/reader/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -from .distributed_reader import * - -__all__ = [] -__all__ += distributed_reader.__all__ diff --git a/python/paddle/fluid/contrib/reader/distributed_reader.py b/python/paddle/fluid/contrib/reader/distributed_reader.py deleted file mode 100644 index ecee769218f54..0000000000000 --- a/python/paddle/fluid/contrib/reader/distributed_reader.py +++ /dev/null @@ -1,65 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -import os - -__all__ = ["distributed_batch_reader"] - - -def distributed_batch_reader(batch_reader): - """ - Create a reader for multi-process training. The input must be a batch reader. - - Args: - batch_reader (callable): The input reader should be a batch reader. - - Examples: - - .. code-block:: python - import paddle - import paddle.fluid as fluid - - train_reader = paddle.batch(paddle.dataset.mnist.train(), - batch_size=32,drop_last=True) - train_reader = fluid.contrib.reader.distributed_batch_reader( - train_reader) - - """ - trainers_num = int(os.environ.get('PADDLE_TRAINERS_NUM', 1)) - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0)) - assert trainer_id < trainers_num - - def decorate_for_multi_process(): - if trainers_num > 1: - print("start data reader (trainers_num: {}, trainer_id: {})".format( - trainers_num, trainer_id)) - - train_data, idx = None, 1 - for batch_id, data in enumerate(batch_reader()): - if trainers_num > 1: - if idx < trainers_num: - if idx == trainer_id + 1: - train_data = data - idx += 1 - else: - if idx == trainer_id + 1: - train_data = data - assert train_data is not None, "train data should not be None." - yield train_data - train_data, idx = None, 1 - else: - yield data - - return decorate_for_multi_process diff --git a/python/paddle/fluid/contrib/utils/__init__.py b/python/paddle/fluid/contrib/utils/__init__.py deleted file mode 100644 index 1c1c2fb227091..0000000000000 --- a/python/paddle/fluid/contrib/utils/__init__.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserve. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function -from . import lookup_table_utils -from .lookup_table_utils import * -from . import hdfs_utils -from .hdfs_utils import * - -__all__ = [] -__all__ += lookup_table_utils.__all__ -__all__ += hdfs_utils.__all__ diff --git a/python/paddle/fluid/contrib/utils/hdfs_utils.py b/python/paddle/fluid/contrib/utils/hdfs_utils.py deleted file mode 100644 index 9572552f0f2be..0000000000000 --- a/python/paddle/fluid/contrib/utils/hdfs_utils.py +++ /dev/null @@ -1,603 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""hdfs_utils.py will move to fluid/incubate/fleet/utils/hdfs.py""" - -import os -import sys -import subprocess -import multiprocessing -from datetime import datetime - -import re -import copy -import errno - -import logging -from paddle.fluid.log_helper import get_logger - -__all__ = ["HDFSClient", "multi_download", "multi_upload"] - -_logger = get_logger( - __name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s') - - -class HDFSClient(object): - r""" - A tool of HDFS - - Args: - hadoop_home (string): hadoop_home - configs (dict): hadoop config, it is a dict, please contain \ - key "fs.default.name" and "hadoop.job.ugi" - Can be a float value - Examples: - hadoop_home = "/home/client/hadoop-client/hadoop/" - - configs = { - "fs.default.name": "hdfs://xxx.hadoop.com:54310", - "hadoop.job.ugi": "hello,hello123" - } - - client = HDFSClient(hadoop_home, configs) - - client.ls("/user/com/train-25") - files = client.lsr("/user/com/train-25/models") - """ - - def __init__(self, hadoop_home, configs): - self.pre_commands = [] - hadoop_bin = '%s/bin/hadoop' % hadoop_home - self.pre_commands.append(hadoop_bin) - dfs = 'fs' - self.pre_commands.append(dfs) - - for k, v in configs.items(): - config_command = '-D%s=%s' % (k, v) - self.pre_commands.append(config_command) - - def __run_hdfs_cmd(self, commands, retry_times=5): - whole_commands = copy.deepcopy(self.pre_commands) - whole_commands.extend(commands) - - print('Running system command: {0}'.format(' '.join(whole_commands))) - - ret_code = 0 - ret_out = None - ret_err = None - whole_commands = " ".join(whole_commands) - for x in range(retry_times + 1): - proc = subprocess.Popen( - whole_commands, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - shell=True) - (output, errors) = proc.communicate() - ret_code, ret_out, ret_err = proc.returncode, output, errors - if ret_code: - _logger.warn( - 'Times: %d, Error running command: %s. Return code: %d, Error: %s' - % (x, ' '.join(whole_commands), proc.returncode, errors)) - else: - break - return ret_code, ret_out, ret_err - - def upload(self, hdfs_path, local_path, overwrite=False, retry_times=5): - """ - upload the local file to hdfs - - Args: - hdfs_path(str): the hdfs file path - local_path(str): the local file path - overwrite(bool|None): will overwrite the file on HDFS or not - retry_times(int|5): retry times - - Returns: - True or False - """ - assert hdfs_path is not None - assert local_path is not None and os.path.exists(local_path) - - if os.path.isdir(local_path): - _logger.warn( - "The Local path: {} is dir and I will support it later, return". - format(local_path)) - return False - - base = os.path.basename(local_path) - if not self.is_exist(hdfs_path): - self.makedirs(hdfs_path) - else: - if self.is_exist(os.path.join(hdfs_path, base)): - if overwrite: - _logger.error( - "The HDFS path: {} is exist and overwrite is True, delete it". - format(hdfs_path)) - self.delete(hdfs_path) - else: - _logger.error( - "The HDFS path: {} is exist and overwrite is False, return". - format(hdfs_path)) - return False - - put_commands = ["-put", local_path, hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd(put_commands, - retry_times) - if returncode: - _logger.error("Put local path: {} to HDFS path: {} failed".format( - local_path, hdfs_path)) - return False - else: - _logger.info("Put local path: {} to HDFS path: {} successfully". - format(local_path, hdfs_path)) - return True - - def download(self, hdfs_path, local_path, overwrite=False, unzip=False): - """ - download file from HDFS - - Args: - hdfs_path(str): the hdfs file path - local_path(str): the local file path - overwrite(bool|None): will overwrite the file on HDFS or not - unzip(bool|False): if the download file is compressed by zip, unzip it or not. - - Returns: - True or False - """ - _logger.info('Downloading %r to %r.', hdfs_path, local_path) - _logger.info('Download of %s to %r complete.', hdfs_path, local_path) - - if not self.is_exist(hdfs_path): - print("HDFS path: {} do not exist".format(hdfs_path)) - return False - if self.is_dir(hdfs_path): - _logger.error( - "The HDFS path: {} is dir and I will support it later, return". - format(hdfs_path)) - - if os.path.exists(local_path): - base = os.path.basename(hdfs_path) - local_file = os.path.join(local_path, base) - if os.path.exists(local_file): - if overwrite: - os.remove(local_file) - else: - _logger.error( - "The Local path: {} is exist and overwrite is False, return". - format(local_file)) - return False - - self.make_local_dirs(local_path) - - download_commands = ["-get", hdfs_path, local_path] - returncode, output, errors = self.__run_hdfs_cmd(download_commands) - if returncode: - _logger.error("Get local path: {} from HDFS path: {} failed".format( - local_path, hdfs_path)) - return False - else: - _logger.info("Get local path: {} from HDFS path: {} successfully". - format(local_path, hdfs_path)) - return True - - def is_exist(self, hdfs_path=None): - """ - whether the remote HDFS path exists - - Args: - hdfs_path(str): the hdfs file path - - Returns: - True or False - """ - exist_cmd = ['-test', '-e', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - exist_cmd, retry_times=1) - - if returncode: - _logger.error("HDFS is_exist HDFS path: {} failed".format( - hdfs_path)) - return False - else: - _logger.info("HDFS is_exist HDFS path: {} successfully".format( - hdfs_path)) - return True - - def is_dir(self, hdfs_path=None): - """ - whether the remote HDFS path is directory - - Args: - hdfs_path(str): the hdfs file path - - Returns: - True or False - """ - - if not self.is_exist(hdfs_path): - return False - - dir_cmd = ['-test', '-d', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd(dir_cmd, retry_times=1) - - if returncode: - _logger.error("HDFS path: {} failed is not a directory".format( - hdfs_path)) - return False - else: - _logger.info("HDFS path: {} successfully is a directory".format( - hdfs_path)) - return True - - def delete(self, hdfs_path): - """ - Remove a file or directory from HDFS. - - whether the remote HDFS path exists - - Args: - hdfs_path: HDFS path. - - Returns: - True or False - This function returns `True` if the deletion was successful and `False` if - no file or directory previously existed at `hdfs_path`. - """ - _logger.info('Deleting %r.', hdfs_path) - - if not self.is_exist(hdfs_path): - _logger.warn("HDFS path: {} do not exist".format(hdfs_path)) - return True - - if self.is_dir(hdfs_path): - del_cmd = ['-rmr', hdfs_path] - else: - del_cmd = ['-rm', hdfs_path] - - returncode, output, errors = self.__run_hdfs_cmd(del_cmd, retry_times=0) - - if returncode: - _logger.error("HDFS path: {} delete files failure".format( - hdfs_path)) - return False - else: - _logger.info("HDFS path: {} delete files successfully".format( - hdfs_path)) - return True - - def rename(self, hdfs_src_path, hdfs_dst_path, overwrite=False): - """ - Move a file or folder on HDFS. - - Args: - hdfs_path(str): HDFS path. - overwrite(bool|False): If the path already exists and overwrite is False, will return False. - - Returns: - True or False - """ - assert hdfs_src_path is not None - assert hdfs_dst_path is not None - - if not self.is_exist(hdfs_src_path): - _logger.info("HDFS path do not exist: {}".format(hdfs_src_path)) - if self.is_exist(hdfs_dst_path) and not overwrite: - _logger.error("HDFS path is exist: {} and overwrite=False".format( - hdfs_dst_path)) - - rename_command = ['-mv', hdfs_src_path, hdfs_dst_path] - returncode, output, errors = self.__run_hdfs_cmd( - rename_command, retry_times=1) - - if returncode: - _logger.error("HDFS rename path: {} to {} failed".format( - hdfs_src_path, hdfs_dst_path)) - return False - else: - _logger.info("HDFS rename path: {} to {} successfully".format( - hdfs_src_path, hdfs_dst_path)) - return True - - @staticmethod - def make_local_dirs(local_path): - """ - create a directory local, is same to mkdir - Args: - local_path: local path that wants to create a directory. - """ - try: - os.makedirs(local_path) - except OSError as e: - if e.errno != errno.EEXIST: - raise - - def makedirs(self, hdfs_path): - """ - Create a remote directory, recursively if necessary. - - Args: - hdfs_path(str): Remote path. Intermediate directories will be created appropriately. - - Returns: - True or False - """ - _logger.info('Creating directories to %r.', hdfs_path) - assert hdfs_path is not None - - if self.is_exist(hdfs_path): - _logger.error("HDFS path is exist: {}".format(hdfs_path)) - return - - mkdirs_commands = ['-mkdir', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - mkdirs_commands, retry_times=1) - - if returncode: - _logger.error("HDFS mkdir path: {} failed".format(hdfs_path)) - return False - else: - _logger.error("HDFS mkdir path: {} successfully".format(hdfs_path)) - return True - - def ls(self, hdfs_path): - """ - ls directory contents about HDFS hdfs_path - - Args: - hdfs_path(str): Remote HDFS path will be ls. - - Returns: - List: a contents list about hdfs_path. - """ - assert hdfs_path is not None - - if not self.is_exist(hdfs_path): - return [] - - ls_commands = ['-ls', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - ls_commands, retry_times=1) - - if returncode: - _logger.error("HDFS list path: {} failed".format(hdfs_path)) - return [] - else: - _logger.info("HDFS list path: {} successfully".format(hdfs_path)) - - ret_lines = [] - regex = re.compile(r'\s+') - out_lines = output.strip().split("\n") - for line in out_lines: - re_line = regex.split(line) - if len(re_line) == 8: - ret_lines.append(re_line[7]) - return ret_lines - - def lsr(self, hdfs_path, only_file=True, sort=True): - """ - list directory contents about HDFS hdfs_path recursively - - Args: - hdfs_path(str): Remote HDFS path. - only_file(bool|True): will discard folders. - sort(bool|True): will be sorted by create time. - - Returns: - List: a contents list about hdfs_path. - """ - - def sort_by_time(v1, v2): - v1_time = datetime.strptime(v1[1], '%Y-%m-%d %H:%M') - v2_time = datetime.strptime(v2[1], '%Y-%m-%d %H:%M') - return v1_time > v2_time - - assert hdfs_path is not None - - if not self.is_exist(hdfs_path): - return [] - - ls_commands = ['-lsr', hdfs_path] - returncode, output, errors = self.__run_hdfs_cmd( - ls_commands, retry_times=1) - - if returncode: - _logger.error("HDFS list all files: {} failed".format(hdfs_path)) - return [] - else: - _logger.info("HDFS list all files: {} successfully".format( - hdfs_path)) - lines = [] - regex = re.compile(r'\s+') - out_lines = output.strip().split("\n") - for line in out_lines: - re_line = regex.split(line) - if len(re_line) == 8: - if only_file and re_line[0][0] == "d": - continue - else: - lines.append( - (re_line[7], re_line[5] + " " + re_line[6])) - if sort: - sorted(lines, cmp=sort_by_time) - ret_lines = [ret[0] for ret in lines] - return ret_lines - - -def multi_download(client, - hdfs_path, - local_path, - trainer_id, - trainers, - multi_processes=5): - """ - Download files from HDFS using multi process. - - Args: - client(HDFSClient): instance of HDFSClient - hdfs_path(str): path on hdfs - local_path(str): path on local - trainer_id(int): current trainer id - trainers(int): all trainers number - multi_processes(int|5): the download data process at the same time, default=5 - - Returns: - List: - Download files in local folder. - """ - - def __subprocess_download(datas): - for data in datas: - re_path = os.path.relpath(os.path.dirname(data), hdfs_path) - if re_path == os.curdir: - sub_local_re_path = local_path - else: - sub_local_re_path = os.path.join(local_path, re_path) - client.download(data, sub_local_re_path) - - assert isinstance(client, HDFSClient) - - client.make_local_dirs(local_path) - _logger.info("Make local dir {} successfully".format(local_path)) - - all_need_download = client.lsr(hdfs_path, sort=True) - need_download = all_need_download[trainer_id::trainers] - _logger.info("Get {} files From all {} files need to be download from {}". - format(len(need_download), len(all_need_download), hdfs_path)) - - _logger.info("Start {} multi process to download datas".format( - multi_processes)) - procs = [] - for i in range(multi_processes): - process_datas = need_download[i::multi_processes] - p = multiprocessing.Process( - target=__subprocess_download, args=(process_datas, )) - procs.append(p) - p.start() - - # complete the processes - for proc in procs: - proc.join() - - _logger.info("Finish {} multi process to download datas".format( - multi_processes)) - - local_downloads = [] - for data in need_download: - data_name = os.path.basename(data) - re_path = os.path.relpath(os.path.dirname(data), hdfs_path) - if re_path == os.curdir: - local_re_path = os.path.join(local_path, data_name) - else: - local_re_path = os.path.join(local_path, re_path, data_name) - local_downloads.append(local_re_path) - - return local_downloads - - -def getfilelist(path): - rlist = [] - for dir, folder, file in os.walk(path): - for i in file: - t = os.path.join(dir, i) - rlist.append(t) - for r in rlist: - print(r) - - -def multi_upload(client, - hdfs_path, - local_path, - multi_processes=5, - overwrite=False, - sync=True): - """ - Upload files to HDFS using multi process. - - Args: - client(HDFSClient): instance of HDFSClient - hdfs_path(str): path on hdfs - local_path(str): path on local - multi_processes(int|5): the upload data process at the same time, default=5 - overwrite(bool|False): will overwrite file on HDFS or not - sync(bool|True): upload files sync or not. - - Returns: - None - """ - - def __subprocess_upload(datas): - for data in datas: - re_path = os.path.relpath(os.path.dirname(data), local_path) - hdfs_re_path = os.path.join(hdfs_path, re_path) - client.upload(hdfs_re_path, data, overwrite, retry_times=5) - - def get_local_files(path): - rlist = [] - - if not os.path.isdir(path): - return rlist - - for dirname, folder, files in os.walk(path): - for i in files: - t = os.path.join(dirname, i) - rlist.append(t) - return rlist - - assert isinstance(client, HDFSClient) - - all_files = get_local_files(local_path) - if not all_files: - _logger.info("there are nothing need to upload, exit") - return - _logger.info("Start {} multi process to upload datas".format( - multi_processes)) - procs = [] - for i in range(multi_processes): - process_datas = all_files[i::multi_processes] - p = multiprocessing.Process( - target=__subprocess_upload, args=(process_datas, )) - procs.append(p) - p.start() - - # complete the processes - for proc in procs: - proc.join() - - _logger.info("Finish {} multi process to upload datas".format( - multi_processes)) - - -if __name__ == "__main__": - hadoop_home = "/home/client/hadoop-client/hadoop/" - - configs = { - "fs.default.name": "hdfs://xxx.hadoop.com:54310", - "hadoop.job.ugi": "hello,hello123" - } - - client = HDFSClient(hadoop_home, configs) - - client.ls("/user/com/train-25") - files = client.lsr("/user/com/train-25/models") - - downloads = multi_download( - client, - "/user/com/train-25/model", - "/home/xx/data1", - 1, - 5, - 100, - multi_processes=5) - - multi_upload(client, "/user/com/train-25/model", "/home/xx/data1") diff --git a/python/paddle/fluid/contrib/utils/lookup_table_utils.py b/python/paddle/fluid/contrib/utils/lookup_table_utils.py deleted file mode 100644 index 7d30de565e7a4..0000000000000 --- a/python/paddle/fluid/contrib/utils/lookup_table_utils.py +++ /dev/null @@ -1,496 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""lookup_table_utils.py will move to fluid/incubate/fleet/utils/lookup_table.py""" - -from __future__ import print_function - -import os -import time -import logging - -import paddle -from paddle.fluid import core -from paddle.fluid import io -from paddle.fluid import Program -from paddle.fluid.log_helper import get_logger - -__all__ = [ - "load_persistables_for_increment", "load_persistables_for_inference", - "convert_dist_to_sparse_program" -] - -_logger = get_logger( - 'lookup_table_utils', - logging.INFO, - fmt='%(asctime)s-%(levelname)s: %(message)s') - -model_filename = "__model__" -lookup_table_dir = "__lookup_table__" - - -def __insert_lookup_sparse_table_op(main_program, idx, ids, w, out): - main_program.global_block()._insert_op( - index=idx, - type="lookup_sparse_table", - inputs={"Ids": [ids], - "W": [w]}, - outputs={"Out": [out]}, - attrs={ - "is_distributed": False, - "is_sparse": True, - "grad_inplace": False - }) - - -def __get_prefetch_op_tuples(main_program): - # current lookup tables op is split_ids->prefetch->merge_ids - prefetch_op_tuples = None - op_types = [op.type for op in main_program.global_block().ops] - - for i in range(len(op_types)): - if op_types[i] == "prefetch": - if op_types[i - 1] == "split_ids" and op_types[i + - 1] == "merge_ids": - split_ids_op_id = i - 1 - split_ids_inputs = main_program.global_block().ops[i - 1].input( - "Ids") - prefetch_op_inputs = main_program.global_block().ops[i].input( - "X") - prefetch_op_outputs = main_program.global_block().ops[i].output( - "Out") - merge_ids_outputs = main_program.global_block().ops[ - i + 1].output("Out") - - need_delete_vars = [] - need_delete_vars.extend(prefetch_op_inputs) - need_delete_vars.extend(prefetch_op_outputs) - - prefetch_op_tuples = (split_ids_op_id, split_ids_inputs, - merge_ids_outputs, need_delete_vars) - break - return prefetch_op_tuples - - -def convert_dist_to_sparse_program(program): - """ - WARNING: this function will only be used for distributed training with distributed lookup table. - when we train model with distributed lookup table but want to do the local inference, we can use - this function to convert the train program with distributed lookup table to sparse lookup table. - - Args: - program(Program): the program must be the trainer program, which will be get by the distribute transpiler. - Returns: - program: The `program` is a Program, it's the program replace distributed lookup table to sparse lookup table. - """ - if not program._distributed_lookup_table: - _logger.warn( - "There are no distributed lookup tables need to be converted") - return - - # create table param and grad var in pserver program - origin_emb_var = "{}.origin".format(program._distributed_lookup_table) - emb_var = program._distributed_lookup_table - program.global_block()._rename_var(emb_var, origin_emb_var) - origin_param_var = program.global_block().vars[origin_emb_var] - - param_var = program.global_block().create_var( - name=emb_var, - shape=origin_param_var.shape, - dtype=origin_param_var.dtype, - type=core.VarDesc.VarType.SELECTED_ROWS, - persistable=True) - # parameter must be selected rows - param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - program._sync_with_cpp() - - prefetch_op_tuples = __get_prefetch_op_tuples(program) - - split_ids_id = prefetch_op_tuples[0] - - for idx in range(split_ids_id + 2, split_ids_id - 1, -1): - program.global_block()._remove_op(idx) - program.desc.flush() - - in_out_pairs = zip(prefetch_op_tuples[1], prefetch_op_tuples[2]) - - for in_out_pair in in_out_pairs: - idx = split_ids_id - ids = program.global_block().vars[in_out_pair[0]] - out = program.global_block().vars[in_out_pair[1]] - __insert_lookup_sparse_table_op(program, idx, ids, param_var, out) - program.desc.flush() - return program - - -def load_persistables_for_increment(dirname, executor, program, - lookup_table_var, lookup_table_var_path): - """ - WARNING: this function will only be used for distributed training with distributed lookup table. - for increment training, the pserver will not only load dense variables, - but also load the suitable lookup table var. Because of sliced lookup table - var with HASH, we must load the correct sliced var. - - Args: - dirname(str): The directory path - executor(Executor): The executor to run for loading inference model. - program(Program): The parameter server program, which will run on Pserver. - lookup_table_var: the distributed lookup tables var name. - lookup_table_var_path: the the distributed lookup tables var location. - - Returns: - None - """ - - def _load_persistable_vars(executor, dirname, need_load_vars): - load_prog = Program() - load_block = load_prog.global_block() - need_delete_vars = [] - - for param in need_load_vars: - origin_var = param.origin - slice_var = param.slice - is_slice = param.is_slice - offset = param.offset - - if is_slice: - origin = load_block.create_var( - name="{}.load".format(origin_var.name), - type=origin_var.type, - shape=origin_var.shape, - dtype=origin_var.dtype, - persistable=True) - - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [origin]}, - attrs={ - 'file_path': os.path.join(dirname, origin_var.name) - }) - - slice = load_block.create_var( - name=slice_var.name, - type=slice_var.type, - shape=slice_var.shape, - dtype=slice_var.dtype, - persistable=True) - - dim1_flatten = reduce(lambda x, y: x * y, slice.shape[1:]) - start = int(offset / dim1_flatten) - end = int(offset / dim1_flatten + slice.shape[0]) - - load_block.append_op( - type="slice", - inputs={'Input': origin}, - outputs={'Out': slice}, - attrs={'axes': [0], - 'starts': [start], - 'ends': [end]}) - - need_delete_vars.append(origin) - else: - origin = load_block.create_var( - name="{}".format(origin_var.name), - type=origin_var.type, - shape=origin_var.shape, - dtype=origin_var.dtype, - persistable=True) - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [origin]}, - attrs={ - 'file_path': os.path.join(dirname, origin_var.name) - }) - - load_block.append_op( - type='delete_var', - inputs={'X': need_delete_vars}, ) - - executor.run(load_prog) - - def __load_lookup_table_vars(executor, main_program, lookup_table_var, - lookup_table_var_path): - emb_var = main_program.global_block().var(lookup_table_var) - - load_program = Program() - load_block = load_program.global_block() - load_block.append_op( - type='load', - inputs={}, - outputs={'Out': [emb_var]}, - attrs={'file_path': lookup_table_var_path}) - executor.run(load_program) - - if not os.path.isdir(dirname): - raise ValueError("There is no directory named '%s'", dirname) - - if not os.path.exists(lookup_table_var_path): - raise ValueError("There is no file named '%s'", lookup_table_var_path) - - if not isinstance(program, Program): - raise ValueError("program must be an instance of fluid.Program") - - _logger.info("Start Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - need_load_vars = program._parameters_on_pservers.get_distributed_vars_by_ep( - program._ps_endpoint) - _load_persistable_vars(executor, dirname, need_load_vars) - __load_lookup_table_vars(executor, program, lookup_table_var, - lookup_table_var_path) - - _logger.info("Finish Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - -def load_persistables_for_inference(dirname, executor, program, - lookup_table_var_name): - """ - WARNING: this function will only be used for inference with distributed lookup table. - Inference with distributed lookup table is a little funky, this function will load distributed - lookup table vars into sparse var, can be used in local inference mode. - - Args: - dirname(str): The directory path - executor(Executor): The executor to run for loading inference model. - program(Program): The parameter server program, which will run on Pserver. - lookup_table_var_name: the distributed lookup tables var name. - Returns: - None - """ - - def _load_persistable_vars(executor, dirname, program, lookup_table_vars): - def _is_checkpoint_var(exclude_fluid_vars=None): - """ - the checkpoint will not save or load all the variables. - var type is FEED_MINIBATCH/FETCH_LIST/RAW or var name ends with @GRAD are discarded. - - : param var(Variable) - """ - - if exclude_fluid_vars is None: - exclude_fluid_vars = [] - - def is_valid(var): - if var.desc.type() == core.VarDesc.VarType.FEED_MINIBATCH or \ - var.desc.type() == core.VarDesc.VarType.FETCH_LIST or \ - var.desc.type() == core.VarDesc.VarType.RAW: - return False - # @GRAD are named for gradient variables, checkpoint will not save it. - if "@GRAD" in var.name: - return False - # .trainer_ are named for distribute train variables, checkpoint will not save it. - if ".trainer_" in var.name: - return False - - # .block is named for distribute train variables, checkpoint will not save it. - if ".block" in var.name: - return False - - if "tmp_" in var.name: - return False - - if var.name in exclude_fluid_vars: - return False - - return var.persistable - - return is_valid - - io.load_vars( - executor, - dirname=dirname, - main_program=program, - predicate=_is_checkpoint_var(lookup_table_vars), - filename=None) - - def _load_lookup_table_vars(executor, dirname, main_program, - lookup_table_vars): - if not os.path.isdir(dirname): - raise ValueError("There is no directory named '%s'", dirname) - - lookup_table_dirname = os.path.join(dirname, lookup_table_dir) - - emb_var_name = lookup_table_vars[0] - emb_var = main_program.global_block().var(emb_var_name) - - emb_files = [] - for emb_name in os.listdir(lookup_table_dirname): - if emb_var_name in emb_name: - emb_files.append(emb_name) - - convert_program = Program() - global_block = convert_program.global_block() - - emb_var = global_block.create_var( - name=emb_var.name, - shape=emb_var.shape, - dtype=emb_var.dtype, - type=core.VarDesc.VarType.SELECTED_ROWS, - persistable=True) - emb_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - - sums = [] - - for i, emb_file in enumerate(emb_files): - var_name = "{}_{}".format(emb_var.name, i) - param_var = global_block.create_var( - name=var_name, - shape=emb_var.shape, - dtype=emb_var.dtype, - type=core.VarDesc.VarType.SELECTED_ROWS, - persistable=True) - param_var.desc.set_type(core.VarDesc.VarType.SELECTED_ROWS) - global_block.append_op( - type='load', - inputs={}, - outputs={'Out': [param_var]}, - attrs={ - 'file_path': os.path.join(lookup_table_dirname, var_name) - }) - sums.append(param_var) - global_block.append_op( - type='merge_sparse_lookup_table', - inputs={"X": sums}, - outputs={'Out': emb_var}, - attrs={}) - global_block.append_op( - type='save', - inputs={"X": [emb_var]}, - outputs={}, - attrs={ - 'file_path': os.path.join(lookup_table_dirname, emb_var.name) - }) - global_block.append_op(type='delete_var', inputs={'X': sums}) - executor.run(convert_program) - - if not os.path.isdir(dirname): - raise ValueError("There is no directory named '%s'", dirname) - - if program: - if not isinstance(program, Program): - raise ValueError("program must be an instance of fluid.Program") - else: - local_model = os.path.join(dirname, model_filename) - - with open(local_model, "rb") as f: - program_desc_str = f.read() - - program = Program.parse_from_string(program_desc_str) - - if not core._is_program_version_supported(program._version()): - raise ValueError("Unsupported program version: %d\n" % - program._version()) - - _logger.info("Start Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - _load_persistable_vars(executor, dirname, program, [lookup_table_var_name]) - _load_lookup_table_vars(executor, dirname, program, [lookup_table_var_name]) - - _logger.info("Finish Load Sparse Program With " - "Distributed Lookup Table Vars from {}, time = {}".format( - dirname, time.ctime())) - - return program - - -def get_inference_model(main_program, feeded_var_names, target_vars): - """ - Prune the given `main_program` to build a new program especially for inference with distributed lookup table , - and then add `feeded_vars` and `target_vars` in this program. - - Args: - main_program(Program|None): The original program, which will be pruned to - build the inference model. If is set None, - the default main program will be used. - Default: None. - feeded_var_names(list[str]): Names of variables that need to be fed data - during inference. - target_vars(list[Variable]): Variables from which we can get inference - results. - Returns: - program(Program) - - Raises: - ValueError: If `feed_var_names` is not a list of basestring. - ValueError: If `target_vars` is not a list of Variable. - - """ - - def prepend_feed_ops(inference_program, - feed_target_names, - feed_holder_name='feed'): - if len(feed_target_names) == 0: - return - - global_block = inference_program.global_block() - - feed_var = global_block.create_var( - name=feed_holder_name, - type=core.VarDesc.VarType.FEED_MINIBATCH, - persistable=True) - - for i, name in enumerate(feed_target_names): - out = global_block.var(name) - global_block._prepend_op( - type='feed', - inputs={'X': [feed_var]}, - outputs={'Out': [out]}, - attrs={'col': i}) - - def append_fetch_ops(inference_program, - fetch_target_names, - fetch_holder_name='fetch'): - global_block = inference_program.global_block() - fetch_var = global_block.create_var( - name=fetch_holder_name, - type=core.VarDesc.VarType.FETCH_LIST, - persistable=True) - - for i, name in enumerate(fetch_target_names): - global_block.append_op( - type='fetch', - inputs={'X': [name]}, - outputs={'Out': [fetch_var]}, - attrs={'col': i}) - - origin_program = main_program.clone() - main_program = main_program.clone() - global_block = main_program.global_block() - - need_to_remove_op_index = [] - for i, op in enumerate(global_block.ops): - op.desc.set_is_target(False) - if op.type == "feed" or op.type == "fetch": - need_to_remove_op_index.append(i) - - for index in need_to_remove_op_index[::-1]: - global_block._remove_op(index) - - main_program.desc.flush() - - main_program = main_program._prune(targets=target_vars) - main_program = main_program._inference_optimize(prune_read_op=True) - - fetch_var_names = [v.name for v in target_vars] - - prepend_feed_ops(main_program, feeded_var_names) - append_fetch_ops(main_program, fetch_var_names) - - return main_program diff --git a/python/paddle/fluid/incubate/data_generator/__init__.py b/python/paddle/fluid/incubate/data_generator/__init__.py deleted file mode 100644 index 8d31a68e8083d..0000000000000 --- a/python/paddle/fluid/incubate/data_generator/__init__.py +++ /dev/null @@ -1,375 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -__all__ = ['MultiSlotDataGenerator', 'MultiSlotStringDataGenerator'] - - -class DataGenerator(object): - """ - DataGenerator is a general Base class for user to inherit - A user who wants to define his/her own python processing logic - with paddle.fluid.dataset should inherit this class. - """ - - def __init__(self): - self._proto_info = None - self.batch_size_ = 32 - - def _set_line_limit(self, line_limit): - if not isinstance(line_limit, int): - raise ValueError("line_limit%s must be in int type" % - type(line_limit)) - if line_limit < 1: - raise ValueError("line_limit can not less than 1") - self._line_limit = line_limit - - def set_batch(self, batch_size): - ''' - Set batch size of current DataGenerator - This is necessary only if a user wants to define generator_batch - - Example: - - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", int_words) - return local_iter - - def generate_batch(self, samples): - def local_iter(): - for s in samples: - yield ("words", s[1].extend([s[1][0]])) - mydata = MyData() - mydata.set_batch(128) - - ''' - self.batch_size_ = batch_size - - def run_from_memory(self): - ''' - This function generator data from memory, it is usually used for - debug and benchmarking - - Example: - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - - def generate_sample(self, line): - def local_iter(): - yield ("words", [1, 2, 3, 4]) - return local_iter - - mydata = MyData() - mydata.run_from_memory() - ''' - batch_samples = [] - line_iter = self.generate_sample(None) - for user_parsed_line in line_iter(): - if user_parsed_line == None: - continue - batch_samples.append(user_parsed_line) - if len(batch_samples) == self.batch_size_: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - batch_samples = [] - if len(batch_samples) > 0: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - - def run_from_stdin(self): - ''' - This function reads the data row from stdin, parses it with the - process function, and further parses the return value of the - process function with the _gen_str function. The parsed data will - be wrote to stdout and the corresponding protofile will be - generated. - - Example: - - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", [int_words]) - return local_iter - - mydata = MyData() - mydata.run_from_stdin() - - ''' - batch_samples = [] - for line in sys.stdin: - line_iter = self.generate_sample(line) - for user_parsed_line in line_iter(): - if user_parsed_line == None: - continue - batch_samples.append(user_parsed_line) - if len(batch_samples) == self.batch_size_: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - batch_samples = [] - if len(batch_samples) > 0: - batch_iter = self.generate_batch(batch_samples) - for sample in batch_iter(): - sys.stdout.write(self._gen_str(sample)) - - def _gen_str(self, line): - ''' - Further processing the output of the process() function rewritten by - user, outputting data that can be directly read by the datafeed,and - updating proto_info information. - - Args: - line(str): the output of the process() function rewritten by user. - - Returns: - Return a string data that can be read directly by the datafeed. - ''' - raise NotImplementedError( - "pls use MultiSlotDataGenerator or PairWiseDataGenerator") - - def generate_sample(self, line): - ''' - This function needs to be overridden by the user to process the - original data row into a list or tuple. - - Args: - line(str): the original data row - - Returns: - Returns the data processed by the user. - The data format is list or tuple: - [(name, [feasign, ...]), ...] - or ((name, [feasign, ...]), ...) - - For example: - [("words", [1926, 08, 17]), ("label", [1])] - or (("words", [1926, 08, 17]), ("label", [1])) - - Note: - The type of feasigns must be in int or float. Once the float - element appears in the feasign, the type of that slot will be - processed into a float. - - Example: - - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", [int_words]) - return local_iter - - ''' - raise NotImplementedError( - "Please rewrite this function to return a list or tuple: " + - "[(name, [feasign, ...]), ...] or ((name, [feasign, ...]), ...)") - - def generate_batch(self, samples): - ''' - This function needs to be overridden by the user to process the - generated samples from generate_sample(self, str) function - It is usually used as batch processing when a user wants to - do preprocessing on a batch of samples, e.g. padding according to - the max length of a sample in the batch - - Args: - samples(list tuple): generated sample from generate_sample - - Returns: - a python generator, the same format as return value of generate_sample - - Example: - - .. code-block:: python - import paddle.fluid.incubate.data_generator as dg - class MyData(dg.DataGenerator): - - def generate_sample(self, line): - def local_iter(): - int_words = [int(x) for x in line.split()] - yield ("words", int_words) - return local_iter - - def generate_batch(self, samples): - def local_iter(): - for s in samples: - yield ("words", s[1].extend([s[1][0]])) - mydata = MyData() - mydata.set_batch(128) - ''' - - def local_iter(): - for sample in samples: - yield sample - - return local_iter - - -# TODO: guru4elephant -# add more generalized DataGenerator that can adapt user-defined slot -# for example, [(name, float_list), (name, str_list), (name, int_list)] -class MultiSlotStringDataGenerator(DataGenerator): - def _gen_str(self, line): - ''' - Further processing the output of the process() function rewritten by - user, outputting data that can be directly read by the MultiSlotDataFeed, - and updating proto_info information. - - The input line will be in this format: - >>> [(name, [str(feasign), ...]), ...] - >>> or ((name, [str(feasign), ...]), ...) - The output will be in this format: - >>> [ids_num id1 id2 ...] ... - - For example, if the input is like this: - >>> [("words", ["1926", "08", "17"]), ("label", ["1"])] - >>> or (("words", ["1926", "08", "17"]), ("label", ["1"])) - the output will be: - >>> 3 1234 2345 3456 1 1 - - Args: - line(str): the output of the process() function rewritten by user. - - Returns: - Return a string data that can be read directly by the MultiSlotDataFeed. - ''' - if not isinstance(line, list) and not isinstance(line, tuple): - raise ValueError( - "the output of process() must be in list or tuple type" - "Examples: [('words', ['1926', '08', '17']), ('label', ['1'])]") - output = "" - for index, item in enumerate(line): - name, elements = item - if output: - output += " " - out_str = [] - out_str.append(str(len(elements))) - out_str.extend(elements) - output += " ".join(out_str) - return output + "\n" - - -class MultiSlotDataGenerator(DataGenerator): - def _gen_str(self, line): - ''' - Further processing the output of the process() function rewritten by - user, outputting data that can be directly read by the MultiSlotDataFeed, - and updating proto_info information. - - The input line will be in this format: - >>> [(name, [feasign, ...]), ...] - >>> or ((name, [feasign, ...]), ...) - The output will be in this format: - >>> [ids_num id1 id2 ...] ... - The proto_info will be in this format: - >>> [(name, type), ...] - - For example, if the input is like this: - >>> [("words", [1926, 08, 17]), ("label", [1])] - >>> or (("words", [1926, 08, 17]), ("label", [1])) - the output will be: - >>> 3 1234 2345 3456 1 1 - the proto_info will be: - >>> [("words", "uint64"), ("label", "uint64")] - - Args: - line(str): the output of the process() function rewritten by user. - - Returns: - Return a string data that can be read directly by the MultiSlotDataFeed. - ''' - if not isinstance(line, list) and not isinstance(line, tuple): - raise ValueError( - "the output of process() must be in list or tuple type" - "Example: [('words', [1926, 08, 17]), ('label', [1])]") - output = "" - - if self._proto_info is None: - self._proto_info = [] - for item in line: - name, elements = item - if not isinstance(name, str): - raise ValueError("name%s must be in str type" % type(name)) - if not isinstance(elements, list): - raise ValueError("elements%s must be in list type" % - type(elements)) - if not elements: - raise ValueError( - "the elements of each field can not be empty, you need padding it in process()." - ) - self._proto_info.append((name, "uint64")) - if output: - output += " " - output += str(len(elements)) - for elem in elements: - if isinstance(elem, float): - self._proto_info[-1] = (name, "float") - elif not isinstance(elem, int) and not isinstance(elem, - long): - raise ValueError( - "the type of element%s must be in int or float" % - type(elem)) - output += " " + str(elem) - else: - if len(line) != len(self._proto_info): - raise ValueError( - "the complete field set of two given line are inconsistent.") - for index, item in enumerate(line): - name, elements = item - if not isinstance(name, str): - raise ValueError("name%s must be in str type" % type(name)) - if not isinstance(elements, list): - raise ValueError("elements%s must be in list type" % - type(elements)) - if not elements: - raise ValueError( - "the elements of each field can not be empty, you need padding it in process()." - ) - if name != self._proto_info[index][0]: - raise ValueError( - "the field name of two given line are not match: require<%s>, get<%s>." - % (self._proto_info[index][0], name)) - if output: - output += " " - output += str(len(elements)) - for elem in elements: - if self._proto_info[index][1] != "float": - if isinstance(elem, float): - self._proto_info[index] = (name, "float") - elif not isinstance(elem, int) and not isinstance(elem, - long): - raise ValueError( - "the type of element%s must be in int or float" - % type(elem)) - output += " " + str(elem) - return output + "\n" diff --git a/python/paddle/fluid/incubate/data_generator/test_data_generator.py b/python/paddle/fluid/incubate/data_generator/test_data_generator.py deleted file mode 100644 index dcacd67e92a88..0000000000000 --- a/python/paddle/fluid/incubate/data_generator/test_data_generator.py +++ /dev/null @@ -1,36 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -from __init__ import * - - -class SyntheticData(MultiSlotDataGenerator): - def generate_sample(self, line): - def data_iter(): - for i in range(10000): - yield ("words", [1, 2, 3, 4]), ("label", [0]) - - return data_iter - - -class SyntheticStringData(MultiSlotStringDataGenerator): - def generate_sample(self, line): - def data_iter(): - for i in range(10000): - yield ("words", ["1", "2", "3", "4"], ("label", ["0"])) - - -sd = SyntheticData() -sd.run_from_memory() - -sd2 = SyntheticStringData() -sd.run_from_memory() diff --git a/python/paddle/incubate/__init__.py b/python/paddle/incubate/__init__.py index 662515f0e52b1..f94364a30fd0e 100644 --- a/python/paddle/incubate/__init__.py +++ b/python/paddle/incubate/__init__.py @@ -13,9 +13,7 @@ # limitations under the License. from . import optimizer -from ..fluid.contrib import reader from ..fluid.layer_helper import LayerHelper __all__ = [] -__all__ += ["reader"] __all__ += optimizer.__all__ diff --git a/python/setup.py.in b/python/setup.py.in index fae860464fb33..ab306dddc171d 100644 --- a/python/setup.py.in +++ b/python/setup.py.in @@ -176,11 +176,9 @@ packages=['paddle', 'paddle.fluid.contrib', 'paddle.fluid.contrib.decoder', 'paddle.fluid.contrib.quantize', - 'paddle.fluid.contrib.reader', 'paddle.fluid.contrib.slim', 'paddle.fluid.contrib.slim.quantization', 'paddle.fluid.contrib.slim.quantization.imperative', - 'paddle.fluid.contrib.utils', 'paddle.fluid.contrib.extend_optimizer', 'paddle.fluid.contrib.mixed_precision', 'paddle.fluid.contrib.mixed_precision.bf16', @@ -188,7 +186,6 @@ packages=['paddle', 'paddle.fluid.transpiler', 'paddle.fluid.transpiler.details', 'paddle.fluid.incubate', - 'paddle.fluid.incubate.data_generator', 'paddle.fluid.incubate.fleet', 'paddle.fluid.incubate.checkpoint', 'paddle.fluid.incubate.fleet.base', From d349bf8bb775bd3c1c465698909cbf05e2c3e53d Mon Sep 17 00:00:00 2001 From: tianshuo78520a <707759223@qq.com> Date: Tue, 20 Apr 2021 15:24:49 +0800 Subject: [PATCH 2/4] fix reader --- python/paddle/fluid/contrib/__init__.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/paddle/fluid/contrib/__init__.py b/python/paddle/fluid/contrib/__init__.py index df41e649ca8cb..30981f531289a 100644 --- a/python/paddle/fluid/contrib/__init__.py +++ b/python/paddle/fluid/contrib/__init__.py @@ -22,11 +22,7 @@ from .op_frequence import * from . import quantize from .quantize import * -from . import reader -from .reader import * from . import slim -from . import utils -from .utils import * from . import extend_optimizer from .extend_optimizer import * from . import model_stat @@ -42,8 +38,6 @@ __all__ += memory_usage_calc.__all__ __all__ += op_frequence.__all__ __all__ += quantize.__all__ -__all__ += reader.__all__ -__all__ += utils.__all__ __all__ += extend_optimizer.__all__ __all__ += ['mixed_precision'] __all__ += layers.__all__ From af3cde41fd5bf100e977b1b682f1ffda68f61551 Mon Sep 17 00:00:00 2001 From: tianshuo78520a <707759223@qq.com> Date: Tue, 20 Apr 2021 15:50:04 +0800 Subject: [PATCH 3/4] fix reader --- .../contrib/tests/test_distributed_reader.py | 45 ------------------- 1 file changed, 45 deletions(-) delete mode 100644 python/paddle/fluid/contrib/tests/test_distributed_reader.py diff --git a/python/paddle/fluid/contrib/tests/test_distributed_reader.py b/python/paddle/fluid/contrib/tests/test_distributed_reader.py deleted file mode 100644 index b964168eb3a2f..0000000000000 --- a/python/paddle/fluid/contrib/tests/test_distributed_reader.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from __future__ import print_function - -import unittest -import numpy as np -import paddle.fluid as fluid -import os - - -def data_generator(): - data = [0, 1, 2, 3] - for val in data: - yield val - - -class TestDistributedReader(unittest.TestCase): - def test_distributed_reader(self): - trainer_num = 4 - os.environ['PADDLE_TRAINER_ID'] = str(1) - os.environ['PADDLE_TRAINERS_NUM'] = str(trainer_num) - - reader = fluid.contrib.reader.distributed_batch_reader(data_generator) - data = next(reader()) - assert data == 1 - - #Note: windows python3 don't have unsetenv - del os.environ['PADDLE_TRAINER_ID'] - del os.environ['PADDLE_TRAINERS_NUM'] - - -if __name__ == '__main__': - unittest.main() From ffe35dba1fed9d095bdb7a561952a53d131aec38 Mon Sep 17 00:00:00 2001 From: tianshuo78520a <707759223@qq.com> Date: Tue, 20 Apr 2021 17:47:10 +0800 Subject: [PATCH 4/4] fix test_fleet_utils --- .../fleet/utils/fleet_barrier_util.py | 56 ------------------- .../fluid/tests/unittests/test_fleet_utils.py | 10 ---- 2 files changed, 66 deletions(-) delete mode 100644 python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py diff --git a/python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py b/python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py deleted file mode 100644 index a9fd8ac74f428..0000000000000 --- a/python/paddle/fluid/incubate/fleet/utils/fleet_barrier_util.py +++ /dev/null @@ -1,56 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.contrib.utils import HDFSClient -import os -import time - - -def check_all_trainers_ready(ready_path, epoch): - trainer_num = fleet.worker_num() - trainer_id = fleet.worker_index() - - hadoop_home = os.getenv("HADOOP_HOME") - configs = { - "fs.default.name": os.getenv("FS_NAME"), - "hadoop.job.ugi": os.getenv("FS_UGI") - } - - node_ready = "ready.{}.{}.done".format(epoch, trainer_id) - - with open(node_ready, "w") as node: - node.write("") - - client = HDFSClient(hadoop_home, configs) - if not client.is_dir(ready_path): - client.makedirs(ready_path) - client.upload( - hdfs_path=ready_path, - local_path=node_ready, - overwrite=True, - retry_times=0) - - print("PUT {} ON HDFS {} OK".format(node_ready, ready_path)) - - while True: - ready_num = len(client.ls(ready_path)) - print("have {} trainers need to be ready".format(trainer_num - ready_num - % trainer_num)) - if ready_num % trainer_num == 0: - break - time.sleep(10) - ready_num = len(client.ls(ready_path)) - - print("All trainers are ready, continue training") diff --git a/python/paddle/fluid/tests/unittests/test_fleet_utils.py b/python/paddle/fluid/tests/unittests/test_fleet_utils.py index 51c12375948f5..09de4867ef9f4 100644 --- a/python/paddle/fluid/tests/unittests/test_fleet_utils.py +++ b/python/paddle/fluid/tests/unittests/test_fleet_utils.py @@ -24,7 +24,6 @@ from paddle.dataset.common import download, DATA_HOME import paddle.fluid.incubate.fleet.base.role_maker as role_maker from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet -from paddle.fluid.incubate.fleet.utils.fleet_barrier_util import check_all_trainers_ready from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil import paddle.fluid.incubate.fleet.utils.utils as utils @@ -50,15 +49,6 @@ def test_fleet_util_init(self): fleet_util_transpiler = FleetUtil(mode="transpiler") self.assertRaises(Exception, FleetUtil, "other") - def test_fleet_barrier(self): - role = role_maker.UserDefinedRoleMaker( - current_id=0, - role=role_maker.Role.WORKER, - worker_num=1, - server_endpoints=['127.0.0.1']) - fleet.init(role) - check_all_trainers_ready("/ready_path/", 0) - def test_program_type_trans(self): data_dir = self.download_files() program_dir = os.path.join(data_dir, self.pruned_dir)