Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/indigo-devel' into indigo-devel
Browse files Browse the repository at this point in the history
* upstream/indigo-devel:
  fix regression of PR ros#515
  Adding support for fixed-width floating-point and integer array values as per issue ros#400
  Fix exception at roslaunch startup if python is build without ipv6 support.
  Fix exception at roscore startup if python has ipv6 disabled.
  use fileno comparison only as an alternative and when available
  fix removal of QueuedConnection leading to wrong subscriber count (fix ros#526)
  fix comment (fix ros#529)
  unregister statistics publisher
  Fix: TCPROS header validation crash when `callerid` header is not set
  avoid storing subscriber reference in SubscriberStatisticsLogger
  only create SubscriberStatisticsLogger when enabled
  make param functions thread-safe
  Do not use Python when building for Android
  Do not use ifaddrs on Android as it is not natively supported
  • Loading branch information
Michael Jae-Yoon Chung committed Dec 10, 2014
2 parents 3672746 + bb13439 commit f9a2086
Show file tree
Hide file tree
Showing 11 changed files with 98 additions and 44 deletions.
2 changes: 1 addition & 1 deletion clients/roscpp/include/ros/param.h
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,6 @@ void param(const std::string& param_name, T& param_val, const T& default_val)

} // namespace param

} // namespace param
} // namespace ros

#endif // ROSCPP_PARAM_H
7 changes: 6 additions & 1 deletion clients/roscpp/src/libros/transport/transport.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,14 @@

#include "ros/transport/transport.h"
#include "ros/console.h"
#include <ifaddrs.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <netdb.h>

#if !defined(__ANDROID__)
#include <ifaddrs.h>
#endif

#ifndef NI_MAXHOST
#define NI_MAXHOST 1025
#endif
Expand Down Expand Up @@ -68,6 +71,7 @@ Transport::Transport()
gethostname(our_hostname, sizeof(our_hostname)-1);
allowed_hosts_.push_back(std::string(our_hostname));
allowed_hosts_.push_back("localhost");
#if !defined(__ANDROID__)
// for ipv4 loopback, we'll explicitly search for 127.* in isHostAllowed()
// now we need to iterate all local interfaces and add their addresses
// from the getifaddrs manpage: (maybe something similar for windows ?)
Expand Down Expand Up @@ -96,6 +100,7 @@ Transport::Transport()
}
allowed_hosts_.push_back(std::string(addr));
}
#endif
}

