Skip to content

Commit

Permalink
Merge pull request #1121 from knorth55/fix-light-weight-logger
Browse files Browse the repository at this point in the history
fix to work LightweightLogger
  • Loading branch information
k-okada authored Dec 18, 2019
2 parents f1fd45b + 9acac07 commit 37ba34c
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 40 deletions.
7 changes: 7 additions & 0 deletions .travis.rosinstall.indigo
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,10 @@
local-name: jsk_3rdparty/respeaker_ros
uri: https://github.com/tork-a/jsk_3rdparty-release/archive/release/kinetic/respeaker_ros/2.1.13-1.tar.gz
version: jsk_3rdparty-release-release-kinetic-respeaker_ros-2.1.13-1
# jsk_robot_startup LightweightLogger requires mongodb_store 0.4.5.
# indigo is already EOL and 0.4.5 is never released. (2019/08/20)
# more detailed information, see https://github.com/jsk-ros-pkg/jsk_robot/pull/1121
- git:
local-name: strands-project/mongodb_store
uri: https://github.com/strands-project/mongodb_store.git
version: 0.4.5
13 changes: 11 additions & 2 deletions jsk_robot_common/jsk_robot_startup/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ find_package(catkin REQUIRED COMPONENTS
urdf
)

find_package(Boost REQUIRED)
find_package(Boost REQUIRED COMPONENTS program_options thread system)
if ("${BOOST_MAJOR_VERSION}" VERSION_EQUAL "1" AND "${Boost_MINOR_VERSION}" VERSION_GREATER "46")
find_package(Boost REQUIRED COMPONENTS chrono)
endif()

find_package(OpenCV REQUIRED)

catkin_python_setup()
Expand Down Expand Up @@ -59,8 +63,13 @@ macro(add_lifelog_nodelet _cpp _cls _bin)
endmacro()

# jsk_topic_tools >= 2.2.27 only available from indigo
# mongodb_store >= 0.4.4 required in kinetic
# mongodb_store >= 0.5.2 required in melodic
if($ENV{ROS_DISTRO} STRGREATER "hydro")
add_lifelog_nodelet(src/lightweight_logger_nodelet.cpp "jsk_robot_lifelog/LightweightLogger" "lightweight_logger")
if (("${mongodb_store_VERSION}" VERSION_GREATER "0.4.3" AND "${mongodb_store_VERSION}" VERSION_LESS "0.5.0")
OR "${mongodb_store_VERSION}" VERSION_GREATER "0.5.1")
add_lifelog_nodelet(src/lightweight_logger_nodelet.cpp "jsk_robot_lifelog/LightweightLogger" "lightweight_logger")
endif()
endif()

