Skip to content

Commit

Permalink
speed up dumpfile (PaddlePaddle#42)
Browse files Browse the repository at this point in the history
* speed up gpugraph infer,using sprintf
  • Loading branch information
seemingwang authored Jun 23, 2022
1 parent c176c00 commit 35a704f
Show file tree
Hide file tree
Showing 2 changed files with 183 additions and 19 deletions.
196 changes: 178 additions & 18 deletions paddle/fluid/framework/device_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ limitations under the License. */

#include "paddle/fluid/framework/device_worker.h"

#include <chrono>
#include "paddle/fluid/framework/convert_utils.h"

namespace phi {
class DenseTensor;
} // namespace phi
Expand Down Expand Up @@ -52,7 +52,55 @@ std::string PrintLodTensorType(Tensor* tensor, int64_t start, int64_t end,
}
return os.str();
}
template <typename T>
void PrintLodTensorType(Tensor* tensor, int64_t start, int64_t end,
std::string& out_val, char separator = ':',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
out_val += "access violation";
return;
}
if (start >= end) return;
if (!need_leading_separator) {
out_val += std::to_string(tensor->data<T>()[start]);
// os << tensor->data<T>()[start];
start++;
}
for (int64_t i = start; i < end; i++) {
// os << ":" << tensor->data<T>()[i];
// os << separator << tensor->data<T>()[i];
out_val += separator;
out_val += std::to_string(tensor->data<T>()[i]);
}
}

#define FLOAT_EPS 1e-8
#define MAX_FLOAT_BUFF_SIZE 40
template <>
void PrintLodTensorType<float>(Tensor* tensor, int64_t start, int64_t end,
std::string& out_val, char separator,
bool need_leading_separator) {
char buf[MAX_FLOAT_BUFF_SIZE];
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
out_val += "access violation";
return;
}
if (start >= end) return;
for (int64_t i = start; i < end; i++) {
if (i != start || need_leading_separator) out_val += separator;
if (tensor->data<float>()[i] > -FLOAT_EPS &&
tensor->data<float>()[i] < FLOAT_EPS)
out_val += "0";
else {
sprintf(buf, "%.9f", tensor->data<float>()[i]);
out_val += buf;
}
}
}
std::string PrintLodTensorIntType(Tensor* tensor, int64_t start, int64_t end,
char separator = ':',
bool need_leading_separator = true) {
Expand All @@ -74,6 +122,31 @@ std::string PrintLodTensorIntType(Tensor* tensor, int64_t start, int64_t end,
return os.str();
}

void PrintLodTensorIntType(Tensor* tensor, int64_t start, int64_t end,
std::string& out_val, char separator = ':',
bool need_leading_separator = true) {
auto count = tensor->numel();
if (start < 0 || end > count) {
VLOG(3) << "access violation";
out_val += "access violation";
return;
}
if (start >= end) return;
if (!need_leading_separator) {
out_val +=
std::to_string(static_cast<uint64_t>(tensor->data<int64_t>()[start]));
start++;
}
for (int64_t i = start; i < end; i++) {
// os << ":" << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
// os << separator << static_cast<uint64_t>(tensor->data<int64_t>()[i]);
out_val += separator;
out_val +=
std::to_string(static_cast<uint64_t>(tensor->data<int64_t>()[i]));
}
// return os.str();
}

std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end,
char separator, bool need_leading_separator) {
std::string out_val;
Expand All @@ -94,6 +167,25 @@ std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end,
return out_val;
}

