Skip to content

Commit

Permalink
Merge pull request #9743 from JiayiFeng/modify_readers_to_fit_paralle…
Browse files Browse the repository at this point in the history
…l_executor

Modify readers to fit the parallel executor
  • Loading branch information
JiayiFeng authored Apr 11, 2018
2 parents 718e180 + 8c1eb86 commit 90084a2
Show file tree
Hide file tree
Showing 22 changed files with 390 additions and 197 deletions.
32 changes: 17 additions & 15 deletions paddle/fluid/framework/lod_tensor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,21 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include "paddle/fluid/framework/lod_tensor.h"
#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <iterator>

#include "paddle/fluid/framework/data_type.h"
#include "paddle/fluid/framework/framework.pb.h"
#include "paddle/fluid/framework/lod_tensor.h"

#include "paddle/fluid/memory/memcpy.h"
#include "paddle/fluid/memory/memory.h"

#include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h"

#include <stdint.h>
#include <string.h>
#include <algorithm>
#include <iterator>

namespace paddle {
namespace framework {

Expand Down Expand Up @@ -294,7 +294,7 @@ void DeserializeFromStream(std::istream &is, LoDTensor *tensor,
TensorFromStream(is, static_cast<Tensor *>(tensor), dev_ctx);
}

void WriteToRecordIO(recordio::Writer &writer,
void WriteToRecordIO(recordio::Writer *writer,
const std::vector<LoDTensor> &tensor,
const platform::DeviceContext &dev_ctx) {
std::stringstream buffer;
Expand All @@ -303,18 +303,20 @@ void WriteToRecordIO(recordio::Writer &writer,
for (auto &each : tensor) {
SerializeToStream(buffer, each, dev_ctx);
}
writer.Write(buffer.str());
writer->Write(buffer.str());
}

std::vector<LoDTensor> ReadFromRecordIO(
recordio::Scanner &scanner, const platform::DeviceContext &dev_ctx) {
std::istringstream sin(scanner.Next());
uint32_t sz;
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
recordio::Scanner *scanner, const platform::DeviceContext &dev_ctx) {
std::vector<LoDTensor> result;
result.resize(sz);
for (uint32_t i = 0; i < sz; ++i) {
DeserializeFromStream(sin, &result[i], dev_ctx);
if (scanner->HasNext()) {
std::istringstream sin(scanner->Next());
uint32_t sz;
sin.read(reinterpret_cast<char *>(&sz), sizeof(uint32_t));
result.resize(sz);
for (uint32_t i = 0; i < sz; ++i) {
DeserializeFromStream(sin, &result[i], dev_ctx);
}
}
return result;
}
Expand Down
7 changes: 5 additions & 2 deletions paddle/fluid/framework/lod_tensor.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ limitations under the License. */
#pragma once

#include <memory>
#include <string>
#include <utility>
#include <vector>
#ifdef PADDLE_WITH_CUDA
#include <thrust/device_vector.h>
#include <thrust/host_vector.h>
Expand Down Expand Up @@ -216,12 +219,12 @@ void SerializeToStream(std::ostream& os, const LoDTensor& tensor,
void DeserializeFromStream(std::istream& is, LoDTensor* tensor,
const platform::DeviceContext& dev_ctx);

extern void WriteToRecordIO(recordio::Writer& writer,
extern void WriteToRecordIO(recordio::Writer* writer,
const std::vector<LoDTensor>& tensor,
const platform::DeviceContext& dev_ctx);

extern std::vector<LoDTensor> ReadFromRecordIO(
recordio::Scanner& scanner, const platform::DeviceContext& dev_ctx);
recordio::Scanner* scanner, const platform::DeviceContext& dev_ctx);

} // namespace framework
} // namespace paddle
18 changes: 9 additions & 9 deletions paddle/fluid/framework/lod_tensor_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "paddle/fluid/framework/lod_tensor.h"

#include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h"

#include <glog/logging.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <memory>
#include <vector>

#include "paddle/fluid/framework/lod_tensor.h"

#include "paddle/fluid/recordio/scanner.h"
#include "paddle/fluid/recordio/writer.h"

namespace paddle {
namespace framework {

Expand Down Expand Up @@ -240,8 +240,8 @@ TEST(LoDTensor, RecordIO) {
*platform::DeviceContextPool::Instance().Get(platform::CPUPlace());
{
recordio::Writer writer(stream, recordio::Compressor::kSnappy);
WriteToRecordIO(writer, {tensor, tensor}, ctx);
WriteToRecordIO(writer, {tensor, tensor}, ctx);
WriteToRecordIO(&writer, {tensor, tensor}, ctx);
WriteToRecordIO(&writer, {tensor, tensor}, ctx);
writer.Flush();
}

Expand All @@ -254,11 +254,11 @@ TEST(LoDTensor, RecordIO) {
{
std::unique_ptr<std::istream> stream_ptr(stream);
recordio::Scanner scanner(std::move(stream_ptr));
auto tensors = ReadFromRecordIO(scanner, ctx);
auto tensors = ReadFromRecordIO(&scanner, ctx);
ASSERT_EQ(tensors.size(), 2);
assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]);
tensors = ReadFromRecordIO(scanner, ctx);
tensors = ReadFromRecordIO(&scanner, ctx);
ASSERT_EQ(tensors.size(), 2);
assert_tensor_ok(tensors[0]);
assert_tensor_ok(tensors[1]);
Expand Down
4 changes: 1 addition & 3 deletions paddle/fluid/framework/parallel_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,12 @@ void ParallelExecutor::BCastParamsToGPUs(

for (auto &var : vars) {
auto *main_var = main_scope->FindVar(var);
if (!main_var->IsType<LoDTensor>()) {
if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
continue;
}

auto &main_tensor = main_var->Get<LoDTensor>();

auto &dims = main_tensor.dims();

if (paddle::platform::is_gpu_place(main_tensor.place())) {
size_t numel = main_tensor.numel();
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
Expand Down
4 changes: 3 additions & 1 deletion paddle/fluid/framework/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ FileReader::FileReader(const std::vector<DDim> &dims) : dims_(dims) {}

void FileReader::ReadNext(std::vector<LoDTensor> *out) {
ReadNextImpl(out);
PADDLE_ENFORCE_EQ(out->size(), dims_.size());
if (out->empty()) {
return;
}
for (size_t i = 0; i < dims_.size(); ++i) {
auto &actual = out->at(i).dims();
auto &expect = dims_[i];
Expand Down
13 changes: 3 additions & 10 deletions paddle/fluid/framework/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,13 @@

#pragma once

#include <memory>
#include <vector>

#include "paddle/fluid/framework/ddim.h"
#include "paddle/fluid/framework/lod_tensor_array.h"
#include "paddle/fluid/platform/place.h"

#include <memory>
#include <thread>
#include <vector>

namespace paddle {
namespace framework {

Expand All @@ -31,8 +30,6 @@ class ReaderBase {

virtual void ReInit() = 0;

virtual bool HasNext() const = 0;

virtual ~ReaderBase();
};

Expand All @@ -44,8 +41,6 @@ class DecoratedReader : public ReaderBase {

void ReInit() override { reader_->ReInit(); }

bool HasNext() const override { return reader_->HasNext(); }

protected:
ReaderBase* reader_;
};
Expand Down Expand Up @@ -80,8 +75,6 @@ class ReaderHolder {
reader_->ReInit();
}

bool HasNext() const { return reader_->HasNext(); }

private:
std::unique_ptr<ReaderBase> reader_;
};
Expand Down
8 changes: 1 addition & 7 deletions paddle/fluid/operators/read_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,7 @@ class ReadOp : public framework::OperatorBase {
std::vector<std::string> out_arg_names = Outputs("Out");
std::vector<framework::LoDTensor> ins;
reader->ReadNext(&ins);
if (ins.empty()) {
reader->ReInit();
reader->ReadNext(&ins);
PADDLE_ENFORCE(
!ins.empty(),
"Reader can not read the next data even it has been re-initialized.");
}
PADDLE_ENFORCE(!ins.empty(), "There is no next data.");
PADDLE_ENFORCE_EQ(ins.size(), out_arg_names.size());
for (size_t i = 0; i < ins.size(); ++i) {
auto* out =
Expand Down
1 change: 1 addition & 0 deletions paddle/fluid/operators/reader/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ reader_library(create_batch_reader_op SRCS create_batch_reader_op.cc)
reader_library(create_recordio_file_reader_op SRCS create_recordio_file_reader_op.cc)
reader_library(create_double_buffer_reader_op SRCS create_double_buffer_reader_op.cc)
reader_library(create_multi_pass_reader_op SRCS create_multi_pass_reader_op.cc)
reader_library(create_threaded_reader_op SRCS create_threaded_reader_op.cc)
# Export local libraries to parent
set(READER_LIBRARY ${LOCAL_READER_LIBS} PARENT_SCOPE)
47 changes: 27 additions & 20 deletions paddle/fluid/operators/reader/create_double_buffer_reader_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,13 +63,14 @@ class DoubleBufferReader : public framework::DecoratedReader {
StartPrefetcher();
}

bool HasNext() const override;
void ReadNext(std::vector<framework::LoDTensor>* out) override;
void ReInit() override;

~DoubleBufferReader() { EndPrefetcher(); }

private:
bool HasNext() const;

void StartPrefetcher() {
channel_ = framework::MakeChannel<Item>(kChannelSize);
prefetcher_ = std::thread([this] { PrefetchThreadFunc(); });
Expand Down Expand Up @@ -109,7 +110,9 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {

auto place_str = Attr<std::string>("place");
platform::Place place;
if (place_str == "CPU") {
if (place_str == "AUTO") {
place = dev_place;
} else if (place_str == "CPU") {
place = platform::CPUPlace();
} else {
std::istringstream sin(place_str);
Expand Down Expand Up @@ -140,28 +143,22 @@ class CreateDoubleBufferReaderOpMaker : public DecoratedReaderMakerBase {
enum_range.insert(string::Sprintf("CUDA:%d", i));
}
enum_range.insert("CPU");
AddAttr<std::string>("place", "The double buffer place, default is CPU")
.SetDefault("CPU")
enum_range.insert("AUTO");
AddAttr<std::string>("place", "The double buffer place")
.SetDefault("AUTO")
.InEnum({enum_range});
}
};

bool DoubleBufferReader::HasNext() const {
while (!channel_->IsClosed() && !channel_->CanReceive()) {
}
return channel_->CanReceive();
}

void DoubleBufferReader::ReadNext(std::vector<framework::LoDTensor>* out) {
if (!HasNext()) {
PADDLE_THROW("There is no next data!");
}

Item batch;
channel_->Receive(&batch);
*out = batch.payloads_;
if (batch.ctx_) {
batch.ctx_->Wait();
out->clear();
if (HasNext()) {
Item batch;
channel_->Receive(&batch);
*out = batch.payloads_;
if (batch.ctx_) {
batch.ctx_->Wait();
}
}
}

Expand All @@ -171,16 +168,26 @@ void DoubleBufferReader::ReInit() {
StartPrefetcher();
}

bool DoubleBufferReader::HasNext() const {
while (!channel_->IsClosed() && !channel_->CanReceive()) {
}
return channel_->CanReceive();
}

void DoubleBufferReader::PrefetchThreadFunc() {
VLOG(5) << "A new prefetch thread starts.";
std::vector<std::vector<framework::LoDTensor>> cpu_tensor_cache(kCacheSize);
std::vector<std::vector<framework::LoDTensor>> gpu_tensor_cache(kCacheSize);
size_t cached_tensor_id = 0;

while (reader_->HasNext()) {
while (true) {
Item batch;
auto& cpu_batch = cpu_tensor_cache[cached_tensor_id];
reader_->ReadNext(&cpu_batch);
if (cpu_batch.empty()) {
// The underlying reader have no next data.
break;
}
if (platform::is_gpu_place(place_)) {
auto& gpu_batch = gpu_tensor_cache[cached_tensor_id];
auto* gpu_ctx = ctxs_[cached_tensor_id].get();
Expand Down
16 changes: 3 additions & 13 deletions paddle/fluid/operators/reader/create_multi_pass_reader_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,12 @@ class MultiPassReader : public framework::DecoratedReader {
: DecoratedReader(reader), pass_num_(pass_num), pass_count_(0) {}

void ReadNext(std::vector<framework::LoDTensor>* out) override {
if (!HasNext()) {
PADDLE_THROW("There is no next data!");
}
reader_->ReadNext(out);
}

bool HasNext() const override {
if (reader_->HasNext()) {
return true;
} else {
if (out->empty()) {
++pass_count_;
if (pass_count_ >= pass_num_) {
return false;
} else {
if (pass_count_ < pass_num_) {
reader_->ReInit();
return true;
reader_->ReadNext(out);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,6 @@ class RandomDataGenerator : public framework::ReaderBase {

void ReInit() override { return; }

bool HasNext() const override { return true; }

private:
float min_;
float max_;
Expand All @@ -74,7 +72,7 @@ class CreateRandomDataGeneratorOp : public framework::OperatorBase {
const auto& ranks = Attr<std::vector<int>>("ranks");
PADDLE_ENFORCE(!shape_concat.empty() && !ranks.empty());
PADDLE_ENFORCE_EQ(std::accumulate(ranks.begin(), ranks.end(), 0),
int(shape_concat.size()),
static_cast<int>(shape_concat.size()),
"The accumulate of all ranks should be equal to the "
"shape concat's length.");
std::vector<framework::DDim> shapes = RestoreShapes(shape_concat, ranks);
Expand Down
Loading

0 comments on commit 90084a2

Please sign in to comment.