From d994b3416e6461f134feff512324208c2e134446 Mon Sep 17 00:00:00 2001 From: Raphael Dumusc Date: Mon, 30 Jan 2017 17:19:18 +0100 Subject: [PATCH 1/2] JPEG decompression throws on error instead of failing silently --- deflect/ImageJpegDecompressor.cpp | 10 ++------ deflect/ImageJpegDecompressor.h | 4 +-- deflect/SegmentDecoder.cpp | 41 +++++++++++++++++++++++-------- deflect/SegmentDecoder.h | 11 ++++++--- doc/Changelog.md | 2 ++ tests/cpp/SegmentDecoderTests.cpp | 23 +++++++++++++++-- 6 files changed, 66 insertions(+), 25 deletions(-) diff --git a/deflect/ImageJpegDecompressor.cpp b/deflect/ImageJpegDecompressor.cpp index 3273cba..57d8b7d 100644 --- a/deflect/ImageJpegDecompressor.cpp +++ b/deflect/ImageJpegDecompressor.cpp @@ -62,10 +62,7 @@ QByteArray ImageJpegDecompressor::decompress( const QByteArray& jpegData ) (unsigned long)jpegData.size(), &width, &height, &jpegSubsamp ); if( err != 0 ) - { - std::cerr << "libjpeg-turbo header decompression failure" << std::endl; - return QByteArray(); - } + throw std::runtime_error( "libjpeg-turbo header decompression failed" ); // decompress image data int pixelFormat = TJPF_RGBX; // Format for OpenGL texture (GL_RGBA) @@ -78,10 +75,7 @@ QByteArray ImageJpegDecompressor::decompress( const QByteArray& jpegData ) (unsigned char*)decodedData.data(), width, pitch, height, pixelFormat, flags ); if( err != 0 ) - { - std::cerr << "libjpeg-turbo image decompression failure" << std::endl; - return QByteArray(); - } + throw std::runtime_error( "libjpeg-turbo image decompression failed" ); return decodedData; } diff --git a/deflect/ImageJpegDecompressor.h b/deflect/ImageJpegDecompressor.h index 8b33f63..e71d71a 100644 --- a/deflect/ImageJpegDecompressor.h +++ b/deflect/ImageJpegDecompressor.h @@ -62,8 +62,8 @@ class ImageJpegDecompressor * Decompress a Jpeg image * * @param jpegData The compressed Jpeg data - * @return The decompressed image data in (GL_)RGBA format, or an - * empty array if the image could not be decoded. + * @return The decompressed image data in (GL_)RGBA format + * @throw std::runtime_error if a decompression error occured */ DEFLECT_API QByteArray decompress( const QByteArray& jpegData ); diff --git a/deflect/SegmentDecoder.cpp b/deflect/SegmentDecoder.cpp index fc424ba..379af62 100644 --- a/deflect/SegmentDecoder.cpp +++ b/deflect/SegmentDecoder.cpp @@ -1,6 +1,6 @@ /*********************************************************************/ -/* Copyright (c) 2013, EPFL/Blue Brain Project */ -/* Raphael Dumusc */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ /* All rights reserved. */ /* */ /* Redistribution and use in source and binary forms, with or */ @@ -69,20 +69,31 @@ SegmentDecoder::SegmentDecoder() SegmentDecoder::~SegmentDecoder() {} -void decodeSegment( ImageJpegDecompressor* decompressor, Segment* segment ) +void _decodeSegment( ImageJpegDecompressor* decompressor, Segment* segment ) { - QByteArray decodedData = decompressor->decompress( segment->imageData ); + if( !segment->parameters.compressed ) + return; - if( !decodedData.isEmpty( )) + QByteArray decodedData; + try + { + decodedData = decompressor->decompress( segment->imageData ); + } + catch( const std::runtime_error& ) { - segment->imageData = decodedData; - segment->parameters.compressed = false; + throw; } + const auto& params = segment->parameters; + if( (size_t)decodedData.size() != params.height * params.width * 4 ) + throw std::runtime_error( "unexpected segment size" ); + + segment->imageData = decodedData; + segment->parameters.compressed = false; } void SegmentDecoder::decode( Segment& segment ) { - decodeSegment( &_impl->decompressor, &segment ); + _decodeSegment( &_impl->decompressor, &segment ); } void SegmentDecoder::startDecoding( Segment& segment ) @@ -95,14 +106,24 @@ void SegmentDecoder::startDecoding( Segment& segment ) return; } - _impl->decodingFuture = QtConcurrent::run( decodeSegment, + _impl->decodingFuture = QtConcurrent::run( _decodeSegment, &_impl->decompressor, &segment ); } void SegmentDecoder::waitDecoding() { - _impl->decodingFuture.waitForFinished(); + try + { + _impl->decodingFuture.waitForFinished(); + } + catch( const QUnhandledException& ) + { + // Let Qt throws a QUnhandledException and rewrite the error message. + // QtConcurrent::run can only forward QException subclasses, which does + // not even work on 5.7.1: https://bugreports.qt.io/browse/QTBUG-58021 + throw std::runtime_error( "Segment decoding failed" ); + } } bool SegmentDecoder::isRunning() const diff --git a/deflect/SegmentDecoder.h b/deflect/SegmentDecoder.h index 2de7d48..3fdac01 100644 --- a/deflect/SegmentDecoder.h +++ b/deflect/SegmentDecoder.h @@ -1,6 +1,6 @@ /*********************************************************************/ -/* Copyright (c) 2013, EPFL/Blue Brain Project */ -/* Raphael Dumusc */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ /* All rights reserved. */ /* */ /* Redistribution and use in source and binary forms, with or */ @@ -64,6 +64,7 @@ class SegmentDecoder * @param segment The segment to decode. Upon success, its imageData member * will hold the decompressed data and its "compressed" flag will be * set to false. + * @throw std::runtime_error if a decompression error occured */ DEFLECT_API void decode( Segment& segment ); @@ -79,7 +80,11 @@ class SegmentDecoder */ DEFLECT_API void startDecoding( Segment& segment ); - /** Waits for the decoding of a segment to finish, initiated by startDecoding(). */ + /** + * Waits for the decoding of a segment to finish, initiated by + * startDecoding(). + * @throw std::runtime_error if a decompression error occured + */ DEFLECT_API void waitDecoding(); /** Check if the decoding thread is running. */ diff --git a/doc/Changelog.md b/doc/Changelog.md index 34493f2..dbcb807 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -5,6 +5,8 @@ Changelog {#Changelog} ### 0.12.1 (git master) +* [147](https://github.com/BlueBrain/Deflect/pull/147): + Deflect server: better reporting of JPEG decompression errors. * [146](https://github.com/BlueBrain/Deflect/pull/146): Unified the command line options and help message of applications. * [145](https://github.com/BlueBrain/Deflect/pull/145): diff --git a/tests/cpp/SegmentDecoderTests.cpp b/tests/cpp/SegmentDecoderTests.cpp index 0ef932f..221e6a8 100644 --- a/tests/cpp/SegmentDecoderTests.cpp +++ b/tests/cpp/SegmentDecoderTests.cpp @@ -1,6 +1,6 @@ /*********************************************************************/ -/* Copyright (c) 2013, EPFL/Blue Brain Project */ -/* Raphael Dumusc */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ /* All rights reserved. */ /* */ /* Redistribution and use in source and binary forms, with or */ @@ -136,3 +136,22 @@ BOOST_AUTO_TEST_CASE( testImageSegmentationWithCompressionAndDecompression ) data.data() + segment.imageData.size(), dataOut, dataOut+segment.imageData.size( )); } + +BOOST_AUTO_TEST_CASE( testDecompressionOfInvalidData ) +{ + const QByteArray invalidJpegData{ "notjpeg923%^#8" }; + deflect::ImageJpegDecompressor decompressor; + BOOST_CHECK_THROW( decompressor.decompress( invalidJpegData ), + std::runtime_error ); + + deflect::Segment segment; + segment.parameters.width = 32; + segment.parameters.height = 32; + segment.imageData = invalidJpegData; + + deflect::SegmentDecoder decoder; + BOOST_CHECK_THROW( decoder.decode( segment ), std::runtime_error ); + + BOOST_CHECK_NO_THROW( decoder.startDecoding( segment )); + BOOST_CHECK_THROW( decoder.waitDecoding(), std::runtime_error ); +} From c3e565232f063ea281023fa1b79d10a977a2f5db Mon Sep 17 00:00:00 2001 From: Raphael Dumusc Date: Mon, 30 Jan 2017 18:51:58 +0100 Subject: [PATCH 2/2] Improved network version exchange protocol This commit relaxes the check on the client side to allow connecting to servers with a more recent network protocol in the future. Clients also send their protocol version to the server. This information is simply ignored by older servers and does not break compatibility. The idea is to create a transition period after which the protocol can be modified (to exchange more information at connection time, such as support for stereo streams) without breaking compatibilty for clients and servers based on deflect >= 0.12.1. --- deflect/ServerWorker.cpp | 30 ++++++++++++++------- deflect/ServerWorker.h | 6 +++-- deflect/Socket.cpp | 55 ++++++++++++++++++--------------------- deflect/Socket.h | 23 ++++++++-------- deflect/StreamPrivate.cpp | 6 +++-- doc/Changelog.md | 4 ++- tests/cpp/SocketTests.cpp | 5 ++-- tests/mock/MockServer.h | 4 +-- 8 files changed, 73 insertions(+), 60 deletions(-) diff --git a/deflect/ServerWorker.cpp b/deflect/ServerWorker.cpp index e15acb9..fd759d4 100644 --- a/deflect/ServerWorker.cpp +++ b/deflect/ServerWorker.cpp @@ -1,5 +1,5 @@ /*********************************************************************/ -/* Copyright (c) 2013-2016, EPFL/Blue Brain Project */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ /* Raphael Dumusc */ /* Daniel.Nachbaur@epfl.ch */ /* All rights reserved. */ @@ -47,7 +47,10 @@ #include -#define RECEIVE_TIMEOUT_MS 3000 +namespace +{ +const int RECEIVE_TIMEOUT_MS = 3000; +} namespace deflect { @@ -56,6 +59,7 @@ ServerWorker::ServerWorker( const int socketDescriptor ) // Ensure that tcpSocket_ parent is *this* so it gets moved to thread : _tcpSocket( new QTcpSocket( this )) , _sourceId( socketDescriptor ) + , _clientProtocolVersion( NETWORK_PROTOCOL_VERSION ) , _registeredToEvents( false ) { if( !_tcpSocket->setSocketDescriptor( socketDescriptor )) @@ -223,6 +227,9 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader, return; } _streamId = uri; + // The version is only sent by deflect clients since v. 0.13.0 + if( !byteArray.isEmpty( )) + _parseClientProtocolVersion( byteArray ); emit addStreamSource( _streamId, _sourceId ); break; @@ -263,17 +270,22 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader, } } -void ServerWorker::_handlePixelStreamMessage( const QByteArray& byteArray ) +void ServerWorker::_parseClientProtocolVersion( const QByteArray& message ) { - const SegmentParameters* parameters = - reinterpret_cast< const SegmentParameters* >( byteArray.data( )); + bool ok = false; + const int version = message.toInt( &ok ); + if( ok ) + _clientProtocolVersion = version; +} +void ServerWorker::_handlePixelStreamMessage( const QByteArray& message ) +{ Segment segment; - segment.parameters = *parameters; - QByteArray imageData = - byteArray.right( byteArray.size() - sizeof( SegmentParameters )); - segment.imageData = imageData; + const auto data = message.data(); + segment.parameters = *reinterpret_cast( data ); + segment.imageData = message.right( message.size() - + sizeof( SegmentParameters )); emit( receivedSegment( _streamId, _sourceId, segment )); } diff --git a/deflect/ServerWorker.h b/deflect/ServerWorker.h index 7cc0b03..4717cbe 100644 --- a/deflect/ServerWorker.h +++ b/deflect/ServerWorker.h @@ -96,6 +96,7 @@ private slots: QString _streamId; int _sourceId; + int _clientProtocolVersion; bool _registeredToEvents; QQueue _events; @@ -105,8 +106,9 @@ private slots: QByteArray _receiveMessageBody( int size ); void _handleMessage( const MessageHeader& messageHeader, - const QByteArray& byteArray ); - void _handlePixelStreamMessage( const QByteArray& byteArray ); + const QByteArray& message ); + void _parseClientProtocolVersion( const QByteArray& message ); + void _handlePixelStreamMessage( const QByteArray& message ); void _sendProtocolVersion(); void _sendBindReply( bool successful ); diff --git a/deflect/Socket.cpp b/deflect/Socket.cpp index 2ea6d75..360bf15 100644 --- a/deflect/Socket.cpp +++ b/deflect/Socket.cpp @@ -59,7 +59,7 @@ const unsigned short Socket::defaultPortNumber = DEFAULT_PORT_NUMBER; Socket::Socket( const std::string& host, const unsigned short port ) : _host( host ) , _socket( new QTcpSocket( )) - , _remoteProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION ) + , _serverProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION ) { // disable warnings which occur if no QCoreApplication is present during // _connect(): QObject::connect: Cannot connect (null)::destroyed() to @@ -91,6 +91,11 @@ bool Socket::isConnected() const return _socket->state() == QTcpSocket::ConnectedState; } +int32_t Socket::getServerProtocolVersion() const +{ + return _serverProtocolVersion; +} + int Socket::getFileDescriptor() const { return _socket->socketDescriptor(); @@ -172,11 +177,6 @@ bool Socket::receive( MessageHeader& messageHeader, QByteArray& message ) return true; } -int32_t Socket::getRemoteProtocolVersion() const -{ - return _remoteProtocolVersion; -} - bool Socket::_receiveHeader( MessageHeader& messageHeader ) { while( _socket->bytesAvailable() < qint64(MessageHeader::serializedSize) ) @@ -193,45 +193,42 @@ bool Socket::_receiveHeader( MessageHeader& messageHeader ) bool Socket::_connect( const std::string& host, const unsigned short port ) { - // make sure we're disconnected - _socket->disconnectFromHost(); - - // open connection _socket->connectToHost( host.c_str(), port ); - if( !_socket->waitForConnected( RECEIVE_TIMEOUT_MS )) { - std::cerr << "could not connect to host " << host << ":" << port + std::cerr << "could not connect to " << host << ":" << port << std::endl; return false; } - // handshake - if( _checkProtocolVersion( )) - return true; + if( !_receiveProtocolVersion( )) + { + std::cerr << "server protocol version was not received" << std::endl; + _socket->disconnectFromHost(); + return false; + } + + if( _serverProtocolVersion < NETWORK_PROTOCOL_VERSION ) + { + std::cerr << "server uses unsupported protocol: " + << _serverProtocolVersion << " < " + << NETWORK_PROTOCOL_VERSION << std::endl; + _socket->disconnectFromHost(); + return false; + } - std::cerr << "Protocol version check failed for host: " << host << ":" - << port << std::endl; - _socket->disconnectFromHost(); - return false; + return true; } -bool Socket::_checkProtocolVersion() +bool Socket::_receiveProtocolVersion() { while( _socket->bytesAvailable() < qint64(sizeof(int32_t)) ) { if( !_socket->waitForReadyRead( RECEIVE_TIMEOUT_MS )) return false; } - - _socket->read((char *)&_remoteProtocolVersion, sizeof(int32_t)); - - if( _remoteProtocolVersion == NETWORK_PROTOCOL_VERSION ) - return true; - - std::cerr << "unsupported protocol version " << _remoteProtocolVersion - << " != " << NETWORK_PROTOCOL_VERSION << std::endl; - return false; + _socket->read((char*)&_serverProtocolVersion, sizeof(int32_t)); + return true; } } diff --git a/deflect/Socket.h b/deflect/Socket.h index 228f7b9..36533f3 100644 --- a/deflect/Socket.h +++ b/deflect/Socket.h @@ -85,11 +85,8 @@ class Socket : public QObject /** Is the Socket connected */ DEFLECT_API bool isConnected() const; - /** - * Is there a pending message - * @param messageSize Minimum size of the message - */ - bool hasMessage( const size_t messageSize = 0 ) const; + /** @return the protocol version of the server. */ + int32_t getServerProtocolVersion() const; /** * Get the FileDescriptor for the Socket (for use by poll()) @@ -97,6 +94,12 @@ class Socket : public QObject */ int getFileDescriptor() const; + /** + * Is there a pending message + * @param messageSize Minimum size of the message + */ + bool hasMessage( const size_t messageSize = 0 ) const; + /** * Send a message. * @param messageHeader The message header @@ -113,9 +116,6 @@ class Socket : public QObject */ bool receive( MessageHeader& messageHeader, QByteArray& message ); - /** Get the protocol version of the remote host */ - int32_t getRemoteProtocolVersion() const; - signals: /** Signal that the socket has been disconnected. */ void disconnected(); @@ -123,13 +123,12 @@ class Socket : public QObject private: const std::string _host; QTcpSocket* _socket; - int32_t _remoteProtocolVersion; mutable QMutex _socketMutex; - - bool _connect( const std::string &host, const unsigned short port ); - bool _checkProtocolVersion(); + int32_t _serverProtocolVersion; bool _receiveHeader( MessageHeader& messageHeader ); + bool _connect( const std::string &host, const unsigned short port ); + bool _receiveProtocolVersion(); }; } diff --git a/deflect/StreamPrivate.cpp b/deflect/StreamPrivate.cpp index 2530274..49b1113 100644 --- a/deflect/StreamPrivate.cpp +++ b/deflect/StreamPrivate.cpp @@ -41,6 +41,7 @@ #include "StreamPrivate.h" +#include "NetworkProtocol.h" #include "Segment.h" #include "SegmentParameters.h" #include "SizeHints.h" @@ -119,8 +120,9 @@ StreamPrivate::~StreamPrivate() void StreamPrivate::sendOpen() { - const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, 0, id ); - socket.send( mh, QByteArray( )); + const auto message = QByteArray::number( NETWORK_PROTOCOL_VERSION ); + const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, message.size(), id ); + socket.send( mh, message ); } void StreamPrivate::sendClose() diff --git a/doc/Changelog.md b/doc/Changelog.md index dbcb807..c8b830b 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -3,9 +3,11 @@ Changelog {#Changelog} ## Deflect 0.12 -### 0.12.1 (git master) +### 0.12.1 (01-02-2017) * [147](https://github.com/BlueBrain/Deflect/pull/147): + Improved handling of network protocol updates. Future updates should be + possible without breaking any client/server based on this release. Deflect server: better reporting of JPEG decompression errors. * [146](https://github.com/BlueBrain/Deflect/pull/146): Unified the command line options and help message of applications. diff --git a/tests/cpp/SocketTests.cpp b/tests/cpp/SocketTests.cpp index 575c01b..318e8c3 100644 --- a/tests/cpp/SocketTests.cpp +++ b/tests/cpp/SocketTests.cpp @@ -44,6 +44,7 @@ namespace ut = boost::unit_test; #include "MinimalGlobalQtApp.h" #include "MockServer.h" +#include #include #include @@ -53,14 +54,14 @@ BOOST_GLOBAL_FIXTURE( MinimalGlobalQtApp ); void testSocketConnect( const int32_t versionOffset ) { QThread thread; - MockServer* server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset ); + auto server = new MockServer( NETWORK_PROTOCOL_VERSION + versionOffset ); server->moveToThread( &thread ); server->connect( &thread, &QThread::finished, server, &QObject::deleteLater ); thread.start(); deflect::Socket socket( "localhost", server->serverPort( )); - BOOST_CHECK( socket.isConnected() == (versionOffset == 0)); + BOOST_CHECK( socket.isConnected() == (versionOffset >= 0)); thread.quit(); thread.wait(); diff --git a/tests/mock/MockServer.h b/tests/mock/MockServer.h index b63bb21..8dd6f68 100644 --- a/tests/mock/MockServer.h +++ b/tests/mock/MockServer.h @@ -46,7 +46,6 @@ typedef __int32 int32_t; #include #include -#include #include @@ -55,8 +54,7 @@ class MockServer : public QTcpServer Q_OBJECT public: - DEFLECT_API explicit MockServer( int32_t protocolVersion = - NETWORK_PROTOCOL_VERSION ); + DEFLECT_API explicit MockServer( int32_t protocolVersion ); DEFLECT_API virtual ~MockServer(); protected: