Skip to content

Commit

Permalink
Merge From Develop
Browse files Browse the repository at this point in the history
  • Loading branch information
YuanRisheng committed Nov 1, 2021
2 parents 16e6bf1 + fe81306 commit dfec7a0
Show file tree
Hide file tree
Showing 418 changed files with 28,313 additions and 8,643 deletions.
7 changes: 5 additions & 2 deletions cmake/external/boost.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ set(BOOST_PROJECT "extern_boost")
# checked that the devtools package of CentOS 6 installs boost 1.41.0.
# So we use 1.41.0 here.
set(BOOST_VER "1.41.0")
set(BOOST_TAR "boost_1_41_0" CACHE STRING "" FORCE)
# boost_1_41_0_2021_10.tar.gz is almost the same with boost_1_41_0.tar.gz,
# except in visualc.hpp i comment a warning of "unknown compiler version",
# so if you need to change boost, you may need to block the warning similarly.
set(BOOST_TAR "boost_1_41_0_2021_10" CACHE STRING "" FORCE)
set(BOOST_URL "http://paddlepaddledeps.bj.bcebos.com/${BOOST_TAR}.tar.gz" CACHE STRING "" FORCE)

MESSAGE(STATUS "BOOST_VERSION: ${BOOST_VER}, BOOST_URL: ${BOOST_URL}")
Expand All @@ -46,7 +49,7 @@ ExternalProject_Add(
${BOOST_PROJECT}
${EXTERNAL_PROJECT_LOG_ARGS}
"${BOOST_DOWNLOAD_CMD}"
URL_MD5 f891e8c2c9424f0565f0129ad9ab4aff
URL_MD5 51be7cc203628dc0848e97eee32d79e3
PREFIX ${BOOST_PREFIX_DIR}
DOWNLOAD_DIR ${BOOST_SOURCE_DIR}
SOURCE_DIR ${BOOST_SOURCE_DIR}
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/cinn.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ add_definitions(-w)
include(ExternalProject)
set(CINN_SOURCE_DIR ${THIRD_PARTY_PATH}/CINN)
# TODO(zhhsplendid): Modify git tag after we have release tag
set(CINN_GIT_TAG e422c01b7875301996a2baf67a14ba61b0e6192a)
set(CINN_GIT_TAG 2122413fc74f4020ff4397b54488a793529d581b)
set(CINN_OPTIONAL_ARGS -DPY_VERSION=${PY_VERSION} -DWITH_CUDA=${WITH_GPU} -DWITH_CUDNN=${WITH_GPU} -DPUBLISH_LIBS=ON -DWITH_TESTING=ON)
set(CINN_BUILD_COMMAND $(MAKE) cinnapi -j)
ExternalProject_Add(
Expand Down
2 changes: 1 addition & 1 deletion cmake/external/xpu.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ ELSE ()
ENDIF()

SET(XPU_BASE_URL_WITHOUT_DATE "https://baidu-kunlun-product.cdn.bcebos.com/KL-SDK/klsdk-dev")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20210921")
SET(XPU_BASE_URL "${XPU_BASE_URL_WITHOUT_DATE}/20211020")
SET(XPU_XRE_URL "${XPU_BASE_URL}/${XPU_XRE_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
SET(XPU_XDNN_URL "${XPU_BASE_URL}/${XPU_XDNN_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
SET(XPU_XCCL_URL "${XPU_BASE_URL_WITHOUT_DATE}/20210623/${XPU_XCCL_DIR_NAME}.tar.gz" CACHE STRING "" FORCE)
Expand Down
6 changes: 4 additions & 2 deletions cmake/operators.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ function(op_library TARGET)
list(REMOVE_ITEM hip_srcs "cholesky_op.cu")
list(REMOVE_ITEM hip_srcs "matrix_rank_op.cu")
list(REMOVE_ITEM hip_srcs "svd_op.cu")
list(REMOVE_ITEM hip_srcs "eigvalsh_op.cu")
list(REMOVE_ITEM hip_srcs "qr_op.cu")
list(REMOVE_ITEM hip_srcs "eigh_op.cu")
list(REMOVE_ITEM hip_srcs "multinomial_op.cu")
Expand Down Expand Up @@ -217,7 +218,8 @@ function(op_library TARGET)
"fusion_transpose_flatten_concat_op" "fusion_conv_inception_op"
"sync_batch_norm_op" "sparse_attention_op" "dgc_op" "fused_fc_elementwise_layernorm_op"
"skip_layernorm_op" "multihead_matmul_op" "fusion_group_op" "fused_bn_activation_op" "fused_embedding_eltwise_layernorm_op" "fusion_gru_op" "fusion_lstm_op"
"fused_bn_add_activation_op" "fused_attention_op" "resnet_unit_op")
"fused_bn_add_activation_op" "fused_attention_op" "resnet_unit_op" "fused_feedforward_op")

if ("${TARGET}" STREQUAL "${manual_pybind_op}")
set(pybind_flag 1)
endif()
Expand Down Expand Up @@ -298,7 +300,7 @@ function(op_library TARGET)
file(APPEND ${pybind_file} "USE_OP_DEVICE_KERNEL(${TARGET}, CUDNN);\n")
endif()

if (WITH_XPU AND ${xpu_cc_srcs_len} GREATER 0)
if (WITH_XPU AND ${pybind_flag} EQUAL 0 AND ${xpu_cc_srcs_len} GREATER 0)
file(APPEND ${pybind_file} "USE_OP_DEVICE_KERNEL(${TARGET}, XPU);\n")
endif()

Expand Down
1 change: 1 addition & 0 deletions cmake/third_party.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,7 @@ endif (WITH_LITE)
if (WITH_CINN)
message(STATUS "Compile Paddle with CINN.")
include(external/cinn)
add_definitions(-DPADDLE_WITH_CINN)
endif (WITH_CINN)

if (WITH_CRYPTO)
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/distributed/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER 7.0)
"${DISTRIBUTE_COMPILE_FLAGS} -faligned-new")
endif()

