Skip to content

Commit

Permalink
Merge pull request #25 from qingshui/paddlebox
Browse files Browse the repository at this point in the history
add ssd cache mode
  • Loading branch information
qingshui authored Jan 26, 2022
2 parents 4da95e5 + a43c1ab commit 6ba0254
Show file tree
Hide file tree
Showing 15 changed files with 740 additions and 142 deletions.
338 changes: 274 additions & 64 deletions paddle/fluid/framework/data_feed.cc

Large diffs are not rendered by default.

62 changes: 45 additions & 17 deletions paddle/fluid/framework/data_feed.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#pragma once
#if defined _WIN32 || defined __APPLE__
#else
Expand Down Expand Up @@ -229,6 +228,7 @@ class DataFeed {
}
virtual const paddle::platform::Place& GetPlace() const { return place_; }
virtual void SetSampleRate(float r) { sample_rate_ = r; }
virtual void SetLoadArchiveFile(bool archive) { is_archive_file_ = archive; }

protected:
// The following three functions are used to check if it is executed in this
Expand Down Expand Up @@ -289,6 +289,7 @@ class DataFeed {
// The input type of pipe reader, 0 for one sample, 1 for one batch
int input_type_;
float sample_rate_ = 1.0f;
bool is_archive_file_ = false;
};

// PrivateQueueDataFeed is the base virtual class for ohther DataFeeds.
Expand Down Expand Up @@ -838,6 +839,11 @@ struct SlotRecordObject {
slot_uint64_feasigns_.clear(shrink);
slot_float_feasigns_.clear(shrink);
}
void debug(void) {
VLOG(0) << "ins:" << ins_id_
<< ", uint64:" << slot_uint64_feasigns_.slot_values.size()
<< ", float:" << slot_float_feasigns_.slot_values.size();
}
};
using SlotRecord = SlotRecordObject*;

Expand Down Expand Up @@ -1049,6 +1055,11 @@ inline SlotObjPool& SlotRecordPool() {
static SlotObjPool pool;
return pool;
}
// down disk pool
inline SlotObjPool& SlotRecordDownPool() {
static SlotObjPool pool;
return pool;
}

using FeasignValues = SlotValues<uint64_t>;

Expand Down Expand Up @@ -1437,13 +1448,13 @@ class MiniBatchGpuPack {
// tensor gpu memory reused
void resize_tensor(void) {
if (used_float_num_ > 0) {
int float_total_len = buf_.h_float_lens.back();
int float_total_len = buf_.h_float_lens.back() + used_float_num_;
if (float_total_len > 0) {
float_tensor_.mutable_data<float>({float_total_len, 1}, this->place_);
}
}
if (used_uint64_num_ > 0) {
int uint64_total_len = buf_.h_uint64_lens.back();
int uint64_total_len = buf_.h_uint64_lens.back() + used_uint64_num_;
if (uint64_total_len > 0) {
uint64_tensor_.mutable_data<int64_t>({uint64_total_len, 1},
this->place_);
Expand All @@ -1458,18 +1469,14 @@ class MiniBatchGpuPack {
HostBuffer<size_t>& offsets(void) { return offsets_; }
HostBuffer<void*>& h_tensor_ptrs(void) { return h_tensor_ptrs_; }

void* gpu_slot_offsets(void) { return gpu_slot_offsets_->ptr(); }
size_t* gpu_slot_offsets(void) {
return reinterpret_cast<size_t*>(gpu_slot_offsets_.data<int64_t>());
}

void* slot_buf_ptr(void) { return slot_buf_ptr_->ptr(); }

void resize_gpu_slot_offsets(const size_t slot_total_bytes) {
if (gpu_slot_offsets_ == nullptr) {
gpu_slot_offsets_ = memory::AllocShared(place_, slot_total_bytes);
} else if (gpu_slot_offsets_->size() < slot_total_bytes) {
auto buf = memory::AllocShared(place_, slot_total_bytes);
gpu_slot_offsets_.swap(buf);
buf = nullptr;
}
void resize_gpu_slot_offsets(const int64_t slot_total_num) {
gpu_slot_offsets_.mutable_data<int64_t>({slot_total_num, 1}, this->place_);
}
const std::string& get_lineid(int idx) {
if (enable_pv_) {
Expand Down Expand Up @@ -1531,9 +1538,8 @@ class MiniBatchGpuPack {
// batch
HostBuffer<size_t> offsets_;
HostBuffer<void*> h_tensor_ptrs_;

std::shared_ptr<paddle::memory::allocation::Allocation> gpu_slot_offsets_ =
nullptr;
// slot offset
LoDTensor gpu_slot_offsets_;
std::shared_ptr<paddle::memory::allocation::Allocation> slot_buf_ptr_ =
nullptr;
// pcoc
Expand Down Expand Up @@ -1584,7 +1590,25 @@ inline MiniBatchGpuPackMgr& BatchGpuPackMgr() {
return mgr;
}
#endif
/**
* @Brief binary archive file
*/
class BinaryArchiveWriter {
public:
BinaryArchiveWriter();
~BinaryArchiveWriter();
bool open(const std::string& path);
bool write(const SlotRecord& rec);
void close(void);

private:
std::mutex mutex_;
int fd_;
char* buff_ = nullptr;
int woffset_ = 0;
int capacity_ = 0;
char* head_ = nullptr;
};
class SlotPaddleBoxDataFeed : public DataFeed {
public:
SlotPaddleBoxDataFeed() { finish_start_ = false; }
Expand Down Expand Up @@ -1648,6 +1672,7 @@ class SlotPaddleBoxDataFeed : public DataFeed {
bool EnablePvMerge(void);
int GetPackInstance(SlotRecord** ins);
int GetPackPvInstance(SlotPvInstance** pv_ins);
void SetSlotRecordPool(SlotObjPool* pool) { slot_pool_ = pool; }

public:
virtual void Init(const DataFeedDesc& data_feed_desc);
Expand All @@ -1673,6 +1698,8 @@ class SlotPaddleBoxDataFeed : public DataFeed {
virtual void LoadIntoMemoryByLine(void);
// split all file
virtual void LoadIntoMemoryByFile(void);
// load local archive file
virtual void LoadIntoMemoryByArchive(void);

private:
#if defined(PADDLE_WITH_CUDA) && defined(_LINUX)
Expand Down Expand Up @@ -1734,6 +1761,7 @@ class SlotPaddleBoxDataFeed : public DataFeed {
platform::Timer data_timer_;
platform::Timer trans_timer_;
platform::Timer copy_timer_;
SlotObjPool* slot_pool_ = nullptr;
};

class SlotPaddleBoxDataFeedWithGpuReplicaCache : public SlotPaddleBoxDataFeed {
Expand Down Expand Up @@ -1783,7 +1811,7 @@ class InputIndexDataFeed : public DataFeed {
template <class AR, class T>
paddle::framework::Archive<AR>& operator<<(paddle::framework::Archive<AR>& ar,
const SlotValues<T>& r) {
uint16_t value_len = static_cast<uint16_t>(r.slot_values.size());
uint32_t value_len = static_cast<uint32_t>(r.slot_values.size());
ar << value_len;
if (value_len > 0) {
ar.Write(&r.slot_values[0], value_len * sizeof(T));
Expand All @@ -1800,7 +1828,7 @@ paddle::framework::Archive<AR>& operator<<(paddle::framework::Archive<AR>& ar,
template <class AR, class T>
paddle::framework::Archive<AR>& operator>>(paddle::framework::Archive<AR>& ar,
SlotValues<T>& r) {
uint16_t value_len = 0;
uint32_t value_len = 0;
ar >> value_len;
if (value_len > 0) {
r.slot_values.resize(value_len);
Expand Down
Loading

0 comments on commit 6ba0254

Please sign in to comment.