Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
name: c-core
schema: 1
version: 2.12.1
version: 2.12.2
scm: github.com/pubnub/c-core
changelog:
- version: v2.12.2
date: Dec 5, 2019
changes:
- type: enhancement
text: Add support for automatic sending of heartbeat messages with sync interface (POSIX and Windows)
- version: v2.12.1
date: Nov 22, 2019
changes:
Expand Down
2 changes: 2 additions & 0 deletions core/fntest/pubnub_fntest_medium.c
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,9 @@ TEST_DEF(wrong_api_usage)
pubnub_cancel(pbp);
await_timed(10 * SECONDS, PNR_CANCELLED, pbp);

#if !(PUBNUB_USE_AUTO_HEARTBEAT)
expect_pnr(pubnub_subscribe(pbp, NULL, NULL), PNR_INVALID_CHANNEL);
#endif

TEST_POP_DEFERRED;
}
Expand Down
6 changes: 1 addition & 5 deletions core/pb_sleep_ms.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,2 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
#if defined _WIN32
void pb_sleep_ms(DWORD dwMilliseconds);
#else
void pb_sleep_ms(long milliseconds);
#endif
void pb_sleep_ms(unsigned long milliseconds);
80 changes: 65 additions & 15 deletions core/pbauto_heartbeat_callback.c → core/pbauto_heartbeat.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "core/pubnub_pubsubapi.h"
#include "core/pubnub_coreapi.h"
#include "core/pubnub_free_with_timeout.h"
#include "core/pubnub_blocking_io.h"
#include "lib/pb_strnlen_s.h"
#include "core/pb_sleep_ms.h"
#include "core/pubnub_assert.h"
Expand Down Expand Up @@ -60,9 +61,6 @@ static int copy_context_settings(pubnub_t* pb_clone, pubnub_t const* pb)
if (PUBNUB_ORIGIN_SETTABLE) {
pb_clone->origin = pb->origin;
}
#if PUBNUB_BLOCKING_IO_SETTABLE
pb_clone->options.use_blocking_io = pb->options.use_blocking_io;
#endif /* PUBNUB_BLOCKING_IO_SETTABLE */
pb_clone->options.use_http_keep_alive = pb->options.use_http_keep_alive;
#if PUBNUB_USE_IPV6 && defined(PUBNUB_CALLBACK_API)
pb_clone->options.ipv6_connectivity = pb->options.ipv6_connectivity;
Expand Down Expand Up @@ -102,6 +100,16 @@ static bool pubsub_keys_changed(pubnub_t const* pb_clone, pubnub_t const* pb)
|| (pb_clone->core.subscribe_key != pb->core.subscribe_key);
}

#if defined(PUBNUB_CALLBACK_API)
#define add_heartbeat_in_progress(thumper_index)
#else
static void add_heartbeat_in_progress(unsigned thumper_index)
{
pubnub_mutex_lock(m_watcher.mutw);
m_watcher.heartbeat_in_progress_index_array[m_watcher.heartbeats_in_progress++] = thumper_index;
pubnub_mutex_unlock(m_watcher.mutw);
}
#endif

static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
{
Expand All @@ -118,6 +126,8 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
pubnub_mutex_unlock(heartbeat_pb->monitor);

if (keys_changed) {
/** Used in sync environment while for callback it's an empty macro */
add_heartbeat_in_progress(pb->thumperIndex);
pubnub_mutex_unlock(pb->monitor);
pubnub_cancel(heartbeat_pb);
return;
Expand All @@ -132,7 +142,7 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
"--->heartbeat_thump(pb=%p, heartbeat_pb=%p).\n", pb, heartbeat_pb);
copy_context_settings(heartbeat_pb, pb);
res = pubnub_heartbeat(heartbeat_pb, channel, channel_group);
if (res != PNR_STARTED) {
if ((res != PNR_STARTED) && (res != PNR_OK)) {
PUBNUB_LOG_ERROR("heartbeat_thump(pb=%p, heartbeat_pb) - "
"pubnub_heartbeat(heartbeat_pb=%p) returned "
"unexpected: %d('%s')\n",
Expand All @@ -141,6 +151,8 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
res,
pubnub_res_2_string(res));
}
/** Used in sync environment while for callback it's an empty macro */
add_heartbeat_in_progress(pb->thumperIndex);
}
pubnub_mutex_unlock(pb->monitor);
}
Expand Down Expand Up @@ -209,20 +221,50 @@ static pubnub_t* init_new_thumper_pb(pubnub_t* pb, unsigned i)
pubnub_mutex_unlock(pb->monitor);

pubnub_mutex_lock(pb_new->monitor);
pubnub_set_non_blocking_io(pb_new);
pb_new->thumperIndex = i;
pubnub_mutex_unlock(pb_new->monitor);