void PrintLodTensor(Tensor* tensor, int64_t start, int64_t end,
std::string& out_val, char separator,
bool need_leading_separator) {
if (framework::TransToProtoVarType(tensor->dtype()) == proto::VarType::FP32) {
PrintLodTensorType<float>(tensor, start, end, out_val, separator,
need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::INT64) {
PrintLodTensorIntType(tensor, start, end, out_val, separator,
need_leading_separator);
} else if (framework::TransToProtoVarType(tensor->dtype()) ==
proto::VarType::FP64) {
PrintLodTensorType<double>(tensor, start, end, out_val, separator,
need_leading_separator);
} else {
out_val += "unsupported type";
}
}

std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index) {
auto& dims = tensor->dims();
if (tensor->lod().size() != 0) {
Expand Down Expand Up @@ -164,7 +256,9 @@ void DeviceWorker::DumpField(const Scope& scope, int dump_mode,
int dump_interval) { // dump_mode: 0: no random,
// 1: random with insid hash,
// 2: random with random
// 3: simple mode
// 3: simple mode using multi-threads, for gpugraphps-mode
auto start1 = std::chrono::steady_clock::now();

size_t batch_size = device_reader_->GetCurBatchSize();
auto& ins_id_vec = device_reader_->GetInsIdVec();
auto& ins_content_vec = device_reader_->GetInsContentVec();
Expand All @@ -173,7 +267,81 @@ void DeviceWorker::DumpField(const Scope& scope, int dump_mode,
}
std::vector<std::string> ars(batch_size);
std::vector<bool> hit(batch_size, false);
if (dump_mode_ == 3) {
if (dump_fields_ == NULL || (*dump_fields_).size() == 0) {
return;
}
auto set_output_str = [&, this](size_t begin, size_t end,
LoDTensor* tensor) {
for (size_t i = begin; i < end; ++i) {
auto bound = GetTensorBound(tensor, i);
if (ars[i].size() > 0) ars[i] += "\t";
// ars[i] += '[';
PrintLodTensor(tensor, bound.first, bound.second, ars[i], ' ', false);
// ars[i] += ']';
// ars[i] += "<" + PrintLodTensor(tensor, bound.first, bound.second, '
// ', false) + ">";
}
};
std::thread threads[tensor_iterator_thread_num];
for (auto& field : *dump_fields_) {
Variable* var = scope.FindVar(field);
if (var == nullptr) {
VLOG(0) << "Note: field[" << field
<< "] cannot be find in scope, so it was skipped.";
continue;
}
LoDTensor* tensor = var->GetMutable<LoDTensor>();
if (!tensor->IsInitialized()) {
VLOG(0) << "Note: field[" << field
<< "] is not initialized, so it was skipped.";
continue;
}
framework::LoDTensor cpu_tensor;
if (platform::is_gpu_place(tensor->place())) {
TensorCopySync(*tensor, platform::CPUPlace(), &cpu_tensor);
cpu_tensor.set_lod(tensor->lod());
tensor = &cpu_tensor;
}
if (!CheckValidOutput(tensor, batch_size)) {
VLOG(0) << "Note: field[" << field << "] cannot pass check, so it was "
"skipped. Maybe the dimension is "
"wrong ";
continue;
}
size_t acutal_thread_num =
std::min((size_t)batch_size, tensor_iterator_thread_num);
for (size_t i = 0; i < acutal_thread_num; i++) {
size_t average_size = batch_size / acutal_thread_num;
size_t begin =
average_size * i + std::min(batch_size % acutal_thread_num, i);
size_t end =
begin + average_size + (i < batch_size % acutal_thread_num ? 1 : 0);
threads[i] = std::thread(set_output_str, begin, end, tensor);
}
for (size_t i = 0; i < acutal_thread_num; i++) threads[i].join();
}
auto end1 = std::chrono::steady_clock::now();
auto tt =
std::chrono::duration_cast<std::chrono::microseconds>(end1 - start1);
VLOG(1) << "writing a batch takes " << tt.count() << " us";

size_t acutal_thread_num =
std::min((size_t)batch_size, tensor_iterator_thread_num);
for (size_t i = 0; i < acutal_thread_num; i++) {
size_t average_size = batch_size / acutal_thread_num;
size_t begin =
average_size * i + std::min(batch_size % acutal_thread_num, i);
size_t end =
begin + average_size + (i < batch_size % acutal_thread_num ? 1 : 0);
for (size_t j = begin + 1; j < end; j++) {
if (ars[begin].size() > 0 && ars[j].size() > 0) ars[begin] += "\n";
ars[begin] += ars[j];
}
if (ars[begin].size() > 0) writer_ << ars[begin];
}
return;
}
std::default_random_engine engine(0);
std::uniform_int_distribution<size_t> dist(0U, INT_MAX);
for (size_t i = 0; i < batch_size; i++) {
Expand All @@ -189,14 +357,12 @@ void DeviceWorker::DumpField(const Scope& scope, int dump_mode,
hit[i] = true;
}

if (dump_mode_ != 3) {
for (size_t i = 0; i < ins_id_vec.size(); i++) {
if (!hit[i]) {
continue;
}
ars[i] += ins_id_vec[i];
ars[i] = ars[i] + "\t" + ins_content_vec[i];
for (size_t i = 0; i < ins_id_vec.size(); i++) {
if (!hit[i]) {
continue;
}
ars[i] += ins_id_vec[i];
ars[i] += "\t" + ins_content_vec[i];
}
for (auto& field : *dump_fields_) {
Variable* var = scope.FindVar(field);
Expand All @@ -223,22 +389,16 @@ void DeviceWorker::DumpField(const Scope& scope, int dump_mode,
"wrong ";
continue;
}

for (size_t i = 0; i < batch_size; ++i) {
if (!hit[i]) {
continue;
}
auto bound = GetTensorBound(tensor, i);
if (dump_mode_ == 3) {
if (ars[i].size() > 0) ars[i] += "\t";
ars[i] += PrintLodTensor(tensor, bound.first, bound.second, ' ', false);
} else {
ars[i] = ars[i] + "\t" + field + ":" +
std::to_string(bound.second - bound.first);
ars[i] += PrintLodTensor(tensor, bound.first, bound.second);
}
ars[i] += "\t" + field + ":" + std::to_string(bound.second - bound.first);
ars[i] += PrintLodTensor(tensor, bound.first, bound.second);
}
}

// #pragma omp parallel for
for (size_t i = 0; i < ars.size(); i++) {
if (ars[i].length() == 0) {
Expand Down
6 changes: 5 additions & 1 deletion paddle/fluid/framework/device_worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ limitations under the License. */
#include "paddle/fluid/distributed/ps/wrapper/fleet.h"
#endif

#include <map>
#include "paddle/fluid/framework/data_feed.h"
#include "paddle/fluid/framework/executor_gc_helper.h"
#include "paddle/fluid/framework/heter_util.h"
Expand Down Expand Up @@ -62,6 +63,9 @@ namespace framework {
std::string PrintLodTensor(Tensor* tensor, int64_t start, int64_t end,
char separator = ',',
bool need_leading_separator = false);
void PrintLodTensor(Tensor* tensor, int64_t start, int64_t end,
std::string& output_str, char separator = ',',
bool need_leading_separator = false);
std::pair<int64_t, int64_t> GetTensorBound(LoDTensor* tensor, int index);
bool CheckValidOutput(LoDTensor* tensor, size_t batch_size);

Expand Down Expand Up @@ -231,6 +235,7 @@ class DeviceWorker {
int dump_mode_ = 0;
int dump_interval_ = 10000;
ChannelWriter<std::string> writer_;
const size_t tensor_iterator_thread_num = 16;
platform::DeviceContext* dev_ctx_ = nullptr;
};

Expand Down Expand Up @@ -770,7 +775,6 @@ class HeterSectionWorker : public DeviceWorker {
static uint64_t batch_id_;
uint64_t total_ins_num_ = 0;
platform::DeviceContext* dev_ctx_ = nullptr;

bool debug_ = false;
std::vector<double> op_total_time_;
std::vector<std::string> op_name_;
Expand Down

0 comments on commit 35a704f

Please sign in to comment.