Skip to content

Commit

Permalink
HPCC-32945 Add support for queue clients with priorities
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
  • Loading branch information
ghalliday committed Nov 19, 2024
1 parent d6dcd8c commit 05caafa
Show file tree
Hide file tree
Showing 3 changed files with 349 additions and 100 deletions.
231 changes: 196 additions & 35 deletions common/workunit/wujobq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "platform.h"
#include <algorithm>
#include <cstdlib>
#include "limits.h"
#include "jlib.hpp"
#include "jbuff.hpp"
Expand Down Expand Up @@ -57,6 +58,89 @@ JobQueues
#endif


static void deserializePriorities(Unsigned64Array & priorities, const char * text)
{
if (isEmptyString(text))
return;

for (;;)
{
char * end = nullptr;
unsigned __int64 next = strtoll(text, &end, 10);
priorities.append(next);
if (*end != ',')
return;
text = end + 1;
}
}

static void serializePriorities(StringBuffer & out, const Unsigned64Array & priorities)
{
ForEachItemIn(i, priorities)
{
if (i)
out.append(",");
out.append(priorities.item(i));
}
}

//Insert in order, the list is likely to be small, so use a linear scan
static void insertPriority(Unsigned64Array & priorities, __uint64 prio)
{
unsigned pos = 0;
for (; priorities.isItem(pos); pos++)
{
if (prio > priorities.item(pos))
break;
}

priorities.add(prio, pos);
}

static void removePriority(Unsigned64Array & priorities, __uint64 prio)
{
ForEachItemIn(i, priorities)
{
if (priorities.item(i) == prio)
{
priorities.remove(i);
return;
}
}
throwUnexpected();
}

static unsigned countHigherPriorities(Unsigned64Array & priorities, __uint64 prio, unsigned threshold)
{
unsigned numPrios = priorities.ordinality();
for (unsigned pos = 0; pos < numPrios; pos++)
{
//If we have already exceeds the threshold, then we can stop
if (pos == threshold)
return pos;

if (prio >= priorities.item(pos))
return pos;
}
return numPrios;
}

static unsigned countHigherPriorities(const char * priorities, __uint64 prio, unsigned threshold)
{
unsigned pos = 0;
for (; pos < threshold; pos++)
{
char * end = nullptr;
unsigned __int64 nextPrio = strtoll(priorities, &end, 10);
if (prio >= nextPrio)
break;

if (*end != ',')
return pos+1;
priorities = end + 1;
}
return pos;
}

class CJobQueueItem: implements IJobQueueItem, public CInterface
{
Expand Down Expand Up @@ -789,16 +873,17 @@ class CJobQueueConst: public CJobQueueBase
class CJobQueue: public CJobQueueBase, implements IJobQueue
{
public:
sQueueData *activeq;
sQueueData *activeq = nullptr;
SessionId sessionid;
unsigned locknest;
bool writemode;
bool connected;
unsigned locknest = 0;
bool writemode = false;
bool connected = false;
Owned<IConversation> initiateconv;
StringAttr initiatewu;
bool dequeuestop;
bool cancelwaiting;
bool validateitemsessions;
std::atomic<bool> isProcessingDequeue = 0;
bool dequeuestop = false;
bool cancelwaiting = false;
bool validateitemsessions = false;

class csubs: implements ISDSSubscription, public CInterface
{
Expand All @@ -811,15 +896,20 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
{
CriticalBlock block(parent->crit);
parent->notifysem.signal();
//There is a race condition - a callback may be at this point while the CJobQueue is deleted.
//Adding a critical section in parent makes it much more likely to be hit.
//Ultimately the semaphore should be moved to this class instead
parent->noteQueueChange(xpath);
}
} subs;
};

Owned<csubs> subs;

IMPLEMENT_IINTERFACE;

CJobQueue(const char *_qname) : CJobQueueBase(_qname), subs(this)
CJobQueue(const char *_qname) : CJobQueueBase(_qname)
{
subs.setown(new csubs(this));
activeq = qdata;
sessionid = myProcessSession();
validateitemsessions = false;
Expand Down Expand Up @@ -960,6 +1050,12 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
}

void noteQueueChange(const char * xpath)
{
//CriticalBlock block(crit);
notifysem.signal();
}

class Cconnlockblock: public CriticalBlock
{
CJobQueue *parent;
Expand Down Expand Up @@ -1037,7 +1133,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *subs, false);
}
}

