From 305afd2dd81ed26a83f77670be9b84acdf32e66f Mon Sep 17 00:00:00 2001 From: Jim Won Date: Fri, 17 Nov 2017 14:35:43 -0500 Subject: [PATCH] Python Bag class supports encryption; all the rosbag tools work for encrypted bags. (#1206) --- tools/rosbag/package.xml | 2 + tools/rosbag/src/rosbag/bag.py | 397 +++++++++++++++++++-- tools/rosbag/src/rosbag/rosbag_main.py | 14 +- tools/rosbag_storage/include/rosbag/bag.h | 6 +- tools/rosbag_storage/src/aes_encryptor.cpp | 25 +- tools/rosbag_storage/src/bag.cpp | 17 +- 6 files changed, 399 insertions(+), 62 deletions(-) diff --git a/tools/rosbag/package.xml b/tools/rosbag/package.xml index 534cec7937..acc0963b9f 100644 --- a/tools/rosbag/package.xml +++ b/tools/rosbag/package.xml @@ -30,6 +30,8 @@ boost genmsg genpy + python-crypto + python-gnupg python-rospkg rosbag_storage rosconsole diff --git a/tools/rosbag/src/rosbag/bag.py b/tools/rosbag/src/rosbag/bag.py index a89bc8baa2..209e2e3d2c 100644 --- a/tools/rosbag/src/rosbag/bag.py +++ b/tools/rosbag/src/rosbag/bag.py @@ -50,6 +50,10 @@ import time import yaml +from Crypto import Random +from Crypto.Cipher import AES +import gnupg + try: from cStringIO import StringIO # Python 2.x except ImportError: @@ -102,6 +106,13 @@ class ROSBagEncryptNotSupportedException(ROSBagException): def __init__(self, value): ROSBagException.__init__(self, value) +class ROSBagEncryptException(ROSBagException): + """ + Exception raised when encryption or decryption failed. + """ + def __init__(self, value): + ROSBagException.__init__(self, value) + class Compression: """ Allowable compression types @@ -112,6 +123,263 @@ class Compression: BagMessage = collections.namedtuple('BagMessage', 'topic message timestamp') +class _ROSBagEncryptor(object): + """ + Base class for bag encryptor. + """ + _ENCRYPTOR_FIELD_NAME = 'encryptor' + + def __init__(self): + pass + +class _ROSBagNoEncryptor(_ROSBagEncryptor): + """ + Class for unencrypted bags. + """ + def __init__(self): + super(_ROSBagNoEncryptor, self).__init__() + + def initialize(self, _, __): + pass + + def encrypt_chunk(self, chunk_size, _, __): + return chunk_size + + def decrypt_chunk(self, chunk): + return chunk + + def add_fields_to_file_header(self, _): + pass + + def read_fields_from_file_header(self, _): + pass + + def write_encrypted_header(self, write_header, f, header): + return write_header(f, header) + + def read_encrypted_header(self, read_header, f, req_op=None): + return read_header(f, req_op) + + def add_info_rows(self, rows): + pass + + def get_info_str(self): + return None + +class _ROSBagAesCbcEncryptor(_ROSBagEncryptor): + """ + Class for AES-CBC-encrypted bags. + """ + NAME = 'rosbag/AesCbcEncryptor' + _GPG_USER_FIELD_NAME = 'gpg_user' + _ENCRYPTED_KEY_FIELD_NAME = 'encrypted_key' + + def __init__(self): + """ + Create AES encryptor. + """ + super(_ROSBagAesCbcEncryptor, self).__init__() + # User name of GPG key used for symmetric key encryption + self._gpg_key_user = None + # Symmetric key for encryption/decryption + self._symmetric_key = None + # Encrypted symmetric key + self._encrypted_symmetric_key = None + + def initialize(self, bag, gpg_key_user): + """ + Initialize encryptor by composing AES symmetric key. + @param bag: bag to be encrypted/decrypted + @type bag: Bag + @param gpg_key_user: user name of GPG key used for symmetric key encryption + @type gpg_key_user: str + @raise ROSBagException: if GPG key user has already been set + """ + if bag._mode != 'w': + return + if self._gpg_key_user == gpg_key_user: + return + if not self._gpg_key_user: + self._gpg_key_user = gpg_key_user + self._build_symmetric_key() + else: + raise ROSBagException('Encryption user has already been set to {}'.format(self._gpg_key_user)) + + def encrypt_chunk(self, chunk_size, chunk_data_pos, f): + """ + Read chunk from file, encrypt it, and write back to file. + @param chunk_size: size of chunk + @type chunk_size: int + @param chunk_data_pos: position of chunk data portion + @type chunk_data_pos: int + @param f: file stream + @type f: file + @return: size of encrypted chunk + @rtype: int + """ + f.seek(chunk_data_pos) + chunk = _read(f, chunk_size) + # Encrypt chunk + iv = 16 * b'\x00' + cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) + encrypted_chunk = cipher.encrypt(_add_padding(chunk)) + # Write encrypted chunk + f.seek(chunk_data_pos) + f.write(encrypted_chunk) + f.truncate(f.tell()) + return len(encrypted_chunk) + + def decrypt_chunk(self, encrypted_chunk): + """ + Decrypt chunk. + @param encrypted_chunk: chunk to decrypt + @type encrypted_chunk: str + @return: decrypted chunk + @rtype: str + @raise ROSBagFormatException: if size of input chunk is not multiple of AES block size + """ + if len(encrypted_chunk) % AES.block_size != 0: + raise ROSBagFormatException('Error in encrypted chunk size') + + iv = 16 * b'\x00' + cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) + decrypted_chunk = cipher.decrypt(encrypted_chunk) + return _remove_padding(decrypted_chunk) + + def add_fields_to_file_header(self, header): + """ + Add encryptor information to bag file header. + @param header: bag file header + @type header: dict + """ + header[self._ENCRYPTOR_FIELD_NAME] = self.NAME + header[self._GPG_USER_FIELD_NAME] = self._gpg_key_user + header[self._ENCRYPTED_KEY_FIELD_NAME] = self._encrypted_symmetric_key + + def read_fields_from_file_header(self, header): + """ + Read encryptor information from bag file header. + @param header: bag file header + @type header: dict + @raise ROSBagFormatException: if GPG key user is not found in header + """ + try: + self._encrypted_symmetric_key = _read_str_field(header, self._ENCRYPTED_KEY_FIELD_NAME) + except ROSBagFormatException: + raise ROSBagFormatException('Encrypted symmetric key is not found in header') + try: + self._gpg_key_user = _read_str_field(header, self._GPG_USER_FIELD_NAME) + except ROSBagFormatException: + raise ROSBagFormatException('GPG key user is not found in header') + try: + self._symmetric_key = _decrypt_string_gpg(self._encrypted_symmetric_key) + except ROSBagFormatException: + raise + + def write_encrypted_header(self, _, f, header): + """ + Write encrypted header to bag file. + @param f: file stream + @type f: file + @param header: unencrypted header + @type header: dict + @return: encrypted string representing header + @rtype: str + """ + header_str = b'' + equal = b'=' + for k, v in header.items(): + if not isinstance(k, bytes): + k = k.encode() + if not isinstance(v, bytes): + v = v.encode() + header_str += _pack_uint32(len(k) + 1 + len(v)) + k + equal + v + + iv = 16 * b'\x00' + cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) + enc_str = cipher.encrypt(_add_padding(header_str)) + _write_sized(f, enc_str) + return enc_str + + def read_encrypted_header(self, _, f, req_op=None): + """ + Read encrypted header from bag file. + @param f: file stream + @type f: file + @param req_op: expected header op code + @type req_op: int + @return: decrypted header + @rtype: dict + @raise ROSBagFormatException: if error occurs while decrypting/reading header + """ + # Read header + try: + header = self._decrypt_encrypted_header(f) + except ROSBagException as ex: + raise ROSBagFormatException('Error reading header: %s' % str(ex)) + + return _build_header_from_str(header, req_op) + + def add_info_rows(self, rows): + """ + Add rows for rosbag info. + @param rows: information on bag encryption + @type rows: list of tuples + """ + rows.append(('encryption', self.NAME)) + rows.append(('GPG key user', self._gpg_key_user)) + + def get_info_str(self): + """ + Return string for rosbag info. + @return: information on bag encryption + @rtype: str + """ + return 'encryption: %s\nGPG key user: %s\n' % (self.NAME, self._gpg_key_user) + + def _build_symmetric_key(self): + if not self._gpg_key_user: + return + self._symmetric_key = Random.new().read(AES.block_size) + self._encrypted_symmetric_key = _encrypt_string_gpg(self._gpg_key_user, self._symmetric_key) + + def _decrypt_encrypted_header(self, f): + try: + size = _read_uint32(f) + except struct.error as ex: + raise ROSBagFormatException('error unpacking uint32: %s' % str(ex)) + + if size % AES.block_size != 0: + raise ROSBagFormatException('Error in encrypted header size') + encrypted_header = _read(f, size) + + iv = 16 * b'\x00' + cipher = AES.new(self._symmetric_key, AES.MODE_CBC, iv) + header = cipher.decrypt(encrypted_header) + return _remove_padding(header) + +def _add_padding(input_str): + # Add PKCS#7 padding to input string + return input_str + (AES.block_size - len(input_str) % AES.block_size) * chr(AES.block_size - len(input_str) % AES.block_size) + +def _remove_padding(input_str): + # Remove PKCS#7 padding from input string + return input_str[:-ord(input_str[len(input_str) - 1:])] + +def _encrypt_string_gpg(key_user, input): + gpg = gnupg.GPG() + enc_data = gpg.encrypt(input, [key_user], always_trust=True) + if not enc_data.ok: + raise ROSBagEncryptException('Failed to encrypt bag: {}. Have you installed a required public key?'.format(enc_data.status)) + return str(enc_data) + +def _decrypt_string_gpg(input): + gpg = gnupg.GPG() + dec_data = gpg.decrypt(input, passphrase='clearpath') + if not dec_data.ok: + raise ROSBagEncryptException('Failed to decrypt bag: {}. Have you installed a required private key?'.format(dec_data.status)) + return str(dec_data) + class Bag(object): """ Bag serialize messages to and from a single file on disk using the bag format. @@ -177,6 +445,8 @@ def __init__(self, f, mode='r', compression=Compression.NONE, chunk_threshold=76 self._curr_compression = Compression.NONE + self._encryptor = _ROSBagNoEncryptor() + self._open(f, mode, allow_unindexed) self._output_file = self._file @@ -356,8 +626,8 @@ def write(self, topic, msg, t=None, raw=False): header = { 'topic' : topic, 'type' : msg.__class__._type, 'md5sum' : msg.__class__._md5sum, 'message_definition' : msg._full_text } connection_info = _ConnectionInfo(conn_id, topic, header) - - self._write_connection_record(connection_info) + # No need to encrypt connection records in chunk (encrypt=False) + self._write_connection_record(connection_info, False) self._connections[conn_id] = connection_info self._topic_connections[topic] = connection_info @@ -614,7 +884,21 @@ def get_type_and_topic_info(self, topic_filters=None): frequency=frequency) return collections.namedtuple("TypesAndTopicsTuple", ["msg_types", "topics"])(msg_types=types, topics=topics_t) - + + def set_encryptor(self, encryptor=None, param=None): + if self._chunks: + raise ROSBagException('Cannot set encryptor after chunks are written') + if encryptor is None: + self._encryptor = _ROSBagNoEncryptor() + elif encryptor == _ROSBagAesCbcEncryptor.NAME: + if sys.platform == 'win32': + raise ROSBagEncryptNotSupportedException('AES CBC encryptor is not supported for Windows') + else: + self._encryptor = _ROSBagAesCbcEncryptor() + else: + self._encryptor = _ROSBagNoEncryptor() + self._encryptor.initialize(self, param) + def __str__(self): rows = [] @@ -719,6 +1003,8 @@ def __str__(self): rows.append(('uncompressed', '%*s' % (total_size_str_length, total_uncompressed_size_str))) rows.append(('compressed', '%*s' % (total_size_str_length, total_compressed_size_str))) + self._encryptor.add_info_rows(rows) + datatypes = set() datatype_infos = [] for connection in self._connections.values(): @@ -866,6 +1152,8 @@ def _get_yaml_info(self, key=None): s += 'uncompressed: %d\n' % sum((h.uncompressed_size for h in self._chunk_headers.values())) s += 'compressed: %d\n' % sum((h.compressed_size for h in self._chunk_headers.values())) + s += self._encryptor.get_info_str() + datatypes = set() datatype_infos = [] for connection in self._connections.values(): @@ -1272,7 +1560,8 @@ def _stop_writing(self): # Write connection infos for connection_info in self._connections.values(): - self._write_connection_record(connection_info) + # Encrypt connection records in index data (encrypt: True) + self._write_connection_record(connection_info, True) # Write chunk infos for chunk_info in self._chunks: @@ -1321,6 +1610,10 @@ def _stop_writing_chunk(self): self._set_compression_mode(Compression.NONE) compressed_size = self._file.tell() - self._curr_chunk_data_pos + # When encryption is on, compressed_size represents encrypted chunk size; + # When decrypting, the actual compressed size can be deduced from the decrypted chunk + compressed_size = self._encryptor.encrypt_chunk(compressed_size, self._curr_chunk_data_pos, self._file) + # Rewrite the chunk header with the size of the chunk (remembering current offset) end_of_chunk_pos = self._file.tell() self._file.seek(self._curr_chunk_info.pos) @@ -1361,18 +1654,24 @@ def _write_file_header_record(self, index_pos, connection_count, chunk_count): 'conn_count': _pack_uint32(connection_count), 'chunk_count': _pack_uint32(chunk_count) } + self._encryptor.add_fields_to_file_header(header) _write_record(self._file, header, padded_size=_FILE_HEADER_LENGTH) - def _write_connection_record(self, connection_info): + def _write_connection_record(self, connection_info, encrypt): header = { 'op': _pack_uint8(_OP_CONNECTION), 'topic': connection_info.topic, 'conn': _pack_uint32(connection_info.id) } - - _write_header(self._output_file, header) - - _write_header(self._output_file, connection_info.header) + if encrypt: + self._encryptor.write_encrypted_header(_write_header, self._output_file, header) + else: + _write_header(self._output_file, header) + + if encrypt: + self._encryptor.write_encrypted_header(_write_header, self._output_file, connection_info.header) + else: + _write_header(self._output_file, connection_info.header) def _write_message_data_record(self, connection_id, t, serialized_bytes): header = { @@ -1681,6 +1980,9 @@ def _read_header(f, req_op=None): except ROSBagException as ex: raise ROSBagFormatException('Error reading header: %s' % str(ex)) + return _build_header_from_str(header, req_op) + +def _build_header_from_str(header, req_op): # Parse header into a dict header_dict = {} while header != b'': @@ -2114,7 +2416,6 @@ def reindex(self): chunk_pos = f.tell() if chunk_pos >= total_bytes: break - yield chunk_pos try: @@ -2136,10 +2437,21 @@ def _reindex_read_chunk(self, f, chunk_pos, total_bytes): raise ROSBagException('unterminated chunk at %d' % chunk_pos) if chunk_header.compression == Compression.NONE: - chunk_file = f + encrypted_chunk = _read(f, chunk_header.compressed_size) + + chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) + + if self.decompressed_chunk_io: + self.decompressed_chunk_io.close() + self.decompressed_chunk_io = StringIO(chunk) + + chunk_file = self.decompressed_chunk_io + else: - # Read the compressed chunk - compressed_chunk = _read(f, chunk_header.compressed_size) + # Read the chunk, and decrypt/decompress it + encrypted_chunk = _read(f, chunk_header.compressed_size) + + compressed_chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) # Decompress it if chunk_header.compression == Compression.BZ2: @@ -2158,18 +2470,15 @@ def _reindex_read_chunk(self, f, chunk_pos, total_bytes): # Read chunk connection and message records self.bag._curr_chunk_info = None - if chunk_header.compression == Compression.NONE: - offset = chunk_file.tell() - chunk_pos - else: - offset = chunk_file.tell() + offset = chunk_file.tell() expected_index_length = 0 while offset < chunk_header.uncompressed_size: op = _peek_next_header_op(chunk_file) - if op == _OP_CONNECTION: - connection_info = self.read_connection_record(chunk_file) + # Connection records in chunk are not encrypted (encrypt: False) + connection_info = self.read_connection_record(chunk_file, False) if connection_info.id not in self.bag._connections: self.bag._connections[connection_info.id] = connection_info @@ -2210,10 +2519,7 @@ def _reindex_read_chunk(self, f, chunk_pos, total_bytes): # Unknown record type so skip _skip_record(chunk_file) - if chunk_header.compression == Compression.NONE: - offset = chunk_file.tell() - chunk_pos - else: - offset = chunk_file.tell() + offset = chunk_file.tell() # Skip over index records, connection records and chunk info records next_op = _peek_next_header_op(f) @@ -2250,7 +2556,7 @@ def _read_terminal_connection_records(self): if self._advance_to_next_record(_OP_CONNECTION): # Read the CONNECTION records while True: - connection_info = r.read_connection_record(f) + connection_info = r.read_connection_record(f, False) b._connections[connection_info.id] = connection_info b._connection_indexes[connection_info.id] = [] @@ -2299,7 +2605,8 @@ def start_reading(self): # Read the connection records self.bag._connection_indexes = {} for i in range(self.bag._connection_count): - connection_info = self.read_connection_record(self.bag._file) + # Connection records in index data are encrypted (encrypt: True) + connection_info = self.read_connection_record(self.bag._file, True) self.bag._connections[connection_info.id] = connection_info self.bag._connection_indexes[connection_info.id] = [] @@ -2317,6 +2624,8 @@ def start_reading(self): except ROSBagEncryptNotSupportedException: raise + except ROSBagEncryptException: + raise except Exception as ex: raise ROSBagUnindexedException() @@ -2357,21 +2666,27 @@ def read_file_header_record(self): self.bag._connection_count = _read_uint32_field(header, 'conn_count') try: encryptor = _read_str_field(header, 'encryptor') - if encryptor and encryptor != 'rosbag/NoEncryptor': - raise ROSBagEncryptNotSupportedException('Encrypted bag cannot be read. Decrypt the bag first using "rosbag decrypt"') + self.bag.set_encryptor(encryptor) + self.bag._encryptor.read_fields_from_file_header(header) except ROSBagFormatException: # If encryptor header is not found, keep going pass _skip_sized(self.bag._file) # skip over the record data, i.e. padding - def read_connection_record(self, f): - header = _read_header(f, _OP_CONNECTION) + def read_connection_record(self, f, encrypt): + if encrypt: + header = self.bag._encryptor.read_encrypted_header(_read_header, f, _OP_CONNECTION) + else: + header = _read_header(f, _OP_CONNECTION) conn_id = _read_uint32_field(header, 'conn') topic = _read_str_field (header, 'topic') - connection_header = _read_header(f) + if encrypt: + connection_header = self.bag._encryptor.read_encrypted_header(_read_header, f) + else: + connection_header = _read_header(f) return _ConnectionInfo(conn_id, topic, connection_header) @@ -2446,13 +2761,25 @@ def seek_and_read_message_data_record(self, position, raw): raise ROSBagException('no chunk at position %d' % chunk_pos) if chunk_header.compression == Compression.NONE: - f = self.bag._file - f.seek(chunk_header.data_pos + offset) + if self.decompressed_chunk_pos != chunk_pos: + f = self.bag._file + f.seek(chunk_header.data_pos) + encrypted_chunk = _read(f, chunk_header.compressed_size) + + chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) + + self.decompressed_chunk_pos = chunk_pos + + if self.decompressed_chunk_io: + self.decompressed_chunk_io.close() + self.decompressed_chunk_io = StringIO(chunk) else: if self.decompressed_chunk_pos != chunk_pos: # Seek to the chunk data, read and decompress self.bag._file.seek(chunk_header.data_pos) - compressed_chunk = _read(self.bag._file, chunk_header.compressed_size) + encrypted_chunk = _read(self.bag._file, chunk_header.compressed_size) + + compressed_chunk = self.bag._encryptor.decrypt_chunk(encrypted_chunk) if chunk_header.compression == Compression.BZ2: self.decompressed_chunk = bz2.decompress(compressed_chunk) @@ -2467,8 +2794,8 @@ def seek_and_read_message_data_record(self, position, raw): self.decompressed_chunk_io.close() self.decompressed_chunk_io = StringIO(self.decompressed_chunk) - f = self.decompressed_chunk_io - f.seek(offset) + f = self.decompressed_chunk_io + f.seek(offset) # Skip any CONNECTION records while True: diff --git a/tools/rosbag/src/rosbag/rosbag_main.py b/tools/rosbag/src/rosbag/rosbag_main.py index e6dca3ca80..355a035d07 100644 --- a/tools/rosbag/src/rosbag/rosbag_main.py +++ b/tools/rosbag/src/rosbag/rosbag_main.py @@ -47,7 +47,7 @@ import roslib.message import roslib.packages -from .bag import Bag, Compression, ROSBagException, ROSBagFormatException, ROSBagUnindexedException, ROSBagEncryptNotSupportedException +from .bag import Bag, Compression, ROSBagException, ROSBagFormatException, ROSBagUnindexedException, ROSBagEncryptNotSupportedException, ROSBagEncryptException from .migration import MessageMigrator, fixbag2, checkbag def print_trans(old, new, indent): @@ -167,7 +167,7 @@ def info_cmd(argv): if i < len(args) - 1: print('---') - except ROSBagEncryptNotSupportedException as ex: + except (ROSBagEncryptNotSupportedException, ROSBagEncryptException) as ex: print('ERROR: %s' % str(ex), file=sys.stderr) except ROSBagUnindexedException as ex: print('ERROR bag unindexed: %s. Run rosbag reindex.' % arg, @@ -326,7 +326,7 @@ def eval_fn(topic, m, t): try: inbag = Bag(inbag_filename) - except ROSBagEncryptNotSupportedException as ex: + except (ROSBagEncryptNotSupportedException, ROSBagEncryptException) as ex: print('ERROR: %s' % str(ex), file=sys.stderr) return except ROSBagUnindexedException as ex: @@ -426,7 +426,7 @@ def fix_cmd(argv): try: migrations = fixbag2(migrator, inbag_filename, outname, options.force) - except ROSBagEncryptNotSupportedException as ex: + except (ROSBagEncryptNotSupportedException, ROSBagEncryptException) as ex: print('ERROR: %s' % str(ex), file=sys.stderr) return except ROSBagUnindexedException as ex: @@ -476,7 +476,7 @@ def check_cmd(argv): # First check that the bag is not unindexed try: Bag(args[0]) - except ROSBagEncryptNotSupportedException as ex: + except (ROSBagEncryptNotSupportedException, ROSBagEncryptException) as ex: print('ERROR: %s' % str(ex), file=sys.stderr) return except ROSBagUnindexedException as ex: @@ -794,7 +794,7 @@ def reindex_op(inbag, outbag, quiet): try: for offset in outbag.reindex(): pass - except ROSBagEncryptNotSupportedException as ex: + except (ROSBagEncryptNotSupportedException, ROSBagEncryptException) as ex: raise except: pass @@ -803,7 +803,7 @@ def reindex_op(inbag, outbag, quiet): try: for offset in outbag.reindex(): meter.step(offset) - except ROSBagEncryptNotSupportedException as ex: + except (ROSBagEncryptNotSupportedException, ROSBagEncryptException) as ex: raise except: pass diff --git a/tools/rosbag_storage/include/rosbag/bag.h b/tools/rosbag_storage/include/rosbag/bag.h index 7d0f130799..2e877631fa 100644 --- a/tools/rosbag_storage/include/rosbag/bag.h +++ b/tools/rosbag_storage/include/rosbag/bag.h @@ -231,7 +231,7 @@ class ROSBAG_DECL Bag void writeVersion(); void writeFileHeaderRecord(); - void writeConnectionRecord(ConnectionInfo const* connection_info); + void writeConnectionRecord(ConnectionInfo const* connection_info, const bool encrypt); void appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info); template void writeMessageDataRecord(uint32_t conn_id, ros::Time const& time, T const& msg); @@ -581,8 +581,8 @@ void Bag::doWrite(std::string const& topic, ros::Time const& time, T const& msg, (*connection_info->header)["message_definition"] = connection_info->msg_def; } connections_[conn_id] = connection_info; - - writeConnectionRecord(connection_info); + // No need to encrypt connection records in chunks + writeConnectionRecord(connection_info, false); appendConnectionRecordToBuffer(outgoing_chunk_buffer_, connection_info); } diff --git a/tools/rosbag_storage/src/aes_encryptor.cpp b/tools/rosbag_storage/src/aes_encryptor.cpp index 860c77e0eb..05ecbdb57b 100644 --- a/tools/rosbag_storage/src/aes_encryptor.cpp +++ b/tools/rosbag_storage/src/aes_encryptor.cpp @@ -136,7 +136,7 @@ static std::string encryptStringGpg(std::string& user, std::basic_string decryptStringGpg(std::string const& inpu if (err) { gpgme_release(ctx); throw rosbag::BagException( - (boost::format("Failed to decrypt string: gpgme_data_new_from_mem returned %1%") % gpgme_strerror(err)).str()); + (boost::format("Failed to decrypt bag: gpgme_data_new_from_mem returned %1%") % gpgme_strerror(err)).str()); } gpgme_data_t output_data; err = gpgme_data_new(&output_data); @@ -180,14 +180,14 @@ static std::basic_string decryptStringGpg(std::string const& inpu gpgme_data_release(input_data); gpgme_release(ctx); throw rosbag::BagException( - (boost::format("Failed to decrypt string: gpgme_data_new returned %1%") % gpgme_strerror(err)).str()); + (boost::format("Failed to decrypt bag: gpgme_data_new returned %1%") % gpgme_strerror(err)).str()); } err = gpgme_op_decrypt(ctx, input_data, output_data); if (err) { gpgme_data_release(output_data); gpgme_data_release(input_data); gpgme_release(ctx); - throw rosbag::BagException((boost::format("Failed to decrypt string: %1%") % gpgme_strerror(err)).str()); + throw rosbag::BagException((boost::format("Failed to decrypt bag: %1%. Have you installed a required private key?") % gpgme_strerror(err)).str()); } std::size_t output_length = gpgme_data_seek(output_data, 0, SEEK_END); if (output_length != AES_BLOCK_SIZE) { @@ -283,15 +283,16 @@ void AesCbcEncryptor::addFieldsToFileHeader(ros::M_string &header_fields) const } void AesCbcEncryptor::readFieldsFromFileHeader(ros::M_string const& header_fields) { - gpg_key_user_ = readHeaderField(header_fields, GPG_USER_FIELD_NAME); encrypted_symmetric_key_ = readHeaderField(header_fields, ENCRYPTED_KEY_FIELD_NAME); - if (!encrypted_symmetric_key_.empty()) { - if (gpg_key_user_.empty()) { - throw rosbag::BagFormatException("Encrypted symmetric key is found, but no GPG user is specified"); - } - symmetric_key_ = decryptStringGpg(encrypted_symmetric_key_); - AES_set_decrypt_key(&symmetric_key_[0], AES_BLOCK_SIZE*8, &aes_decrypt_key_); + if (encrypted_symmetric_key_.empty()) { + throw rosbag::BagFormatException("Encrypted symmetric key is not found in header"); + } + gpg_key_user_ = readHeaderField(header_fields, GPG_USER_FIELD_NAME); + if (gpg_key_user_.empty()) { + throw rosbag::BagFormatException("GPG key user is not found in header"); } + symmetric_key_ = decryptStringGpg(encrypted_symmetric_key_); + AES_set_decrypt_key(&symmetric_key_[0], AES_BLOCK_SIZE*8, &aes_decrypt_key_); } void AesCbcEncryptor::writeEncryptedHeader(boost::function, ros::M_string const& header_fields, ChunkedFile& file) { @@ -317,7 +318,7 @@ bool AesCbcEncryptor::readEncryptedHeader(boost::function, r uint32_t encrypted_header_len; file.read((char*) &encrypted_header_len, 4); if (encrypted_header_len % AES_BLOCK_SIZE != 0) { - throw BagFormatException("Error in encrypted header length"); + throw BagFormatException((boost::format("Error in encrypted header length, %d") % encrypted_header_len).str()); } // Read encrypted header std::basic_string encrypted_header(encrypted_header_len, 0); diff --git a/tools/rosbag_storage/src/bag.cpp b/tools/rosbag_storage/src/bag.cpp index 9160484cfb..83a6b31394 100644 --- a/tools/rosbag_storage/src/bag.cpp +++ b/tools/rosbag_storage/src/bag.cpp @@ -450,7 +450,7 @@ void Bag::stopWritingChunk() { uint32_t compressed_size = file_.getOffset() - curr_chunk_data_pos_; // When encryption is on, compressed_size represents encrypted chunk size; - // When decrypting, the compressed size can be deduced from the decrypted chunk + // When decrypting, the actual compressed size can be deduced from the decrypted chunk compressed_size = encryptor_->encryptChunk(compressed_size, curr_chunk_data_pos_, file_); // Rewrite the chunk header with the size of the chunk (remembering current offset) @@ -656,11 +656,11 @@ void Bag::readConnectionIndexRecord200() { void Bag::writeConnectionRecords() { for (map::const_iterator i = connections_.begin(); i != connections_.end(); i++) { ConnectionInfo const* connection_info = i->second; - writeConnectionRecord(connection_info); + writeConnectionRecord(connection_info, true); } } -void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) { +void Bag::writeConnectionRecord(ConnectionInfo const* connection_info, const bool encrypt) { CONSOLE_BRIDGE_logDebug("Writing CONNECTION [%llu:%d]: topic=%s id=%d", (unsigned long long) file_.getOffset(), getChunkOffset(), connection_info->topic.c_str(), connection_info->id); @@ -668,9 +668,16 @@ void Bag::writeConnectionRecord(ConnectionInfo const* connection_info) { header[OP_FIELD_NAME] = toHeaderString(&OP_CONNECTION); header[TOPIC_FIELD_NAME] = connection_info->topic; header[CONNECTION_FIELD_NAME] = toHeaderString(&connection_info->id); - encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, _1), header, file_); - encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, _1), *connection_info->header, file_); + if (encrypt) + encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, _1), header, file_); + else + writeHeader(header); + + if (encrypt) + encryptor_->writeEncryptedHeader(boost::bind(&Bag::writeHeader, this, _1), *connection_info->header, file_); + else + writeHeader(*connection_info->header); } void Bag::appendConnectionRecordToBuffer(Buffer& buf, ConnectionInfo const* connection_info) {