Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Peek function and size reporting macros changes. #1

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 131 additions & 28 deletions rpa_queue.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,22 +20,31 @@
#include <unistd.h>
#include <pthread.h>
#include <string.h>
#include <stdbool.h>

// uncomment to print debug messages
//#define QUEUE_DEBUG

/**
* @struct rpa_queue_t
* @brief Opaque structure representing a thread-safe circular queue.
*
* This structure defines a thread-safe circular queue that supports concurrent access
* by multiple threads. The queue uses a fixed-size array to store elements and is designed
* to handle both producer and consumer threads efficiently.
*/
struct rpa_queue_t {
void **data;
volatile uint32_t nelts; /**< # elements */
uint32_t in; /**< next empty location */
uint32_t out; /**< next filled location */
uint32_t bounds;/**< max size of queue */
uint32_t full_waiters;
uint32_t empty_waiters;
pthread_mutex_t *one_big_mutex;
pthread_cond_t *not_empty;
pthread_cond_t *not_full;
int terminated;
void **data; /**< Array to store elements */
volatile uint32_t nelts; /**< Number of elements in the queue */
uint32_t in; /**< Next empty location in the queue */
uint32_t out; /**< Next filled location in the queue */
uint32_t bounds; /**< Maximum size of the queue */
uint32_t full_waiters; /**< Number of threads waiting on a full queue */
uint32_t empty_waiters; /**< Number of threads waiting on an empty queue */
pthread_mutex_t *one_big_mutex;/**< Mutex for controlling access to the queue */
pthread_cond_t *not_empty; /**< Condition variable for signaling non-empty queue */
pthread_cond_t *not_full; /**< Condition variable for signaling non-full queue */
int terminated; /**< Flag indicating whether the queue is terminated */
};

#ifdef QUEUE_DEBUG
Expand All @@ -50,16 +59,92 @@ static void Q_DBG(const char*msg, rpa_queue_t *q) {
#endif

/**
* Detects when the rpa_queue_t is full. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
* @brief Macro to check if the rpa_queue_t is full.
*
* This macro checks if the number of elements in the queue is equal to the maximum
* size of the queue, indicating that the queue is full.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return 1 if the queue is full, 0 otherwise.
*/
#define MC_rpa_queue_full(queue) ((queue)->nelts == (queue)->bounds)

/**
* @brief Macro to get the number of free slots in the rpa_queue_t.
*
* This macro calculates the number of free slots in the queue by subtracting
* the current number of elements from the maximum size of the queue.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return The number of free slots in the queue.
*/
#define MC_rpa_queue_get_free(queue) (((queue)->bounds) - ((queue)->nelts))

/**
* @brief Macro to get the number of taken slots in the rpa_queue_t.
*
* This macro retrieves the current number of elements in the queue, indicating
* the number of slots that have been taken.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return The number of taken slots in the queue.
*/
#define MC_rpa_queue_get_taken(queue) ((queue)->nelts)

/**
* @brief Macro to check if the rpa_queue_t is empty.
*
* This macro checks if the number of elements in the queue is zero, indicating
* that the queue is empty.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return 1 if the queue is empty, 0 otherwise.
*/
#define MC_rpa_queue_empty(queue) ((queue)->nelts == 0)

/**
* @brief Macro to check if the rpa_queue_t is full.
*
* This macro checks if the number of elements in the queue is equal to the maximum
* size of the queue, indicating that the queue is full.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return 1 if the queue is full, 0 otherwise.
*/
#define rpa_queue_full(queue) ((queue)->nelts == (queue)->bounds)


/**
* Detects when the rpa_queue_t is empty. This utility function is expected
* to be called from within critical sections, and is not threadsafe.
* @brief Macro to check if the rpa_queue_t is empty.
*
* This macro checks if the number of elements in the queue is zero, indicating
* that the queue is empty.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return 1 if the queue is empty, 0 otherwise.
*/
#define rpa_queue_empty(queue) ((queue)->nelts == 0)

bool rpa_queue_empty(const rpa_queue_t *queue)
{
return MC_rpa_queue_empty(queue);
}

bool rpa_queue_full(const rpa_queue_t *queue)
{
return MC_rpa_queue_full(queue);
}

unsigned rpa_queue_get_free(const rpa_queue_t *queue)
{
return MC_rpa_queue_get_free(queue);
}

unsigned rpa_queue_get_taken(const rpa_queue_t *queue)
{
return MC_rpa_queue_get_taken(queue);
}


static bool _rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms, bool remove_from_queue);

static void set_timeout(struct timespec * abstime, int wait_ms)
{
Expand Down Expand Up @@ -178,7 +263,7 @@ bool rpa_queue_timedpush(rpa_queue_t *queue, void *data, int wait_ms)
return false;
}

