Skip to content

Commit a181723

Browse files
authored
Auto heartbeat for sync interface (#99)
Auto heartbeat for C/C++ is now complete. Only Qt remains. Also, fixed some potential race conditions and cleaned up some internal interfaces.
1 parent 5f27b1c commit a181723

21 files changed

+142
-187
lines changed

.pubnub.yml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
name: c-core
22
schema: 1
3-
version: 2.12.1
3+
version: 2.12.2
44
scm: github.com/pubnub/c-core
55
changelog:
6+
- version: v2.12.2
7+
date: Dec 5, 2019
8+
changes:
9+
- type: enhancement
10+
text: Add support for automatic sending of heartbeat messages with sync interface (POSIX and Windows)
611
- version: v2.12.1
712
date: Nov 22, 2019
813
changes:

core/fntest/pubnub_fntest_medium.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,9 @@ TEST_DEF(wrong_api_usage)
304304
pubnub_cancel(pbp);
305305
await_timed(10 * SECONDS, PNR_CANCELLED, pbp);
306306

307+
#if !(PUBNUB_USE_AUTO_HEARTBEAT)
307308
expect_pnr(pubnub_subscribe(pbp, NULL, NULL), PNR_INVALID_CHANNEL);
309+
#endif
308310

309311
TEST_POP_DEFERRED;
310312
}

core/pb_sleep_ms.h

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,2 @@
11
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
2-
#if defined _WIN32
3-
void pb_sleep_ms(DWORD dwMilliseconds);
4-
#else
5-
void pb_sleep_ms(long milliseconds);
6-
#endif
2+
void pb_sleep_ms(unsigned long milliseconds);

core/pbauto_heartbeat_callback.c renamed to core/pbauto_heartbeat.c

Lines changed: 65 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#include "core/pubnub_pubsubapi.h"
1313
#include "core/pubnub_coreapi.h"
1414
#include "core/pubnub_free_with_timeout.h"
15+
#include "core/pubnub_blocking_io.h"
1516
#include "lib/pb_strnlen_s.h"
1617
#include "core/pb_sleep_ms.h"
1718
#include "core/pubnub_assert.h"
@@ -60,9 +61,6 @@ static int copy_context_settings(pubnub_t* pb_clone, pubnub_t const* pb)
6061
if (PUBNUB_ORIGIN_SETTABLE) {
6162
pb_clone->origin = pb->origin;
6263
}
63-
#if PUBNUB_BLOCKING_IO_SETTABLE
64-
pb_clone->options.use_blocking_io = pb->options.use_blocking_io;
65-
#endif /* PUBNUB_BLOCKING_IO_SETTABLE */
6664
pb_clone->options.use_http_keep_alive = pb->options.use_http_keep_alive;
6765
#if PUBNUB_USE_IPV6 && defined(PUBNUB_CALLBACK_API)
6866
pb_clone->options.ipv6_connectivity = pb->options.ipv6_connectivity;
@@ -102,6 +100,16 @@ static bool pubsub_keys_changed(pubnub_t const* pb_clone, pubnub_t const* pb)
102100
|| (pb_clone->core.subscribe_key != pb->core.subscribe_key);
103101
}
104102

103+
#if defined(PUBNUB_CALLBACK_API)
104+
#define add_heartbeat_in_progress(thumper_index)
105+
#else
106+
static void add_heartbeat_in_progress(unsigned thumper_index)
107+
{
108+
pubnub_mutex_lock(m_watcher.mutw);
109+
m_watcher.heartbeat_in_progress_index_array[m_watcher.heartbeats_in_progress++] = thumper_index;
110+
pubnub_mutex_unlock(m_watcher.mutw);
111+
}
112+
#endif
105113