add_library(jsk_robot_lifelog SHARED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,35 +41,96 @@
#ifndef LIGHTWEIGHT_LOGGER_H__
#define LIGHTWEIGHT_LOGGER_H__

#include <boost/circular_buffer.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <diagnostic_updater/diagnostic_updater.h>
#include <diagnostic_updater/publisher.h>
#include <topic_tools/shape_shifter.h>
#include <jsk_topic_tools/diagnostic_utils.h>
#include <jsk_topic_tools/stealth_relay.h>
#include <jsk_topic_tools/timered_diagnostic_updater.h>
#include <jsk_topic_tools/vital_checker.h>
#include <jsk_robot_startup/message_store_singleton.h>
#include <mongodb_store/message_store.h>

namespace jsk_robot_startup
{
namespace lifelog
{

template <typename T>
class blocking_circular_buffer {
/* Thread safe blocking circular buffer */
public:
blocking_circular_buffer() {}

void put(const T& value) {
boost::mutex::scoped_lock lock(mutex_);
const bool prev_empty = empty();
buffer_.push_back(value);
if (prev_empty) empty_wait_.notify_all();
}

bool get(T& value, double timeout=0.0) {
boost::posix_time::ptime start = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_duration elapsed;
while (ros::ok()) {
elapsed = boost::posix_time::microsec_clock::local_time() - start;
if (timeout > 0 &&
(double)(elapsed.total_milliseconds() / 1000.0) > timeout)
break;
boost::mutex::scoped_lock lock(mutex_);
if (empty()) empty_wait_.wait(lock);
if (!empty()) {
value = buffer_.front();
buffer_.pop_front();
return true;
}
}
return false;
}

void clear() {
buffer_.clear();
}

const bool empty() const {
return buffer_.empty();
}

const bool full() const {
return buffer_.full();
}

const int size() const {
return buffer_.size();
}

void set_capacity(int cap) {
buffer_.set_capacity(cap);
}
protected:
boost::circular_buffer<T> buffer_;
boost::condition_variable empty_wait_;
boost::mutex mutex_;
};

class LightweightLogger : public jsk_topic_tools::StealthRelay
{
protected:
virtual void onInit();
virtual ~LightweightLogger();
virtual void loadThread();
virtual void inputCallback(const ros::MessageEvent<topic_tools::ShapeShifter>& event);
virtual void loggerThread();
virtual void inputCallback(const ros::MessageEvent<topic_tools::ShapeShifter const>& event);
virtual void updateDiagnostic(diagnostic_updater::DiagnosticStatusWrapper &stat);

mongodb_store::MessageStoreProxy* msg_store_;
boost::thread deferred_load_thread_;
boost::shared_ptr<mongodb_store::MessageStoreProxy> msg_store_;
boost::thread logger_thread_;
bool wait_for_insert_;
bool vital_check_;
bool initialized_;
std::string input_topic_name_;
std::string db_name_, col_name_;
blocking_circular_buffer<ros::MessageEvent<topic_tools::ShapeShifter const> > buffer_;

// diagnostics
ros::Time init_time_;
Expand Down
157 changes: 124 additions & 33 deletions jsk_robot_common/jsk_robot_startup/src/lightweight_logger_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace jsk_robot_startup
initialized_ = false;
jsk_topic_tools::StealthRelay::onInit();

// settings for database
nh_->param<std::string>("/robot/database", db_name_, "jsk_robot_lifelog");
nh_->param<std::string>("/robot/name", col_name_, std::string());
if (col_name_.empty())
Expand All @@ -56,74 +57,164 @@ namespace jsk_robot_startup
return;
}

pnh_->param("wait_for_insert", wait_for_insert_, false);
// settings for buffering logged messages
int buffer_capacity;
pnh_->param("buffer_capacity", buffer_capacity, 100);
buffer_.set_capacity(buffer_capacity);

// settings for blocking/non-blocking message insertion
pnh_->param("wait_for_insert", wait_for_insert_, false);
pnh_->param("vital_check", vital_check_, true);

input_topic_name_ = pnh_->resolveName("input", true);

diagnostic_updater_.reset(
new jsk_topic_tools::TimeredDiagnosticUpdater(*pnh_, ros::Duration(1.0)));
diagnostic_updater_->setHardwareID("LightweightLogger");
diagnostic_updater_->add(
"LightweightLogger::" + input_topic_name_,
boost::bind(&LightweightLogger::updateDiagnostic, this, _1));

// settings for diagnostics
double vital_rate;
pnh_->param("vital_rate", vital_rate, 1.0);
vital_checker_.reset(
new jsk_topic_tools::VitalChecker(1.0 / vital_rate));

diagnostic_updater_->start();
bool enable_diagnostics;
pnh_->param<bool>("enable_diagnostics", enable_diagnostics, true);
if (enable_diagnostics) {
diagnostic_updater_.reset(
new jsk_topic_tools::TimeredDiagnosticUpdater(*pnh_, ros::Duration(1.0)));
diagnostic_updater_->setHardwareID("LightweightLogger");
diagnostic_updater_->add(
"LightweightLogger::" + input_topic_name_,
boost::bind(&LightweightLogger::updateDiagnostic, this, _1));
diagnostic_updater_->start();
}

inserted_count_ = 0;
insert_error_count_ = 0;
prev_insert_error_count_ = 0;
init_time_ = ros::Time::now();

deferred_load_thread_ = boost::thread(
boost::bind(&LightweightLogger::loadThread, this));
// start logger thread
logger_thread_ = boost::thread(
boost::bind(&LightweightLogger::loggerThread, this));
}

LightweightLogger::~LightweightLogger() {
if (!initialized_) {
NODELET_DEBUG_STREAM("Shutting down deferred load thread");
deferred_load_thread_.join();
NODELET_DEBUG_STREAM("deferred load thread stopped");
NODELET_DEBUG_STREAM("destructor called");

// stop logger thread
if (logger_thread_.joinable()) {
NODELET_DEBUG_STREAM("Shutting down logger thread");
logger_thread_.interrupt();
bool ok = logger_thread_.try_join_for(boost::chrono::seconds(5));
if (ok) {
NODELET_INFO_STREAM("successsfully stopped logger thread");
} else {
NODELET_WARN_STREAM("Timed out to join logger thread");
}
}

// deinit message store object
if (msg_store_) {
msg_store_.reset();
}
}

void LightweightLogger::loadThread()
void LightweightLogger::loggerThread()
{
NODELET_INFO_STREAM("Connecting to database " << db_name_ << "/" << col_name_ << "...");
msg_store_ = MessageStoreSingleton::getInstance(col_name_, db_name_);
NODELET_INFO_STREAM("Successfully connected to database!");
NODELET_DEBUG_STREAM("logger thread started");

// The message store object is initialized here, since the object waits for connection
// until the connection to the server is established.
msg_store_.reset(new mongodb_store::MessageStoreProxy(*nh_, col_name_, db_name_));
initialized_ = true;

// After message store object is initialized, this thread is re-used for
// lazy document insertion.
bool being_interrupted = false;
while (ros::ok()) {
try {
// check interruption
if (being_interrupted) {
if (!buffer_.empty()) {
NODELET_WARN_STREAM(
"The thread is interrupted through buffer is still not empty. "
"Continue to insert");
} else {
NODELET_DEBUG_STREAM("buffer is empty. interrupted.");
break;
}
}

// lazy document insertion
ros::MessageEvent<topic_tools::ShapeShifter const> event;
const double timeout = 0.5;
if (buffer_.get(event, timeout)) {
const std::string& publisher_name = event.getPublisherName();
const boost::shared_ptr<topic_tools::ShapeShifter const>& msg = event.getConstMessage();
jsk_topic_tools::StealthRelay::inputCallback(msg);
try
{
mongo::BSONObjBuilder meta;
meta.append("input_topic", input_topic_name_);
meta.append("published_by", publisher_name);
std::string doc_id = msg_store_->insert(*msg, meta.obj(), true);
NODELET_DEBUG_STREAM("Lazy inserted (" << input_topic_name_ << "): " << doc_id);
}
catch (...) {
NODELET_ERROR_STREAM("Failed to lazy insert");
if (being_interrupted) {
NODELET_WARN_STREAM("Force exiting");
break;
}
}
} else {
NODELET_DEBUG_STREAM("waiting for buffer...");
}
} catch (boost::thread_interrupted e) {
NODELET_DEBUG_STREAM("logger thread being interrupted");
being_interrupted = true;
}
}

NODELET_DEBUG_STREAM("logger thread ended");
}

void LightweightLogger::inputCallback(const ros::MessageEvent<topic_tools::ShapeShifter>& event)
void LightweightLogger::inputCallback(const ros::MessageEvent<topic_tools::ShapeShifter const>& event)
{
const std::string& publisher_name = event.getPublisherName();
const boost::shared_ptr<topic_tools::ShapeShifter const>& msg = event.getConstMessage();
jsk_topic_tools::StealthRelay::inputCallback(msg);

if (!initialized_) return;
vital_checker_->poke();

try
{
mongo::BSONObjBuilder meta;
meta.append("input_topic", input_topic_name_);
meta.append("published_by", publisher_name);
std::string doc_id = msg_store_->insert(*msg, meta.obj(), wait_for_insert_);
if (doc_id.empty())
NODELET_DEBUG_STREAM("Inserted (" << input_topic_name_ << ")");
else
NODELET_DEBUG_STREAM("Inserted (" << input_topic_name_ << "): " << doc_id);
bool on_the_fly = initialized_ && buffer_.empty();
if (!wait_for_insert_ && msg_store_->getNumInsertSubscribers() == 0) {
// subscriber for message_store/insert does not exists
on_the_fly = false;
}
catch (...) {
NODELET_ERROR_STREAM("Failed to insert to db");

if (on_the_fly) {
try
{
mongo::BSONObjBuilder meta;
meta.append("input_topic", input_topic_name_);
meta.append("published_by", publisher_name);
std::string doc_id = msg_store_->insert(*msg, meta.obj(), wait_for_insert_);
if (doc_id.empty())
NODELET_DEBUG_STREAM("Inserted (" << input_topic_name_ << ")");
else
NODELET_DEBUG_STREAM("Inserted (" << input_topic_name_ << "): " << doc_id);
}
catch (...) {
NODELET_ERROR_STREAM("Failed to insert to db");
}
} else {
if (!initialized_) {
NODELET_WARN_THROTTLE(1.0, "nodelet is not yet initialized");
}
if (buffer_.full()) {
NODELET_WARN_THROTTLE(1.0, "buffer is full. discarded old elements");
}
buffer_.put(event);
NODELET_DEBUG_STREAM("Put into buffer for lazy insertion");
}
}

Expand Down

0 comments on commit 37ba34c

Please sign in to comment.