add_subdirectory(common)
add_subdirectory(service)
add_subdirectory(table)
add_subdirectory(test)
Expand Down
4 changes: 4 additions & 0 deletions paddle/fluid/distributed/common/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

cc_library(afs_wrapper SRCS afs_warpper.cc DEPS fs ps_framework_proto)

#set_property(GLOBAL PROPERTY COMMON_DEPS afs_warpper)
89 changes: 89 additions & 0 deletions paddle/fluid/distributed/common/afs_warpper.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/distributed/common/afs_warpper.h"
#include "paddle/fluid/framework/io/fs.h"

namespace paddle {
namespace distributed {
// AfsClient impl
int AfsClient::initialize(const FsClientParameter& fs_client_param) {
// temporarily implemented with hdfs-client
return initialize(fs_client_param.hadoop_bin(), fs_client_param.uri(),
fs_client_param.user(), fs_client_param.passwd(),
fs_client_param.buffer_size());
}
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& user, const std::string& passwd,
int buffer_size_param) {
return initialize(hadoop_bin, uri, paddle::string::format_string(
"%s,%s", user.c_str(), passwd.c_str()),
buffer_size_param);
}
int AfsClient::initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& ugi, int buffer_size_param) {
// temporarily implemented with hdfs-client
size_t buffer_size = 1L << 25; // 32MB
if (buffer_size_param > static_cast<int>(buffer_size)) {
buffer_size = buffer_size_param;
}
paddle::framework::hdfs_set_buffer_size(buffer_size);
paddle::framework::hdfs_set_command(paddle::string::format_string(
"2>>./hdfs_err.log %s fs -Dfs.default.name=%s -Dhadoop.job.ugi=%s "
"-Ddfs.client.block.write.retries=15 -Ddfs.rpc.timeout=300000",
hadoop_bin.c_str(), uri.c_str(), ugi.c_str()));
return 0;
}

// open file in 'w' or 'r'
std::shared_ptr<FsReadChannel> AfsClient::open_r(const FsChannelConfig& config,
uint32_t buffer_size,
int* err_no) {
std::shared_ptr<FsReadChannel> channel =
std::make_shared<FsReadChannel>(buffer_size);
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_read(config.path, err_no, config.deconverter);
channel->open(fp, config);
return channel;
}
std::shared_ptr<FsWriteChannel> AfsClient::open_w(const FsChannelConfig& config,
uint32_t buffer_size,
int* err_no) {
std::shared_ptr<FsWriteChannel> channel =
std::make_shared<FsWriteChannel>(buffer_size);
std::shared_ptr<FILE> fp =
paddle::framework::fs_open_write(config.path, err_no, config.converter);
channel->open(fp, config);
return channel;
}

// remove file in path, path maybe a reg, such as 'part-000-*'
void AfsClient::remove(const std::string& path) {
return paddle::framework::fs_remove(path);
}
void AfsClient::remove_dir(const std::string& dir) {
return paddle::framework::fs_remove(dir);
}