106114
static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
107115
{
@@ -118,6 +126,8 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
118126
pubnub_mutex_unlock(heartbeat_pb->monitor);
119127

120128
if (keys_changed) {
129+
/** Used in sync environment while for callback it's an empty macro */
130+
add_heartbeat_in_progress(pb->thumperIndex);
121131
pubnub_mutex_unlock(pb->monitor);
122132
pubnub_cancel(heartbeat_pb);
123133
return;
@@ -132,7 +142,7 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
132142
"--->heartbeat_thump(pb=%p, heartbeat_pb=%p).\n", pb, heartbeat_pb);
133143
copy_context_settings(heartbeat_pb, pb);
134144
res = pubnub_heartbeat(heartbeat_pb, channel, channel_group);
135-
if (res != PNR_STARTED) {
145+
if ((res != PNR_STARTED) && (res != PNR_OK)) {
136146
PUBNUB_LOG_ERROR("heartbeat_thump(pb=%p, heartbeat_pb) - "
137147
"pubnub_heartbeat(heartbeat_pb=%p) returned "
138148
"unexpected: %d('%s')\n",
@@ -141,6 +151,8 @@ static void heartbeat_thump(pubnub_t* pb, pubnub_t* heartbeat_pb)
141151
res,
142152
pubnub_res_2_string(res));
143153
}
154+
/** Used in sync environment while for callback it's an empty macro */
155+
add_heartbeat_in_progress(pb->thumperIndex);
144156
}
145157
pubnub_mutex_unlock(pb->monitor);
146158
}
@@ -209,20 +221,50 @@ static pubnub_t* init_new_thumper_pb(pubnub_t* pb, unsigned i)
209221
pubnub_mutex_unlock(pb->monitor);
210222

211223
pubnub_mutex_lock(pb_new->monitor);
224+
pubnub_set_non_blocking_io(pb_new);
212225
pb_new->thumperIndex = i;
213226
pubnub_mutex_unlock(pb_new->monitor);
214227

215228
return pb_new;
216229
}
217230

218231

219-
static void take_the_timer_out(unsigned* indexes, unsigned i, unsigned* active_timers)
232+
void pbauto_take_the_node_out(unsigned* indexes, unsigned i, unsigned* dimension)
220233
{
221-
unsigned* timer_out = indexes + i;
222-
--*active_timers;
223-
memmove(timer_out, timer_out + 1, (*active_timers - i) * sizeof(unsigned));
234+
unsigned* node_out = indexes + i;
235+
--*dimension;
236+
memmove(node_out, node_out + 1, (*dimension - i) * sizeof(unsigned));
224237
}
225238

239+
#if defined(PUBNUB_CALLBACK_API)
240+
#define handle_heartbeats_in_progress()
241+
#else
242+
static void handle_heartbeats_in_progress(void)
243+
{
244+
unsigned i;
245+
enum pubnub_res result;
246+
struct pubnub_heartbeat_data* heartbeat_data = m_watcher.heartbeat_data;
247+
unsigned* heartbeat_indexes = m_watcher.heartbeat_in_progress_index_array;
248+
249+
pubnub_mutex_lock(m_watcher.mutw);
250+
for (i = 0; i < m_watcher.heartbeats_in_progress;) {
251+
pubnub_t* heartbeat_pb = heartbeat_data[heartbeat_indexes[i]].heartbeat_pb;
252+
result = pubnub_last_result(heartbeat_pb);
253+
if (result != PNR_STARTED) {
254+
/** auto heartbeat transaction is finished(not in progress any more) */
255+
pbauto_take_the_node_out(heartbeat_indexes, i, &m_watcher.heartbeats_in_progress);
256+
257+
pubnub_mutex_lock(heartbeat_pb->monitor);
258+
auto_heartbeat_callback(heartbeat_pb, heartbeat_pb->trans, result, NULL);
259+
pubnub_mutex_unlock(heartbeat_pb->monitor);
260+
}
261+
else {
262+
++i;
263+
}
264+
}
265+
pubnub_mutex_unlock(m_watcher.mutw);
266+
}
267+
#endif
226268