return pb_new;
}


static void take_the_timer_out(unsigned* indexes, unsigned i, unsigned* active_timers)
void pbauto_take_the_node_out(unsigned* indexes, unsigned i, unsigned* dimension)
{
unsigned* timer_out = indexes + i;
--*active_timers;
memmove(timer_out, timer_out + 1, (*active_timers - i) * sizeof(unsigned));
unsigned* node_out = indexes + i;
--*dimension;
memmove(node_out, node_out + 1, (*dimension - i) * sizeof(unsigned));
}

#if defined(PUBNUB_CALLBACK_API)
#define handle_heartbeats_in_progress()
#else
static void handle_heartbeats_in_progress(void)
{
unsigned i;
enum pubnub_res result;
struct pubnub_heartbeat_data* heartbeat_data = m_watcher.heartbeat_data;
unsigned* heartbeat_indexes = m_watcher.heartbeat_in_progress_index_array;

pubnub_mutex_lock(m_watcher.mutw);
for (i = 0; i < m_watcher.heartbeats_in_progress;) {
pubnub_t* heartbeat_pb = heartbeat_data[heartbeat_indexes[i]].heartbeat_pb;
result = pubnub_last_result(heartbeat_pb);
if (result != PNR_STARTED) {
/** auto heartbeat transaction is finished(not in progress any more) */
pbauto_take_the_node_out(heartbeat_indexes, i, &m_watcher.heartbeats_in_progress);

pubnub_mutex_lock(heartbeat_pb->monitor);
auto_heartbeat_callback(heartbeat_pb, heartbeat_pb->trans, result, NULL);
pubnub_mutex_unlock(heartbeat_pb->monitor);
}
else {
++i;
}
}
pubnub_mutex_unlock(m_watcher.mutw);
}
#endif

static void handle_heartbeat_timers(int elapsed_ms)
{
Expand All @@ -240,7 +282,7 @@ static void handle_heartbeat_timers(int elapsed_ms)
pubnub_t* heartbeat_pb;

/* Taking out one that has expired */
take_the_timer_out(indexes, i, &active_timers);
pbauto_take_the_node_out(indexes, i, &active_timers);

pubnub_mutex_lock(m_watcher.mutw);
pb = thumper->pb;
Expand All @@ -252,8 +294,9 @@ static void handle_heartbeat_timers(int elapsed_ms)
if (NULL == heartbeat_pb) {
continue;
}
#if defined(PUBNUB_CALLBACK_API)
pubnub_register_callback(heartbeat_pb, auto_heartbeat_callback, NULL);

#endif
pubnub_mutex_lock(m_watcher.mutw);
thumper->heartbeat_pb = heartbeat_pb;
pubnub_mutex_unlock(m_watcher.mutw);
Expand Down Expand Up @@ -290,7 +333,8 @@ pubnub_watcher_t pbauto_heartbeat_watcher_thread(void* arg)
if (stop_thread) {
break;
}

/** Used in sync environment while in callback it is an empty macro */
handle_heartbeats_in_progress();
pb_sleep_ms(1);
#if !defined(_WIN32)
monotonic_clock_get_time(&timspec);
Expand All @@ -314,7 +358,7 @@ pubnub_watcher_t pbauto_heartbeat_watcher_thread(void* arg)


/** Initializes auto heartbeat thumper for @p pb context and if its called for
the first time starts auto heartbeat watcher thread.
the first time starts auto heartbeat watcher thread.
*/
static int form_heartbeat_thumper(pubnub_t* pb)
{
Expand All @@ -328,7 +372,6 @@ static int form_heartbeat_thumper(pubnub_t* pb)
pbauto_heartbeat_init(&m_watcher);
s_began = true;
}

pubnub_mutex_lock(m_watcher.mutw);
if (m_watcher.thumpers_in_use >= PUBNUB_MAX_HEARTBEAT_THUMPERS) {
PUBNUB_LOG_WARNING(
Expand All @@ -352,7 +395,9 @@ static int form_heartbeat_thumper(pubnub_t* pb)
pubnub_mutex_unlock(m_watcher.mutw);
return -1;
}
#if defined(PUBNUB_CALLBACK_API)
pubnub_register_callback(heartbeat_pb, auto_heartbeat_callback, NULL);
#endif
thumper->heartbeat_pb = heartbeat_pb;
}
pb->thumperIndex = i;
Expand Down Expand Up @@ -423,7 +468,7 @@ static void auto_heartbeat_stop_timer(unsigned thumper_index)
for (i = 0, indexes = m_watcher.timer_index_array; i < active_timers; i++) {
if (indexes[i] == thumper_index) {
/* Taking timer out */
take_the_timer_out(indexes, i, &active_timers);
pbauto_take_the_node_out(indexes, i, &active_timers);
m_watcher.active_timers = active_timers;
break;
}
Expand Down Expand Up @@ -464,11 +509,16 @@ static void release_thumper(unsigned thumper_index)
}
}

