From db84bf4c70e5ba51a19e3f015f3c38df0f1d9eff Mon Sep 17 00:00:00 2001 From: Daniel Nachbaur Date: Fri, 28 Jul 2017 10:28:41 +0200 Subject: [PATCH] Add deflect::Observer which can be used only receive events w/o the need so send images --- CMakeLists.txt | 2 +- deflect/CMakeLists.txt | 2 + deflect/FrameDispatcher.cpp | 30 ++++-- deflect/FrameDispatcher.h | 18 +++- deflect/MessageHeader.h | 6 +- deflect/Observer.cpp | 172 ++++++++++++++++++++++++++++++ deflect/Observer.h | 199 +++++++++++++++++++++++++++++++++++ deflect/ReceiveBuffer.cpp | 4 +- deflect/Server.cpp | 34 +++--- deflect/ServerWorker.cpp | 19 +++- deflect/ServerWorker.h | 3 + deflect/Stream.cpp | 117 +------------------- deflect/Stream.h | 101 +----------------- deflect/StreamPrivate.cpp | 16 ++- deflect/StreamPrivate.h | 5 +- deflect/StreamSendWorker.cpp | 14 +++ deflect/StreamSendWorker.h | 8 +- doc/Changelog.md | 9 +- tests/cpp/ServerTests.cpp | 106 ++++++++++++++++++- 19 files changed, 613 insertions(+), 252 deletions(-) create mode 100644 deflect/Observer.cpp create mode 100644 deflect/Observer.h diff --git a/CMakeLists.txt b/CMakeLists.txt index ed78ecc..a8a83ca 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -4,7 +4,7 @@ # Daniel Nachbaur cmake_minimum_required(VERSION 3.1 FATAL_ERROR) -project(Deflect VERSION 0.13.1) +project(Deflect VERSION 0.14.0) set(Deflect_VERSION_ABI 6) list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/CMake/common) diff --git a/deflect/CMakeLists.txt b/deflect/CMakeLists.txt index 1760f8d..43b916f 100644 --- a/deflect/CMakeLists.txt +++ b/deflect/CMakeLists.txt @@ -10,6 +10,7 @@ set(DEFLECT_PUBLIC_HEADERS Frame.h ImageWrapper.h MTQueue.h + Observer.h Segment.h SegmentParameters.h Server.h @@ -38,6 +39,7 @@ set(DEFLECT_SOURCES ImageWrapper.cpp MessageHeader.cpp MetaTypeRegistration.cpp + Observer.cpp ReceiveBuffer.cpp Server.cpp ServerWorker.cpp diff --git a/deflect/FrameDispatcher.cpp b/deflect/FrameDispatcher.cpp index 8c0c68b..699c0d4 100644 --- a/deflect/FrameDispatcher.cpp +++ b/deflect/FrameDispatcher.cpp @@ -71,10 +71,12 @@ class FrameDispatcher::Impl typedef std::map StreamBuffers; StreamBuffers streamBuffers; + std::map observers; }; -FrameDispatcher::FrameDispatcher() - : _impl(new Impl) +FrameDispatcher::FrameDispatcher(QObject* parent) + : QObject(parent) + , _impl(new Impl) { } @@ -97,8 +99,21 @@ void FrameDispatcher::removeSource(const QString uri, const size_t sourceIndex) _impl->streamBuffers[uri].removeSource(sourceIndex); - if (_impl->streamBuffers[uri].getSourceCount() == 0) - deleteStream(uri); + deleteStream(uri); +} + +void FrameDispatcher::addObserver(const QString uri) +{ + ++_impl->observers[uri]; + emit pixelStreamOpened(uri); +} + +void FrameDispatcher::removeObserver(QString uri) +{ + if (_impl->observers[uri] > 0) + --_impl->observers[uri]; + + deleteStream(uri); } void FrameDispatcher::processSegment(const QString uri, @@ -145,10 +160,13 @@ void FrameDispatcher::requestFrame(const QString uri) void FrameDispatcher::deleteStream(const QString uri) { - if (_impl->streamBuffers.count(uri)) + if (_impl->streamBuffers[uri].getSourceCount() == 0 && + _impl->streamBuffers.count(uri)) { _impl->streamBuffers.erase(uri); - emit pixelStreamClosed(uri); + + if (_impl->observers[uri] == 0) + emit pixelStreamClosed(uri); } } } diff --git a/deflect/FrameDispatcher.h b/deflect/FrameDispatcher.h index 7e929a8..836c1ea 100644 --- a/deflect/FrameDispatcher.h +++ b/deflect/FrameDispatcher.h @@ -58,7 +58,7 @@ class FrameDispatcher : public QObject public: /** Construct a dispatcher */ - FrameDispatcher(); + FrameDispatcher(QObject* parent); /** Destructor. */ ~FrameDispatcher(); @@ -80,6 +80,22 @@ public slots: */ void removeSource(QString uri, size_t sourceIndex); + /** + * Add a stream source as an observer which does not contribute segments. + * Emits pixelStreamOpened() if no other observer or source is present. + * + * @param uri Identifier for the stream + */ + void addObserver(QString uri); + + /** + * Remove a stream observer, emits pixelStreamClosed() if all observers and + * sources are gone. + * + * @param uri Identifier for the stream + */ + void removeObserver(QString uri); + /** * Process a new Segment. * diff --git a/deflect/MessageHeader.h b/deflect/MessageHeader.h index 3d1f0ff..8733078 100644 --- a/deflect/MessageHeader.h +++ b/deflect/MessageHeader.h @@ -1,5 +1,5 @@ /*********************************************************************/ -/* Copyright (c) 2013-2016, EPFL/Blue Brain Project */ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ /* Raphael.Dumusc@epfl.ch */ /* Daniel.Nachbaur@epfl.ch */ /* Redistribution and use in source and binary forms, with or */ @@ -67,7 +67,9 @@ enum MessageType MESSAGE_TYPE_QUIT = 12, MESSAGE_TYPE_SIZE_HINTS = 13, MESSAGE_TYPE_DATA = 14, - MESSAGE_TYPE_IMAGE_VIEW = 15 + MESSAGE_TYPE_IMAGE_VIEW = 15, + MESSAGE_TYPE_OBSERVER_OPEN = 16, + MESSAGE_TYPE_OBSERVER_QUIT = 17 }; #define MESSAGE_HEADER_URI_LENGTH 64 diff --git a/deflect/Observer.cpp b/deflect/Observer.cpp new file mode 100644 index 0000000..f96e118 --- /dev/null +++ b/deflect/Observer.cpp @@ -0,0 +1,172 @@ +/*********************************************************************/ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ +/* Stefan.Eilemann@epfl.ch */ +/* Daniel.Nachbaur@epfl.ch */ +/* All rights reserved. */ +/* */ +/* Redistribution and use in source and binary forms, with or */ +/* without modification, are permitted provided that the following */ +/* conditions are met: */ +/* */ +/* 1. Redistributions of source code must retain the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer. */ +/* */ +/* 2. Redistributions in binary form must reproduce the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer in the documentation and/or other materials */ +/* provided with the distribution. */ +/* */ +/* THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, */ +/* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF */ +/* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE */ +/* DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, */ +/* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES */ +/* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE */ +/* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR */ +/* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF */ +/* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */ +/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT */ +/* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE */ +/* POSSIBILITY OF SUCH DAMAGE. */ +/* */ +/* The views and conclusions contained in the software and */ +/* documentation are those of the authors and should not be */ +/* interpreted as representing official policies, either expressed */ +/* or implied, of The University of Texas at Austin. */ +/*********************************************************************/ + +#include "Observer.h" +#include "StreamPrivate.h" + +#include + +#include +#include + +namespace deflect +{ +Observer::Observer() + : _impl(new StreamPrivate("", "", Socket::defaultPortNumber, true)) +{ +} + +Observer::Observer(const std::string& id, const std::string& host, + const unsigned short port) + : _impl(new StreamPrivate(id, host, port, true)) +{ +} + +Observer::Observer(StreamPrivate* impl) + : _impl(impl) +{ +} + +Observer::~Observer() +{ +} + +bool Observer::isConnected() const +{ + return _impl->socket.isConnected(); +} + +const std::string& Observer::getId() const +{ + return _impl->id; +} + +const std::string& Observer::getHost() const +{ + return _impl->socket.getHost(); +} + +bool Observer::registerForEvents(const bool exclusive) +{ + if (!isConnected()) + { + std::cerr << "deflect::Stream::registerForEvents: Stream is not " + << "connected, operation failed" << std::endl; + return false; + } + + if (isRegisteredForEvents()) + return true; + + // Send the bind message + if (!_impl->sendWorker.enqueueBindRequest(exclusive).get()) + { + std::cerr << "deflect::Stream::registerForEvents: sending bind message " + << "failed" << std::endl; + return false; + } + + // Wait for bind reply + MessageHeader mh; + QByteArray message; + if (!_impl->socket.receive(mh, message)) + { + std::cerr << "deflect::Stream::registerForEvents: receive bind reply " + << "failed" << std::endl; + return false; + } + if (mh.type != MESSAGE_TYPE_BIND_EVENTS_REPLY) + { + std::cerr << "deflect::Stream::registerForEvents: received unexpected " + << "message type (" << int(mh.type) << ")" << std::endl; + return false; + } + _impl->registeredForEvents = *(bool*)(message.data()); + + return isRegisteredForEvents(); +} + +bool Observer::isRegisteredForEvents() const +{ + return _impl->registeredForEvents; +} + +int Observer::getDescriptor() const +{ + return _impl->socket.getFileDescriptor(); +} + +bool Observer::hasEvent() const +{ + return _impl->socket.hasMessage(Event::serializedSize); +} + +Event Observer::getEvent() +{ + MessageHeader mh; + QByteArray message; + if (!_impl->socket.receive(mh, message)) + { + std::cerr << "deflect::Stream::getEvent: receive failed" << std::endl; + return Event(); + } + if (mh.type != MESSAGE_TYPE_EVENT) + { + std::cerr << "deflect::Stream::getEvent: received unexpected message " + << "type (" << int(mh.type) << ")" << std::endl; + return Event(); + } + + assert((size_t)message.size() == Event::serializedSize); + + Event event; + { + QDataStream stream(message); + stream >> event; + } + return event; +} + +void Observer::setDisconnectedCallback(const std::function callback) +{ + _impl->disconnectedCallback = callback; +} +} diff --git a/deflect/Observer.h b/deflect/Observer.h new file mode 100644 index 0000000..6559bb6 --- /dev/null +++ b/deflect/Observer.h @@ -0,0 +1,199 @@ +/*********************************************************************/ +/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */ +/* Raphael Dumusc */ +/* Stefan.Eilemann@epfl.ch */ +/* Daniel.Nachbaur@epfl.ch */ +/* All rights reserved. */ +/* */ +/* Redistribution and use in source and binary forms, with or */ +/* without modification, are permitted provided that the following */ +/* conditions are met: */ +/* */ +/* 1. Redistributions of source code must retain the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer. */ +/* */ +/* 2. Redistributions in binary form must reproduce the above */ +/* copyright notice, this list of conditions and the following */ +/* disclaimer in the documentation and/or other materials */ +/* provided with the distribution. */ +/* */ +/* THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, */ +/* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF */ +/* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE */ +/* DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF TEXAS AT */ +/* AUSTIN OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, */ +/* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES */ +/* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE */ +/* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR */ +/* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF */ +/* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */ +/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT */ +/* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE */ +/* POSSIBILITY OF SUCH DAMAGE. */ +/* */ +/* The views and conclusions contained in the software and */ +/* documentation are those of the authors and should not be */ +/* interpreted as representing official policies, either expressed */ +/* or implied, of The University of Texas at Austin. */ +/*********************************************************************/ + +#ifndef DEFLECT_OBSERVER_H +#define DEFLECT_OBSERVER_H + +#include +#include +#include + +#include +#include +#include + +namespace deflect +{ +class StreamPrivate; + +/** + */ +class Observer +{ +public: + /** + * Open a new connection to the Server using environment variables. + * + * DEFLECT_HOST The address of the target Server instance (required). + * DEFLECT_ID The identifier for the stream. If not provided, a random + * unique identifier will be used. + * @throw std::runtime_error if DEFLECT_HOST was not provided. + * @version 1.3 + */ + DEFLECT_API Observer(); + + /** + * Open a new connection to the Server. + * + * The user can check if the connection was successfully established with + * isConnected(). + * + * Different Streams can contribute to a single window by using the same + * identifier. All the Streams which contribute to the same window should be + * created before any of them starts sending images. + * + * @param id The identifier for the stream. If left empty, the environment + * variable DEFLECT_ID will be used. If both values are empty, + * a random unique identifier will be used. + * @param host The address of the target Server instance. It can be a + * hostname like "localhost" or an IP in string format like + * "192.168.1.83". If left empty, the environment variable + * DEFLECT_HOST will be used instead. + * @param port Port of the Server instance, default 1701. + * @throw std::runtime_error if no host was provided. + * @version 1.0 + */ + DEFLECT_API Observer(const std::string& id, const std::string& host, + unsigned short port = 1701); + + /** Destruct the Stream, closing the connection. @version 1.0 */ + DEFLECT_API virtual ~Observer(); + + /** @return true if the stream is connected, false otherwise. @version 1.0*/ + DEFLECT_API bool isConnected() const; + + /** @return the identifier defined by the constructor. @version 1.3 */ + DEFLECT_API const std::string& getId() const; + + /** @return the host defined by the constructor. @version 1.3 */ + DEFLECT_API const std::string& getHost() const; + + /** + * Register to receive Events. + * + * After registering, the Server application will send Events whenever a + * user is interacting with this Stream's window. + * + * Events can be retrieved using hasEvent() and getEvent(). + * + * The current registration status can be checked with + * isRegisteredForEvents(). + * + * This method is synchronous and waits for a registration reply from the + * Server before returning. + * + * @param exclusive Binds only one stream source for the same identifier. + * @return true if the registration could be or was already established. + * @version 1.0 + */ + DEFLECT_API bool registerForEvents(bool exclusive = false); + + /** + * Is this stream registered to receive events. + * + * Check if the stream has already successfully registered with + * registerForEvents(). + * + * @return true after the Server application has acknowledged the + * registration request, false otherwise + * @version 1.0 + */ + DEFLECT_API bool isRegisteredForEvents() const; + + /** + * Get the native descriptor for the data stream. + * + * This descriptor can for instance be used by poll() on UNIX systems. + * Having this descriptor lets a Stream class user detect when the Stream + * has received any data. The user can the use query the state of the + * Stream, for example using hasEvent(), and process the events accordingly. + * + * @return The native descriptor if available; otherwise returns -1. + * @version 1.0 + */ + DEFLECT_API int getDescriptor() const; + + /** + * Check if a new Event is available. + * + * This method is non-blocking. Use this method prior to calling getEvent(), + * for example as the condition for a while() loop to process all pending + * events. + * + * @return True if an Event is available, false otherwise + * @version 1.0 + */ + DEFLECT_API bool hasEvent() const; + + /** + * Get the next Event. + * + * This method is synchronous and waits until an Event is available before + * returning (or a 1 second timeout occurs). + * + * Check if an Event is available with hasEvent() before calling this + * method. + * + * @return The next Event if available, otherwise an empty (default) Event. + * @version 1.0 + */ + DEFLECT_API Event getEvent(); + + /** + * Set a function to be be called just after the stream gets disconnected. + * + * @param callback the function to call + * @note replaces the previous disconnected signal + * @version 1.5 + */ + DEFLECT_API void setDisconnectedCallback(std::function callback); + +protected: + Observer(const Stream&) = delete; + const Observer& operator=(const Observer&) = delete; + + std::unique_ptr _impl; + + Observer(StreamPrivate* impl); +}; +} + +#endif diff --git a/deflect/ReceiveBuffer.cpp b/deflect/ReceiveBuffer.cpp index 76a3959..b6787d1 100644 --- a/deflect/ReceiveBuffer.cpp +++ b/deflect/ReceiveBuffer.cpp @@ -93,8 +93,6 @@ void ReceiveBuffer::finishFrameForSource(const size_t sourceIndex) bool ReceiveBuffer::hasCompleteFrame() const { - assert(!_sourceBuffers.empty()); - // Check if all sources for Stream have reached the same index for (const auto& kv : _sourceBuffers) { @@ -102,7 +100,7 @@ bool ReceiveBuffer::hasCompleteFrame() const if (buffer.getBackFrameIndex() <= _lastFrameComplete) return false; } - return true; + return !_sourceBuffers.empty(); } Segments ReceiveBuffer::popFrame() diff --git a/deflect/Server.cpp b/deflect/Server.cpp index 5684bbb..d7b3daa 100644 --- a/deflect/Server.cpp +++ b/deflect/Server.cpp @@ -55,11 +55,17 @@ const int Server::defaultPortNumber = DEFAULT_PORT_NUMBER; class Server::Impl { public: - FrameDispatcher frameDispatcher; + Impl(QObject* parent) + : frameDispatcher( + new FrameDispatcher(parent)) // we be deleted by parent + { + } + + FrameDispatcher* frameDispatcher; }; Server::Server(const int port) - : _impl(new Impl) + : _impl(new Impl(this)) { setProxy(QNetworkProxy::NoProxy); if (!listen(QHostAddress::Any, port)) @@ -71,13 +77,13 @@ Server::Server(const int port) } // Forward FrameDispatcher signals - connect(&_impl->frameDispatcher, &FrameDispatcher::pixelStreamOpened, this, + connect(_impl->frameDispatcher, &FrameDispatcher::pixelStreamOpened, this, &Server::pixelStreamOpened); - connect(&_impl->frameDispatcher, &FrameDispatcher::pixelStreamClosed, this, + connect(_impl->frameDispatcher, &FrameDispatcher::pixelStreamClosed, this, &Server::pixelStreamClosed); - connect(&_impl->frameDispatcher, &FrameDispatcher::sendFrame, this, + connect(_impl->frameDispatcher, &FrameDispatcher::sendFrame, this, &Server::receivedFrame); - connect(&_impl->frameDispatcher, &FrameDispatcher::bufferSizeExceeded, this, + connect(_impl->frameDispatcher, &FrameDispatcher::bufferSizeExceeded, this, &Server::closePixelStream); } @@ -95,13 +101,13 @@ Server::~Server() void Server::requestFrame(const QString uri) { - _impl->frameDispatcher.requestFrame(uri); + _impl->frameDispatcher->requestFrame(uri); } void Server::closePixelStream(const QString uri) { emit _closePixelStream(uri); - _impl->frameDispatcher.deleteStream(uri); + _impl->frameDispatcher->deleteStream(uri); } void Server::incomingConnection(const qintptr socketHandle) @@ -132,14 +138,18 @@ void Server::incomingConnection(const qintptr socketHandle) &ServerWorker::closeConnection); // FrameDispatcher - connect(worker, &ServerWorker::addStreamSource, &_impl->frameDispatcher, + connect(worker, &ServerWorker::addStreamSource, _impl->frameDispatcher, &FrameDispatcher::addSource); - connect(worker, &ServerWorker::receivedSegment, &_impl->frameDispatcher, + connect(worker, &ServerWorker::receivedSegment, _impl->frameDispatcher, &FrameDispatcher::processSegment); connect(worker, &ServerWorker::receivedFrameFinished, - &_impl->frameDispatcher, &FrameDispatcher::processFrameFinished); - connect(worker, &ServerWorker::removeStreamSource, &_impl->frameDispatcher, + _impl->frameDispatcher, &FrameDispatcher::processFrameFinished); + connect(worker, &ServerWorker::removeStreamSource, _impl->frameDispatcher, &FrameDispatcher::removeSource); + connect(worker, &ServerWorker::addObserver, _impl->frameDispatcher, + &FrameDispatcher::addObserver); + connect(worker, &ServerWorker::removeObserver, _impl->frameDispatcher, + &FrameDispatcher::removeObserver); workerThread->start(); } diff --git a/deflect/ServerWorker.cpp b/deflect/ServerWorker.cpp index 9cc233a..b99a425 100644 --- a/deflect/ServerWorker.cpp +++ b/deflect/ServerWorker.cpp @@ -86,7 +86,10 @@ ServerWorker::~ServerWorker() // if other senders are still active / resp. the window gets closed if no // more senders contribute to it. if (!_streamId.isEmpty()) + { emit removeStreamSource(_streamId, _sourceId); + emit removeObserver(_streamId, _sourceId); + } if (_isConnected()) _sendQuit(); @@ -194,9 +197,11 @@ void ServerWorker::_handleMessage(const MessageHeader& messageHeader, closeConnection(_streamId); return; } - if (uri != _streamId && messageHeader.type != MESSAGE_TYPE_PIXELSTREAM_OPEN) + if (uri != _streamId && + messageHeader.type != MESSAGE_TYPE_PIXELSTREAM_OPEN && + messageHeader.type != MESSAGE_TYPE_OBSERVER_OPEN) { - std::cerr << "Warning: ingnoring message with incorrect stream id: '" + std::cerr << "Warning: ignoring message with incorrect stream id: '" << messageHeader.uri << "', expected: '" << _streamId.toStdString() << "'" << std::endl; return; @@ -222,6 +227,16 @@ void ServerWorker::_handleMessage(const MessageHeader& messageHeader, emit addStreamSource(_streamId, _sourceId); break; + case MESSAGE_TYPE_OBSERVER_QUIT: + emit removeObserver(_streamId, _sourceId); + _streamId = QString(); + break; + + case MESSAGE_TYPE_OBSERVER_OPEN: + _streamId = uri; + emit addObserver(_streamId, _sourceId); + break; + case MESSAGE_TYPE_PIXELSTREAM_FINISH_FRAME: emit receivedFrameFinished(_streamId, _sourceId); break; diff --git a/deflect/ServerWorker.h b/deflect/ServerWorker.h index c1f9529..65086c6 100644 --- a/deflect/ServerWorker.h +++ b/deflect/ServerWorker.h @@ -71,6 +71,9 @@ public slots: void addStreamSource(QString uri, size_t sourceIndex); void removeStreamSource(QString uri, size_t sourceIndex); + void addObserver(QString uri, size_t sourceIndex); + void removeObserver(QString uri, size_t sourceIndex); + void receivedSegment(QString uri, size_t sourceIndex, deflect::Segment segment); void receivedFrameFinished(QString uri, size_t sourceIndex); diff --git a/deflect/Stream.cpp b/deflect/Stream.cpp index 2e1aeb3..1f5b767 100644 --- a/deflect/Stream.cpp +++ b/deflect/Stream.cpp @@ -42,28 +42,16 @@ #include "Stream.h" #include "StreamPrivate.h" -#include "Event.h" -#include "ImageWrapper.h" -#include "MessageHeader.h" -#include "Segment.h" -#include "SegmentParameters.h" -#include "Socket.h" - -#include - -#include -#include - namespace deflect { Stream::Stream() - : _impl(new StreamPrivate("", "", Socket::defaultPortNumber)) + : Observer(new StreamPrivate("", "", Socket::defaultPortNumber, false)) { } Stream::Stream(const std::string& id, const std::string& host, const unsigned short port) - : _impl(new StreamPrivate(id, host, port)) + : Observer(new StreamPrivate(id, host, port, false)) { } @@ -71,21 +59,6 @@ Stream::~Stream() { } -bool Stream::isConnected() const -{ - return _impl->socket.isConnected(); -} - -const std::string& Stream::getId() const -{ - return _impl->id; -} - -const std::string& Stream::getHost() const -{ - return _impl->socket.getHost(); -} - Stream::Future Stream::send(const ImageWrapper& image) { return _impl->sendWorker.enqueueImage(image, false); @@ -101,97 +74,11 @@ Stream::Future Stream::sendAndFinish(const ImageWrapper& image) return _impl->sendWorker.enqueueImage(image, true); } -bool Stream::registerForEvents(const bool exclusive) -{ - if (!isConnected()) - { - std::cerr << "deflect::Stream::registerForEvents: Stream is not " - << "connected, operation failed" << std::endl; - return false; - } - - if (isRegisteredForEvents()) - return true; - - // Send the bind message - if (!_impl->sendWorker.enqueueBindRequest(exclusive).get()) - { - std::cerr << "deflect::Stream::registerForEvents: sending bind message " - << "failed" << std::endl; - return false; - } - - // Wait for bind reply - MessageHeader mh; - QByteArray message; - if (!_impl->socket.receive(mh, message)) - { - std::cerr << "deflect::Stream::registerForEvents: receive bind reply " - << "failed" << std::endl; - return false; - } - if (mh.type != MESSAGE_TYPE_BIND_EVENTS_REPLY) - { - std::cerr << "deflect::Stream::registerForEvents: received unexpected " - << "message type (" << int(mh.type) << ")" << std::endl; - return false; - } - _impl->registeredForEvents = *(bool*)(message.data()); - - return isRegisteredForEvents(); -} - -bool Stream::isRegisteredForEvents() const -{ - return _impl->registeredForEvents; -} - -int Stream::getDescriptor() const -{ - return _impl->socket.getFileDescriptor(); -} - -bool Stream::hasEvent() const -{ - return _impl->socket.hasMessage(Event::serializedSize); -} - -Event Stream::getEvent() -{ - MessageHeader mh; - QByteArray message; - if (!_impl->socket.receive(mh, message)) - { - std::cerr << "deflect::Stream::getEvent: receive failed" << std::endl; - return Event(); - } - if (mh.type != MESSAGE_TYPE_EVENT) - { - std::cerr << "deflect::Stream::getEvent: received unexpected message " - << "type (" << int(mh.type) << ")" << std::endl; - return Event(); - } - - assert((size_t)message.size() == Event::serializedSize); - - Event event; - { - QDataStream stream(message); - stream >> event; - } - return event; -} - void Stream::sendSizeHints(const SizeHints& hints) { _impl->sendWorker.enqueueSizeHints(hints); } -void Stream::setDisconnectedCallback(const std::function callback) -{ - _impl->disconnectedCallback = callback; -} - bool Stream::sendData(const char* data, const size_t count) { return _impl->sendWorker diff --git a/deflect/Stream.h b/deflect/Stream.h index b954b21..aa9e336 100644 --- a/deflect/Stream.h +++ b/deflect/Stream.h @@ -42,20 +42,13 @@ #ifndef DEFLECT_STREAM_H #define DEFLECT_STREAM_H -#include #include +#include #include #include -#include -#include -#include -#include - namespace deflect { -class StreamPrivate; - /** * Stream visual data to a deflect::Server. * @@ -66,7 +59,7 @@ class StreamPrivate; * The methods in this class are reentrant (all instances are independant) but * are not thread-safe. */ -class Stream +class Stream : public Observer { public: /** @@ -107,15 +100,6 @@ class Stream /** Destruct the Stream, closing the connection. @version 1.0 */ DEFLECT_API virtual ~Stream(); - /** @return true if the stream is connected, false otherwise. @version 1.0*/ - DEFLECT_API bool isConnected() const; - - /** @return the identifier defined by the constructor. @version 1.3 */ - DEFLECT_API const std::string& getId() const; - - /** @return the host defined by the constructor. @version 1.3 */ - DEFLECT_API const std::string& getHost() const; - /** @name Asynchronous send API */ //@{ /** Future signaling success of asyncSend(). @version 1.5 */ @@ -165,77 +149,6 @@ class Stream Future asyncSend(const ImageWrapper& image) { return sendAndFinish(image); } //@} - /** - * Register to receive Events. - * - * After registering, the Server application will send Events whenever a - * user is interacting with this Stream's window. - * - * Events can be retrieved using hasEvent() and getEvent(). - * - * The current registration status can be checked with - * isRegisteredForEvents(). - * - * This method is synchronous and waits for a registration reply from the - * Server before returning. - * - * @param exclusive Binds only one stream source for the same identifier. - * @return true if the registration could be or was already established. - * @version 1.0 - */ - DEFLECT_API bool registerForEvents(bool exclusive = false); - - /** - * Is this stream registered to receive events. - * - * Check if the stream has already successfully registered with - * registerForEvents(). - * - * @return true after the Server application has acknowledged the - * registration request, false otherwise - * @version 1.0 - */ - DEFLECT_API bool isRegisteredForEvents() const; - - /** - * Get the native descriptor for the data stream. - * - * This descriptor can for instance be used by poll() on UNIX systems. - * Having this descriptor lets a Stream class user detect when the Stream - * has received any data. The user can the use query the state of the - * Stream, for example using hasEvent(), and process the events accordingly. - * - * @return The native descriptor if available; otherwise returns -1. - * @version 1.0 - */ - DEFLECT_API int getDescriptor() const; - - /** - * Check if a new Event is available. - * - * This method is non-blocking. Use this method prior to calling getEvent(), - * for example as the condition for a while() loop to process all pending - * events. - * - * @return True if an Event is available, false otherwise - * @version 1.0 - */ - DEFLECT_API bool hasEvent() const; - - /** - * Get the next Event. - * - * This method is synchronous and waits until an Event is available before - * returning (or a 1 second timeout occurs). - * - * Check if an Event is available with hasEvent() before calling this - * method. - * - * @return The next Event if available, otherwise an empty (default) Event. - * @version 1.0 - */ - DEFLECT_API Event getEvent(); - /** * Send size hints to the stream server to indicate sizes that should be * respected by resize operations on the server side. @@ -257,20 +170,10 @@ class Stream */ DEFLECT_API bool sendData(const char* data, size_t count); - /** - * Set a function to be be called just after the stream gets disconnected. - * - * @param callback the function to call - * @note replaces the previous disconnected signal - * @version 1.5 - */ - DEFLECT_API void setDisconnectedCallback(std::function callback); - private: Stream(const Stream&) = delete; const Stream& operator=(const Stream&) = delete; - std::unique_ptr _impl; friend class deflect::test::Application; }; } diff --git a/deflect/StreamPrivate.cpp b/deflect/StreamPrivate.cpp index e90c4bc..8373bc3 100644 --- a/deflect/StreamPrivate.cpp +++ b/deflect/StreamPrivate.cpp @@ -80,10 +80,11 @@ std::string _getStreamId(const std::string& id) namespace deflect { StreamPrivate::StreamPrivate(const std::string& id_, const std::string& host, - const unsigned short port) + const unsigned short port, const bool observer) : id{_getStreamId(id_)} , socket{_getStreamHost(host), port} , sendWorker{socket, id} + , _observer(observer) { if (!socket.isConnected()) return; @@ -95,12 +96,21 @@ StreamPrivate::StreamPrivate(const std::string& id_, const std::string& host, socket.moveToThread(&sendWorker); sendWorker.start(); - sendWorker.enqueueOpen().wait(); + + if (observer) + sendWorker.enqueueObserverOpen().wait(); + else + sendWorker.enqueueOpen().wait(); } StreamPrivate::~StreamPrivate() { if (socket.isConnected()) - sendWorker.enqueueClose().wait(); + { + if (_observer) + sendWorker.enqueueObserverClose().wait(); + else + sendWorker.enqueueClose().wait(); + } } } diff --git a/deflect/StreamPrivate.h b/deflect/StreamPrivate.h index 82dde6d..81dbba1 100644 --- a/deflect/StreamPrivate.h +++ b/deflect/StreamPrivate.h @@ -62,7 +62,7 @@ class StreamPrivate * @param port Port of the target Server instance. */ StreamPrivate(const std::string& id, const std::string& host, - unsigned short port); + unsigned short port, bool observer); /** Destructor, close the Stream. */ ~StreamPrivate(); @@ -81,6 +81,9 @@ class StreamPrivate /** The worker doing all the socket send operations. */ StreamSendWorker sendWorker; + +private: + const bool _observer; }; } #endif diff --git a/deflect/StreamSendWorker.cpp b/deflect/StreamSendWorker.cpp index 6324f4f..8f85470 100644 --- a/deflect/StreamSendWorker.cpp +++ b/deflect/StreamSendWorker.cpp @@ -164,6 +164,20 @@ Stream::Future StreamSendWorker::enqueueClose() return _enqueueRequest({[this] { return _send(MESSAGE_TYPE_QUIT, {}); }}); } +Stream::Future StreamSendWorker::enqueueObserverOpen() +{ + return _enqueueRequest({[this] { + return _send(MESSAGE_TYPE_OBSERVER_OPEN, + QByteArray::number(NETWORK_PROTOCOL_VERSION)); + }}); +} + +Stream::Future StreamSendWorker::enqueueObserverClose() +{ + return _enqueueRequest( + {[this] { return _send(MESSAGE_TYPE_OBSERVER_QUIT, {}); }}); +} + Stream::Future StreamSendWorker::enqueueBindRequest(const bool exclusive) { return _enqueueRequest({[this, exclusive] { diff --git a/deflect/StreamSendWorker.h b/deflect/StreamSendWorker.h index 8636664..b554ff4 100644 --- a/deflect/StreamSendWorker.h +++ b/deflect/StreamSendWorker.h @@ -76,9 +76,11 @@ class StreamSendWorker : public QThread /** Enqueue an image to be send during the execution of run(). */ Stream::Future enqueueImage(const ImageWrapper& image, bool finish); - Stream::Future enqueueFinish(); //!< Enqueue a finishFrame() - Stream::Future enqueueOpen(); //!< Enqueue an open message - Stream::Future enqueueClose(); //!< Enqueue a close message + Stream::Future enqueueFinish(); //!< Enqueue a finishFrame() + Stream::Future enqueueOpen(); //!< Enqueue an open message + Stream::Future enqueueClose(); //!< Enqueue a close message + Stream::Future enqueueObserverOpen(); //!< Enqueue an observer open message + Stream::Future enqueueObserverClose(); //!< Enqueue a observer close message /** @sa Stream::registerForEvents */ Stream::Future enqueueBindRequest(bool exclusive); diff --git a/doc/Changelog.md b/doc/Changelog.md index 4e64a37..f4965eb 100644 --- a/doc/Changelog.md +++ b/doc/Changelog.md @@ -1,16 +1,21 @@ Changelog {#Changelog} ============ -## Deflect 0.13 +## Deflect 0.14 -### 0.13.1 (git master) +### 0.14.0 (git master) +* [175](https://github.com/BlueBrain/Deflect/pull/175): + Add deflect::Observer which can be used only receive events w/o the need so + send images * [174](https://github.com/BlueBrain/Deflect/pull/174): Performance improvements for sending small images (tested with 64x64): directly compress them in the call thread * [173](https://github.com/BlueBrain/Deflect/pull/173): Fix server: disabling system proxy which are default starting Qt 5.8 +## Deflect 0.13 + ### Deflect 0.13.0 (10-05-2017) * [163](https://github.com/BlueBrain/Deflect/pull/163): Stereo streaming accepts side-by-side images as input. diff --git a/tests/cpp/ServerTests.cpp b/tests/cpp/ServerTests.cpp index 5d6b6f3..a962721 100644 --- a/tests/cpp/ServerTests.cpp +++ b/tests/cpp/ServerTests.cpp @@ -1,6 +1,6 @@ /*********************************************************************/ -/* Copyright (c) 2015, EPFL/Blue Brain Project */ -/* Daniel.Nachbaur@epfl.ch */ +/* Copyright (c) 2015-2017, EPFL/Blue Brain Project */ +/* Daniel.Nachbaur@epfl.ch */ /* All rights reserved. */ /* */ /* Redistribution and use in source and binary forms, with or */ @@ -44,6 +44,7 @@ namespace ut = boost::unit_test; #include "MinimalGlobalQtApp.h" #include +#include #include #include @@ -235,3 +236,104 @@ BOOST_AUTO_TEST_CASE(testDataReceivedByServer) } BOOST_CHECK(!"reachable"); } + +BOOST_AUTO_TEST_CASE(testOneObserverAndOneStream) +{ + QThread serverThread; + deflect::Server* server = new deflect::Server(0 /* OS-chosen port */); + server->moveToThread(&serverThread); + serverThread.connect(&serverThread, &QThread::finished, server, + &deflect::Server::deleteLater); + serverThread.start(); + + QWaitCondition received; + QMutex mutex; + + size_t openedStreams = 0; + bool receivedState = false; + server->connect(server, &deflect::Server::pixelStreamOpened, + [&](const QString) { + ++openedStreams; + if (openedStreams == 2) + { + mutex.lock(); + receivedState = true; + received.wakeAll(); + mutex.unlock(); + } + }); + + server->connect(server, &deflect::Server::registerToEvents, + [&](const QString, const bool, deflect::EventReceiver*, + deflect::BoolPromisePtr success) { + success->set_value(true); + }); + + const size_t expectedFrames = 2; + size_t receivedFrames = 0; + server->connect(server, &deflect::Server::receivedFrame, + [&](deflect::FramePtr frame) { + BOOST_CHECK_EQUAL(frame->segments.size(), 1); + BOOST_CHECK_EQUAL(frame->uri.toStdString(), + testStreamId.toStdString()); + ++receivedFrames; + mutex.lock(); + receivedState = true; + received.wakeAll(); + mutex.unlock(); + }); + + deflect::Stream stream(testStreamId.toStdString(), "localhost", + server->serverPort()); + BOOST_REQUIRE(stream.isConnected()); + + deflect::Observer observer(testStreamId.toStdString(), "localhost", + server->serverPort()); + BOOST_REQUIRE(observer.isConnected()); + + BOOST_CHECK(observer.registerForEvents(true)); + + // handle connects first before sending and receiving frames + while (true) + { + mutex.lock(); + received.wait(&mutex, 100 /*ms*/); + if (receivedState) + { + mutex.unlock(); + break; + } + mutex.unlock(); + } + + const unsigned int width = 4; + const unsigned int height = 4; + const unsigned int byte = width * height * 4; + std::unique_ptr pixels(new uint8_t[byte]); + ::memset(pixels.get(), 0, byte); + deflect::ImageWrapper image(pixels.get(), width, height, deflect::RGBA); + + for (size_t i = 0; i < expectedFrames; ++i) + { + receivedState = false; + stream.sendAndFinish(image).wait(); + server->requestFrame(testStreamId); + + for (size_t j = 0; j < 20; ++i) + { + mutex.lock(); + received.wait(&mutex, 100 /*ms*/); + if (receivedState) + { + mutex.unlock(); + break; + } + mutex.unlock(); + } + } + + serverThread.quit(); + serverThread.wait(); + + BOOST_CHECK_EQUAL(receivedFrames, expectedFrames); +}