227269
static void handle_heartbeat_timers(int elapsed_ms)
228270
{
@@ -240,7 +282,7 @@ static void handle_heartbeat_timers(int elapsed_ms)
240282
pubnub_t* heartbeat_pb;
241283

242284
/* Taking out one that has expired */
243-
take_the_timer_out(indexes, i, &active_timers);
285+
pbauto_take_the_node_out(indexes, i, &active_timers);
244286

245287
pubnub_mutex_lock(m_watcher.mutw);
246288
pb = thumper->pb;
@@ -252,8 +294,9 @@ static void handle_heartbeat_timers(int elapsed_ms)
252294
if (NULL == heartbeat_pb) {
253295
continue;
254296
}
297+
#if defined(PUBNUB_CALLBACK_API)
255298
pubnub_register_callback(heartbeat_pb, auto_heartbeat_callback, NULL);
256-
299+
#endif
257300
pubnub_mutex_lock(m_watcher.mutw);
258301
thumper->heartbeat_pb = heartbeat_pb;
259302
pubnub_mutex_unlock(m_watcher.mutw);
@@ -290,7 +333,8 @@ pubnub_watcher_t pbauto_heartbeat_watcher_thread(void* arg)
290333
if (stop_thread) {
291334
break;
292335
}
293-
336+
/** Used in sync environment while in callback it is an empty macro */
337+
handle_heartbeats_in_progress();
294338
pb_sleep_ms(1);
295339
#if !defined(_WIN32)
296340
monotonic_clock_get_time(&timspec);
@@ -314,7 +358,7 @@ pubnub_watcher_t pbauto_heartbeat_watcher_thread(void* arg)
314358

315359

316360
/** Initializes auto heartbeat thumper for @p pb context and if its called for
317-
the first time starts auto heartbeat watcher thread.
361+
the first time starts auto heartbeat watcher thread.
318362
*/
319363
static int form_heartbeat_thumper(pubnub_t* pb)
320364
{
@@ -328,7 +372,6 @@ static int form_heartbeat_thumper(pubnub_t* pb)
328372
pbauto_heartbeat_init(&m_watcher);
329373
s_began = true;
330374
}
331-
332375
pubnub_mutex_lock(m_watcher.mutw);
333376
if (m_watcher.thumpers_in_use >= PUBNUB_MAX_HEARTBEAT_THUMPERS) {
334377
PUBNUB_LOG_WARNING(
@@ -352,7 +395,9 @@ static int form_heartbeat_thumper(pubnub_t* pb)
352395
pubnub_mutex_unlock(m_watcher.mutw);
353396
return -1;
354397
}
398+
#if defined(PUBNUB_CALLBACK_API)
355399
pubnub_register_callback(heartbeat_pb, auto_heartbeat_callback, NULL);
400+
#endif
356401
thumper->heartbeat_pb = heartbeat_pb;
357402
}
358403
pb->thumperIndex = i;
@@ -423,7 +468,7 @@ static void auto_heartbeat_stop_timer(unsigned thumper_index)
423468
for (i = 0, indexes = m_watcher.timer_index_array; i < active_timers; i++) {
424469
if (indexes[i] == thumper_index) {
425470
/* Taking timer out */
426-
take_the_timer_out(indexes, i, &active_timers);
471+
pbauto_take_the_node_out(indexes, i, &active_timers);
427472
m_watcher.active_timers = active_timers;
428473
break;
429474
}
@@ -464,11 +509,16 @@ static void release_thumper(unsigned thumper_index)
464509
}
465510
}
466511

