Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restream buffered frames with minimum publish rate #88

Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions include/web_video_server/image_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,19 @@ class ImageStreamer
ros::NodeHandle& nh);

virtual void start() = 0;
virtual ~ImageStreamer();

bool isInactive()
{
return inactive_;
}
;

/**
* Restreams the last received image frame if older than max_age.
*/
virtual void restreamFrame(double max_age) = 0;

std::string getTopic()
{
return topic_;
Expand All @@ -45,19 +51,24 @@ class ImageTransportImageStreamer : public ImageStreamer
public:
ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh);

virtual ~ImageTransportImageStreamer();
virtual void start();

protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time) = 0;

virtual void restreamFrame(double max_age);
virtual void initialize(const cv::Mat &);

image_transport::Subscriber image_sub_;
int output_width_;
int output_height_;
bool invert_;
std::string default_transport_;

ros::Time last_frame;
cv::Mat output_size_image;
boost::mutex send_mutex_;

private:
image_transport::ImageTransport it_;
bool initialized_;
Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/jpeg_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MjpegStreamer : public ImageTransportImageStreamer
public:
MjpegStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh);

~MjpegStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand All @@ -38,7 +38,7 @@ class JpegSnapshotStreamer : public ImageTransportImageStreamer
public:
JpegSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);

~JpegSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand Down
4 changes: 2 additions & 2 deletions include/web_video_server/png_streamers.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class PngStreamer : public ImageTransportImageStreamer
public:
PngStreamer(const async_web_server_cpp::HttpRequest &request, async_web_server_cpp::HttpConnectionPtr connection,
ros::NodeHandle& nh);

~PngStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand All @@ -38,7 +38,7 @@ class PngSnapshotStreamer : public ImageTransportImageStreamer
public:
PngSnapshotStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh);

~PngSnapshotStreamer();
protected:
virtual void sendImage(const cv::Mat &, const ros::Time &time);

Expand Down
2 changes: 1 addition & 1 deletion include/web_video_server/ros_compressed_streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class RosCompressedStreamer : public ImageStreamer

private:
void imageCallback(const sensor_msgs::CompressedImageConstPtr &msg);

virtual void restreamFrame(double max_age);
MultipartStream stream_;
ros::Subscriber image_sub_;
};
Expand Down
2 changes: 2 additions & 0 deletions include/web_video_server/web_video_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class WebVideoServer
async_web_server_cpp::HttpConnectionPtr connection, const char* begin, const char* end);

private:
void restreamFrames(double max_age);
void cleanup_inactive_streams();

ros::NodeHandle nh_;
Expand All @@ -59,6 +60,7 @@ class WebVideoServer
ros::Timer cleanup_timer_;
#endif
int ros_threads_;
double publish_rate_;
int port_;
std::string address_;
boost::shared_ptr<async_web_server_cpp::HttpServer> server_;
Expand Down
46 changes: 44 additions & 2 deletions src/image_streamer.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "web_video_server/image_streamer.h"
#include <cv_bridge/cv_bridge.h>
#include <iostream>

namespace web_video_server
{
Expand All @@ -11,6 +12,10 @@ ImageStreamer::ImageStreamer(const async_web_server_cpp::HttpRequest &request,
topic_ = request.get_query_param_value_or_default("topic", "");
}

ImageStreamer::~ImageStreamer()
{
}

ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_cpp::HttpRequest &request,
async_web_server_cpp::HttpConnectionPtr connection, ros::NodeHandle& nh) :
ImageStreamer(request, connection, nh), it_(nh), initialized_(false)
Expand All @@ -21,6 +26,10 @@ ImageTransportImageStreamer::ImageTransportImageStreamer(const async_web_server_
default_transport_ = request.get_query_param_value_or_default("default_transport", "raw");
}

ImageTransportImageStreamer::~ImageTransportImageStreamer()
{
}

void ImageTransportImageStreamer::start()
{
image_transport::TransportHints hints(default_transport_);
Expand All @@ -41,6 +50,37 @@ void ImageTransportImageStreamer::initialize(const cv::Mat &)
{
}

void ImageTransportImageStreamer::restreamFrame(double max_age)
{
if (inactive_ || !initialized_ )
return;
try {
if ( last_frame + ros::Duration(max_age) < ros::Time::now() ) {
boost::mutex::scoped_lock lock(send_mutex_);
sendImage(output_size_image, ros::Time::now() ); // don't update last_frame, it may remain an old value.
}
}
catch (boost::system::system_error &e)
{
// happens when client disconnects
ROS_DEBUG("system_error exception: %s", e.what());
inactive_ = true;
return;
}
catch (std::exception &e)
{
ROS_ERROR_THROTTLE(30, "exception: %s", e.what());
inactive_ = true;
return;
}
catch (...)
{
ROS_ERROR_THROTTLE(30, "exception");
inactive_ = true;
return;
}
}

void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr &msg)
{
if (inactive_)
Expand Down Expand Up @@ -84,7 +124,7 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr
cv::flip(img, img, true);
}

cv::Mat output_size_image;
boost::mutex::scoped_lock lock(send_mutex_); // protects output_size_image
if (output_width_ != input_width || output_height_ != input_height)
{
cv::Mat img_resized;
Expand All @@ -102,7 +142,9 @@ void ImageTransportImageStreamer::imageCallback(const sensor_msgs::ImageConstPtr
initialize(output_size_image);
initialized_ = true;
}
sendImage(output_size_image, msg->header.stamp);

last_frame = ros::Time::now();
sendImage(output_size_image, last_frame );

}
catch (cv_bridge::Exception &e)
Expand Down
32 changes: 25 additions & 7 deletions src/jpeg_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ MjpegStreamer::MjpegStreamer(const async_web_server_cpp::HttpRequest &request,
stream_.sendInitialHeader();
}

