Skip to content

Commit

Permalink
Adds ability to get a file from a running Gazebo instance (#164)
Browse files Browse the repository at this point in the history
* Updates

Signed-off-by: Nate Koenig <nate@openrobotics.org>

* Update documentation

Signed-off-by: Nate Koenig <nate@openrobotics.org>

* Added seperate asset function

Signed-off-by: Nate Koenig <nate@openrobotics.org>

* Fix documentation

Signed-off-by: Nate Koenig <nate@openrobotics.org>

* Fix documentation

Signed-off-by: Nate Koenig <nate@openrobotics.org>

Co-authored-by: Nate Koenig <nate@openrobotics.org>
  • Loading branch information
nkoenig and Nate Koenig authored May 31, 2022
1 parent 8c78b6c commit 0005cc2
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 20 deletions.
97 changes: 79 additions & 18 deletions plugins/websocket_server/WebsocketServer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -286,26 +286,28 @@ int rootCallback(struct lws *_wsi,

// Handle incoming messages
case LWS_CALLBACK_RECEIVE:
igndbg << "LWS_CALLBACK_RECEIVE\n";

// Prevent too many connections.
if (self->maxConnections >= 0 &&
self->connections.size()+1 > self->maxConnections)
{
ignerr << "Skipping new connection, limit of "
<< self->maxConnections << " has been reached\n";
igndbg << "LWS_CALLBACK_RECEIVE\n";

// Prevent too many connections.
if (self->maxConnections >= 0 &&
self->connections.size()+1 > self->maxConnections)
{
ignerr << "Skipping new connection, limit of "
<< self->maxConnections << " has been reached\n";

// This will return an error code of 1008 with a reason of
// "max_connections".
std::string reason = "max_connections";
lws_close_reason(_wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION,
reinterpret_cast<unsigned char *>(reason.data()), reason.size());
// This will return an error code of 1008 with a reason of
// "max_connections".
std::string reason = "max_connections";
lws_close_reason(_wsi, LWS_CLOSE_STATUS_POLICY_VIOLATION,
reinterpret_cast<unsigned char *>(reason.data()), reason.size());

// Return non-zero to close the connection.
return -1;
// Return non-zero to close the connection.
return -1;
}
self->OnMessage(fd, std::string((const char *)_in).substr(0, _len));
break;
}
self->OnMessage(fd, std::string((const char *)_in));
break;

default:
// Do nothing on default.
Expand Down Expand Up @@ -659,7 +661,7 @@ void WebsocketServer::OnDisconnect(int _socketId)
}

//////////////////////////////////////////////////
void WebsocketServer::OnMessage(int _socketId, const std::string &_msg)
void WebsocketServer::OnMessage(int _socketId, const std::string _msg)
{
// Skip invalid sockets
if (this->connections.find(_socketId) == this->connections.end())
Expand All @@ -673,7 +675,7 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg)
// Count the number of commas to handle a frame like "sub,,,"
std::count(_msg.begin(), _msg.end(), ',') != 3)
{
ignerr << "Received an invalid frame with " << frameParts.size()
ignerr << "Received an invalid frame[" << _msg << "] with " << frameParts.size()
<< "components when 4 is expected.\n";
return;
}
Expand Down Expand Up @@ -992,6 +994,65 @@ void WebsocketServer::OnMessage(int _socketId, const std::string &_msg)
}
}
}
else if (frameParts[0] == "asset")
{
this->OnAsset(_socketId, frameParts);
}
}

//////////////////////////////////////////////////
void WebsocketServer::OnAsset(int _socketId,
const std::vector<std::string> &_frameParts)
{
if (_frameParts.size() <= 1)
{
ignerr << "Asset requested, but asset URI is missing\n";
return;
}

std::string assetUri = _frameParts[1];

std::string resolvedPath;

// Short circuit the case where the assetURI is already a valid path.
if (common::exists(assetUri))
{
resolvedPath = assetUri;
}
else
{
ignition::msgs::StringMsg req, rep;
req.set_data(assetUri);
bool result;
unsigned int timeout = 2000;

// Request the file path from Gazebo
bool executed = this->node.Request("/gazebo/resource_paths/resolve",
req, timeout, rep, result);
if (executed && result)
resolvedPath = rep.data();
}

if (!resolvedPath.empty())
{
// Read the file
std::ifstream infile(resolvedPath, std::ios_base::binary);
std::string fileBuffer = std::string(
std::istreambuf_iterator<char>(infile),
std::istreambuf_iterator<char>());

// Store the file in a protobuf message
ignition::msgs::Bytes bytes;
bytes.set_data(fileBuffer);

// Construct the response message
std::string data = BUILD_MSG(this->operations[ASSET], assetUri,
std::string("ignition.msgs.Bytes"), bytes.SerializeAsString());

// Queue the message for delivery.
this->QueueMessage(this->connections[_socketId].get(),
data.c_str(), data.length());
}
}

//////////////////////////////////////////////////
Expand Down
19 changes: 17 additions & 2 deletions plugins/websocket_server/WebsocketServer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ namespace ignition
/// 6. "particle_emitters": Get the list of particle emitters.
/// definitions.
/// 7. "unsub": Unsubscribe from the topic in the `topic_name` component
/// 8. "asset": Get a file as a byte array from a running Gazebo
/// server. Set the payload to the file URI that is
/// being requested.
///
/// The `topic_name` component is mandatory for the "sub", "pub", and
/// "unsub" operations. If present, it must be the name of an Ignition
Expand Down Expand Up @@ -178,7 +181,10 @@ namespace ignition
/// \param[in] _socketId ID of the socket.
public: void OnDisconnect(int _socketId);

public: void OnMessage(int _socketId, const std::string &_msg);
/// \brief Handles incoming websocket messages
/// \param[in] _socketId Id of the socket associated with the message.
/// \param[in] _msg The incoming message.
public: void OnMessage(int _socketId, const std::string _msg);

public: void OnRequestMessage(int _socketId, const std::string &_msg);

Expand All @@ -195,6 +201,12 @@ namespace ignition
public: bool UpdateMsgTypeSubscriptionCount(const std::string &_topic,
int _socketId, bool _subscribe);

/// \brief Handles asset requests.
/// \param[in] _socketId Id of the socket associated with the message.
/// \param[in] _frameParts The request message in frame parts.
private: void OnAsset(int _socketId,
const std::vector<std::string> &_frameParts);

private: ignition::transport::Node node;

private: bool run = true;
Expand Down Expand Up @@ -289,13 +301,16 @@ namespace ignition

/// \brief Get the protobuf definitions.
PROTOS = 3,

/// \brief Get an asset as a byte array.
ASSET = 4,
};

/// \brief The set of valid operations, in string form. These values
/// can be sent in websocket message frames.
/// These valus must align with the `Operation` enum.
private: std::vector<std::string> operations{
"sub", "pub", "topics", "protos"};
"sub", "pub", "topics", "protos", "asset"};

/// \brief Store publish headers for topics. This is here to improve
/// performance. Keys are topic names and values are frame headers.
Expand Down

0 comments on commit 0005cc2

Please sign in to comment.