Skip to content

Commit

Permalink
Add deflect::Observer which can be used only receive events w/o the n…
Browse files Browse the repository at this point in the history
…eed so send images
  • Loading branch information
tribal-tec committed Jul 28, 2017
1 parent 5ff0f8e commit db84bf4
Show file tree
Hide file tree
Showing 19 changed files with 613 additions and 252 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
# Daniel Nachbaur <daniel.nachbaur@epfl.ch>

cmake_minimum_required(VERSION 3.1 FATAL_ERROR)
project(Deflect VERSION 0.13.1)
project(Deflect VERSION 0.14.0)
set(Deflect_VERSION_ABI 6)

list(APPEND CMAKE_MODULE_PATH ${CMAKE_SOURCE_DIR}/CMake/common)
Expand Down
2 changes: 2 additions & 0 deletions deflect/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(DEFLECT_PUBLIC_HEADERS
Frame.h
ImageWrapper.h
MTQueue.h
Observer.h
Segment.h
SegmentParameters.h
Server.h
Expand Down Expand Up @@ -38,6 +39,7 @@ set(DEFLECT_SOURCES
ImageWrapper.cpp
MessageHeader.cpp
MetaTypeRegistration.cpp
Observer.cpp
ReceiveBuffer.cpp
Server.cpp
ServerWorker.cpp
Expand Down
30 changes: 24 additions & 6 deletions deflect/FrameDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,12 @@ class FrameDispatcher::Impl

typedef std::map<QString, ReceiveBuffer> StreamBuffers;
StreamBuffers streamBuffers;
std::map<QString, size_t> observers;
};

FrameDispatcher::FrameDispatcher()
: _impl(new Impl)
FrameDispatcher::FrameDispatcher(QObject* parent)
: QObject(parent)
, _impl(new Impl)
{
}

Expand All @@ -97,8 +99,21 @@ void FrameDispatcher::removeSource(const QString uri, const size_t sourceIndex)

_impl->streamBuffers[uri].removeSource(sourceIndex);

if (_impl->streamBuffers[uri].getSourceCount() == 0)
deleteStream(uri);
deleteStream(uri);
}

void FrameDispatcher::addObserver(const QString uri)
{
++_impl->observers[uri];
emit pixelStreamOpened(uri);
}

void FrameDispatcher::removeObserver(QString uri)
{
if (_impl->observers[uri] > 0)
--_impl->observers[uri];

deleteStream(uri);
}

void FrameDispatcher::processSegment(const QString uri,
Expand Down Expand Up @@ -145,10 +160,13 @@ void FrameDispatcher::requestFrame(const QString uri)

void FrameDispatcher::deleteStream(const QString uri)
{
if (_impl->streamBuffers.count(uri))
if (_impl->streamBuffers[uri].getSourceCount() == 0 &&
_impl->streamBuffers.count(uri))
{
_impl->streamBuffers.erase(uri);
emit pixelStreamClosed(uri);

if (_impl->observers[uri] == 0)
emit pixelStreamClosed(uri);
}
}
}
18 changes: 17 additions & 1 deletion deflect/FrameDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class FrameDispatcher : public QObject

public:
/** Construct a dispatcher */
FrameDispatcher();
FrameDispatcher(QObject* parent);

/** Destructor. */
~FrameDispatcher();
Expand All @@ -80,6 +80,22 @@ public slots:
*/
void removeSource(QString uri, size_t sourceIndex);

/**
* Add a stream source as an observer which does not contribute segments.
* Emits pixelStreamOpened() if no other observer or source is present.
*
* @param uri Identifier for the stream
*/
void addObserver(QString uri);

/**
* Remove a stream observer, emits pixelStreamClosed() if all observers and
* sources are gone.
*
* @param uri Identifier for the stream
*/
void removeObserver(QString uri);

/**
* Process a new Segment.
*
Expand Down
6 changes: 4 additions & 2 deletions deflect/MessageHeader.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*********************************************************************/
/* Copyright (c) 2013-2016, EPFL/Blue Brain Project */
/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */
/* Raphael.Dumusc@epfl.ch */
/* Daniel.Nachbaur@epfl.ch */
/* Redistribution and use in source and binary forms, with or */
Expand Down Expand Up @@ -67,7 +67,9 @@ enum MessageType
MESSAGE_TYPE_QUIT = 12,
MESSAGE_TYPE_SIZE_HINTS = 13,
MESSAGE_TYPE_DATA = 14,
MESSAGE_TYPE_IMAGE_VIEW = 15
MESSAGE_TYPE_IMAGE_VIEW = 15,
MESSAGE_TYPE_OBSERVER_OPEN = 16,
MESSAGE_TYPE_OBSERVER_QUIT = 17
};