// list files in path, path maybe a dir with reg
std::vector<std::string> AfsClient::list(const std::string& path) {
return paddle::framework::fs_list(path);
}

// exist or not
bool AfsClient::exist(const std::string& dir) {
return paddle::framework::fs_exists(dir);
}
} // namespace distributed
} // namespace paddle
156 changes: 156 additions & 0 deletions paddle/fluid/distributed/common/afs_warpper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
// Copyright (c) 2021 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <functional>
#include <iostream>
#include <memory>
#include <string>
#include <vector>
#include "paddle/fluid/distributed/ps.pb.h"
#include "paddle/fluid/string/string_helper.h"

namespace paddle {
namespace distributed {
struct FsDataConverter {
std::string converter;
std::string deconverter;
};

struct FsChannelConfig {
std::string path; // path of file
std::string converter; // data converter
std::string deconverter;
};

class FsReadChannel {
public:
FsReadChannel() : _buffer_size(0) {}
explicit FsReadChannel(uint32_t buffer_size) : _buffer_size(buffer_size) {}
virtual ~FsReadChannel() {}
FsReadChannel(FsReadChannel&&) = delete;
FsReadChannel(const FsReadChannel&) = delete;
int open(std::shared_ptr<FILE> fp, const FsChannelConfig& config) {
_file = fp;
return 0;
}
inline int close() {
_file.reset();
return 0;
}

inline uint32_t read_line(std::string& line_data) { // NOLINT
line_data.clear();
char buffer = '\0';
size_t read_count = 0;
while (1 == fread(&buffer, 1, 1, _file.get()) && buffer != '\n') {
++read_count;
line_data.append(&buffer, 1);
}
if (read_count == 0 && buffer != '\n') {
return -1;
}
return 0;
}

private:
uint32_t _buffer_size;
FsChannelConfig _config;
std::shared_ptr<FILE> _file;
};
class FsWriteChannel {
public:
FsWriteChannel() : _buffer_size(0) {}
explicit FsWriteChannel(uint32_t buffer_size) : _buffer_size(buffer_size) {}
virtual ~FsWriteChannel() {}
FsWriteChannel(FsWriteChannel&&) = delete;
FsWriteChannel(const FsWriteChannel&) = delete;

int open(std::shared_ptr<FILE> fp, const FsChannelConfig& config) {
_file = fp;

// the buffer has set in fs.cc
// if (_buffer_size != 0) {
// _buffer = std::shared_ptr<char>(new char[_buffer_size]);

// CHECK(0 == setvbuf(&*_file, _buffer.get(), _IOFBF, _buffer_size));
//}
return 0;
}

inline void flush() { return; }

inline int close() {
flush();
_file.reset();
return 0;
}

inline uint32_t write_line(const char* data, uint32_t size) {
size_t write_count = fwrite_unlocked(data, 1, size, _file.get());
if (write_count != size) {
return -1;
}
write_count = fwrite_unlocked("\n", 1, 1, _file.get());
if (write_count != 1) {
return -1;
}
return 0;
}
inline uint32_t write_line(const std::string& data) {
return write_line(data.c_str(), data.size());
}

private:
uint32_t _buffer_size;
FsChannelConfig _config;
std::shared_ptr<FILE> _file;
std::shared_ptr<char> _buffer;
};

class AfsClient {
public:
AfsClient() {}
virtual ~AfsClient() {}
AfsClient(AfsClient&&) = delete;
AfsClient(const AfsClient&) = delete;

int initialize(const FsClientParameter& fs_client_param);
int initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& user, const std::string& passwd,
int buffer_size_param = (1L << 25));
int initialize(const std::string& hadoop_bin, const std::string& uri,
const std::string& ugi, int buffer_size_param = (1L << 25));

// open file in 'w' or 'r'
std::shared_ptr<FsReadChannel> open_r(const FsChannelConfig& config,
uint32_t buffer_size = 0,
int* err_no = nullptr);
std::shared_ptr<FsWriteChannel> open_w(const FsChannelConfig& config,
uint32_t buffer_size = 0,
int* err_no = nullptr);

// remove file in path, path maybe a reg, such as 'part-000-*'
void remove(const std::string& path);
void remove_dir(const std::string& dir);

// list files in path, path maybe a dir with reg
std::vector<std::string> list(const std::string& path);

// exist or not
bool exist(const std::string& dir);
};
} // namespace distributed
} // namespace paddle
Loading

0 comments on commit dfec7a0

Please sign in to comment.