Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release 0.12.1: network protocol improvements and better JPEG error handling #147

Merged
merged 2 commits into from
Feb 1, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions deflect/ImageJpegDecompressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,7 @@ QByteArray ImageJpegDecompressor::decompress( const QByteArray& jpegData )
(unsigned long)jpegData.size(), &width,
&height, &jpegSubsamp );
if( err != 0 )
{
std::cerr << "libjpeg-turbo header decompression failure" << std::endl;
return QByteArray();
}
throw std::runtime_error( "libjpeg-turbo header decompression failed" );

// decompress image data
int pixelFormat = TJPF_RGBX; // Format for OpenGL texture (GL_RGBA)
Expand All @@ -78,10 +75,7 @@ QByteArray ImageJpegDecompressor::decompress( const QByteArray& jpegData )
(unsigned char*)decodedData.data(),
width, pitch, height, pixelFormat, flags );
if( err != 0 )
{
std::cerr << "libjpeg-turbo image decompression failure" << std::endl;
return QByteArray();
}
throw std::runtime_error( "libjpeg-turbo image decompression failed" );

return decodedData;
}
Expand Down
4 changes: 2 additions & 2 deletions deflect/ImageJpegDecompressor.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ class ImageJpegDecompressor
* Decompress a Jpeg image
*
* @param jpegData The compressed Jpeg data
* @return The decompressed image data in (GL_)RGBA format, or an
* empty array if the image could not be decoded.
* @return The decompressed image data in (GL_)RGBA format
* @throw std::runtime_error if a decompression error occured
*/
DEFLECT_API QByteArray decompress( const QByteArray& jpegData );

Expand Down
41 changes: 31 additions & 10 deletions deflect/SegmentDecoder.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*********************************************************************/
/* Copyright (c) 2013, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* All rights reserved. */
/* */
/* Redistribution and use in source and binary forms, with or */
Expand Down Expand Up @@ -69,20 +69,31 @@ SegmentDecoder::SegmentDecoder()

SegmentDecoder::~SegmentDecoder() {}

void decodeSegment( ImageJpegDecompressor* decompressor, Segment* segment )
void _decodeSegment( ImageJpegDecompressor* decompressor, Segment* segment )
{
QByteArray decodedData = decompressor->decompress( segment->imageData );
if( !segment->parameters.compressed )
return;

if( !decodedData.isEmpty( ))
QByteArray decodedData;
try
{
decodedData = decompressor->decompress( segment->imageData );
}
catch( const std::runtime_error& )
{
segment->imageData = decodedData;
segment->parameters.compressed = false;
throw;
}
const auto& params = segment->parameters;
if( (size_t)decodedData.size() != params.height * params.width * 4 )
throw std::runtime_error( "unexpected segment size" );

segment->imageData = decodedData;
segment->parameters.compressed = false;
}

void SegmentDecoder::decode( Segment& segment )
{
decodeSegment( &_impl->decompressor, &segment );
_decodeSegment( &_impl->decompressor, &segment );
}

void SegmentDecoder::startDecoding( Segment& segment )
Expand All @@ -95,14 +106,24 @@ void SegmentDecoder::startDecoding( Segment& segment )
return;
}

_impl->decodingFuture = QtConcurrent::run( decodeSegment,
_impl->decodingFuture = QtConcurrent::run( _decodeSegment,
&_impl->decompressor,
&segment );
}

void SegmentDecoder::waitDecoding()
{
_impl->decodingFuture.waitForFinished();
try
{
_impl->decodingFuture.waitForFinished();
}
catch( const QUnhandledException& )
{
// Let Qt throws a QUnhandledException and rewrite the error message.
// QtConcurrent::run can only forward QException subclasses, which does
// not even work on 5.7.1: https://bugreports.qt.io/browse/QTBUG-58021
throw std::runtime_error( "Segment decoding failed" );
}
}

bool SegmentDecoder::isRunning() const
Expand Down
11 changes: 8 additions & 3 deletions deflect/SegmentDecoder.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*********************************************************************/
/* Copyright (c) 2013, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* All rights reserved. */
/* */
/* Redistribution and use in source and binary forms, with or */
Expand Down Expand Up @@ -64,6 +64,7 @@ class SegmentDecoder
* @param segment The segment to decode. Upon success, its imageData member
* will hold the decompressed data and its "compressed" flag will be
* set to false.
* @throw std::runtime_error if a decompression error occured
*/
DEFLECT_API void decode( Segment& segment );

