Skip to content

Commit

Permalink
Merge pull request #52 from reyoung/custom_stack_trace_refines
Browse files Browse the repository at this point in the history
Custom stack trace refines
  • Loading branch information
emailweixu authored Sep 12, 2016
2 parents dcd87fd + 50c3dbf commit 3fc99a2
Show file tree
Hide file tree
Showing 8 changed files with 314 additions and 43 deletions.
1 change: 1 addition & 0 deletions paddle/gserver/gradientmachines/NeuralNetwork.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ void NeuralNetwork::getState(MachineState& machineState) {
}

void NeuralNetwork::backward(const UpdateCallback& callback) {
gLayerStackTrace.pop(""); // tell layer trace is during backward.
FOR_EACH_R(layer, layers_) {
REGISTER_TIMER_INFO("BackwardTimer", (*layer)->getName().c_str());
if ((*layer)->needGradient()) {
Expand Down
35 changes: 35 additions & 0 deletions paddle/utils/CustomStackTrace.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,44 @@ limitations under the License. */


#include "CustomStackTrace.h"
#include "CommandLineParser.h"
#include <iostream>

P_DEFINE_bool(layer_stack_error_only_current_thread,
true,
"Dump current thread or whole process layer stack when signal error "
"occurred. true means only dump current thread layer stack");

namespace paddle {

CustomStackTrace<std::string> gLayerStackTrace;

static std::mutex gLayerStackTraceMtx;
void installLayerStackTracer() {
logging::installFailureWriter([](const char* data, int sz) {
std::lock_guard<std::mutex> guard(gLayerStackTraceMtx);
if (!gLayerStackTrace.empty()) {
size_t curTid = -1UL;
std::hash<std::thread::id> hasher;
gLayerStackTrace.dump([&curTid, &hasher](std::thread::id tid,
bool* isForwarding,
const std::string& layerName) {
if (curTid != hasher(tid)) {
if (curTid != -1UL) {
std::cerr << std::endl;
}
curTid = hasher(tid);
std::cerr << "Thread [" << tid << "] ";
if (isForwarding) {
std::cerr << (*isForwarding ? "Forwarding ": "Backwarding ");
}
}
std::cerr << layerName << ", ";
}, FLAGS_layer_stack_error_only_current_thread);
std::cerr << std::endl;
}
std::cerr.write(data, sz);
});
}

} // namespace paddle
164 changes: 128 additions & 36 deletions paddle/utils/CustomStackTrace.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ limitations under the License. */
#pragma once

#include <stack>
#include <thread>
#include <unordered_map>
#include <functional>

#include "ThreadLocal.h"

Expand All @@ -29,71 +32,160 @@ namespace paddle {
* @code{.cpp}
*
* paddle::CustomStackTrace<std::string> stack;
* PASS_TEST=0;
* for (auto& layer : layers){
* stack.push(layer->getName());
* layer->forward(passType);
* layer->forward();
* }
* for (auto& layer : layers){
*
* stack.pop(""); // mark under pop stage.
*
* for (auto it = layers.rbegin(); it != layers.rend(); ++it){
* auto& layer = *it;
* layer->backward(passType);
* stack.pop(layer->getName());
* }
*
* if(passType == PASS_TEST) {
* stack.clear();
* }
* else {
* stack.dump([](const std::string& layername){
* LOG(INFO) << "LayerName: " << layername;
* })
* }
*
*
* @endcode
*/
template <typename T>
class CustomStackTrace{
public:
/**
* @brief Pop out an item from the top of the stack. For safety the item
* will be poped should equal to ip.
* @brief Pop out an item from the top of the stack if item == top.
* Else, just set status to popping.
*/
void pop(const T& ip) {
auto& p = *logstack_;
CHECK_EQ(ip, p.top());
p.pop();
void pop(const T& item) {
pushing() = false;
auto& s = this->stack();
if (item == s.top()) {
s.pop();
}
}

/**
* @brief Empty the stack by sequence from top to button.
* @param[in] callback A function deal with each item while dumping.
* It must have and only have a in parameter which is the stack item.
* @brief clear current thread stack.
*/
template <typename Callback>
void dump(Callback callback) {
auto& p = *logstack_;
while (!p.empty()) {
callback(p.top());
p.pop();
void clear() {
auto& s = stack();
while (!s.empty()) {
s.pop();
}
}

/**
* @brief Only empty the stack.
* @brief return true if all thread's stack is empty.
* @return true if empty
*/
void clear() {
dump([](const T& ip){});
bool empty() const {
std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::stack<T>& s = *p.second;
if (!s.empty()) {
return false;
}
}
return true;
}


/**
* @brief DumpCallback Type. It will be invoked many times by dump method.
*
* The first parameter is stack thread id.
* The second parameter is the last action of stack is push or not.
* The third parameter is the item in stack.
*/
typedef std::function<void(const std::thread::id& /*threadId*/,
bool* /*isPushing*/,
const T& /*item*/)> DumpCallback;

/**
* Dump all thread stack, and all stack will be cleared.
*/
void dump(const DumpCallback& callback, bool onlyCurrentThread = false) {
std::lock_guard<std::mutex> g(this->mtx_);
for (auto p : this->stackBuffers_) {
std::thread::id tid = p.first;
if (onlyCurrentThread && tid != std::this_thread::get_id()) {
continue;
}
std::stack<T>& s = *p.second;
bool* isPush = nullptr;
auto it = this->pushingBuffers_.find(tid);
if (it != this->pushingBuffers_.end()) {
isPush = it->second;
}

while (!s.empty()) {
callback(tid, isPush, s.top());
s.pop();
}
}
}

/**
* @brief Push item ip to the top of the stack.
* @brief Push item to current thread stack.
*/
void push(const T& ip) {
auto& p = *logstack_;
p.push(ip);
void push(const T& item) {
pushing() = true;
auto& p = this->stack();
p.push(item);
}

private:
ThreadLocalD<std::stack<T> > logstack_;
/**
* Get thread local attribute, and save them into a map (threadId => TYPE*)
*
* @tparam TYPE thread local attribute type.
* @param threadLocal Thread Local object.
* @param buffers a map from threadId to TYPE*
*/
template <typename TYPE>
inline TYPE& getThreadLocal(
ThreadLocal<TYPE>& threadLocal,
std::unordered_map<std::thread::id, TYPE*>& buffers) {
TYPE* retv = threadLocal.get(false);
if (retv) {
return *retv;
} else {
std::lock_guard<std::mutex> guard(this->mtx_);
retv = threadLocal.get();
auto id = std::this_thread::get_id();
buffers.insert({id, retv});
return *retv;
}
}

/**
* @brief Get thread local stack reference.
*/
std::stack<T>& stack() {
return this->getThreadLocal(this->logStack_,
this->stackBuffers_);
}

/**
* @brief Get thread local pushing flag.
*/
bool& pushing() {
return this->getThreadLocal(this->isPushing_,
this->pushingBuffers_);
}

private:
mutable std::mutex mtx_;

std::unordered_map<std::thread::id, std::stack<T>* > stackBuffers_;
std::unordered_map<std::thread::id, bool* > pushingBuffers_;
ThreadLocal<bool> isPushing_;
ThreadLocal<std::stack<T> > logStack_;
};

extern CustomStackTrace<std::string> gLayerStackTrace;

/**
* @brief Install a failure handler to print layer stack when error.
*/
extern void installLayerStackTracer();

} // namespace paddle
8 changes: 1 addition & 7 deletions paddle/utils/Util.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,7 @@ void runInitFunctions() {

void initMain(int argc, char** argv) {
initializeLogging(argc, argv);
logging::installFailureWriter([](const char* data, int sz) {
std::cerr << "Current Layer forward/backward stack is " << std::endl;
gLayerStackTrace.dump([](const std::string& layername){
std::cerr << "LayerName: " << layername << std::endl;
});
std::cerr.write(data, sz);
});
installLayerStackTracer();
std::string line;
for (int i = 0; i < argc; ++i) {
line += argv[i];
Expand Down
10 changes: 10 additions & 0 deletions paddle/utils/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,13 @@ add_simple_unittest(test_CommandLineParser)
add_simple_unittest(test_Logging)
add_simple_unittest(test_Thread)
add_simple_unittest(test_StringUtils)
add_simple_unittest(test_CustomStackTrace)

add_executable(
test_CustomStackTracePrint
test_CustomStackTracePrint.cpp
)
link_paddle_exe(test_CustomStackTracePrint)
add_test(NAME test_CustomStackTracePrint
COMMAND ${PROJ_ROOT}/paddle/utils/tests/test_CustomStackTracePrint.sh
WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR})
95 changes: 95 additions & 0 deletions paddle/utils/tests/test_CustomStackTrace.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* Copyright (c) 2016 Baidu, Inc. All Rights Reserve.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */

#include <gtest/gtest.h>
#include <chrono>

#include "paddle/utils/CustomStackTrace.h"
#include "paddle/utils/CommandLineParser.h"
#include "paddle/utils/Util.h"
#include "paddle/utils/Locks.h"

P_DEFINE_int32(test_thread_num, 10, "testing thread number");

void testNormalImpl(const std::function<void(
paddle::CustomStackTrace<std::string>&,
size_t, size_t,
paddle::ThreadBarrier&,
paddle::ThreadBarrier&)>& callback) {
paddle::CustomStackTrace<std::string> tracer;
paddle::ThreadBarrier doneBarrier(FLAGS_test_thread_num + 1);
paddle::ThreadBarrier startBarrier(FLAGS_test_thread_num + 1);
constexpr size_t countDown = 10;
constexpr size_t layerSize = 1000;
std::vector<std::unique_ptr<std::thread>> threads;
threads.reserve(FLAGS_test_thread_num);

for (int32_t i=0; i < FLAGS_test_thread_num; ++i) {
threads.emplace_back(new std::thread([&tracer, &countDown, &layerSize,
&startBarrier, &doneBarrier,
&callback]{
callback(tracer, countDown, layerSize, startBarrier, doneBarrier);
}));
}
size_t cntDown = countDown;
while (cntDown-- > 0) {
startBarrier.wait();
doneBarrier.wait();
ASSERT_TRUE(tracer.empty());
}

for (auto& thread : threads) {
thread->join();
}
}


TEST(CustomStackTrace, normalTrain) {
testNormalImpl([](paddle::CustomStackTrace<std::string>& tracer,
size_t countDown, size_t layerSize,
paddle::ThreadBarrier& start, paddle::ThreadBarrier& finish){
while (countDown-- > 0) {
start.wait();
for (size_t i=0; i < layerSize; ++i) {
tracer.push("layer_" + std::to_string(i));
}
tracer.pop("");
for (size_t i=0; i < layerSize; ++i) {
tracer.pop("layer_" + std::to_string(layerSize - 1 - i));
}
finish.wait();
}
});
}

TEST(CustomStackTrace, normalTest) {
testNormalImpl([] (paddle::CustomStackTrace<std::string>& tracer,
size_t countDown, size_t layerSize,
paddle::ThreadBarrier& start, paddle::ThreadBarrier& finish){
while (countDown-- > 0) {
start.wait();
for (size_t i=0; i < layerSize; ++i) {
tracer.push("layer_" + std::to_string(i));
}
tracer.clear(); // in forward test, tracer will clear after forward.
finish.wait();
}
});
}

int main(int argc, char** argv) {
testing::InitGoogleTest(&argc, argv);
paddle::initMain(argc, argv);
return RUN_ALL_TESTS();
}
Loading

0 comments on commit 3fc99a2

Please sign in to comment.