bool Transport::isHostAllowed(const std::string &host) const
Expand Down
32 changes: 20 additions & 12 deletions clients/rospy/src/rospy/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import socket
import struct
import sys
from threading import Lock
import time
import random
import yaml
Expand Down Expand Up @@ -339,6 +340,7 @@ def init_node(name, argv=None, anonymous=False, log_level=None, disable_rostime=

#_master_proxy is a MasterProxy wrapper
_master_proxy = None
_master_proxy_lock = Lock()

def get_master(env=os.environ):
"""
Expand All @@ -351,10 +353,12 @@ def get_master(env=os.environ):
@raise Exception: if server cannot be located or system cannot be
initialized
"""
global _master_proxy
if _master_proxy is not None:
return _master_proxy
_master_proxy = rospy.msproxy.MasterProxy(rosgraph.get_master_uri())
global _master_proxy, _master_proxy_lock
if _master_proxy is None:
with _master_proxy_lock:
if _master_proxy is None:
_master_proxy = rospy.msproxy.MasterProxy(
rosgraph.get_master_uri())
return _master_proxy

#########################################################
Expand Down Expand Up @@ -422,13 +426,17 @@ def wait_for_message(topic, topic_type, timeout=None):
# Param Server Access

_param_server = None
_param_server_lock = Lock()
def _init_param_server():
"""
Initialize parameter server singleton
"""
global _param_server
global _param_server, _param_server_lock
if _param_server is None:
_param_server = get_master() #in the future param server will be a service
with _param_server_lock:
if _param_server is None:
_param_server = get_master()
return _param_server_lock

# class and singleton to distinguish whether or not user has passed us a default value
class _Unspecified(object): pass
Expand All @@ -438,7 +446,7 @@ def get_param(param_name, default=_unspecified):
"""
Retrieve a parameter from the param server
NOTE: this method is not thread-safe.
NOTE: this method is thread-safe.
@param default: (optional) default value to return if key is not set
@type default: any
Expand All @@ -460,7 +468,7 @@ def get_param_names():
"""
Retrieve list of parameter names.
NOTE: this method is not thread-safe.
NOTE: this method is thread-safe.
@return: parameter names
@rtype: [str]
Expand All @@ -477,7 +485,7 @@ def set_param(param_name, param_value):
"""
Set a parameter on the param server
NOTE: this method is not thread-safe.
NOTE: this method is thread-safe.
@param param_name: parameter name
@type param_name: str
Expand All @@ -497,7 +505,7 @@ def search_param(param_name):
"""
Search for a parameter on the param server
NOTE: this method is not thread-safe.
NOTE: this method is thread-safe.
@param param_name: parameter name
@type param_name: str
Expand All @@ -512,7 +520,7 @@ def delete_param(param_name):
"""
Delete a parameter on the param server
NOTE: this method is not thread-safe.
NOTE: this method is thread-safe.
@param param_name: parameter name
@type param_name: str
Expand All @@ -526,7 +534,7 @@ def has_param(param_name):
"""
Test if parameter exists on the param server
NOTE: this method is not thread-safe.
NOTE: this method is thread-safe.
@param param_name: parameter name
@type param_name: str
Expand Down
36 changes: 20 additions & 16 deletions clients/rospy/src/rospy/impl/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,17 @@ class SubscriberStatisticsLogger():
this class basically just keeps a collection of ConnectionStatisticsLogger.
"""

@classmethod
def is_enabled(cls):
# disable statistics if node can't talk to parameter server
# which is the case in unit tests
try:
return rospy.get_param("/enable_statistics", False)
except Exception:
return False

def __init__(self, subscriber):
self.subscriber = subscriber
self.subscriber_name = subscriber.name
self.connections = dict()
self.read_parameters()

Expand All @@ -59,13 +68,6 @@ def read_parameters(self):
Fetch window parameters from parameter server
"""

# disable statistics if node can't talk to parameter server which is the case in unit tests
try:
self.enabled = rospy.get_param("/enable_statistics", False)
except:
self.enabled = False
return

# Range of window length, in seconds
self.min_elements = rospy.get_param("/statistics_window_min_elements", 10)
self.max_elements = rospy.get_param("/statistics_window_max_elements", 100)
Expand All @@ -76,9 +78,6 @@ def read_parameters(self):
self.max_window = rospy.get_param("/statistics_window_max_size", 64)
self.min_window = rospy.get_param("/statistics_window_min_size", 4)

def is_enable_statistics(self):
return self.enabled

def callback(self, msg, publisher, stat_bytes):
"""
This method is called for every message that has been received.
Expand All @@ -93,26 +92,28 @@ def callback(self, msg, publisher, stat_bytes):
instance.
"""

if not self.enabled:
return

# /clock is special, as it is subscribed very early
# also exclude /statistics to reduce noise.
if self.subscriber.name == "/clock" or self.subscriber.name == "/statistics":
if self.subscriber_name == "/clock" or self.subscriber_name == "/statistics":
return

try:
# create ConnectionStatisticsLogger for new connections
logger = self.connections.get(publisher)
if logger is None:
logger = ConnectionStatisticsLogger(self.subscriber.name, rospy.get_name(), publisher)
logger = ConnectionStatisticsLogger(self.subscriber_name, rospy.get_name(), publisher)
self.connections[publisher] = logger

# delegate stuff to that instance
logger.callback(self, msg, stat_bytes)
except Exception as e:
rospy.logerr("Unexpected error during statistics measurement: %s", str(e))

def shutdown(self):
for logger in self.connections.values():
logger.shutdown()
self.connections.clear()


class ConnectionStatisticsLogger():
"""
Expand Down Expand Up @@ -259,3 +260,6 @@ def callback(self, subscriber_statistics_logger, msg, stat_bytes):
if self.last_pub_time + self.pub_frequency < arrival_time:
self.last_pub_time = arrival_time
self.sendStatistics(subscriber_statistics_logger)

def shutdown(self):
self.pub.unregister()
5 changes: 3 additions & 2 deletions clients/rospy/src/rospy/impl/tcpros_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -581,9 +581,10 @@ def _validate_header(self, header):
for required in ['md5sum', 'type']:
if not required in header:
raise TransportInitError("header missing required field [%s]"%required)
self.md5sum = header['md5sum']
self.callerid_pub = header['callerid']
self.type = header['type']
self.md5sum = header['md5sum']
if 'callerid' in header:
self.callerid_pub = header['callerid']
if header.get('latching', '0') == '1':
self.is_latched = True

Expand Down
19 changes: 17 additions & 2 deletions clients/rospy/src/rospy/topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,8 +351,17 @@ def _remove_connection(self, connections, c):
c.close()
except:
pass
# while c might be a rospy.impl.tcpros_base.TCPROSTransport instance
# connections might only contain the rospy.impl.tcpros_pubsub.QueuedConnection proxy
# finding the "right" connection is more difficult then
if c in connections:
connections.remove(c)
# therefore additionally check for fileno equality if available
elif c.fileno():
matching_connections = [
conn for conn in connections if conn.fileno() == c.fileno()]
if len(matching_connections) == 1:
connections.remove(matching_connections[0])

def add_connection(self, c):
"""
Expand Down Expand Up @@ -560,14 +569,19 @@ def __init__(self, name, data_class):
self.queue_size = None
self.buff_size = DEFAULT_BUFF_SIZE
self.tcp_nodelay = False
self.statistics_logger = SubscriberStatisticsLogger(self);
self.statistics_logger = SubscriberStatisticsLogger(self) \
if SubscriberStatisticsLogger.is_enabled() \
else None

def close(self):
"""close I/O and release resources"""
_TopicImpl.close(self)
if self.callbacks:
del self.callbacks[:]
self.callbacks = None
if self.statistics_logger:
self.statistics_logger.shutdown()
self.statistics_logger = None

def set_tcp_nodelay(self, tcp_nodelay):
"""
Expand Down Expand Up @@ -701,7 +715,8 @@ def receive_callback(self, msgs, connection):
# save reference to avoid lock
callbacks = self.callbacks
for msg in msgs:
self.statistics_logger.callback(msg, connection.callerid_pub, connection.stat_bytes)
if self.statistics_logger:
self.statistics_logger.callback(msg, connection.callerid_pub, connection.stat_bytes)
for cb, cb_args in callbacks:
self._invoke_callback(msg, cb, cb_args)

Expand Down
5 changes: 4 additions & 1 deletion tools/rosgraph/src/rosgraph/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,10 @@ def is_local_address(hostname):
:returns True: if hostname maps to a local address, False otherwise. False conditions include invalid hostnames.
"""
try:
reverse_ips = [host[4][0] for host in socket.getaddrinfo(hostname, 0, 0, 0, socket.SOL_TCP)]
if use_ipv6():
reverse_ips = [host[4][0] for host in socket.getaddrinfo(hostname, 0, 0, 0, socket.SOL_TCP)]
else:
reverse_ips = [host[4][0] for host in socket.getaddrinfo(hostname, 0, socket.AF_INET, 0, socket.SOL_TCP)]
except socket.error:
return False
local_addresses = ['localhost'] + get_local_addresses()
Expand Down
3 changes: 2 additions & 1 deletion tools/roslaunch/src/roslaunch/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def is_machine_local(machine):
:returns: True if machine is local and doesn't require remote login, ``bool``
"""
try:
machine_ips = [host[4][0] for host in socket.getaddrinfo(machine.address, 0, 0, 0, socket.SOL_TCP)]
# If Python has ipv6 disabled but machine.address can be resolved somehow to an ipv6 address, then host[4][0] will be int
machine_ips = [host[4][0] for host in socket.getaddrinfo(machine.address, 0, 0, 0, socket.SOL_TCP) if isinstance(host[4][0], str)]
except socket.gaierror:
raise RLException("cannot resolve host address for machine [%s]"%machine.address)
local_addresses = ['localhost'] + rosgraph.network.get_local_addresses()
Expand Down
1 change: 1 addition & 0 deletions tools/rostopic/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

<build_depend>rostest</build_depend>

<run_depend version_gte="0.5.4">genpy</run_depend>
<run_depend>rosbag</run_depend>
<run_depend>rospy</run_depend>

Expand Down
Loading

0 comments on commit f9a2086

Please sign in to comment.