Skip to content

Commit

Permalink
changed to QtConcurrent::run, fixed hanging bug in zmq receiver when …
Browse files Browse the repository at this point in the history
…closed if no source is available
  • Loading branch information
jontio committed Jul 24, 2021
1 parent a1f0470 commit f5b5778
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 168 deletions.
108 changes: 75 additions & 33 deletions JAERO/audioreceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,53 +4,95 @@
#include "QDataStream"
#include "QDateTime"

#include <unistd.h>

void AudioReceiver::process()
void AudioReceiver::ZMQaudioStart(QString address, QString topic)
{
running = true;
// allocate enough for 96Khz sampling with 1 buffer per second
int recsize = 192000;
context = zmq_ctx_new();
subscriber = zmq_socket(context, ZMQ_SUB);

zmqStatus = zmq_connect(subscriber, _address.toStdString().c_str());
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, _topic.toStdString().c_str(), 5);

char buf [recsize];
char topic[20];

while(running){


zmq_recv(subscriber, topic, 20, 0);
int received = zmq_recv(subscriber, buf, recsize, ZMQ_DONTWAIT);

QByteArray qdata(buf, received);
//stop set prams and start thread
ZMQaudioStop();
setParameters(address,topic);
future = QtConcurrent::run([=]() {
process();
qDebug()<<"Thread finished";
return;
});
//wait till the thread is running so ZMQaudioStop functions correctly
for(int i=0;!running&&i<1000;i++)usleep(1000);
//if it fails after a second then arghhhhh, should not happen
if(!running)
{
qDebug()<<"Failed to start ZMQ receiving thread";
}
}

emit recAudio(qdata);
}
void AudioReceiver::process()
{
// allocate enough for 96Khz sampling with 1 buffer per second
int recsize = 192000;
context = zmq_ctx_new();
subscriber = zmq_socket(context, ZMQ_SUB);

zmqStatus = zmq_connect(subscriber, _address.toStdString().c_str());
zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, _topic.toStdString().c_str(), 5);

char buf [recsize];
char topic[20];

running = true;

while(running)
{
zmq_recv(subscriber, topic, 20, 0);
int received = zmq_recv(subscriber, buf, recsize, ZMQ_DONTWAIT);
if(received>=0)
{
QByteArray qdata(buf, received);
emit recAudio(qdata);
}
else
{
if(running)
{
qDebug()<<"zmq_recv error!!!";
usleep(100000);
}
}
}
}

zmq_disconnect(subscriber,_address.toStdString().c_str());
zmq_ctx_destroy (context);

emit finished();
AudioReceiver::AudioReceiver(QObject *parent):
QObject(parent),
running(false),
context(nullptr),
subscriber(nullptr)
{

}


AudioReceiver::AudioReceiver(QString address, QString topic){
_address = address;
_topic = topic;
AudioReceiver::~AudioReceiver()
{
ZMQaudioStop();
}

void AudioReceiver::setParameters(QString address, QString topic){
_address = address;
_topic = topic;
void AudioReceiver::setParameters(QString address, QString topic)
{
_address = address;
_topic = topic;
}

void AudioReceiver::ZMQaudioStop()
{

running = false;
running = false;

if(subscriber)zmq_close(subscriber);
if(context)zmq_ctx_destroy(context);
if(!future.isFinished())future.waitForFinished();

subscriber=nullptr;
context=nullptr;

emit finished();

}
29 changes: 15 additions & 14 deletions JAERO/audioreceiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <QThread>
#include <qbytearray.h>
#include <zmq.h>
#include <QtConcurrent/QtConcurrent>


class AudioReceiver : public QObject
Expand All @@ -16,16 +17,13 @@ class AudioReceiver : public QObject

public:

AudioReceiver(QString address, QString topic);
void setParameters(QString address, QString topic);

bool running;

explicit AudioReceiver(QObject *parent = 0);//QString address, QString topic);
~AudioReceiver();

public slots:

void process();
void ZMQaudioStop();
void ZMQaudioStop();
void ZMQaudioStart(QString address, QString topic);

signals:

Expand All @@ -34,18 +32,21 @@ public slots:

private:

void* context;
void* subscriber;

int zmqStatus;
void setParameters(QString address, QString topic);
volatile bool running;

QString _address;
QString _topic;
int _rate;
void process();

void * volatile context;
void * volatile subscriber;

int zmqStatus;

QString _address;
QString _topic;
int _rate;

QFuture<void> future;

signals:
void recAudio(const QByteArray & audio);
Expand Down
Loading

0 comments on commit f5b5778

Please sign in to comment.