Expand All @@ -1048,7 +1144,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
if (!qd->subscriberid) {
StringBuffer path;
path.appendf("/JobQueues/Queue[@name=\"%s\"]/Edition",qd->qname.get());
qd->subscriberid = querySDS().subscribe(path.str(), subs, false);
qd->subscriberid = querySDS().subscribe(path.str(), *subs, false);
}
unsigned e = (unsigned)qd->root->getPropInt("Edition", 1);
if (e!=qd->lastWaitEdition) {
Expand Down Expand Up @@ -1128,7 +1224,25 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
}
}

sQueueData *findbestqueue(bool useprev,int minprio,unsigned numqueues,sQueueData **queues)
bool hasHigherPriorityClients(IPropertyTree * queueTree, __uint64 clientPrio, unsigned threshold)
{
unsigned higher = 0;
Owned<IPropertyTreeIterator> iter = queueTree->getElements("Client");
ForEach(*iter)
{
const char * priorities = iter->query().queryProp("@priorities");
if (!isEmptyString(priorities))
{
unsigned numHigher = countHigherPriorities(priorities, clientPrio, threshold - higher);
higher += numHigher;
if (higher >= threshold)
return true;
}
}
return false;
}

sQueueData *findbestqueue(bool useprev,int minprio,__uint64 clientPrio,unsigned numqueues,sQueueData **queues)
{
if (numqueues==0)
return NULL;
Expand All @@ -1139,7 +1253,11 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
for (unsigned i=0;i<numqueues;i++) {
sQueueData *qd = queues[i];
unsigned count = qd->root->getPropInt("@count");
if (count) {
if (count)
{
if (hasHigherPriorityClients(qd->root, clientPrio, count))
continue;

int mpr = useprev?std::max(qd->root->getPropInt("@prevpriority"),minprio):minprio;
if (count&&((minprio==INT_MIN)||checkprio(*qd,mpr))) {
StringBuffer path;
Expand All @@ -1160,17 +1278,38 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return best;
}

void setWaiting(unsigned numqueues,sQueueData **queues, bool set)
void setWaiting(unsigned numqueues,sQueueData **queues, unsigned __int64 clientPrio, bool set)
{
for (unsigned i=0; i<numqueues; i++) {
IPropertyTree *croot = queryClientRootSession(*queues[i]);
croot->setPropInt64("@waiting",croot->getPropInt64("@waiting",0)+(set?1:-1));
//If a non-zero client priority has been specified, add (or remove) it from the list of priorities
if (clientPrio)
{
Unsigned64Array priorities;
deserializePriorities(priorities, croot->queryProp("@priorities"));
if (set)
insertPriority(priorities, clientPrio);
else
removePriority(priorities, clientPrio);
StringBuffer prioText;
serializePriorities(prioText, priorities);
croot->setProp("@priorities", prioText.str());
}
}
}

// 'simple' queuing
IJobQueueItem *dodequeue(int minprio,unsigned timeout=INFINITE, bool useprev=false, bool *timedout=NULL)
IJobQueueItem *dodequeue(int minprio, __uint64 clientPrio, unsigned timeout, bool useprev, bool * timedout)
{
//If more than one thread is waiting on the queue, then the queue code does not work correctly
//It is undefined which thread the semaphore signal will wake up.
//E.g. there is one thread with a minimum priority of 0, and another with a minimum of 100, and an item of
//priority 50 is queued. If the minimum priority of 100 is woken twice nothing will be dequeued.
//Similar problems occur when the clientPriority is mixed.
if (isProcessingDequeue.exchange(true))
throw MakeStringException(0, "Nested dequeue not supported");

bool hasminprio=(minprio!=INT_MIN);
if (timedout)
*timedout = false;
Expand Down Expand Up @@ -1200,23 +1339,30 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
active.append(qd);
}
if (stopped==total)
{
isProcessingDequeue.store(false);
return NULL; // all stopped
}
sQueueData **activeqds = (sQueueData **)active.getArray();
unsigned activenum = active.ordinality();
if (activenum) {
sQueueData *bestqd = findbestqueue(useprev,minprio,activenum,activeqds);
sQueueData *bestqd = findbestqueue(useprev,minprio,clientPrio,activenum,activeqds);
unsigned count = bestqd?bestqd->root->getPropInt("@count"):0;
// load minp from cache
if (count) {
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
if (count)
{
if (!hasHigherPriorityClients(bestqd->root, clientPrio, count))
{
int mpr = useprev?std::max(bestqd->root->getPropInt("@prevpriority"),minprio):minprio;
if (!hasminprio||checkprio(*bestqd,mpr)) {
block.setRollback(false);
ret = dotake(*bestqd,NULL,true,hasminprio,mpr);
if (ret) // think it must be!
timeout = 0; // so mark that done
else if (!hasminprio) {
WARNLOG("Resetting queue %s",bestqd->qname.get());
clear(*bestqd); // reset queue as seems to have become out of sync
}
}
}
}
Expand All @@ -1226,15 +1372,15 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
block.setRollback(false);
}
if (!waitingset) {
setWaiting(activenum,activeqds,true);
setWaiting(activenum, activeqds, clientPrio, true);
block.commit();
waitingset = true;
}
}
}
if (timeout==0) {
if (waitingset) {
setWaiting(activenum,activeqds,false);
setWaiting(activenum, activeqds, clientPrio, false);
block.commit();
}
if (timedout)
Expand All @@ -1255,12 +1401,14 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
timeout = 0;
}
}

isProcessingDequeue.store(false);
return ret;
}

IJobQueueItem *dequeue(unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN,timeout);
return dodequeue(INT_MIN, 0, timeout, false, nullptr);
}