if (rpa_queue_full(queue)) {
if (MC_rpa_queue_full(queue)) {
if (!queue->terminated) {
queue->full_waiters++;
if (wait_ms == RPA_WAIT_FOREVER) {
Expand All @@ -196,7 +281,7 @@ bool rpa_queue_timedpush(rpa_queue_t *queue, void *data, int wait_ms)
}
}
/* If we wake up and it's still empty, then we were interrupted */
if (rpa_queue_full(queue)) {
if (MC_rpa_queue_full(queue)) {
Q_DBG("queue full (intr)", queue);
rv = pthread_mutex_unlock(queue->one_big_mutex);
if (rv != 0) {
Expand Down Expand Up @@ -248,7 +333,7 @@ bool rpa_queue_trypush(rpa_queue_t *queue, void *data)
return false;
}

if (rpa_queue_full(queue)) {
if (MC_rpa_queue_full(queue)) {
rv = pthread_mutex_unlock(queue->one_big_mutex);
return false; //EAGAIN;
}
Expand Down Expand Up @@ -280,6 +365,12 @@ uint32_t rpa_queue_size(rpa_queue_t *queue) {
return queue->nelts;
}

bool rpa_queue_timedpeek(rpa_queue_t *queue, void **data, int wait_ms)
{
return _rpa_queue_timedpop(queue, data, wait_ms, false);
}


/**
* Retrieves the next item from the queue. If there are no
* items available, it will block until one becomes available.
Expand All @@ -292,6 +383,11 @@ bool rpa_queue_pop(rpa_queue_t *queue, void **data)
}

bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms)
{
return _rpa_queue_timedpop(queue, data, wait_ms, true);
}

static bool _rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms, bool remove_from_queue)
{
bool rv;

Expand All @@ -307,7 +403,7 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms)
}

/* Keep waiting until we wake up and find that the queue is not empty. */
if (rpa_queue_empty(queue)) {
if (MC_rpa_queue_empty(queue)) {
if (!queue->terminated) {
queue->empty_waiters++;
if (wait_ms == RPA_WAIT_FOREVER) {
Expand All @@ -325,7 +421,7 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms)
}
}
/* If we wake up and it's still empty, then we were interrupted */
if (rpa_queue_empty(queue)) {
if (MC_rpa_queue_empty(queue)) {
Q_DBG("queue empty (intr)", queue);
rv = pthread_mutex_unlock(queue->one_big_mutex);
if (rv != 0) {
Expand All @@ -340,12 +436,17 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms)
}

*data = queue->data[queue->out];
queue->nelts--;

queue->out++;
if (queue->out >= queue->bounds) {
queue->out -= queue->bounds;
if(remove_from_queue)
{
queue->nelts--;

queue->out++;
if (queue->out >= queue->bounds)
{
queue->out -= queue->bounds;
}
}

if (queue->full_waiters) {
Q_DBG("signal !full", queue);
rv = pthread_cond_signal(queue->not_full);
Expand All @@ -355,6 +456,8 @@ bool rpa_queue_timedpop(rpa_queue_t *queue, void **data, int wait_ms)
}
}

//function_return:

pthread_mutex_unlock(queue->one_big_mutex);
return true;
}
Expand All @@ -377,7 +480,7 @@ bool rpa_queue_trypop(rpa_queue_t *queue, void **data)
return false;
}

if (rpa_queue_empty(queue)) {
if (MC_rpa_queue_empty(queue)) {
rv = pthread_mutex_unlock(queue->one_big_mutex);
return false; //EAGAIN;
}
Expand Down
61 changes: 60 additions & 1 deletion rpa_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <stdbool.h>
#include <stdint.h>
#include <stddef.h>
#include <pthread.h>

#define RPA_WAIT_NONE 0
#define RPA_WAIT_FOREVER -1
Expand All @@ -39,10 +40,11 @@
*/

/**
* opaque structure
* @brief Opaque structure representing a thread-safe circular queue.
*/
typedef struct rpa_queue_t rpa_queue_t;


/**
* create a FIFO queue
* @param queue The new queue
Expand Down Expand Up @@ -74,6 +76,18 @@ bool rpa_queue_push(rpa_queue_t *queue, void *data);
*/
bool rpa_queue_timedpush(rpa_queue_t *queue, void *data, int wait_ms);

/**
* peek an object from the queue without removing it from the queue, blocking if the queue is already empty
*
* @param queue the queue
* @param data the data
* @param wait_ms milliseconds to wait
* @returns RPA_EINTR the blocking was interrupted (try again)
* @returns RPA_EOF if the queue has been terminated
* @returns RPA_SUCCESS on a successful pop
*/
bool rpa_queue_timedpeek(rpa_queue_t *queue, void **data, int wait_ms);

/**
* pop/get an object from the queue, blocking if the queue is already empty
*
Expand Down Expand Up @@ -158,4 +172,49 @@ void rpa_queue_destroy(rpa_queue_t * queue);
*/
void rpa_queue_free(rpa_queue_t * queue);


/**
* @brief Check if the rpa_queue_t is empty.
*
* This function checks if the number of elements in the queue is zero, indicating
* that the queue is empty.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return true if the queue is empty, false otherwise.
*/
bool rpa_queue_empty(const rpa_queue_t *queue);

/**
* @brief Check if the rpa_queue_t is full.
*
* This function checks if the number of elements in the queue is equal to the maximum
* size of the queue, indicating that the queue is full.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return true if the queue is full, false otherwise.
*/
bool rpa_queue_full(const rpa_queue_t *queue);

/**
* @brief Get the number of free slots in the rpa_queue_t.
*
* This function calculates the number of free slots in the queue by subtracting
* the current number of elements from the maximum size of the queue.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return The number of free slots in the queue.
*/
unsigned rpa_queue_get_free(const rpa_queue_t *queue);

/**
* @brief Get the number of taken slots in the rpa_queue_t.
*
* This function retrieves the current number of elements in the queue, indicating
* the number of slots that have been taken.
*
* @param queue Pointer to the rpa_queue_t instance.
* @return The number of taken slots in the queue.
*/
unsigned rpa_queue_get_taken(const rpa_queue_t *queue);

#endif /* RPAQUEUE_H */