From c3d08d412f9fd960e8978233cc3e43a246bd1968 Mon Sep 17 00:00:00 2001 From: sfalexrog Date: Mon, 28 Jan 2019 06:05:12 +0300 Subject: [PATCH] Add a workaround for MultipartStream constant busy state (#83) * Add a workaround for MultipartStream constant busy state * Remove C++11 features --- include/web_video_server/multipart_stream.h | 9 ++++++-- src/multipart_stream.cpp | 25 ++++++++++++++++----- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/include/web_video_server/multipart_stream.h b/include/web_video_server/multipart_stream.h index 95826ff..476ee73 100644 --- a/include/web_video_server/multipart_stream.h +++ b/include/web_video_server/multipart_stream.h @@ -9,6 +9,11 @@ namespace web_video_server { +struct PendingFooter { + ros::Time timestamp; + boost::weak_ptr contents; +}; + class MultipartStream { public: MultipartStream(async_web_server_cpp::HttpConnectionPtr& connection, @@ -17,7 +22,7 @@ class MultipartStream { void sendInitialHeader(); void sendPartHeader(const ros::Time &time, const std::string& type, size_t payload_size); - void sendPartFooter(); + void sendPartFooter(const ros::Time &time); void sendPartAndClear(const ros::Time &time, const std::string& type, std::vector &data); void sendPart(const ros::Time &time, const std::string& type, const boost::asio::const_buffer &buffer, async_web_server_cpp::HttpConnection::ResourcePtr resource); @@ -29,7 +34,7 @@ class MultipartStream { const std::size_t max_queue_size_; async_web_server_cpp::HttpConnectionPtr connection_; std::string boundry_; - std::queue > pending_footers_; + std::queue pending_footers_; }; } diff --git a/src/multipart_stream.cpp b/src/multipart_stream.cpp index 8f999c4..5ce235d 100644 --- a/src/multipart_stream.cpp +++ b/src/multipart_stream.cpp @@ -32,10 +32,13 @@ void MultipartStream::sendPartHeader(const ros::Time &time, const std::string& t connection_->write(async_web_server_cpp::HttpReply::to_buffers(*headers), headers); } -void MultipartStream::sendPartFooter() { +void MultipartStream::sendPartFooter(const ros::Time &time) { boost::shared_ptr str(new std::string("\r\n--"+boundry_+"\r\n")); + PendingFooter pf; + pf.timestamp = time; + pf.contents = str; connection_->write(boost::asio::buffer(*str), str); - if (max_queue_size_ > 0) pending_footers_.push(str); + if (max_queue_size_ > 0) pending_footers_.push(pf); } void MultipartStream::sendPartAndClear(const ros::Time &time, const std::string& type, @@ -44,7 +47,7 @@ void MultipartStream::sendPartAndClear(const ros::Time &time, const std::string& { sendPartHeader(time, type, data.size()); connection_->write_and_clear(data); - sendPartFooter(); + sendPartFooter(time); } } @@ -55,14 +58,24 @@ void MultipartStream::sendPart(const ros::Time &time, const std::string& type, { sendPartHeader(time, type, boost::asio::buffer_size(buffer)); connection_->write(buffer, resource); - sendPartFooter(); + sendPartFooter(time); } } bool MultipartStream::isBusy() { - while (!pending_footers_.empty() && pending_footers_.front().expired()) + ros::Time currentTime = ros::Time::now(); + while (!pending_footers_.empty()) { - pending_footers_.pop(); + if (pending_footers_.front().contents.expired()) { + pending_footers_.pop(); + } else { + ros::Time footerTime = pending_footers_.front().timestamp; + if ((currentTime - footerTime).toSec() > 0.5) { + pending_footers_.pop(); + } else { + break; + } + } } return !(max_queue_size_ == 0 || pending_footers_.size() < max_queue_size_); }