Expand All @@ -1271,13 +1419,18 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
{
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minPrio, timeout, usePrevPrio, nullptr));
item.setown(dodequeue(minPrio, 0, timeout, usePrevPrio, nullptr));
}
if (!item)
item.setown(dodequeue(minPrio, timeout-prioritytransitiondelay, false, nullptr));
item.setown(dodequeue(minPrio, 0, timeout-prioritytransitiondelay, false, nullptr));
return item.getClear();
}

IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)
{
return dodequeue(INT_MIN, priority, timeout, false, nullptr);
}

void placeonqueue(sQueueData &qd, IJobQueueItem *qitem,unsigned idx) // takes ownership of qitem
{
Owned<IJobQueueItem> qi = qitem;
Expand Down Expand Up @@ -1628,6 +1781,14 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
return (state&&(strcmp(state,"stopped")==0));
}

void removeClient(sQueueData & qd, IPropertyTree * croot)
{
Unsigned64Array priorities;
deserializePriorities(priorities, croot->queryProp("@priorities"));
//MORE: Remove from the global list??
qd.root->removeTree(croot);
}

void doGetStats(sQueueData &qd,unsigned &connected,unsigned &waiting,unsigned &enqueued)
{
Cconnlockblock block(this,false);
Expand All @@ -1640,7 +1801,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
break;
if (validateitemsessions && !validSession(croot)) {
Cconnlockblock block(this,true);
qd.root->removeTree(croot);
removeClient(qd, croot);
}
else {
waiting += croot->getPropInt("@waiting");
Expand Down Expand Up @@ -1772,7 +1933,7 @@ class CJobQueue: public CJobQueueBase, implements IJobQueue
int minprio = 0;
unsigned timeout = prioritytransitiondelay;
bool usePrevPrio = true;
item.setown(dodequeue(minprio, timeout, usePrevPrio, &timedout));
item.setown(dodequeue(minprio, 0, timeout, usePrevPrio, &timedout));
}
else
item.setown(dequeue(INFINITE));
Expand Down
1 change: 1 addition & 0 deletions common/workunit/wujobq.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ interface IJobQueue: extends IJobQueueConst
// validateitemsessions ensures that all queue items have running session
virtual IJobQueueItem *dequeue(unsigned timeout=INFINITE)=0;
virtual IJobQueueItem *dequeue(int minPrio, unsigned timeout, unsigned prioritytransitiondelay)=0;
virtual IJobQueueItem *dequeuePriority(unsigned __int64 priority, unsigned timeout=INFINITE)=0;
virtual void disconnect()=0; // signal no longer wil be dequeing (optional - done automatically on release)
virtual void getStats(unsigned &connected,unsigned &waiting, unsigned &enqueued)=0; // this not quick as validates clients still running
virtual bool waitStatsChange(unsigned timeout)=0;
Expand Down
Loading

0 comments on commit 05caafa

Please sign in to comment.