From 0c6ab6ecd0ce3eea73cdb5dfb1dd9c448f94ba37 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 19 Mar 2018 12:41:19 +0000 Subject: [PATCH 1/5] add tensor --- paddle/fluid/operators/detail/CMakeLists.txt | 6 +- .../fluid/operators/detail/tensor_parser.cc | 393 ++++++++++++++++++ paddle/fluid/operators/detail/tensor_parser.h | 73 ++++ paddle/fluid/operators/detail/test_serde.cc | 177 ++++---- 4 files changed, 574 insertions(+), 75 deletions(-) create mode 100644 paddle/fluid/operators/detail/tensor_parser.cc create mode 100644 paddle/fluid/operators/detail/tensor_parser.h diff --git a/paddle/fluid/operators/detail/CMakeLists.txt b/paddle/fluid/operators/detail/CMakeLists.txt index 94395ccfbcbd7..ec2375b6a8e2e 100644 --- a/paddle/fluid/operators/detail/CMakeLists.txt +++ b/paddle/fluid/operators/detail/CMakeLists.txt @@ -1,6 +1,8 @@ if(WITH_DISTRIBUTE) - grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc grpc_server.cc PROTO send_recv.proto DEPS lod_tensor selected_rows) + grpc_library(sendrecvop_grpc SRCS bytebuffer_stream.cc sendrecvop_utils.cc grpc_client.cc + grpc_server.cc tensor_parser.cc PROTO send_recv.proto DEPS lod_tensor selected_rows) set(DISTRIBUTE_COMPILE_FLAGS "-Wno-non-virtual-dtor -Wno-error=non-virtual-dtor -Wno-error=delete-non-virtual-dtor") set_source_files_properties(test_serde.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) - cc_test(serde_test SRCS test_serde.cc DEPS grpc++_unsecure grpc_unsecure gpr cares zlib protobuf sendrecvop_grpc) + cc_test(serde_test SRCS test_serde.cc tensor_parser.cc DEPS grpc++_unsecure grpc_unsecure gpr + cares zlib protobuf sendrecvop_grpc) endif() diff --git a/paddle/fluid/operators/detail/tensor_parser.cc b/paddle/fluid/operators/detail/tensor_parser.cc new file mode 100644 index 0000000000000..ef1f0914cab8f --- /dev/null +++ b/paddle/fluid/operators/detail/tensor_parser.cc @@ -0,0 +1,393 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "tensor_parser.h" +#include +#include "paddle/fluid/operators/detail/send_recv.pb.h" +#include "sendrecvop_utils.h" + +namespace paddle { +namespace operators { +namespace detail { + +enum WireType { + WIRETYPE_VARINT = 0, + WIRETYPE_LENGTH_DELIMITED = 2, +}; + +inline int GetTagFieldNumber(uint32_t tag) { return tag >> 3; } + +inline WireType GetTagWireType(uint32_t tag) { + return static_cast(tag & 0x7); +} + +bool ReadVarintSizeAsInt(::google::protobuf::io::CodedInputStream* input, + int* result) { + uint64_t v; + if (input->ReadVarint64(&v) && v <= static_cast(INT_MAX)) { + *result = static_cast(v); + return true; + } else { + return false; + } +} + +bool ReadRaw(::google::protobuf::io::CodedInputStream* input, + const platform::DeviceContext& dev_ctx, platform::Place place, + void* dest, int size) { + const void* data = NULL; + int size_to_write = 0; + + if (platform::is_gpu_place(place)) { +#ifdef PADDLE_WITH_CUDA + auto& gpu_dev_ctx = + static_cast(dev_ctx); + platform::CPUPlace cpu; + + char* p = reinterpret_cast(dest); + while (size > 0) { + if (!input->GetDirectBufferPointer(&data, &size_to_write)) { + return false; + } + + memory::Copy(boost::get(place), + reinterpret_cast(p), cpu, data, size_to_write, + gpu_dev_ctx.stream()); + p += size_to_write; + size -= size_to_write; + + input->Skip(size_to_write); + } + gpu_dev_ctx.Wait(); +#else + PADDLE_THROW("Unexpected branch"); +#endif + return true; + } + + char* p = reinterpret_cast(dest); + while (size > 0) { + if (!input->GetDirectBufferPointer(&data, &size_to_write)) { + return false; + } + // TODO(gongwb): don't copy if it's aligned? + memcpy(reinterpret_cast(p), data, size_to_write); + + p += size_to_write; + size -= size_to_write; + + input->Skip(size_to_write); + } + + return true; +} + +bool TensorResponse::CopyLodTensorData( + ::google::protobuf::io::CodedInputStream* input, + const platform::DeviceContext& ctx, framework::DDim& dims, int length) { + auto var = scope_->FindVar(meta_.varname()); + auto* tensor = var->GetMutable(); + tensor->Resize(dims); + + framework::LoD lod; + for (int i = 0; i < meta_.lod_level(); ++i) { + framework::Vector v; + for (int j = 0; j < meta_.lod(i).lod_data_size(); ++j) { + v.push_back(meta_.lod(i).lod_data(j)); + } + lod.push_back(v); + } + tensor->set_lod(lod); + + void* tensor_data = + tensor->mutable_data(ctx.GetPlace(), ToTypeIndex(meta_.data_type())); + + if (!ReadRaw(input, ctx, tensor->place(), tensor_data, length)) { + return false; + } + + return true; +} + +inline framework::DDim GetDims( + const ::google::protobuf::RepeatedField<::google::protobuf::int64>& dims) { + std::vector vecdims; + for (auto& d : dims) { + vecdims.push_back(d); + } + return framework::make_ddim(vecdims); +} + +bool TensorResponse::CopySelectRowsTensorData( + ::google::protobuf::io::CodedInputStream* input, + const platform::DeviceContext& ctx, framework::DDim& dims, int length) { + auto var = scope_->FindVar(meta_.varname()); + auto* slr = var->GetMutable(); + auto* tensor = slr->mutable_value(); + tensor->Resize(dims); + void* tensor_data = tensor->mutable_data( + ctx.GetPlace(), + paddle::operators::detail::ToTypeIndex(meta_.data_type())); + + if (!ReadRaw(input, ctx, tensor->place(), tensor_data, length)) { + return false; + } + + return true; +} + +bool TensorResponse::CopySelectRowsData( + ::google::protobuf::io::CodedInputStream* input, + const platform::DeviceContext& ctx, int length) { + auto var = scope_->FindVar(meta_.varname()); + auto* slr = var->GetMutable(); + int64_t* rows_data = slr->mutable_rows()->data(); + + // copy rows CPU data, GPU data will be copied lazly + platform::CPUPlace cpu; + if (!ReadRaw(input, ctx, cpu, rows_data, length)) { + return false; + } + + return true; +} + +bool ParseLodData(::google::protobuf::io::CodedInputStream* input, + std::vector* lod) { + while (true) { + auto p = input->ReadTagWithCutoff(127); + int tag = GetTagFieldNumber(p.first); + WireType wt = GetTagWireType(p.first); + + if (!p.second) { + return (tag == 0); + } + + switch (tag) { + case sendrecv::VariableMessage_LodData::kLodDataFieldNumber: { + uint64_t v; + if (wt == WIRETYPE_VARINT) { + if (!input->ReadVarint64(&v)) { + return false; + } + lod->push_back(v); + break; + } + + if (wt == WIRETYPE_LENGTH_DELIMITED) { + int length = 0; + if (!input->ReadVarintSizeAsInt(&length)) { + return tag; + } + + for (int i = 0; i < length; i++) { + uint64_t v; + if (!input->ReadVarint64(&v)) { + return false; + } + lod->push_back(v); + } + break; + } + + return false; + } + default: { return false; } + } + } + + return true; +} + +int TensorResponse::Parse(::grpc::ByteBuffer& byte_buffer, + const platform::DeviceContext& dev_ctx) { + GrpcByteBufferSource source; + source.Init(byte_buffer); + + ::google::protobuf::io::ZeroCopyInputStream* input_stream = &source; + ::google::protobuf::io::CodedInputStream input(input_stream); + input.SetTotalBytesLimit(INT_MAX, INT_MAX); + + while (true) { + auto p = input.ReadTagWithCutoff(127); + int tag = GetTagFieldNumber(p.first); + WireType wt = GetTagWireType(p.first); + if (!p.second) { + if (tag != 0) { + return -1; + } + + return 0; + } + + switch (tag) { + case sendrecv::VariableMessage::kVarnameFieldNumber: { + uint32_t length; + if ((wt != WIRETYPE_LENGTH_DELIMITED) || !input.ReadVarint32(&length)) { + return tag; + } + + std::string temp; + if (!input.ReadString(&temp, length)) { + return tag; + } + + meta_.set_varname(temp); + break; + } + case sendrecv::VariableMessage::kTypeFieldNumber: { + uint64_t v; + if ((wt != WIRETYPE_VARINT) || !input.ReadVarint64(&v)) { + return tag; + } + + meta_.set_type(static_cast<::sendrecv::VarType>(v)); + break; + } + case sendrecv::VariableMessage::kDataTypeFieldNumber: { + uint64_t v = 0; + if ((wt != WIRETYPE_VARINT) || !input.ReadVarint64(&v)) { + return tag; + } + + meta_.set_data_type(static_cast<::sendrecv::VariableMessage_Type>(v)); + break; + } + case sendrecv::VariableMessage::kDimsFieldNumber: { + // not packed + if (wt == WIRETYPE_VARINT) { + uint64_t v; + if (!input.ReadVarint64(&v)) { + return tag; + } + meta_.add_dims(v); + break; + } + + // packed + if (wt == WIRETYPE_LENGTH_DELIMITED) { + int length = 0; + if (!input.ReadVarintSizeAsInt(&length)) { + return tag; + } + for (int i = 0; i < length; i++) { + uint64_t v; + if (!input.ReadVarint64(&v)) { + return tag; + } + meta_.add_dims(v); + } + break; + } + + return tag; + } + case sendrecv::VariableMessage::kLodLevelFieldNumber: { + uint64_t v = 0; + if ((wt != WIRETYPE_VARINT) || !input.ReadVarint64(&v)) { + return tag; + } + meta_.set_lod_level(static_cast(v)); + break; + } + case sendrecv::VariableMessage::kLodFieldNumber: { + int length = 0; + if (wt != WIRETYPE_LENGTH_DELIMITED || + !ReadVarintSizeAsInt(&input, &length)) { + return tag; + } + + std::pair<::google::protobuf::io::CodedInputStream::Limit, int> p = + input.IncrementRecursionDepthAndPushLimit(length); + + std::vector lod_data; + if (p.second < 0 || !ParseLodData(&input, &lod_data)) { + return tag; + } + + if (!input.DecrementRecursionDepthAndPopLimit(p.first)) { + return false; + } + + if (lod_data.size() == 0) { + break; + } + + auto lod = meta_.add_lod(); + for (uint32_t i = 0; i < lod_data.size(); i++) { + lod->add_lod_data(lod_data[i]); + } + break; + } + case sendrecv::VariableMessage::kSerializedFieldNumber: { + PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || + meta_.type() == sendrecv::LOD_TENSOR) && + meta_.varname() != "", + "meta info should be got first!"); + + int length = 0; + if (wt != WIRETYPE_LENGTH_DELIMITED || + !ReadVarintSizeAsInt(&input, &length)) { + return tag; + } + + framework::DDim dims = GetDims(meta_.dims()); + if (meta_.type() == sendrecv::LOD_TENSOR) { + PADDLE_ENFORCE(meta_.lod_size() > 0, "lod info should be got first!"); + if (!CopyLodTensorData(&input, dev_ctx, dims, length)) { + return tag; + } + break; + } + + if (meta_.type() == sendrecv::SELECTED_ROWS) { + if (!CopySelectRowsTensorData(&input, dev_ctx, dims, length)) { + return tag; + } + break; + } + + return tag; + } + case sendrecv::VariableMessage::kRowsFieldNumber: { + PADDLE_ENFORCE((meta_.type() == sendrecv::SELECTED_ROWS || + meta_.type() == sendrecv::LOD_TENSOR) && + meta_.varname() != "", + "meta info should be got first!"); + + int length = 0; + if (wt != WIRETYPE_LENGTH_DELIMITED || + !ReadVarintSizeAsInt(&input, &length)) { + return tag; + } + + if (!CopySelectRowsData(&input, dev_ctx, length)) { + return tag; + } + break; + } + + default: { + // Unknown tag, return unknown error. + return -1; + } + } + } + + return 0; +} + +}; // namespace detail +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/detail/tensor_parser.h b/paddle/fluid/operators/detail/tensor_parser.h new file mode 100644 index 0000000000000..3a0545ea67efb --- /dev/null +++ b/paddle/fluid/operators/detail/tensor_parser.h @@ -0,0 +1,73 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include "paddle/fluid/framework/data_type.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/framework/selected_rows.h" +#include "paddle/fluid/framework/var_type.h" + +#include "paddle/fluid/operators/detail/send_recv.grpc.pb.h" +#include "paddle/fluid/operators/detail/send_recv.pb.h" + +#include "google/protobuf/io/coded_stream.h" +#include "google/protobuf/io/zero_copy_stream.h" +#include "paddle/fluid/framework/tensor.h" +#include "paddle/fluid/operators/detail/bytebuffer_stream.h" + +namespace paddle { +namespace operators { +namespace detail { + +class TensorResponse { + public: + TensorResponse(framework::Scope* scope) : scope_(scope){}; + + virtual ~TensorResponse(){}; + + // return: + // 0:ok. + // -1: unkown error. + // other: number of error field. + int Parse(::grpc::ByteBuffer& byte_buffer, + const platform::DeviceContext& dev_ctx); + + // should call parse first. + framework::Variable* GetVar() { return scope_->FindVar(meta_.varname()); } + + private: + bool CopySelectRowsTensorData(::google::protobuf::io::CodedInputStream* input, + const platform::DeviceContext& ctx, + framework::DDim& dims, int length); + + bool CopySelectRowsData(::google::protobuf::io::CodedInputStream* input, + const platform::DeviceContext& ctx, int length); + + bool CopyLodTensorData(::google::protobuf::io::CodedInputStream* input, + const platform::DeviceContext& ctx, + framework::DDim& dims, int length); + + private: + private: + framework::Variable* var_; + framework::Scope* scope_; + // only Skeleton + sendrecv::VariableMessage meta_; +}; + +}; // namespace detail +}; // namespace operators +}; // namespace paddle diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 2f06e5a686b99..2bae6f11cf523 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -16,11 +16,13 @@ limitations under the License. */ #include #include +#include #include "gtest/gtest.h" #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/tensor_util.h" #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/operators/detail/sendrecvop_utils.h" +#include "paddle/fluid/operators/detail/tensor_parser.h" #include "paddle/fluid/operators/math/math_function.h" #include "paddle/fluid/platform/place.h" #include "paddle/fluid/string/printf.h" @@ -31,19 +33,21 @@ namespace operators = paddle::operators; namespace math = paddle::operators::math; namespace memory = paddle::memory; -void RunSerdeTestTensor(platform::Place place) { - // serialize var to ByteBuffer - framework::Variable var; - auto* tensor = var.GetMutable(); - tensor->Resize(framework::make_ddim({4, 8, 4, 2})); - framework::LoD lod; - lod.push_back(framework::Vector({1, 3, 8})); - tensor->set_lod(lod); - int tensor_numel = 4 * 8 * 4 * 2; +void RunSerdeTestSelectedRows(platform::Place place) { platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); auto& ctx = *pool.Get(place); + + // serialize var to ByteBuffer + framework::Variable var; + auto* slr = var.GetMutable(); + auto* tensor = slr->mutable_value(); + auto* rows = slr->mutable_rows(); + tensor->Resize(framework::make_ddim({2, 10})); tensor->mutable_data(place); - math::set_constant(ctx, tensor, 31.9); + int tensor_numel = 2 * 10; + math::set_constant(ctx, tensor, 32.7); + rows->push_back(3); + rows->push_back(10); ::grpc::ByteBuffer msg; operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg); @@ -56,62 +60,67 @@ void RunSerdeTestTensor(platform::Place place) { for (const auto& s : slices) { tmp.append(reinterpret_cast(s.begin()), s.size()); } + sendrecv::VariableMessage varmsg; EXPECT_TRUE(varmsg.ParseFromString(tmp)); + EXPECT_EQ(varmsg.varname(), "myvar"); - EXPECT_EQ(varmsg.type(), 0); - EXPECT_EQ(varmsg.dims()[0], 4); - EXPECT_EQ(varmsg.dims()[1], 8); - EXPECT_EQ(varmsg.dims()[2], 4); - EXPECT_EQ(varmsg.dims()[3], 2); - EXPECT_EQ(varmsg.lod_level(), 1); - EXPECT_EQ(varmsg.lod(0).lod_data(0), 1); - EXPECT_EQ(varmsg.lod(0).lod_data(1), 3); - EXPECT_EQ(varmsg.lod(0).lod_data(2), 8); + EXPECT_EQ(varmsg.type(), 1); const float* tensor_data = reinterpret_cast(varmsg.serialized().data()); + const int64_t* rows_data = + reinterpret_cast(varmsg.rows().data()); for (int i = 0; i < tensor_numel; ++i) { - EXPECT_FLOAT_EQ(tensor_data[i], 31.9); + EXPECT_FLOAT_EQ(tensor_data[i], 32.7); } - + EXPECT_EQ(rows_data[0], 3); + EXPECT_EQ(rows_data[1], 10); // deserialize zero-copy - framework::Variable var2; - operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); - auto tensor2 = var2.Get(); + // framework::Variable var2; + // operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); + framework::Scope scope; + scope.Var("myvar"); + operators::detail::TensorResponse resp(&scope); + EXPECT_EQ(resp.Parse(msg, ctx), 0); + + framework::Variable* var2 = resp.GetVar(); + + auto* slr2 = var2->GetMutable(); + auto* tensor2 = slr2->mutable_value(); + auto* rows2 = slr2->mutable_rows(); float* tensor_data2 = nullptr; framework::Tensor tmp_tensor; if (platform::is_gpu_place(ctx.GetPlace())) { platform::CPUPlace cpu; - framework::TensorCopy(tensor2, cpu, &tmp_tensor); + framework::TensorCopy(*tensor2, cpu, &tmp_tensor); tensor_data2 = tmp_tensor.data(); } else { - tensor_data2 = const_cast(tensor2.data()); + tensor_data2 = const_cast(tensor2->data()); } + const int64_t* rows_data2 = rows2->data(); - EXPECT_EQ(varmsg.lod_level(), 1); - EXPECT_EQ(varmsg.lod(0).lod_data(0), 1); - EXPECT_EQ(varmsg.lod(0).lod_data(1), 3); - EXPECT_EQ(varmsg.lod(0).lod_data(2), 8); - for (int i = 0; i < tensor_numel; ++i) EXPECT_FLOAT_EQ(tensor_data2[i], 31.9); + for (int i = 0; i < tensor_numel; ++i) { + EXPECT_FLOAT_EQ(tensor_data2[i], 32.7); + } + EXPECT_EQ(rows_data2[0], 3); + EXPECT_EQ(rows_data2[1], 10); } -void RunSerdeTestSelectedRows(platform::Place place) { - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& ctx = *pool.Get(place); - +void RunTestLodTensor(platform::Place place, int from_type = 0) { // serialize var to ByteBuffer framework::Variable var; - auto* slr = var.GetMutable(); - auto* tensor = slr->mutable_value(); - auto* rows = slr->mutable_rows(); - tensor->Resize(framework::make_ddim({2, 10})); + auto* tensor = var.GetMutable(); + tensor->Resize(framework::make_ddim({4, 8, 4, 2})); + framework::LoD lod; + lod.push_back(framework::Vector({1, 3, 8})); + tensor->set_lod(lod); + int tensor_numel = 4 * 8 * 4 * 2; + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& ctx = *pool.Get(place); tensor->mutable_data(place); - int tensor_numel = 2 * 10; - math::set_constant(ctx, tensor, 32.7); - rows->push_back(3); - rows->push_back(10); + math::set_constant(ctx, tensor, 31.9); ::grpc::ByteBuffer msg; operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg); @@ -126,43 +135,75 @@ void RunSerdeTestSelectedRows(platform::Place place) { } sendrecv::VariableMessage varmsg; EXPECT_TRUE(varmsg.ParseFromString(tmp)); - EXPECT_EQ(varmsg.varname(), "myvar"); - EXPECT_EQ(varmsg.type(), 1); + EXPECT_EQ(varmsg.type(), 0); + EXPECT_EQ(varmsg.dims()[0], 4); + EXPECT_EQ(varmsg.dims()[1], 8); + EXPECT_EQ(varmsg.dims()[2], 4); + EXPECT_EQ(varmsg.dims()[3], 2); + EXPECT_EQ(varmsg.lod_level(), 1); + EXPECT_EQ(varmsg.lod(0).lod_data(0), 1); + EXPECT_EQ(varmsg.lod(0).lod_data(1), 3); + EXPECT_EQ(varmsg.lod(0).lod_data(2), 8); const float* tensor_data = reinterpret_cast(varmsg.serialized().data()); - const int64_t* rows_data = - reinterpret_cast(varmsg.rows().data()); for (int i = 0; i < tensor_numel; ++i) { - EXPECT_FLOAT_EQ(tensor_data[i], 32.7); + EXPECT_FLOAT_EQ(tensor_data[i], 31.9); } - EXPECT_EQ(rows_data[0], 3); - EXPECT_EQ(rows_data[1], 10); + + // message binary + std::string str; + varmsg.SerializeToString(&str); + + // message bytebuffer + ::grpc::Slice slices_2[1]; + int num_slices = 1; + slices_2[0] = ::grpc::Slice(str.length()); + memcpy(const_cast(slices_2[0].begin()), str.c_str(), str.length()); + ::grpc::ByteBuffer bytebuffer2(&slices_2[0], num_slices); + // deserialize zero-copy - framework::Variable var2; - operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); + framework::Scope scope; + scope.Var("myvar"); + operators::detail::TensorResponse resp(&scope); + if (from_type == 0) { + EXPECT_EQ(resp.Parse(msg, ctx), 0); + } else { + EXPECT_EQ(resp.Parse(bytebuffer2, ctx), 0); + } - auto* slr2 = var2.GetMutable(); - auto* tensor2 = slr2->mutable_value(); - auto* rows2 = slr2->mutable_rows(); + framework::Variable* var2 = resp.GetVar(); + + auto tensor2 = var2->Get(); float* tensor_data2 = nullptr; framework::Tensor tmp_tensor; if (platform::is_gpu_place(ctx.GetPlace())) { platform::CPUPlace cpu; - framework::TensorCopy(*tensor2, cpu, &tmp_tensor); + framework::TensorCopy(tensor2, cpu, &tmp_tensor); tensor_data2 = tmp_tensor.data(); } else { - tensor_data2 = const_cast(tensor2->data()); + tensor_data2 = const_cast(tensor2.data()); } - const int64_t* rows_data2 = rows2->data(); - for (int i = 0; i < tensor_numel; ++i) { - EXPECT_FLOAT_EQ(tensor_data2[i], 32.7); - } - EXPECT_EQ(rows_data2[0], 3); - EXPECT_EQ(rows_data2[1], 10); + EXPECT_EQ(varmsg.lod_level(), 1); + EXPECT_EQ(varmsg.lod(0).lod_data(0), 1); + EXPECT_EQ(varmsg.lod(0).lod_data(1), 3); + EXPECT_EQ(varmsg.lod(0).lod_data(2), 8); + for (int i = 0; i < tensor_numel; ++i) EXPECT_FLOAT_EQ(tensor_data2[i], 31.9); +} + +TEST(LodTensor, GPU) { + platform::CUDAPlace place; + RunTestLodTensor(place); + RunTestLodTensor(place, 1); +} + +TEST(LodTensor, CPU) { + platform::CPUPlace place; + RunTestLodTensor(place); + RunTestLodTensor(place, 1); } TEST(SelectedRows, CPU) { @@ -174,13 +215,3 @@ TEST(SelectedRows, GPU) { platform::CUDAPlace place; RunSerdeTestSelectedRows(place); } - -TEST(Tensor, CPU) { - platform::CPUPlace place; - RunSerdeTestTensor(place); -} - -TEST(Tensor, GPU) { - platform::CUDAPlace place; - RunSerdeTestTensor(place); -} \ No newline at end of file From 24067c11b66054d78097fc9b64985da6b28c2e58 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 19 Mar 2018 12:46:25 +0000 Subject: [PATCH 2/5] modify style --- paddle/fluid/operators/detail/test_serde.cc | 150 ++++++++++---------- 1 file changed, 75 insertions(+), 75 deletions(-) diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 2bae6f11cf523..6d3ec75fcb080 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -33,81 +33,6 @@ namespace operators = paddle::operators; namespace math = paddle::operators::math; namespace memory = paddle::memory; -void RunSerdeTestSelectedRows(platform::Place place) { - platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); - auto& ctx = *pool.Get(place); - - // serialize var to ByteBuffer - framework::Variable var; - auto* slr = var.GetMutable(); - auto* tensor = slr->mutable_value(); - auto* rows = slr->mutable_rows(); - tensor->Resize(framework::make_ddim({2, 10})); - tensor->mutable_data(place); - int tensor_numel = 2 * 10; - math::set_constant(ctx, tensor, 32.7); - rows->push_back(3); - rows->push_back(10); - - ::grpc::ByteBuffer msg; - operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg); - EXPECT_GT(msg.Length(), 0); - - // deserialize - std::vector<::grpc::Slice> slices; - (void)msg.Dump(&slices); - std::string tmp; - for (const auto& s : slices) { - tmp.append(reinterpret_cast(s.begin()), s.size()); - } - - sendrecv::VariableMessage varmsg; - EXPECT_TRUE(varmsg.ParseFromString(tmp)); - - EXPECT_EQ(varmsg.varname(), "myvar"); - EXPECT_EQ(varmsg.type(), 1); - - const float* tensor_data = - reinterpret_cast(varmsg.serialized().data()); - const int64_t* rows_data = - reinterpret_cast(varmsg.rows().data()); - for (int i = 0; i < tensor_numel; ++i) { - EXPECT_FLOAT_EQ(tensor_data[i], 32.7); - } - EXPECT_EQ(rows_data[0], 3); - EXPECT_EQ(rows_data[1], 10); - // deserialize zero-copy - // framework::Variable var2; - // operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); - framework::Scope scope; - scope.Var("myvar"); - operators::detail::TensorResponse resp(&scope); - EXPECT_EQ(resp.Parse(msg, ctx), 0); - - framework::Variable* var2 = resp.GetVar(); - - auto* slr2 = var2->GetMutable(); - auto* tensor2 = slr2->mutable_value(); - auto* rows2 = slr2->mutable_rows(); - float* tensor_data2 = nullptr; - framework::Tensor tmp_tensor; - - if (platform::is_gpu_place(ctx.GetPlace())) { - platform::CPUPlace cpu; - framework::TensorCopy(*tensor2, cpu, &tmp_tensor); - tensor_data2 = tmp_tensor.data(); - } else { - tensor_data2 = const_cast(tensor2->data()); - } - const int64_t* rows_data2 = rows2->data(); - - for (int i = 0; i < tensor_numel; ++i) { - EXPECT_FLOAT_EQ(tensor_data2[i], 32.7); - } - EXPECT_EQ(rows_data2[0], 3); - EXPECT_EQ(rows_data2[1], 10); -} - void RunTestLodTensor(platform::Place place, int from_type = 0) { // serialize var to ByteBuffer framework::Variable var; @@ -194,6 +119,81 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { for (int i = 0; i < tensor_numel; ++i) EXPECT_FLOAT_EQ(tensor_data2[i], 31.9); } +void RunSerdeTestSelectedRows(platform::Place place) { + platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance(); + auto& ctx = *pool.Get(place); + + // serialize var to ByteBuffer + framework::Variable var; + auto* slr = var.GetMutable(); + auto* tensor = slr->mutable_value(); + auto* rows = slr->mutable_rows(); + tensor->Resize(framework::make_ddim({2, 10})); + tensor->mutable_data(place); + int tensor_numel = 2 * 10; + math::set_constant(ctx, tensor, 32.7); + rows->push_back(3); + rows->push_back(10); + + ::grpc::ByteBuffer msg; + operators::detail::SerializeToByteBuffer("myvar", &var, ctx, &msg); + EXPECT_GT(msg.Length(), 0); + + // deserialize + std::vector<::grpc::Slice> slices; + (void)msg.Dump(&slices); + std::string tmp; + for (const auto& s : slices) { + tmp.append(reinterpret_cast(s.begin()), s.size()); + } + + sendrecv::VariableMessage varmsg; + EXPECT_TRUE(varmsg.ParseFromString(tmp)); + + EXPECT_EQ(varmsg.varname(), "myvar"); + EXPECT_EQ(varmsg.type(), 1); + + const float* tensor_data = + reinterpret_cast(varmsg.serialized().data()); + const int64_t* rows_data = + reinterpret_cast(varmsg.rows().data()); + for (int i = 0; i < tensor_numel; ++i) { + EXPECT_FLOAT_EQ(tensor_data[i], 32.7); + } + EXPECT_EQ(rows_data[0], 3); + EXPECT_EQ(rows_data[1], 10); + // deserialize zero-copy + // framework::Variable var2; + // operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); + framework::Scope scope; + scope.Var("myvar"); + operators::detail::TensorResponse resp(&scope); + EXPECT_EQ(resp.Parse(msg, ctx), 0); + + framework::Variable* var2 = resp.GetVar(); + + auto* slr2 = var2->GetMutable(); + auto* tensor2 = slr2->mutable_value(); + auto* rows2 = slr2->mutable_rows(); + float* tensor_data2 = nullptr; + framework::Tensor tmp_tensor; + + if (platform::is_gpu_place(ctx.GetPlace())) { + platform::CPUPlace cpu; + framework::TensorCopy(*tensor2, cpu, &tmp_tensor); + tensor_data2 = tmp_tensor.data(); + } else { + tensor_data2 = const_cast(tensor2->data()); + } + const int64_t* rows_data2 = rows2->data(); + + for (int i = 0; i < tensor_numel; ++i) { + EXPECT_FLOAT_EQ(tensor_data2[i], 32.7); + } + EXPECT_EQ(rows_data2[0], 3); + EXPECT_EQ(rows_data2[1], 10); +} + TEST(LodTensor, GPU) { platform::CUDAPlace place; RunTestLodTensor(place); From 1404402cbfe49ecb47fbe541018a8a0f0f38bbce Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 19 Mar 2018 12:47:49 +0000 Subject: [PATCH 3/5] clean up --- paddle/fluid/operators/detail/test_serde.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 6d3ec75fcb080..27666e10a6625 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -163,8 +163,6 @@ void RunSerdeTestSelectedRows(platform::Place place) { EXPECT_EQ(rows_data[0], 3); EXPECT_EQ(rows_data[1], 10); // deserialize zero-copy - // framework::Variable var2; - // operators::detail::DeserializeFromByteBuffer(msg, ctx, &var2); framework::Scope scope; scope.Var("myvar"); operators::detail::TensorResponse resp(&scope); From 0c8b9a723459d768cd9b8dd807224398fec2bc7d Mon Sep 17 00:00:00 2001 From: gongweibao Date: Tue, 20 Mar 2018 04:00:53 +0000 Subject: [PATCH 4/5] follow comments --- paddle/fluid/operators/detail/sendrecvop_utils.cc | 11 +++++++++++ paddle/fluid/operators/detail/sendrecvop_utils.h | 5 +++++ paddle/fluid/operators/detail/tensor_parser.cc | 7 ++++--- paddle/fluid/operators/detail/tensor_parser.h | 4 +--- paddle/fluid/operators/detail/test_serde.cc | 15 +++++++-------- 5 files changed, 28 insertions(+), 14 deletions(-) diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.cc b/paddle/fluid/operators/detail/sendrecvop_utils.cc index 39117eeeb611b..5d8ff9c6a87c8 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.cc +++ b/paddle/fluid/operators/detail/sendrecvop_utils.cc @@ -18,6 +18,7 @@ limitations under the License. */ #include "paddle/fluid/framework/data_type.h" #include "paddle/fluid/operators/detail/bytebuffer_stream.h" #include "paddle/fluid/operators/detail/proto_encoder_helper.h" +#include "paddle/fluid/operators/detail/tensor_parser.h" namespace paddle { namespace operators { @@ -295,6 +296,16 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, } } +void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, + const platform::DeviceContext& ctx, + framework::Scope* scope, + framework::Variable*& var) { + operators::detail::TensorResponse resp(scope); + PADDLE_ENFORCE(resp.Parse(msg, ctx) == 0, + "parse bytebuffer to tensor error!"); + var = resp.GetVar(); +} + } // namespace detail } // namespace operators } // namespace paddle diff --git a/paddle/fluid/operators/detail/sendrecvop_utils.h b/paddle/fluid/operators/detail/sendrecvop_utils.h index 4fa6aefd3e0b1..48a6dafd02cf5 100644 --- a/paddle/fluid/operators/detail/sendrecvop_utils.h +++ b/paddle/fluid/operators/detail/sendrecvop_utils.h @@ -52,6 +52,11 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, const platform::DeviceContext& ctx, framework::Variable* var); +void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg, + const platform::DeviceContext& ctx, + framework::Scope* scope, + framework::Variable*& var); + inline std::type_index ToTypeIndex(sendrecv::VariableMessage::Type type) { switch (type) { case sendrecv::VariableMessage::FP32: diff --git a/paddle/fluid/operators/detail/tensor_parser.cc b/paddle/fluid/operators/detail/tensor_parser.cc index ef1f0914cab8f..0e2a245851dd2 100644 --- a/paddle/fluid/operators/detail/tensor_parser.cc +++ b/paddle/fluid/operators/detail/tensor_parser.cc @@ -82,7 +82,8 @@ bool ReadRaw(::google::protobuf::io::CodedInputStream* input, return false; } // TODO(gongwb): don't copy if it's aligned? - memcpy(reinterpret_cast(p), data, size_to_write); + platform::CPUPlace cpu; + memory::Copy(cpu, reinterpret_cast(p), cpu, data, size_to_write); p += size_to_write; size -= size_to_write; @@ -154,7 +155,7 @@ bool TensorResponse::CopySelectRowsData( auto* slr = var->GetMutable(); int64_t* rows_data = slr->mutable_rows()->data(); - // copy rows CPU data, GPU data will be copied lazly + // copy rows CPU data, GPU data will be copied lazliy. platform::CPUPlace cpu; if (!ReadRaw(input, ctx, cpu, rows_data, length)) { return false; @@ -210,7 +211,7 @@ bool ParseLodData(::google::protobuf::io::CodedInputStream* input, return true; } -int TensorResponse::Parse(::grpc::ByteBuffer& byte_buffer, +int TensorResponse::Parse(const ::grpc::ByteBuffer& byte_buffer, const platform::DeviceContext& dev_ctx) { GrpcByteBufferSource source; source.Init(byte_buffer); diff --git a/paddle/fluid/operators/detail/tensor_parser.h b/paddle/fluid/operators/detail/tensor_parser.h index 3a0545ea67efb..f37aa104695b8 100644 --- a/paddle/fluid/operators/detail/tensor_parser.h +++ b/paddle/fluid/operators/detail/tensor_parser.h @@ -42,7 +42,7 @@ class TensorResponse { // 0:ok. // -1: unkown error. // other: number of error field. - int Parse(::grpc::ByteBuffer& byte_buffer, + int Parse(const ::grpc::ByteBuffer& byte_buffer, const platform::DeviceContext& dev_ctx); // should call parse first. @@ -61,8 +61,6 @@ class TensorResponse { framework::DDim& dims, int length); private: - private: - framework::Variable* var_; framework::Scope* scope_; // only Skeleton sendrecv::VariableMessage meta_; diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 27666e10a6625..00e23ab0772ab 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -91,14 +91,15 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { // deserialize zero-copy framework::Scope scope; scope.Var("myvar"); - operators::detail::TensorResponse resp(&scope); + framework::Variable* var2 = NULL; if (from_type == 0) { - EXPECT_EQ(resp.Parse(msg, ctx), 0); + operators::detail::DeserializeFromByteBuffer(msg, ctx, &scope, var2); } else { - EXPECT_EQ(resp.Parse(bytebuffer2, ctx), 0); + operators::detail::DeserializeFromByteBuffer(bytebuffer2, ctx, &scope, + var2); } - framework::Variable* var2 = resp.GetVar(); + // resp.GetVar(); auto tensor2 = var2->Get(); float* tensor_data2 = nullptr; @@ -165,10 +166,8 @@ void RunSerdeTestSelectedRows(platform::Place place) { // deserialize zero-copy framework::Scope scope; scope.Var("myvar"); - operators::detail::TensorResponse resp(&scope); - EXPECT_EQ(resp.Parse(msg, ctx), 0); - - framework::Variable* var2 = resp.GetVar(); + framework::Variable* var2 = NULL; + operators::detail::DeserializeFromByteBuffer(msg, ctx, &scope, var2); auto* slr2 = var2->GetMutable(); auto* tensor2 = slr2->mutable_value(); From be8cae4b36352a38962518e2c56a68eb5ba99e74 Mon Sep 17 00:00:00 2001 From: gongweibao Date: Tue, 20 Mar 2018 04:02:30 +0000 Subject: [PATCH 5/5] clean up --- paddle/fluid/operators/detail/test_serde.cc | 2 -- 1 file changed, 2 deletions(-) diff --git a/paddle/fluid/operators/detail/test_serde.cc b/paddle/fluid/operators/detail/test_serde.cc index 00e23ab0772ab..3e56cbb07f74d 100644 --- a/paddle/fluid/operators/detail/test_serde.cc +++ b/paddle/fluid/operators/detail/test_serde.cc @@ -99,8 +99,6 @@ void RunTestLodTensor(platform::Place place, int from_type = 0) { var2); } - // resp.GetVar(); - auto tensor2 = var2->Get(); float* tensor_data2 = nullptr; framework::Tensor tmp_tensor;