forked from CURG-archive/trajectory_planner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
point_cloud2.py
149 lines (130 loc) · 5.45 KB
/
point_cloud2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
#!/usr/bin/env python
"""
Serialization of sensor_msgs.PointCloud2 messages.
Author: Tim Field
"""
import roslib; roslib.load_manifest('sensor_msgs')
import roslib.message
import ctypes
import math
import struct
from sensor_msgs.msg import PointCloud2, PointField
_DATATYPES = {}
_DATATYPES[PointField.INT8] = ('b', 1)
_DATATYPES[PointField.UINT8] = ('B', 1)
_DATATYPES[PointField.INT16] = ('h', 2)
_DATATYPES[PointField.UINT16] = ('H', 2)
_DATATYPES[PointField.INT32] = ('i', 4)
_DATATYPES[PointField.UINT32] = ('I', 4)
_DATATYPES[PointField.FLOAT32] = ('f', 4)
_DATATYPES[PointField.FLOAT64] = ('d', 8)
def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
"""
Read points from a L{sensor_msgs.PointCloud2} message.
@param cloud: The point cloud to read from.
@type cloud: L{sensor_msgs.PointCloud2}
@param field_names: The names of fields to read. If None, read all fields. [default: None]
@type field_names: iterable
@param skip_nans: If True, then don't return any point with a NaN value.
@type skip_nans: bool [default: False]
@param uvs: If specified, then only return the points at the given coordinates. [default: empty list]
@type uvs: iterable
@return: Generator which yields a list of values for each point.
@rtype: generator
"""
assert isinstance(cloud, roslib.message.Message) and cloud._type == 'sensor_msgs/PointCloud2', 'cloud is not a sensor_msgs.msg.PointCloud2'
fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names)
width, height, point_step, row_step, data, isnan = cloud.width, cloud.height, cloud.point_step, cloud.height*cloud.point_step, cloud.data, math.isnan
unpack_from = struct.Struct(fmt).unpack_from
if skip_nans:
if uvs:
for u, v in uvs:
p = unpack_from(data, (row_step * v) + (point_step * u))
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
else:
for v in xrange(height):
offset = row_step * v
for u in xrange(width):
p = unpack_from(data, offset)
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
offset += point_step
else:
if uvs:
for u, v in uvs:
yield unpack_from(data, (row_step * v) + (point_step * u))
else:
for v in xrange(height):
offset = row_step * v
for u in xrange(width):
yield unpack_from(data, offset)
offset += point_step
def create_cloud(header, fields, points):
"""
Create a L{sensor_msgs.msg.PointCloud2} message.
@param header: The point cloud header.
@type header: L{std_msgs.msg.Header}
@param fields: The point cloud fields.
@type fields: iterable of L{sensor_msgs.msg.PointField}
@param points: The point cloud points.
@type points: list of iterables, i.e. one iterable for each point, with the
elements of each iterable being the values of the fields for
that point (in the same order as the fields parameter)
@return: The point cloud.
@rtype: L{sensor_msgs.msg.PointCloud2}
"""
cloud_struct = struct.Struct(_get_struct_fmt(False, fields))
buff = ctypes.create_string_buffer(cloud_struct.size * len(points))
point_step, pack_into = cloud_struct.size, cloud_struct.pack_into
offset = 0
for p in points:
pack_into(buff, offset, *p)
offset += point_step
return PointCloud2(header=header,
height=1,
width=len(points),
is_dense=False,
is_bigendian=False,
fields=fields,
point_step=cloud_struct.size,
row_step=cloud_struct.size * len(points),
data=buff.raw)
def create_cloud_xyz32(header, points):
"""
Create a L{sensor_msgs.msg.PointCloud2} message with 3 float32 fields (x, y, z).
@param header: The point cloud header.
@type header: L{std_msgs.msg.Header}
@param points: The point cloud points.
@type points: iterable
@return: The point cloud.
@rtype: L{sensor_msgs.msg.PointCloud2}
"""
fields = [PointField('x', 0, PointField.FLOAT32, 1),
PointField('y', 4, PointField.FLOAT32, 1),
PointField('z', 8, PointField.FLOAT32, 1)]
return create_cloud(header, fields, points)
def _get_struct_fmt(is_bigendian, fields, field_names=None):
fmt = '>' if is_bigendian else '<'
offset = 0
for field in (f for f in sorted(fields, key=lambda f: f.offset) if field_names is None or f.name in field_names):
if offset < field.offset:
fmt += 'x' * (field.offset - offset)
offset = field.offset
if field.datatype not in _DATATYPES:
print >> sys.stderr, 'Skipping unknown PointField datatype [%d]' % field.datatype
else:
datatype_fmt, datatype_length = _DATATYPES[field.datatype]
fmt += field.count * datatype_fmt
offset += field.count * datatype_length
return fmt