MjpegStreamer::~MjpegStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void MjpegStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand Down Expand Up @@ -48,6 +54,12 @@ JpegSnapshotStreamer::JpegSnapshotStreamer(const async_web_server_cpp::HttpReque
quality_ = request.get_query_param_value_or_default<int>("quality", 95);
}

JpegSnapshotStreamer::~JpegSnapshotStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand All @@ -59,13 +71,19 @@ void JpegSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)

char stamp[20];
sprintf(stamp, "%.06lf", time.toSec());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
"Server", "web_video_server").header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header(
"X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/jpeg").header(
"Access-Control-Allow-Origin", "*").header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size())).write(
connection_);
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
.header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, "
"max-age=0")
.header("X-Timestamp", stamp)
.header("Pragma", "no-cache")
.header("Content-type", "image/jpeg")
.header("Access-Control-Allow-Origin", "*")
.header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size()))
.write(connection_);
connection_->write_and_clear(encoded_buffer);
inactive_ = true;
}
Expand Down
32 changes: 25 additions & 7 deletions src/png_streamers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ PngStreamer::PngStreamer(const async_web_server_cpp::HttpRequest &request,
stream_.sendInitialHeader();
}

PngStreamer::~PngStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void PngStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand Down Expand Up @@ -48,6 +54,12 @@ PngSnapshotStreamer::PngSnapshotStreamer(const async_web_server_cpp::HttpRequest
quality_ = request.get_query_param_value_or_default<int>("quality", 3);
}

PngSnapshotStreamer::~PngSnapshotStreamer()
{
this->inactive_ = true;
boost::mutex::scoped_lock lock(send_mutex_); // protects sendImage.
}

void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)
{
std::vector<int> encode_params;
Expand All @@ -59,13 +71,19 @@ void PngSnapshotStreamer::sendImage(const cv::Mat &img, const ros::Time &time)

char stamp[20];
sprintf(stamp, "%.06lf", time.toSec());
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok).header("Connection", "close").header(
"Server", "web_video_server").header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, max-age=0").header(
"X-Timestamp", stamp).header("Pragma", "no-cache").header("Content-type", "image/png").header(
"Access-Control-Allow-Origin", "*").header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size())).write(
connection_);
async_web_server_cpp::HttpReply::builder(async_web_server_cpp::HttpReply::ok)
.header("Connection", "close")
.header("Server", "web_video_server")
.header("Cache-Control",
"no-cache, no-store, must-revalidate, pre-check=0, post-check=0, "
"max-age=0")
.header("X-Timestamp", stamp)
.header("Pragma", "no-cache")
.header("Content-type", "image/png")
.header("Access-Control-Allow-Origin", "*")
.header("Content-Length",
boost::lexical_cast<std::string>(encoded_buffer.size()))
.write(connection_);
connection_->write_and_clear(encoded_buffer);
inactive_ = true;
}
Expand Down
4 changes: 4 additions & 0 deletions src/ros_compressed_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ RosCompressedStreamer::RosCompressedStreamer(const async_web_server_cpp::HttpReq
stream_.sendInitialHeader();
}

void RosCompressedStreamer::restreamFrame(double max_age)
{
}
viktorku marked this conversation as resolved.
Show resolved Hide resolved

void RosCompressedStreamer::start() {
std::string compressed_topic = topic_ + "/compressed";
image_sub_ = nh_.subscribe(compressed_topic, 1, &RosCompressedStreamer::imageCallback, this);
Expand Down
32 changes: 29 additions & 3 deletions src/web_video_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ WebVideoServer::WebVideoServer(ros::NodeHandle &nh, ros::NodeHandle &private_nh)
private_nh.param("server_threads", server_threads, 1);

private_nh.param("ros_threads", ros_threads_, 2);
private_nh.param("publish_rate", publish_rate_, -1.0);

private_nh.param<std::string>("default_stream_type", __default_stream_type, "mjpeg");

Expand Down Expand Up @@ -100,11 +101,36 @@ void WebVideoServer::spin()
{
server_->run();
ROS_INFO_STREAM("Waiting For connections on " << address_ << ":" << port_);
ros::MultiThreadedSpinner spinner(ros_threads_);
spinner.spin();

ros::AsyncSpinner spinner(ros_threads_);
spinner.start();

if ( publish_rate_ > 0 ) {
ros::Rate r(publish_rate_);

while( ros::ok() ) {
this->restreamFrames( 1.0 / publish_rate_ );
r.sleep();
}
} else {
ros::waitForShutdown();
}

server_->stop();
}

void WebVideoServer::restreamFrames( double max_age )
{
boost::mutex::scoped_lock lock(subscriber_mutex_);

typedef std::vector<boost::shared_ptr<ImageStreamer> >::iterator itr_type;

for (itr_type itr = image_subscribers_.begin(); itr < image_subscribers_.end(); ++itr)
{
(*itr)->restreamFrame( max_age );
}
}

void WebVideoServer::cleanup_inactive_streams()
{
boost::mutex::scoped_lock lock(subscriber_mutex_, boost::try_to_lock);
Expand Down Expand Up @@ -189,7 +215,7 @@ bool WebVideoServer::handle_stream_viewer(const async_web_server_cpp::HttpReques
// Fallback for topics without corresponding compressed topics
if (type == std::string("ros_compressed"))
{

std::string compressed_topic_name = topic + "/compressed";
ros::master::V_TopicInfo topics;
ros::master::getTopics(topics);
Expand Down