/** If it is a thumper pubnub context, it is exempted from some usual procedures. */
/** If it is a thumper pubnub context, or one that doesn't have thumper assigned, it is
exempted from auto heartbeat procedures.
*/
static bool is_exempted(pubnub_t const* pb, unsigned thumper_index)
{
pubnub_t const* pb_exempted;

if (UNASSIGNED == thumper_index) {
return true;
}
pubnub_mutex_lock(m_watcher.mutw);
pb_exempted = m_watcher.heartbeat_data[thumper_index].heartbeat_pb;
pubnub_mutex_unlock(m_watcher.mutw);
Expand Down
8 changes: 8 additions & 0 deletions core/pbauto_heartbeat.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ struct pubnub_heartbeat_data {

struct HeartbeatWatcherData {
struct pubnub_heartbeat_data heartbeat_data[PUBNUB_MAX_HEARTBEAT_THUMPERS] pubnub_guarded_by(mutw);
#if !defined(PUBNUB_CALLBACK_API)
/** Array of thumper indices for which auto heartbeat transactions are currently in progress.
Used in sync environment.
*/
unsigned heartbeat_in_progress_index_array[PUBNUB_MAX_HEARTBEAT_THUMPERS] pubnub_guarded_by(mutw);
/** Number of auto heartbeat transactions currently in progress. */
unsigned heartbeats_in_progress pubnub_guarded_by(mutw);
#endif
unsigned thumpers_in_use pubnub_guarded_by(mutw);
/** Times left for each of the thumper timers in progress */
size_t heartbeat_timers[PUBNUB_MAX_HEARTBEAT_THUMPERS] pubnub_guarded_by(timerlock);
Expand Down
75 changes: 0 additions & 75 deletions core/pbauto_heartbeat_sync.c

This file was deleted.

18 changes: 10 additions & 8 deletions core/pubnub_blocking_io.c
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
#include "pubnub_blocking_io.h"

#include "pubnub_assert.h"
#include "pubnub_internal.h"


#include "pubnub_assert.h"
#include "pubnub_blocking_io.h"


int pubnub_set_non_blocking_io(pubnub_t *p)
{
#if PUBNUB_BLOCKING_IO_SETTABLE
p->options.use_blocking_io = false;
return 0;
pubnub_mutex_lock(p->monitor);
p->options.use_blocking_io = false;
pubnub_mutex_unlock(p->monitor);
return 0;
#endif
return -1;
}
Expand All @@ -20,8 +20,10 @@ int pubnub_set_non_blocking_io(pubnub_t *p)
int pubnub_set_blocking_io(pubnub_t *p)
{
#if PUBNUB_BLOCKING_IO_SETTABLE
p->options.use_blocking_io = true;
return 0;
pubnub_mutex_lock(p->monitor);
p->options.use_blocking_io = true;
pubnub_mutex_unlock(p->monitor);
return 0;
#endif
return -1;
}
10 changes: 5 additions & 5 deletions core/pubnub_pubsubapi.c
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ pubnub_t* pubnub_init(pubnub_t* p, const char* publish_key, const char* subscrib
p->cb = NULL;
p->user_data = NULL;
p->flags.sent_queries = 0;
#if PUBNUB_USE_AUTO_HEARTBEAT
p->thumperIndex = UNASSIGNED;
p->channelInfo.channel = NULL;
p->channelInfo.channel_group = NULL;
#endif /* PUBNUB_AUTO_HEARTBEAT */
#endif /* defined(PUBNUB_CALLBACK_API) */
if (PUBNUB_ORIGIN_SETTABLE) {
p->origin = PUBNUB_ORIGIN;
Expand All @@ -46,6 +41,11 @@ pubnub_t* pubnub_init(pubnub_t* p, const char* publish_key, const char* subscrib
p->options.use_blocking_io = true;
#endif
#endif /* PUBNUB_BLOCKING_IO_SETTABLE */
#if PUBNUB_USE_AUTO_HEARTBEAT
p->thumperIndex = UNASSIGNED;
p->channelInfo.channel = NULL;
p->channelInfo.channel_group = NULL;
#endif /* PUBNUB_AUTO_HEARTBEAT */

p->state = PBS_IDLE;
p->trans = PBTT_NONE;
Expand Down
2 changes: 1 addition & 1 deletion core/pubnub_version_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#define INC_PUBNUB_VERSION_INTERNAL


#define PUBNUB_SDK_VERSION "2.12.1"
#define PUBNUB_SDK_VERSION "2.12.2"


#endif /* !defined INC_PUBNUB_VERSION_INTERNAL */
Loading