Expand All @@ -79,7 +80,11 @@ class SegmentDecoder
*/
DEFLECT_API void startDecoding( Segment& segment );

/** Waits for the decoding of a segment to finish, initiated by startDecoding(). */
/**
* Waits for the decoding of a segment to finish, initiated by
* startDecoding().
* @throw std::runtime_error if a decompression error occured
*/
DEFLECT_API void waitDecoding();

/** Check if the decoding thread is running. */
Expand Down
30 changes: 21 additions & 9 deletions deflect/ServerWorker.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*********************************************************************/
/* Copyright (c) 2013-2016, EPFL/Blue Brain Project */
/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* Daniel.Nachbaur@epfl.ch */
/* All rights reserved. */
Expand Down Expand Up @@ -47,7 +47,10 @@

#include <QDataStream>

#define RECEIVE_TIMEOUT_MS 3000
namespace
{
const int RECEIVE_TIMEOUT_MS = 3000;
}

namespace deflect
{
Expand All @@ -56,6 +59,7 @@ ServerWorker::ServerWorker( const int socketDescriptor )
// Ensure that tcpSocket_ parent is *this* so it gets moved to thread
: _tcpSocket( new QTcpSocket( this ))
, _sourceId( socketDescriptor )
, _clientProtocolVersion( NETWORK_PROTOCOL_VERSION )
, _registeredToEvents( false )
{
if( !_tcpSocket->setSocketDescriptor( socketDescriptor ))
Expand Down Expand Up @@ -223,6 +227,9 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader,
return;
}
_streamId = uri;
// The version is only sent by deflect clients since v. 0.13.0
if( !byteArray.isEmpty( ))
_parseClientProtocolVersion( byteArray );
emit addStreamSource( _streamId, _sourceId );
break;

Expand Down Expand Up @@ -263,17 +270,22 @@ void ServerWorker::_handleMessage( const MessageHeader& messageHeader,
}
}

void ServerWorker::_handlePixelStreamMessage( const QByteArray& byteArray )
void ServerWorker::_parseClientProtocolVersion( const QByteArray& message )
{
const SegmentParameters* parameters =
reinterpret_cast< const SegmentParameters* >( byteArray.data( ));
bool ok = false;
const int version = message.toInt( &ok );
if( ok )
_clientProtocolVersion = version;
}

void ServerWorker::_handlePixelStreamMessage( const QByteArray& message )
{
Segment segment;
segment.parameters = *parameters;

QByteArray imageData =
byteArray.right( byteArray.size() - sizeof( SegmentParameters ));
segment.imageData = imageData;
const auto data = message.data();
segment.parameters = *reinterpret_cast<const SegmentParameters*>( data );
segment.imageData = message.right( message.size() -
sizeof( SegmentParameters ));

emit( receivedSegment( _streamId, _sourceId, segment ));
}
Expand Down
6 changes: 4 additions & 2 deletions deflect/ServerWorker.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ private slots:

QString _streamId;
int _sourceId;
int _clientProtocolVersion;

bool _registeredToEvents;
QQueue<Event> _events;
Expand All @@ -105,8 +106,9 @@ private slots:
QByteArray _receiveMessageBody( int size );

void _handleMessage( const MessageHeader& messageHeader,
const QByteArray& byteArray );
void _handlePixelStreamMessage( const QByteArray& byteArray );
const QByteArray& message );
void _parseClientProtocolVersion( const QByteArray& message );
void _handlePixelStreamMessage( const QByteArray& message );

void _sendProtocolVersion();
void _sendBindReply( bool successful );
Expand Down
55 changes: 26 additions & 29 deletions deflect/Socket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ const unsigned short Socket::defaultPortNumber = DEFAULT_PORT_NUMBER;
Socket::Socket( const std::string& host, const unsigned short port )
: _host( host )
, _socket( new QTcpSocket( ))
, _remoteProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION )
, _serverProtocolVersion( INVALID_NETWORK_PROTOCOL_VERSION )
{
// disable warnings which occur if no QCoreApplication is present during
// _connect(): QObject::connect: Cannot connect (null)::destroyed() to
Expand Down Expand Up @@ -91,6 +91,11 @@ bool Socket::isConnected() const
return _socket->state() == QTcpSocket::ConnectedState;
}

int32_t Socket::getServerProtocolVersion() const
{
return _serverProtocolVersion;
}

int Socket::getFileDescriptor() const
{
return _socket->socketDescriptor();
Expand Down Expand Up @@ -172,11 +177,6 @@ bool Socket::receive( MessageHeader& messageHeader, QByteArray& message )
return true;
}

