Skip to content

Commit 611595d

Browse files
Stefan EilemannStefan Eilemann
authored andcommitted
Async stream WIP
1 parent e5b4304 commit 611595d

File tree

11 files changed

+193
-132
lines changed

11 files changed

+193
-132
lines changed

apps/DesktopStreamer/Stream.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -193,8 +193,13 @@ class Stream::Impl
193193
deflectImage.compressionQuality = std::max(1, std::min(quality, 100));
194194
deflectImage.subsampling = subsamp;
195195

196-
if (!_stream.send(deflectImage) || !_stream.finishFrame())
197-
return "Streaming failure, connection closed";
196+
for (auto& future : _lastSend)
197+
if (!future.get())
198+
return "Streaming failure, connection closed";
199+
200+
_lastSend.clear();
201+
_lastSend.push_back(_stream.send(deflectImage));
202+
_lastSend.push_back(_stream.finishFrame());
198203
return std::string();
199204
}
200205

@@ -293,6 +298,9 @@ class Stream::Impl
293298
EventQueue _events;
294299
#endif
295300

301+
using Futures = std::vector<deflect::Stream::Future>;
302+
Futures _lastSend;
303+
296304
QTimer _mouseActiveTimer;
297305
QPoint _mousePos;
298306
};

apps/SimpleStreamer/main.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -264,7 +264,7 @@ bool send(const Image& image, const deflect::View view)
264264
: deflect::COMPRESSION_OFF;
265265
deflectImage.compressionQuality = deflectCompressionQuality;
266266
deflectImage.view = view;
267-
return deflectStream->send(deflectImage);
267+
return deflectStream->send(deflectImage).get();
268268
}
269269

270270
bool timeout(const float sec)

deflect/Stream.cpp

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,20 +84,22 @@ const std::string& Stream::getHost() const
8484
return _impl->socket.getHost();
8585
}
8686

87-
bool Stream::send(const ImageWrapper& image)
87+
Stream::Future Stream::send(const ImageWrapper& image)
8888
{
8989
return _impl->send(image);
9090
}
9191

92-
bool Stream::finishFrame()
92+
Stream::Future Stream::finishFrame()
9393
{
9494
return _impl->finishFrame();
9595
}
9696

97+
#if 0
9798
Stream::Future Stream::asyncSend(const ImageWrapper& image)
9899
{
99100
return _impl->asyncSend(image);
100101
}
102+
#endif
101103

102104
bool Stream::registerForEvents(const bool exclusive)
103105
{

deflect/Stream.h

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,9 @@ class StreamPrivate;
6161
/**
6262
* Stream visual data to a deflect::Server.
6363
*
64-
* A Stream can be subdivided into one or more images. This allows to have
65-
* different applications each responsible for sending one part of the global
66-
* image.
64+
* A Stream can be subdivided into one or more images and eye passes. This
65+
* allows to have different applications each responsible for sending one part
66+
* of the global image.
6767
*
6868
* The methods in this class are reentrant (all instances are independant) but
6969
* are not thread-safe.
@@ -123,49 +123,31 @@ class Stream
123123
/** Future signaling success of asyncSend(). @version 1.5 */
124124
using Future = std::future<bool>;
125125

126-
/**
127-
* Send an image and finish the frame asynchronously.
128-
*
129-
* The send (and the optional compression) and finishFrame() are executed in
130-
* a different thread. The result of this operation can be obtained by the
131-
* returned future object.
132-
*
133-
* @param image The image to send. Note that the image is not copied, so the
134-
* referenced must remain valid until the send is finished
135-
* @return true if the image data could be sent, false otherwise
136-
* @see send()
137-
* @version 1.1
138-
*/
139-
DEFLECT_API Future asyncSend(const ImageWrapper& image);
140-
//@}
126+
DEFLECT_API Future asyncSend(const ImageWrapper& image); //<! @deprecated
141127

142-
/** @name Synchronous send API */
143-
//@{
144128
/**
145-
* Send an image synchronously.
129+
* Send an image asynchronously.
146130
*
147-
* @note A call to send() while an asyncSend() is pending is undefined.
148131
* @param image The image to send
149132
* @return true if the image data could be sent, false otherwise
150133
* @version 1.0
151134
* @sa finishFrame()
152135
*/
153-
DEFLECT_API bool send(const ImageWrapper& image);
136+
DEFLECT_API Future send(const ImageWrapper& image);
154137

155138
/**
156-
* Notify that all the images for this frame have been sent.
139+
* Asynchronously notify that all the images for this frame have been sent.
157140
*
158141
* This method must be called everytime this Stream instance has finished
159142
* sending its image(s) for the current frame. The receiver will display
160143
* the images once all the senders which use the same identifier have
161-
* finished a frame.
144+
* finished a frame. This is only to be called once per frame, even for
145+
* stereo rendering.
162146
*
163-
* @note A call to finishFrame() while an asyncSend() is pending is
164-
* undefined.
165-
* @see send()
147+
* @sa send()
166148
* @version 1.0
167149
*/
168-
DEFLECT_API bool finishFrame();
150+
DEFLECT_API Future finishFrame();
169151
//@}
170152

171153
/**
@@ -240,17 +222,19 @@ class Stream
240222
DEFLECT_API Event getEvent();
241223

242224
/**
243-
* Send size hints to the stream host to indicate sizes that should be
244-
* respected by resize operations on the host side.
225+
* Send size hints to the stream server to indicate sizes that should be
226+
* respected by resize operations on the server side.
245227
*
246-
* @param hints the new size hints for the host
228+
* @note do not use while asynchronous send operations are pending.
229+
* @param hints the new size hints for the server
247230
* @version 1.2
248231
*/
249232
DEFLECT_API void sendSizeHints(const SizeHints& hints);
250233

251234
/**
252235
* Send data to the Server.
253236
*
237+
* @note do not use while asynchronous send operations are pending.
254238
* @param data the pointer to the data buffer.
255239
* @param count the number of bytes to send.
256240
* @return true if the data could be sent, false otherwise

deflect/StreamPrivate.cpp

Lines changed: 12 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@
5454

5555
namespace
5656
{
57-
const unsigned int SEGMENT_SIZE = 512;
5857
const char* STREAM_ID_ENV_VAR = "DEFLECT_ID";
5958
const char* STREAM_HOST_ENV_VAR = "DEFLECT_HOST";
6059
}
@@ -92,9 +91,8 @@ StreamPrivate::StreamPrivate(const std::string& id_, const std::string& host,
9291
: id(_getStreamId(id_))
9392
, socket(_getStreamHost(host), port)
9493
, registeredForEvents(false)
94+
, _sendWorker(*this)
9595
{
96-
imageSegmenter.setNominalSegmentDimensions(SEGMENT_SIZE, SEGMENT_SIZE);
97-
9896
if (!socket.isConnected())
9997
return;
10098

@@ -108,8 +106,7 @@ StreamPrivate::StreamPrivate(const std::string& id_, const std::string& host,
108106

109107
StreamPrivate::~StreamPrivate()
110108
{
111-
_sendWorker.reset();
112-
109+
_sendWorker.stop();
113110
if (!socket.isConnected())
114111
return;
115112

@@ -129,37 +126,19 @@ void StreamPrivate::sendClose()
129126
socket.send(mh, QByteArray());
130127
}
131128

132-
bool StreamPrivate::send(const ImageWrapper& image)
129+
Stream::Future StreamPrivate::send(const ImageWrapper& image)
133130
{
134-
if (image.compressionPolicy != COMPRESSION_ON && image.pixelFormat != RGBA)
135-
{
136-
std::cerr << "Currently, RAW images can only be sent in RGBA format. "
137-
"Other formats support remain to be implemented."
138-
<< std::endl;
139-
return false;
140-
}
141-
142-
if (!sendImageView(image.view))
143-
return false;
144-
145-
const auto sendFunc = std::bind(&StreamPrivate::sendPixelStreamSegment,
146-
this, std::placeholders::_1);
147-
return imageSegmenter.generate(image, sendFunc);
131+
return _sendWorker.enqueueImage(image, false);
148132
}
149133

150134
Stream::Future StreamPrivate::asyncSend(const ImageWrapper& image)
151135
{
152-
if (!_sendWorker)
153-
_sendWorker.reset(new StreamSendWorker(*this));
154-
155-
return _sendWorker->enqueueImage(image);
136+
return _sendWorker.enqueueImage(image, true);
156137
}
157138

158-
bool StreamPrivate::finishFrame()
139+
Stream::Future StreamPrivate::finishFrame()
159140
{
160-
// Open a window for the PixelStream
161-
const MessageHeader mh(MESSAGE_TYPE_PIXELSTREAM_FINISH_FRAME, 0, id);
162-
return socket.send(mh, QByteArray());
141+
return _sendWorker.enqueueFinish();
163142
}
164143

165144
bool StreamPrivate::sendImageView(const View view)
@@ -191,6 +170,11 @@ bool StreamPrivate::sendPixelStreamSegment(const Segment& segment)
191170
return socket.send(mh, message);
192171
}
193172

173+
bool StreamPrivate::sendFinish()
174+
{
175+
return socket.send({MESSAGE_TYPE_PIXELSTREAM_FINISH_FRAME, 0, id}, {});
176+
}
177+
194178
bool StreamPrivate::sendSizeHints(const SizeHints& hints)
195179
{
196180
const MessageHeader mh(MESSAGE_TYPE_SIZE_HINTS, sizeof(hints), id);

deflect/StreamPrivate.h

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -45,22 +45,18 @@
4545
#include <deflect/api.h>
4646

4747
#include "Event.h"
48-
#include "ImageSegmenter.h"
4948
#include "MessageHeader.h"
50-
#include "Socket.h" // member
51-
#include "Stream.h" // Stream::Future
49+
#include "Socket.h" // member
50+
#include "Stream.h" // Stream::Future
51+
#include "StreamSendWorker.h" // member
5252

5353
#include <functional>
5454
#include <memory>
5555
#include <string>
5656

5757
namespace deflect
5858
{
59-
class StreamSendWorker;
60-
61-
/**
62-
* Private implementation for the Stream class.
63-
*/
59+
/** Private implementation for the Stream class. */
6460
class StreamPrivate
6561
{
6662
public:
@@ -89,17 +85,11 @@ class StreamPrivate
8985
*/
9086
bool close();
9187

92-
/** @sa Stream::send */
93-
bool send(const ImageWrapper& image);
94-
95-
/** @sa Stream::asyncSend */
88+
/** @deprecated async-send-and-finish for mono images. */
9689
Stream::Future asyncSend(const ImageWrapper& image);
9790

98-
/** @sa Stream::finishFrame */
99-
bool finishFrame();
100-
101-
/** Send the view for the image to be sent with sendPixelStreamSegment. */
102-
bool sendImageView(View view);
91+
/** @sa Stream::send */
92+
Stream::Future send(const ImageWrapper& image);
10393

10494
/**
10595
* Send a Segment through the Stream.
@@ -108,6 +98,9 @@ class StreamPrivate
10898
*/
10999
DEFLECT_API bool sendPixelStreamSegment(const Segment& segment);
110100

101+
/** @sa Stream::finishFrame */
102+
Stream::Future finishFrame();
103+
111104
/** @sa Stream::sendSizeHints */
112105
bool sendSizeHints(const SizeHints& hints);
113106

@@ -120,16 +113,20 @@ class StreamPrivate
120113
/** The communication socket instance */
121114
Socket socket;
122115

123-
/** The image segmenter */
124-
ImageSegmenter imageSegmenter;
125-
126116
/** Has a successful event registration reply been received */
127117
bool registeredForEvents;
128118

129119
std::function<void()> disconnectedCallback;
130120

131121
private:
132-
std::unique_ptr<StreamSendWorker> _sendWorker;
122+
friend class StreamSendWorker;
123+
124+
/** Send the view for the image to be sent with sendPixelStreamSegment. */
125+
bool sendImageView(View view);
126+
127+
bool sendFinish(); //<! Send the finish frame message
128+
129+
StreamSendWorker _sendWorker;
133130
};
134131
}
135132
#endif

0 commit comments

Comments
 (0)