467-
/** If it is a thumper pubnub context, it is exempted from some usual procedures. */
512+
/** If it is a thumper pubnub context, or one that doesn't have thumper assigned, it is
513+
exempted from auto heartbeat procedures.
514+
*/
468515
static bool is_exempted(pubnub_t const* pb, unsigned thumper_index)
469516
{
470517
pubnub_t const* pb_exempted;
471518

519+
if (UNASSIGNED == thumper_index) {
520+
return true;
521+
}
472522
pubnub_mutex_lock(m_watcher.mutw);
473523
pb_exempted = m_watcher.heartbeat_data[thumper_index].heartbeat_pb;
474524
pubnub_mutex_unlock(m_watcher.mutw);

core/pbauto_heartbeat.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,14 @@ struct pubnub_heartbeat_data {
2929

3030
struct HeartbeatWatcherData {
3131
struct pubnub_heartbeat_data heartbeat_data[PUBNUB_MAX_HEARTBEAT_THUMPERS] pubnub_guarded_by(mutw);
32+
#if !defined(PUBNUB_CALLBACK_API)
33+
/** Array of thumper indices for which auto heartbeat transactions are currently in progress.
34+
Used in sync environment.
35+
*/
36+
unsigned heartbeat_in_progress_index_array[PUBNUB_MAX_HEARTBEAT_THUMPERS] pubnub_guarded_by(mutw);
37+
/** Number of auto heartbeat transactions currently in progress. */
38+
unsigned heartbeats_in_progress pubnub_guarded_by(mutw);
39+
#endif
3240
unsigned thumpers_in_use pubnub_guarded_by(mutw);
3341
/** Times left for each of the thumper timers in progress */
3442
size_t heartbeat_timers[PUBNUB_MAX_HEARTBEAT_THUMPERS] pubnub_guarded_by(timerlock);

core/pbauto_heartbeat_sync.c

Lines changed: 0 additions & 75 deletions
This file was deleted.

core/pubnub_blocking_io.c

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
/* -*- c-file-style:"stroustrup"; indent-tabs-mode: nil -*- */
2-
#include "pubnub_blocking_io.h"
3-
4-
#include "pubnub_assert.h"
52
#include "pubnub_internal.h"
63

7-
4+
#include "pubnub_assert.h"
5+
#include "pubnub_blocking_io.h"
86

97

108
int pubnub_set_non_blocking_io(pubnub_t *p)
119
{
1210
#if PUBNUB_BLOCKING_IO_SETTABLE
13-
p->options.use_blocking_io = false;
14-
return 0;
11+
pubnub_mutex_lock(p->monitor);
12+
p->options.use_blocking_io = false;
13+
pubnub_mutex_unlock(p->monitor);
14+
return 0;
1515
#endif
1616
return -1;
1717
}
@@ -20,8 +20,10 @@ int pubnub_set_non_blocking_io(pubnub_t *p)
2020
int pubnub_set_blocking_io(pubnub_t *p)
2121
{
2222
#if PUBNUB_BLOCKING_IO_SETTABLE
23-
p->options.use_blocking_io = true;
24-
return 0;
23+
pubnub_mutex_lock(p->monitor);
24+
p->options.use_blocking_io = true;
25+
pubnub_mutex_unlock(p->monitor);
26+
return 0;
2527
#endif
2628
return -1;
2729
}

core/pubnub_pubsubapi.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@ pubnub_t* pubnub_init(pubnub_t* p, const char* publish_key, const char* subscrib
3030
p->cb = NULL;
3131
p->user_data = NULL;
3232
p->flags.sent_queries = 0;
33-
#if PUBNUB_USE_AUTO_HEARTBEAT
34-
p->thumperIndex = UNASSIGNED;
35-
p->channelInfo.channel = NULL;
36-
p->channelInfo.channel_group = NULL;
37-
#endif /* PUBNUB_AUTO_HEARTBEAT */
3833
#endif /* defined(PUBNUB_CALLBACK_API) */
3934
if (PUBNUB_ORIGIN_SETTABLE) {
4035
p->origin = PUBNUB_ORIGIN;
@@ -46,6 +41,11 @@ pubnub_t* pubnub_init(pubnub_t* p, const char* publish_key, const char* subscrib
4641
p->options.use_blocking_io = true;
4742
#endif
4843
#endif /* PUBNUB_BLOCKING_IO_SETTABLE */
44+
#if PUBNUB_USE_AUTO_HEARTBEAT
45+
p->thumperIndex = UNASSIGNED;
46+
p->channelInfo.channel = NULL;
47+
p->channelInfo.channel_group = NULL;
48+
#endif /* PUBNUB_AUTO_HEARTBEAT */
4949

5050
p->state = PBS_IDLE;
5151
p->trans = PBTT_NONE;

core/pubnub_version_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
#define INC_PUBNUB_VERSION_INTERNAL
44

55

6-
#define PUBNUB_SDK_VERSION "2.12.1"
6+
#define PUBNUB_SDK_VERSION "2.12.2"
77

88

99
#endif /* !defined INC_PUBNUB_VERSION_INTERNAL */

0 commit comments

Comments
 (0)