forked from inSight-mk1/rosbag-tools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dump.py
executable file
·76 lines (67 loc) · 2.59 KB
/
dump.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#!/usr/bin/env python3
"""Dump messages from a bag."""
import argparse
from rosbag import Bag
from tqdm import tqdm
def dump(input_file: Bag, topics: list, output_file: 'file'=None) -> None:
"""
Dump messages from a bag.
Args:
input_file: the input bag to dump topics from
topics: the topics to dump
output_file: an optional file to dump to
Returns:
None
"""
# create a progress bar for iterating over the messages in the bag
with tqdm(total=input_file.get_message_count(topic_filters=topics)) as prog:
# iterate over the messages in this input bag
for topic, msg, _ in input_file.read_messages(topics=topics):
# update the progress bar with a single iteration
prog.update(1)
# create the line to print
line = '{} {}\n\n'.format(topic, msg)
# print the line to the terminal
print(line)
# if there is an output file, write the line to it
if output_file is not None:
output_file.write(line)
# ensure this script is running as the main entry point
if __name__ == '__main__':
# create an argument parser to read arguments from the command line
PARSER = argparse.ArgumentParser(description=__doc__)
# add an argument for the input bag to read data from
PARSER.add_argument('--input_bag', '-i',
type=str,
help='The input bag file to read from',
)
# add an argument for the topics to dump
PARSER.add_argument('--topics', '-t',
type=str,
nargs='+',
help='A list of input bag files',
)
# add an argument for the optional output file
PARSER.add_argument('--output_file', '-o',
type=str,
help='An optional output file to dump to instead of the command line.',
default=None,
required=False,
)
try:
# get the arguments from the argument parser
ARGS = PARSER.parse_args()
# open the input bag with an automatically closing context
with Bag(ARGS.input_bag, 'r') as input_bag:
# if there is an output file path, open the file
if ARGS.output_file is not None:
ARGS.output_file = open(ARGS.output_file, 'w')
# stream the input bag to the output bag
dump(input_bag, ARGS.topics, ARGS.output_file)
# if there was an output file, close it
if ARGS.output_file is not None:
ARGS.output_file.close()
except KeyboardInterrupt:
pass
# explicitly define the outward facing API of this module
__all__ = [dump.__name__]