Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ballle98/AqualinkD#8: replace busy polling with pthread_cond_wait #100

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 62 additions & 40 deletions aq_programmer.c
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ unsigned char _pgm_command = NUL;

bool _last_sent_was_cmd = false;

static pthread_mutex_t _pgm_command_mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t _pgm_command_sent_cond = PTHREAD_COND_INITIALIZER;

// External view of adding to queue
void aq_send_cmd(unsigned char cmd) {
push_aq_cmd(cmd);
Expand Down Expand Up @@ -162,9 +165,12 @@ unsigned char pop_aq_cmd(struct aqualinkdata *aq_data)
if (in_programming_mode(aq_data) && ( in_ot_programming_mode(aq_data) == false && in_iaqt_programming_mode(aq_data) == false )) {
//if (aq_data->active_thread.thread_id != 0) {
if ( _pgm_command != NUL && aq_data->last_packet_type == CMD_STATUS) {
pthread_mutex_lock(&_pgm_command_mutex);
cmd = _pgm_command;
_pgm_command = NUL;
LOG(PROG_LOG, LOG_DEBUG_SERIAL, "RS SEND cmd '0x%02hhx' (programming)\n", cmd);
pthread_cond_signal(&_pgm_command_sent_cond);
pthread_mutex_unlock(&_pgm_command_mutex);
} else if (_pgm_command != NUL) {
LOG(PROG_LOG, LOG_DEBUG_SERIAL, "RS Waiting to send cmd '0x%02hhx' (programming)\n", _pgm_command);
} else {
Expand Down Expand Up @@ -1071,39 +1077,33 @@ void _aq_programmer(program_type r_type, char *args, struct aqualinkdata *aq_dat

void waitForSingleThreadOrTerminate(struct programmingThreadCtrl *threadCtrl, program_type type)
{
//static int tries = 120;
int tries = 120;
static int waitTime = 1;
int i=0;

i = 0;
while (get_aq_cmd_length() > 0 && ( i++ <= tries) ) {
LOG(PROG_LOG, LOG_DEBUG, "Thread %p (%s) sleeping, waiting command queue to empty\n", &threadCtrl->thread_id, ptypeName(type));
sleep(waitTime);
}
if (i >= tries) {
LOG(PROG_LOG, LOG_ERR, "Thread %p (%s) timeout waiting, ending\n",&threadCtrl->thread_id,ptypeName(type));
free(threadCtrl);
pthread_exit(0);
}
int ret = 0;
struct timespec max_wait;
clock_gettime(CLOCK_REALTIME, &max_wait);
max_wait.tv_sec += 30;

while ( (threadCtrl->aq_data->active_thread.thread_id != 0) && ( i++ <= tries) ) {
//LOG(PROG_LOG, LOG_DEBUG, "Thread %d sleeping, waiting for thread %d to finish\n", threadCtrl->thread_id, threadCtrl->aq_data->active_thread.thread_id);
LOG(PROG_LOG, LOG_DEBUG, "Thread %p (%s) sleeping, waiting for thread %p (%s) to finish\n",
&threadCtrl->thread_id, ptypeName(type),
threadCtrl->aq_data->active_thread.thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype));
sleep(waitTime);
}

if (i >= tries) {
//LOG(PROG_LOG, LOG_ERR, "Thread %d timeout waiting, ending\n",threadCtrl->thread_id);
LOG(PROG_LOG, LOG_ERR, "Thread (%s) %p timeout waiting for thread (%s) %p to finish\n",
ptypeName(type), &threadCtrl->thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype),
threadCtrl->aq_data->active_thread.thread_id);
free(threadCtrl);
pthread_exit(0);
}

pthread_mutex_lock(&threadCtrl->aq_data->mutex);
while (threadCtrl->aq_data->active_thread.thread_id != 0)
{
LOG(PROG_LOG, LOG_DEBUG, "Thread %d,%p (%s) sleeping, waiting for thread %d,%p (%s) to finish\n",
type, &threadCtrl->thread_id, ptypeName(type),
threadCtrl->aq_data->active_thread.ptype, threadCtrl->aq_data->active_thread.thread_id, ptypeName(threadCtrl->aq_data->active_thread.ptype));
if ((ret = pthread_cond_timedwait(&threadCtrl->aq_data->thread_finished_cond,
&threadCtrl->aq_data->mutex, &max_wait)))
{
LOG(PROG_LOG, LOG_ERR, "Thread %d,%p err %s waiting for thread %d,%p to finish\n",
type, &threadCtrl->thread_id, strerror(ret),
threadCtrl->aq_data->active_thread.ptype,
threadCtrl->aq_data->active_thread.thread_id);

if ((ret = pthread_mutex_unlock(&threadCtrl->aq_data->mutex)))
{
LOG(PROG_LOG, LOG_ERR, "waitForSingleThreadOrTerminate mutex unlock ret %s\n", strerror(ret));
}
free(threadCtrl);
pthread_exit(0);
}
}
// Clear out any messages to the UI.
threadCtrl->aq_data->last_display_message[0] = '\0';
threadCtrl->aq_data->active_thread.thread_id = &threadCtrl->thread_id;
Expand All @@ -1119,11 +1119,12 @@ void waitForSingleThreadOrTerminate(struct programmingThreadCtrl *threadCtrl, pr
threadCtrl->aq_data->active_thread.ptype,
threadCtrl->aq_data->active_thread.thread_id,
ptypeName(threadCtrl->aq_data->active_thread.ptype));
pthread_mutex_unlock(&threadCtrl->aq_data->mutex);
}

void cleanAndTerminateThread(struct programmingThreadCtrl *threadCtrl)
{
waitfor_queue2empty();
pthread_mutex_lock(&threadCtrl->aq_data->mutex);
#ifndef AQ_DEBUG
LOG(PROG_LOG, LOG_DEBUG, "Thread %d,%p (%s) finished\n",threadCtrl->aq_data->active_thread.ptype, threadCtrl->thread_id,ptypeName(threadCtrl->aq_data->active_thread.ptype));
#else
Expand All @@ -1137,10 +1138,11 @@ void cleanAndTerminateThread(struct programmingThreadCtrl *threadCtrl)
elapsed.tv_sec, elapsed.tv_nsec / 1000000L);
#endif

// Quick delay to allow for last message to be sent.
delay(500);
threadCtrl->aq_data->active_thread.thread_id = 0;
threadCtrl->aq_data->active_thread.ptype = AQP_NULL;
pthread_cond_signal(&threadCtrl->aq_data->thread_finished_cond);
pthread_mutex_unlock(&threadCtrl->aq_data->mutex);

threadCtrl->thread_id = 0;
// Force update, change display message
threadCtrl->aq_data->updated = true;
Expand Down Expand Up @@ -2203,14 +2205,34 @@ void longwaitfor_queue2empty()
_waitfor_queue2empty(true);
}

void send_cmd(unsigned char cmd)
bool send_cmd(unsigned char cmd)
{
waitfor_queue2empty();

_pgm_command = cmd;
//delay(200);
bool ret=true;
int pret = 0;
struct timespec max_wait;

clock_gettime(CLOCK_REALTIME, &max_wait);
max_wait.tv_sec += 5;

pthread_mutex_lock(&_pgm_command_mutex);
_pgm_command = cmd;
LOG(PROG_LOG, LOG_INFO, "Queue send '0x%02hhx' to controller (programming)\n", _pgm_command);
while (_pgm_command != NUL)
{
if ((pret = pthread_cond_timedwait(&_pgm_command_sent_cond,
&_pgm_command_mutex, &max_wait)))
{
LOG(PROG_LOG, LOG_ERR, "send_cmd 0x%02hhx err %s\n",
cmd, strerror(pret));
ret = false;
break;
}
}
if (ret) {
LOG(PROG_LOG, LOG_INFO, "sent '0x%02hhx' to controller\n", _pgm_command);
}
pthread_mutex_unlock(&_pgm_command_mutex);
return ret;
}
void force_queue_delete()
{
Expand Down
2 changes: 2 additions & 0 deletions aqualink.h
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ struct aqualinkdata
struct timespec last_active_time;
struct timespec start_active_time;
#endif
pthread_mutex_t mutex;
pthread_cond_t thread_finished_cond;
};


Expand Down
3 changes: 3 additions & 0 deletions aqualinkd.c
Original file line number Diff line number Diff line change
Expand Up @@ -1508,6 +1508,9 @@ void main_loop()
signal(SIGQUIT, intHandler);
signal(SIGRESTART, intHandler);

pthread_mutex_init(&_aqualink_data.mutex, NULL);
pthread_cond_init(&_aqualink_data.thread_finished_cond, NULL);

if (!start_net_services(&_aqualink_data))
{
LOG(AQUA_LOG,LOG_ERR, "Can not start webserver on port %s.\n", _aqconfig_.socket_port);
Expand Down
4 changes: 2 additions & 2 deletions pda_aq_programmer.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ bool set_PDA_aqualink_time(struct aqualinkdata *aq_data);


// These are from aq_programmer.c , exposed here for PDA AQ PROGRAMMER
void send_cmd(unsigned char cmd);
bool send_cmd(unsigned char cmd);
bool push_aq_cmd(unsigned char cmd);
bool waitForMessage(struct aqualinkdata *aq_data, char* message, int numMessageReceived);
void waitfor_queue2empty();
void longwaitfor_queue2empty();

//void pda_programming_thread_check(struct aqualinkdata *aq_data);

#endif // AQ_PDA_PROGRAMMER_H_
#endif // AQ_PDA_PROGRAMMER_H_