Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LoDTensor using in fluid (Part 1) #46663

Merged
merged 4 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
8 changes: 4 additions & 4 deletions paddle/fluid/distributed/fleet_executor/dist_model.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ bool IsPersistable(const framework::VarDesc *var) {
}

bool LoadDataFromDistModelTensor(const DistModelTensor &input_data,
framework::LoDTensor *input_tensor,
phi::DenseTensor *input_tensor,
const platform::Place &place) {
VLOG(3) << "Loading data from DistModelTensor for " << input_data.name;
framework::DDim dims = phi::make_ddim(input_data.shape);
Expand Down Expand Up @@ -515,7 +515,7 @@ bool DistModel::FeedData(const std::vector<DistModelTensor> &input_data,
feed_tensors_.resize(feeds_.size());
for (size_t i = 0; i < input_data.size(); ++i) {
// feed each data separately
framework::LoDTensor *input_tensor = &(feed_tensors_[i]);
phi::DenseTensor *input_tensor = &(feed_tensors_[i]);
if (!LoadDataFromDistModelTensor(input_data[i], input_tensor, place_)) {
LOG(ERROR) << "Fail to load data from tensor " << input_data[i].name;
return false;
Expand Down Expand Up @@ -556,7 +556,7 @@ bool DistModel::FetchResults(std::vector<DistModelTensor> *output_data,
i));
framework::FetchType &fetch_var =
framework::GetFetchVariable(*scope, "fetch", idx);
auto &fetch = PADDLE_GET(framework::LoDTensor, fetch_var);
auto &fetch = PADDLE_GET(phi::DenseTensor, fetch_var);
auto type = framework::TransToProtoVarType(fetch.dtype());
auto output = &(output_data->at(i));
output->name = idx_to_fetches_[idx];
Expand Down Expand Up @@ -587,7 +587,7 @@ bool DistModel::FetchResults(std::vector<DistModelTensor> *output_data,
}

template <typename T>
bool DistModel::FetchResult(const framework::LoDTensor &fetch,
bool DistModel::FetchResult(const phi::DenseTensor &fetch,
DistModelTensor *output_data) {
auto shape = phi::vectorize(fetch.dims());
output_data->shape.assign(shape.begin(), shape.end());
Expand Down
5 changes: 2 additions & 3 deletions paddle/fluid/distributed/fleet_executor/dist_model.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ class DistModel {
bool FetchResults(std::vector<DistModelTensor>* output_data,
framework::Scope* scope);
template <typename T>
bool FetchResult(const framework::LoDTensor& fetch,
DistModelTensor* output_data);
bool FetchResult(const phi::DenseTensor& fetch, DistModelTensor* output_data);

std::string carrier_id_;
std::vector<framework::LoDTensor> feed_tensors_;
std::vector<phi::DenseTensor> feed_tensors_;
std::vector<framework::OpDesc*> feeds_;
std::map<std::string, int64_t> feed_names_;
std::map<int64_t, std::string> idx_to_feeds_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ std::vector<framework::OperatorBase*> GetOps() {
framework::Scope* GetScope() {
framework::Scope* scope = new framework::Scope();

scope->Var("x")->GetMutable<framework::LoDTensor>();
scope->Var("out")->GetMutable<framework::LoDTensor>();
scope->Var("x")->GetMutable<phi::DenseTensor>();
scope->Var("out")->GetMutable<phi::DenseTensor>();
return scope;
}

Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/ps/service/brpc_ps_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1501,7 +1501,7 @@ int32_t BrpcPsClient::RecvAndSaveTable(const uint64_t table_id,
auto &dev_ctx = *pool.Get(place);

framework::Variable *var = scope->Var(var_name);
framework::LoDTensor *var_tensor = var->GetMutable<framework::LoDTensor>();
phi::DenseTensor *var_tensor = var->GetMutable<phi::DenseTensor>();

std::vector<int64_t> vec_dim = {var_num, var_shape};
var_tensor->Resize(phi::make_ddim(vec_dim));
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/distributed/ps/service/brpc_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ void SerializeToMultiVarMsgAndIOBuf(

framework::Variable* var = scope->FindVar(send_var_name);

if (var->IsType<framework::LoDTensor>()) {
if (var->IsType<phi::DenseTensor>()) {
SerializeLodTensor(var, ctx, send_var_msg, &temp_iobuf);
} else if (var->IsType<phi::SelectedRows>()) {
SerializeSelectedRows(var, ctx, send_var_msg, &temp_iobuf);
Expand All @@ -92,7 +92,7 @@ void SerializeLodTensor(framework::Variable* var,
const platform::DeviceContext& ctx,
VarMsg* var_msg,
butil::IOBuf* iobuf) {
auto* tensor = var->GetMutable<framework::LoDTensor>();
auto* tensor = var->GetMutable<phi::DenseTensor>();
var_msg->set_type(::paddle::distributed::LOD_TENSOR);
const framework::LoD lod = tensor->lod();
if (lod.size() > 0) {
Expand Down Expand Up @@ -229,7 +229,7 @@ void DeserializeLodTensor(framework::Variable* var,
butil::IOBufBytesIterator& io_buffer_itr, // NOLINT
const platform::DeviceContext& ctx) {
const auto place = ctx.GetPlace();
framework::LoDTensor* tensor = var->GetMutable<framework::LoDTensor>();
phi::DenseTensor* tensor = var->GetMutable<phi::DenseTensor>();
std::vector<int> vec_dim;
for (auto& x : msg.dims()) {
vec_dim.push_back(x);
Expand Down
54 changes: 27 additions & 27 deletions paddle/fluid/distributed/ps/service/communicator/communicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ limitations under the License. */
namespace paddle {
namespace distributed {

using framework::LoDTensor;
using LoDTensor = phi::DenseTensor;
using phi::SelectedRows;

const uint32_t MAX_FEASIGN_NUM = 1024 * 100 * 100;
Expand Down Expand Up @@ -243,7 +243,7 @@ void Communicator::RpcSendSparseParam(const std::string &varname,
std::vector<float *> push_g_vec;

auto *send_var = scope.FindVar(varname);
auto *tensor = send_var->GetMutable<framework::LoDTensor>();
auto *tensor = send_var->GetMutable<phi::DenseTensor>();
auto dim = tensor->dims()[1];
uint64_t sparse_num = static_cast<uint64_t>(tensor->dims()[0]);
std::vector<uint64_t> sparse_push_keys(sparse_num);
Expand Down Expand Up @@ -340,7 +340,7 @@ void Communicator::RpcRecvSparse(const std::string &varname,
platform::TracerEventType::Communication,
1);
auto *send_var = scope->Var(varname);
auto *tensor = send_var->GetMutable<framework::LoDTensor>();
auto *tensor = send_var->GetMutable<phi::DenseTensor>();
auto dim = tensor->dims()[1];
uint64_t sparse_num = static_cast<uint64_t>(tensor->dims()[0]);

Expand Down Expand Up @@ -418,7 +418,7 @@ void Communicator::SendGlobalStep(const CommContext &ctx,

auto &var_name = STEP_COUNTER;
auto *out_var = send_scope->Var(var_name);
auto *out_t = out_var->GetMutable<framework::LoDTensor>();
auto *out_t = out_var->GetMutable<phi::DenseTensor>();
auto *data = out_t->mutable_data<int64_t>({1}, platform::CPUPlace());
data[0] = static_cast<int64_t>(batches);
VLOG(3) << "Communicator::SendGlobalStep send: " << batches;
Expand Down Expand Up @@ -598,12 +598,12 @@ void AsyncCommunicator::PullSparseToTensorSync(
fea_keys.reserve(MAX_FEASIGN_NUM / 100);
pull_result_ptr.reserve(MAX_FEASIGN_NUM / 100);
std::vector<float> init_value(fea_dim, 0);
framework::LoDTensor *output = nullptr;
phi::DenseTensor *output = nullptr;
float *output_data = nullptr;
size_t output_index = -1;
size_t output_len = 0;
for (size_t index = 0; index < inputs->size(); ++index) {
const framework::LoDTensor *tensor = inputs->at(index);
const phi::DenseTensor *tensor = inputs->at(index);
const int64_t *ids = tensor->data<int64_t>();
size_t len = tensor->numel();
for (size_t i = 0; i < len; ++i, output_len += fea_dim) {
Expand Down Expand Up @@ -646,10 +646,10 @@ void AsyncCommunicator::PushSparseFromTensorAsync(
int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const framework::LoDTensor *> *inputs,
const framework::LoDTensor *shows,
const framework::LoDTensor *clks,
std::vector<framework::LoDTensor *> *outputs) {
std::vector<const phi::DenseTensor *> *inputs,
const phi::DenseTensor *shows,
const phi::DenseTensor *clks,
std::vector<phi::DenseTensor *> *outputs) {
int batch_size = -1;
bool batch_size_consist = true;
for (auto *input : *inputs) {
Expand Down Expand Up @@ -688,7 +688,7 @@ void AsyncCommunicator::PushSparseFromTensorAsync(
// const long int* clk_tensor = clks->data<int64_t>();

for (size_t index = 0; index < inputs->size(); ++index) {
framework::LoDTensor *g_tensor = outputs->at(index);
phi::DenseTensor *g_tensor = outputs->at(index);
float *g = g_tensor->data<float>();

if (batch_size_consist) { // TODO(zhaocaibei123): add config
Expand All @@ -700,7 +700,7 @@ void AsyncCommunicator::PushSparseFromTensorAsync(
batch_size; // hard code here, because of cvm_grad op
}

const framework::LoDTensor *tensor = inputs->at(index);
const phi::DenseTensor *tensor = inputs->at(index);
const int64_t *ids = tensor->data<int64_t>();
size_t len = tensor->numel();
output_len = 0;
Expand Down Expand Up @@ -872,7 +872,7 @@ bool AsyncCommunicator::Check(const std::vector<std::string> &var_tables) {
if (table_name == STEP_COUNTER) {
VLOG(3) << "send step_counter into queue";
auto tmp_var = std::make_shared<Variable>();
auto *tensor = tmp_var->GetMutable<framework::LoDTensor>();
auto *tensor = tmp_var->GetMutable<phi::DenseTensor>();
tensor->Resize(phi::make_ddim({1}));
auto *out_d = tensor->mutable_data<int64_t>(platform::CPUPlace());
out_d[0] = 1;
Expand Down Expand Up @@ -1164,13 +1164,13 @@ void GeoCommunicator::InitDense(std::vector<std::string> &varnames,
// copy to old_scope
for (auto &t : varnames) {
auto *global_var = recv_scope_->FindVar(t);
global_var->GetMutable<framework::LoDTensor>();
global_var->GetMutable<phi::DenseTensor>();
auto *old_var = old_scope_->Var(t);
old_var->GetMutable<framework::LoDTensor>();
old_var->GetMutable<phi::DenseTensor>();
framework::CopyVariable(*global_var, old_var); // src, dst
// init pserver_scope_
auto *pserver_var = pserver_scope_->Var(t);
pserver_var->GetMutable<framework::LoDTensor>();
pserver_var->GetMutable<phi::DenseTensor>();
framework::CopyVariable(*global_var, pserver_var);
}
VLOG(1) << "init dense table " << table_id << " done";
Expand All @@ -1196,12 +1196,12 @@ void GeoCommunicator::SendDense(const CommContext &send_ctx) {
platform::errors::Unavailable(
"%s is not initialized, please check", param_name));

auto &t_latest = var_latest->Get<framework::LoDTensor>();
auto t_timestamp = var_timestamp->GetMutable<framework::LoDTensor>();
auto &t_latest = var_latest->Get<phi::DenseTensor>();
auto t_timestamp = var_timestamp->GetMutable<phi::DenseTensor>();

phi::CPUContext cpu_ctx;
auto *var_delta = delta_scope_->Var(varname);
auto *t_delta = var_delta->GetMutable<framework::LoDTensor>();
auto *t_delta = var_delta->GetMutable<phi::DenseTensor>();
t_delta->mutable_data<float>(t_latest.dims(), cpu_ctx.GetPlace());

auto blas = phi::funcs::GetBlas<phi::CPUContext, float>(cpu_ctx);
Expand Down Expand Up @@ -1237,16 +1237,16 @@ void GeoCommunicator::RecvDense(const CommContext &send_ctx) {
phi::CPUContext cpu_ctx;
for (auto &varname : varnames) {
auto *var_latest = recv_scope_->FindVar(varname);
auto t_latest = var_latest->GetMutable<framework::LoDTensor>();
auto t_latest = var_latest->GetMutable<phi::DenseTensor>();

auto *var_old = old_scope_->FindVar(varname);
auto t_old = var_old->GetMutable<framework::LoDTensor>();
auto t_old = var_old->GetMutable<phi::DenseTensor>();

auto *var_pserver = pserver_scope_->FindVar(varname);
auto t_pserver = var_pserver->Get<framework::LoDTensor>();
auto t_pserver = var_pserver->Get<phi::DenseTensor>();

auto *var_delta = delta_scope_->Var(varname);
auto *t_delta = var_delta->GetMutable<framework::LoDTensor>();
auto *t_delta = var_delta->GetMutable<phi::DenseTensor>();
t_delta->mutable_data<float>(t_latest->dims(), cpu_ctx.GetPlace());

auto blas = phi::funcs::GetBlas<phi::CPUContext, float>(cpu_ctx);
Expand Down Expand Up @@ -1347,8 +1347,8 @@ void GeoCommunicator::SendSparse(const std::string &varname,
platform::errors::Unavailable(
"%s is not initialized, please check", param_name));

auto &t_latest = var_latest->Get<framework::LoDTensor>();
auto *t_old = var_old->GetMutable<framework::LoDTensor>();
auto &t_latest = var_latest->Get<phi::DenseTensor>();
auto *t_old = var_old->GetMutable<phi::DenseTensor>();

auto dims1 = t_latest.dims()[1];
phi::CPUContext cpu_ctx;
Expand Down Expand Up @@ -1427,8 +1427,8 @@ void GeoCommunicator::RecvSparse(const std::string &varname,
auto *var_latest = recv_scope_->FindVar(param);
auto *var_old = old_scope_->FindVar(param);

auto *t_latest = var_latest->GetMutable<framework::LoDTensor>();
auto *t_old = var_old->GetMutable<framework::LoDTensor>();
auto *t_latest = var_latest->GetMutable<phi::DenseTensor>();
auto *t_old = var_old->GetMutable<phi::DenseTensor>();

auto dims1 = t_latest->dims()[1];
auto numel = keys.size() * dims1;
Expand Down
33 changes: 16 additions & 17 deletions paddle/fluid/distributed/ps/service/communicator/communicator.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,16 @@ inline void MergeVars(const std::string &var_name,
auto &var0 = vars[0];
auto *out_var = scope->Var(var_name);

if (var0->IsType<framework::LoDTensor>()) {
auto dims = var0->Get<framework::LoDTensor>().dims();
if (var0->IsType<phi::DenseTensor>()) {
auto dims = var0->Get<phi::DenseTensor>().dims();
VLOG(3) << "merge " << var_name << " LoDTensor dims " << dims
<< "; merge add: " << merge_add;
// init output tensor
auto *out_t = out_var->GetMutable<framework::LoDTensor>();
auto *out_t = out_var->GetMutable<phi::DenseTensor>();
out_t->mutable_data<T>(dims, cpu_place);
// check the input dims
for (auto &var : vars) {
auto &var_t = var->Get<framework::LoDTensor>();
auto &var_t = var->Get<phi::DenseTensor>();
PADDLE_ENFORCE_EQ(
var_t.dims(),
dims,
Expand All @@ -192,7 +192,7 @@ inline void MergeVars(const std::string &var_name,
// sum all vars to out
auto result = EigenVector<T>::Flatten(*out_t);
for (auto &var : vars) {
auto &in_t = var->Get<framework::LoDTensor>();
auto &in_t = var->Get<phi::DenseTensor>();
auto in = EigenVector<T>::Flatten(in_t);
result.device(*cpu_ctx.eigen_device()) = result + in;
}
Expand Down Expand Up @@ -484,18 +484,17 @@ class AsyncCommunicator : public Communicator {
uint64_t padding_id,
platform::Place place,
bool is_training,
std::vector<const framework::LoDTensor *> *inputs, // NOLINT
std::vector<framework::LoDTensor *> *outputs); // NOLINT

void PushSparseFromTensorAsync(
const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const framework::LoDTensor *> *inputs,
const framework::LoDTensor *shows,
const framework::LoDTensor *clicks,
std::vector<framework::LoDTensor *> *outputs);
std::vector<const phi::DenseTensor *> *inputs, // NOLINT
std::vector<phi::DenseTensor *> *outputs); // NOLINT

void PushSparseFromTensorAsync(const uint64_t table_id,
int fea_dim,
uint64_t padding_id,
platform::Place place,
std::vector<const phi::DenseTensor *> *inputs,
const phi::DenseTensor *shows,
const phi::DenseTensor *clicks,
std::vector<phi::DenseTensor *> *outputs);

protected:
std::unordered_map<std::string,
Expand Down
6 changes: 3 additions & 3 deletions paddle/fluid/distributed/ps/service/heter_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ std::shared_ptr<HeterClient> HeterClient::switch_s_instance_ = nullptr;
int GetMicroId(const platform::DeviceContext& ctx,
const framework::Scope* scope) {
framework::Variable* var = scope->FindVar("microbatch_id");
PADDLE_ENFORCE_EQ(var->IsType<framework::LoDTensor>(),
PADDLE_ENFORCE_EQ(var->IsType<phi::DenseTensor>(),
true,
platform::errors::InvalidArgument(
"the type of micro id shoulde be LoDTensor."));
auto micro_id = -1;
auto* tensor = var->GetMutable<framework::LoDTensor>();
auto* tensor = var->GetMutable<phi::DenseTensor>();
if (platform::is_cpu_place(tensor->place())) {
auto data = reinterpret_cast<const float*>(tensor->data());
micro_id = static_cast<int>(data[0]);
Expand Down Expand Up @@ -251,7 +251,7 @@ int HeterClient::Send(const platform::DeviceContext& ctx,
send_var_msg->set_varname(send_var_name);
framework::Variable* var = p_scope->FindVar(send_var_name);
butil::IOBuf temp_iobuf;
if (var->IsType<framework::LoDTensor>()) {
if (var->IsType<phi::DenseTensor>()) {
SerializeLodTensor(var, ctx, send_var_msg, &temp_iobuf);
} else if (var->IsType<phi::SelectedRows>()) {
SerializeSelectedRows(var, ctx, send_var_msg, &temp_iobuf);
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/ps/service/heter_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ int SendAndRecvVariableHandler::QueryInSwitchWithScope(
LOG(INFO) << "local_scope not find var: " << req_var_name;
}
butil::IOBuf temp_iobuf;
if (var_ptr->IsType<framework::LoDTensor>()) {
if (var_ptr->IsType<phi::DenseTensor>()) {
SerializeLodTensor(var_ptr, cpu_dev_ctx, send_var_msg, &temp_iobuf);
} else if (var_ptr->IsType<phi::SelectedRows>()) {
SerializeSelectedRows(var_ptr, cpu_dev_ctx, send_var_msg, &temp_iobuf);
Expand Down
2 changes: 1 addition & 1 deletion paddle/fluid/distributed/ps/service/heter_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class SendAndRecvVariableHandler final : public ServiceHandlerBase {
nullptr,
platform::errors::InvalidArgument(
"Not find variable microbatch_id in scope."));
auto* tensor = var->GetMutable<framework::LoDTensor>();
auto* tensor = var->GetMutable<phi::DenseTensor>();
auto data = reinterpret_cast<const float*>(tensor->data());
auto micro_id = static_cast<int>(data[0]);
VLOG(4) << "micro_id in heter server: " << micro_id;
Expand Down
Loading