#define MESSAGE_HEADER_URI_LENGTH 64
Expand Down
172 changes: 172 additions & 0 deletions deflect/Observer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*********************************************************************/
/* Copyright (c) 2013-2017, EPFL/Blue Brain Project */
/* Raphael Dumusc <raphael.dumusc@epfl.ch> */
/* Stefan.Eilemann@epfl.ch */
/* Daniel.Nachbaur@epfl.ch */
/* All rights reserved. */
/* */
/* Redistribution and use in source and binary forms, with or */
/* without modification, are permitted provided that the following */
/* conditions are met: */
/* */
/* 1. Redistributions of source code must retain the above */
/* copyright notice, this list of conditions and the following */
/* disclaimer. */
/* */
/* 2. Redistributions in binary form must reproduce the above */
/* copyright notice, this list of conditions and the following */
/* disclaimer in the documentation and/or other materials */
/* provided with the distribution. */
/* */
/* THIS SOFTWARE IS PROVIDED BY THE UNIVERSITY OF TEXAS AT */
/* AUSTIN ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, */
/* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF */
/* MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE */
/* DISCLAIMED. IN NO EVENT SHALL THE UNIVERSITY OF TEXAS AT */
/* AUSTIN OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, */
/* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES */
/* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE */
/* GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR */
/* BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF */
/* LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT */
/* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT */
/* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE */
/* POSSIBILITY OF SUCH DAMAGE. */
/* */
/* The views and conclusions contained in the software and */
/* documentation are those of the authors and should not be */
/* interpreted as representing official policies, either expressed */
/* or implied, of The University of Texas at Austin. */
/*********************************************************************/

#include "Observer.h"
#include "StreamPrivate.h"

#include <QDataStream>

#include <cassert>
#include <iostream>

namespace deflect
{
Observer::Observer()
: _impl(new StreamPrivate("", "", Socket::defaultPortNumber, true))
{
}

Observer::Observer(const std::string& id, const std::string& host,
const unsigned short port)
: _impl(new StreamPrivate(id, host, port, true))
{
}

Observer::Observer(StreamPrivate* impl)
: _impl(impl)
{
}

Observer::~Observer()
{
}

bool Observer::isConnected() const
{
return _impl->socket.isConnected();
}

const std::string& Observer::getId() const
{
return _impl->id;
}

const std::string& Observer::getHost() const
{
return _impl->socket.getHost();
}

bool Observer::registerForEvents(const bool exclusive)
{
if (!isConnected())
{
std::cerr << "deflect::Stream::registerForEvents: Stream is not "
<< "connected, operation failed" << std::endl;
return false;
}

if (isRegisteredForEvents())
return true;

// Send the bind message
if (!_impl->sendWorker.enqueueBindRequest(exclusive).get())
{
std::cerr << "deflect::Stream::registerForEvents: sending bind message "
<< "failed" << std::endl;
return false;
}

// Wait for bind reply
MessageHeader mh;
QByteArray message;
if (!_impl->socket.receive(mh, message))
{
std::cerr << "deflect::Stream::registerForEvents: receive bind reply "
<< "failed" << std::endl;
return false;
}
if (mh.type != MESSAGE_TYPE_BIND_EVENTS_REPLY)
{
std::cerr << "deflect::Stream::registerForEvents: received unexpected "
<< "message type (" << int(mh.type) << ")" << std::endl;
return false;
}
_impl->registeredForEvents = *(bool*)(message.data());

return isRegisteredForEvents();
}

bool Observer::isRegisteredForEvents() const
{
return _impl->registeredForEvents;
}

int Observer::getDescriptor() const
{
return _impl->socket.getFileDescriptor();
}

bool Observer::hasEvent() const
{
return _impl->socket.hasMessage(Event::serializedSize);
}

Event Observer::getEvent()
{
MessageHeader mh;
QByteArray message;
if (!_impl->socket.receive(mh, message))
{
std::cerr << "deflect::Stream::getEvent: receive failed" << std::endl;
return Event();
}
if (mh.type != MESSAGE_TYPE_EVENT)
{
std::cerr << "deflect::Stream::getEvent: received unexpected message "
<< "type (" << int(mh.type) << ")" << std::endl;
return Event();
}

assert((size_t)message.size() == Event::serializedSize);

Event event;
{
QDataStream stream(message);
stream >> event;
}
return event;
}

void Observer::setDisconnectedCallback(const std::function<void()> callback)
{
_impl->disconnectedCallback = callback;
}
}
Loading

0 comments on commit db84bf4

Please sign in to comment.