int32_t Socket::getRemoteProtocolVersion() const
{
return _remoteProtocolVersion;
}

bool Socket::_receiveHeader( MessageHeader& messageHeader )
{
while( _socket->bytesAvailable() < qint64(MessageHeader::serializedSize) )
Expand All @@ -193,45 +193,42 @@ bool Socket::_receiveHeader( MessageHeader& messageHeader )

bool Socket::_connect( const std::string& host, const unsigned short port )
{
// make sure we're disconnected
_socket->disconnectFromHost();

// open connection
_socket->connectToHost( host.c_str(), port );

if( !_socket->waitForConnected( RECEIVE_TIMEOUT_MS ))
{
std::cerr << "could not connect to host " << host << ":" << port
std::cerr << "could not connect to " << host << ":" << port
<< std::endl;
return false;
}

// handshake
if( _checkProtocolVersion( ))
return true;
if( !_receiveProtocolVersion( ))
{
std::cerr << "server protocol version was not received" << std::endl;
_socket->disconnectFromHost();
return false;
}

if( _serverProtocolVersion < NETWORK_PROTOCOL_VERSION )
{
std::cerr << "server uses unsupported protocol: "
<< _serverProtocolVersion << " < "
<< NETWORK_PROTOCOL_VERSION << std::endl;
_socket->disconnectFromHost();
return false;
}

std::cerr << "Protocol version check failed for host: " << host << ":"
<< port << std::endl;
_socket->disconnectFromHost();
return false;
return true;
}

bool Socket::_checkProtocolVersion()
bool Socket::_receiveProtocolVersion()
{
while( _socket->bytesAvailable() < qint64(sizeof(int32_t)) )
{
if( !_socket->waitForReadyRead( RECEIVE_TIMEOUT_MS ))
return false;
}

_socket->read((char *)&_remoteProtocolVersion, sizeof(int32_t));

if( _remoteProtocolVersion == NETWORK_PROTOCOL_VERSION )
return true;

std::cerr << "unsupported protocol version " << _remoteProtocolVersion
<< " != " << NETWORK_PROTOCOL_VERSION << std::endl;
return false;
_socket->read((char*)&_serverProtocolVersion, sizeof(int32_t));
return true;
}

}
23 changes: 11 additions & 12 deletions deflect/Socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,21 @@ class Socket : public QObject
/** Is the Socket connected */
DEFLECT_API bool isConnected() const;

/**
* Is there a pending message
* @param messageSize Minimum size of the message
*/
bool hasMessage( const size_t messageSize = 0 ) const;
/** @return the protocol version of the server. */
int32_t getServerProtocolVersion() const;

/**
* Get the FileDescriptor for the Socket (for use by poll())
* @return The file descriptor if available, otherwise return -1.
*/
int getFileDescriptor() const;

/**
* Is there a pending message
* @param messageSize Minimum size of the message
*/
bool hasMessage( const size_t messageSize = 0 ) const;

/**
* Send a message.
* @param messageHeader The message header
Expand All @@ -113,23 +116,19 @@ class Socket : public QObject
*/
bool receive( MessageHeader& messageHeader, QByteArray& message );

/** Get the protocol version of the remote host */
int32_t getRemoteProtocolVersion() const;

signals:
/** Signal that the socket has been disconnected. */
void disconnected();

private:
const std::string _host;
QTcpSocket* _socket;
int32_t _remoteProtocolVersion;
mutable QMutex _socketMutex;

bool _connect( const std::string &host, const unsigned short port );
bool _checkProtocolVersion();
int32_t _serverProtocolVersion;

bool _receiveHeader( MessageHeader& messageHeader );
bool _connect( const std::string &host, const unsigned short port );
bool _receiveProtocolVersion();
};

}
Expand Down
6 changes: 4 additions & 2 deletions deflect/StreamPrivate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@

#include "StreamPrivate.h"

#include "NetworkProtocol.h"
#include "Segment.h"
#include "SegmentParameters.h"
#include "SizeHints.h"
Expand Down Expand Up @@ -119,8 +120,9 @@ StreamPrivate::~StreamPrivate()

void StreamPrivate::sendOpen()
{
const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, 0, id );
socket.send( mh, QByteArray( ));
const auto message = QByteArray::number( NETWORK_PROTOCOL_VERSION );
const MessageHeader mh( MESSAGE_TYPE_PIXELSTREAM_OPEN, message.size(), id );
socket.send( mh, message );
}

void StreamPrivate::sendClose()
Expand Down
Loading