Skip to content

Commit

Permalink
Add compatibility mode with Device Client 1.9 and Greengrass Secure T…
Browse files Browse the repository at this point in the history
…unneling Component (#140)

* fix compatibility with Device Client 1.9
  • Loading branch information
RogerZhongAWS authored Dec 22, 2023
1 parent d3150e0 commit f63f8fd
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 16 deletions.
1 change: 1 addition & 0 deletions src/LocalproxyConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ namespace aws {
* The end point will store either source listening or destination service depends on the mode of local proxy.
*/
std::unordered_map<std::string, std::string> serviceId_to_endpoint_map;

/**
* A flag to judge if v2 local proxy needs to fallback to communicate using v1 local proxy message format.
* v1 local proxy format fallback will be enabled when a tunnel is opened with no or 1 service id.
Expand Down
82 changes: 66 additions & 16 deletions src/TcpAdapterProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,12 @@ namespace aws { namespace iot { namespace securedtunneling {
{
if (tac.serviceId_to_tcp_server_map.find(service_id) == tac.serviceId_to_tcp_server_map.end())
{
BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_server mapping for service_id: " << service_id;
return connection_ptr;
if (tac.serviceId_to_tcp_server_map.find(tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first) == tac.serviceId_to_tcp_server_map.end())
{
BOOST_LOG_SEV(log, debug) << "No serviceId_to_tcp_server mapping for service_id: " << service_id;
return connection_ptr;
}
service_id = tac.adapter_config.serviceId_to_endpoint_map.cbegin()->first;;
}
tcp_server::pointer server = tac.serviceId_to_tcp_server_map[service_id];
BOOST_LOG_SEV(log, trace) << "num active connections for service id " << service_id << ": " << server->connectionId_to_tcp_connection_map.size();
Expand Down Expand Up @@ -376,6 +380,7 @@ namespace aws { namespace iot { namespace securedtunneling {
BOOST_LOG_SEV(this->log, trace) << "Post-reset TCP drain complete. Closing TCP socket for service id " << service_id << " connection id " << connection_id;
BOOST_LOG_SEV(this->log, info) << "Disconnected from: " << connection_to_reset->socket().remote_endpoint();
connection_to_reset->socket_.close();
delete_tcp_socket(tac, service_id, connection_id);
*tcp_write_buffer_drain_complete = true;
if (*web_socket_write_buffer_drain_complete)
{
Expand Down Expand Up @@ -515,8 +520,9 @@ namespace aws { namespace iot { namespace securedtunneling {
tcp_connection::pointer socket_connection = get_tcp_connection(tac, service_id, connection_id);

// if simultaneous connections are not enabled, then send a stream reset
if (tac.adapter_config.is_v2_message_format)
if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format)
{
BOOST_LOG_SEV(log, info) << "simultaneous connections are not enabled, sending stream reset";
socket_connection->after_send_message = std::bind(&tcp_adapter_proxy::setup_tcp_socket, this, std::ref(tac), service_id);
tac.serviceId_to_control_message_handler_map[service_id] = std::bind(&tcp_adapter_proxy::ignore_message_and_stop, this, std::ref(tac), std::placeholders::_1);
async_send_stream_reset(tac, service_id, connection_id);
Expand Down Expand Up @@ -600,8 +606,16 @@ namespace aws { namespace iot { namespace securedtunneling {

BOOST_LOG_SEV(log, debug) << "Sending stream start, setting new stream ID to: " << new_stream_id << ", service id: " << service_id;

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

outgoing_message.set_type(Message_Type_STREAM_START);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(new_stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand Down Expand Up @@ -638,8 +652,16 @@ namespace aws { namespace iot { namespace securedtunneling {
}
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

outgoing_message.set_type(Message_Type_CONNECTION_START);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand All @@ -657,12 +679,20 @@ namespace aws { namespace iot { namespace securedtunneling {
return;
}

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now.
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];
outgoing_message.set_type(Message_Type_STREAM_RESET);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(0);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
outgoing_message.clear_payload();
async_send_message(tac, outgoing_message, service_id, connection_id);
Expand All @@ -677,10 +707,19 @@ namespace aws { namespace iot { namespace securedtunneling {
BOOST_LOG_SEV(log, warning) << "No stream id mapping found for service id " << service_id << " . Skip connection reset.";
return;
}

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// NOTE: serviceIds -> streamId mapping will be updated when send/receive stream start, no action needed now.
std::int32_t stream_id = tac.serviceId_to_streamId_map[service_id];
outgoing_message.set_type(Message_Type_CONNECTION_RESET);
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(stream_id);
outgoing_message.set_connectionid(connection_id);
outgoing_message.set_ignorable(false);
Expand Down Expand Up @@ -1073,7 +1112,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
string service_id = message.serviceid();
Expand Down Expand Up @@ -1305,6 +1344,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// Remove empty string map and put new mapping
tac.adapter_config.serviceId_to_endpoint_map.erase("");
tac.adapter_config.serviceId_to_endpoint_map[service_id] = endpoint;

BOOST_LOG_SEV(log, info) << "Updated port mapping for v1 format: ";
for (auto m : tac.adapter_config.serviceId_to_endpoint_map)
{
Expand All @@ -1328,7 +1368,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
string service_id = message.serviceid();
Expand Down Expand Up @@ -1431,7 +1471,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
/**
Expand Down Expand Up @@ -1562,7 +1602,7 @@ namespace aws { namespace iot { namespace securedtunneling {
// backward compatibility: set connection id to 1 if first received a message with no connection id (id value will be 0)
if (!connection_id)
{
connection_id = 1;
BOOST_LOG_SEV(log, info) << "reverting to v2 message format";
tac.adapter_config.is_v2_message_format = true;
}
tcp_connection::pointer connection = get_tcp_connection(tac, service_id, connection_id);
Expand Down Expand Up @@ -1762,8 +1802,17 @@ namespace aws { namespace iot { namespace securedtunneling {
throw proxy_exception((boost::format("No streamId exists for the service Id %1%") % service_id).str());
}
BOOST_LOG_SEV(log, debug) << "Prepare to send data message: service id: " << service_id << " stream id: " << tac.serviceId_to_streamId_map[service_id] << " connection id: " << connection_id;

if (tac.adapter_config.is_v1_message_format)
{
outgoing_message.set_serviceid("");
}
else
{
outgoing_message.set_serviceid(service_id);
}

// Construct outgoing message
outgoing_message.set_serviceid(service_id);
outgoing_message.set_streamid(tac.serviceId_to_streamId_map[service_id]);
outgoing_message.set_connectionid(connection_id);
size_t const send_size = std::min<std::size_t>(GET_SETTING(settings, MESSAGE_MAX_PAYLOAD_SIZE),
Expand Down Expand Up @@ -1988,9 +2037,10 @@ namespace aws { namespace iot { namespace securedtunneling {
uint32_t new_connection_id = ++server->highest_connection_id;

// backward compatibility: set connection id to 1 if simultaneous connections is not enabled
if (tac.adapter_config.is_v2_message_format)
if (tac.adapter_config.is_v2_message_format || tac.adapter_config.is_v1_message_format)
{
new_connection_id = 1;
BOOST_LOG_SEV(log, info) << "Falling back to older protocol, setting new connection id to 0";
new_connection_id = 0;
}
BOOST_LOG_SEV(log, info) << "creating tcp connection id " << new_connection_id;

Expand All @@ -2009,7 +2059,7 @@ namespace aws { namespace iot { namespace securedtunneling {
server->connectionId_to_tcp_connection_map[new_connection_id]->socket() = std::move(new_socket);
BOOST_LOG_SEV(log, info) << "Accepted tcp connection on port " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().local_endpoint().port() << " from " << server->connectionId_to_tcp_connection_map[new_connection_id]->socket().remote_endpoint();

if (is_first_connection)
if (is_first_connection || tac.adapter_config.is_v1_message_format || tac.adapter_config.is_v2_message_format)
{
async_send_stream_start(tac, service_id, new_connection_id);
}
Expand Down

0 comments on commit f63f8fd

Please sign in to comment.