-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathrmc_pub.h
171 lines (141 loc) · 6.96 KB
/
rmc_pub.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
// Copyright (C) 2018, Jaguar Land Rover
// This program is licensed under the terms and conditions of the
// Mozilla Public License, version 2.0. The full text of the
// Mozilla Public License is at https://www.mozilla.org/MPL/2.0/
//
// Author: Magnus Feuer (mfeuer1@jaguarlandrover.com)
#ifndef __RMC_PUB_H__
#define __RMC_PUB_H__
#include "reliable_multicast.h"
#include "rmc_list.h"
struct pub_subscriber;
//
// A packet that is either waiting to be sent,
// or has been sent and is collecting acks from all subscribers.
//
typedef struct pub_packet {
packet_id_t pid;
// ref_count == 0 -> The packet has not yet been sent.
// ref_count > 0 -> The packet has been sent and can be found
// in 'ref_count' subscriber::inflight_packets
// lists.
uint32_t ref_count;
// Back pointer to pub_context::queued or pub_context::pending, depending
// on if the packet has been queud or sent.
// Allows for quick movement of packet as it changes status.
struct _pub_packet_node* parent_node;
usec_timestamp_t send_ts; // When was the packet sent
void *payload; // Payload provided by pub_queue_packet()
payload_len_t payload_len; // Payload length provided by pub_queue_packet()
user_data_t pkg_user_data; // Provided by queue_packet
} pub_packet_t;
RMC_LIST(pub_packet_list, pub_packet_node, pub_packet_t*)
typedef pub_packet_list pub_packet_list_t;
typedef pub_packet_node pub_packet_node_t;
// Each subscriber is hosted by a context.
// Each subscriber has a list of pointers to pending_packet_t owned by
// context->pending.
//
// A pending packet is added to inflight_packets when it is multicasted
// out.
//
// A pending packet is deleted from inflight_packets when a tcp-sent
// ack is received from the subscriber.
//
// When the pending_packet_t::ref_count reaches 0, all subscribers
// have received an ack, and the packet can be ermoved.
//
typedef struct pub_subscriber {
struct pub_context* context;
// Contains pointers to pending_packet_t sent but not
// acknowledged.
pub_packet_list_t inflight;
user_data_t user_data;
} pub_subscriber_t;
RMC_LIST(pub_sub_list, pub_sub_node, pub_subscriber_t*)
typedef pub_sub_list pub_sub_list_t;
typedef pub_sub_node pub_sub_node_t;
// A publisher context. Each publisher can have one or more
// subscribers, each hosted as a subscriber_t struct pointed to by the
// 'subscribers' list.
//
// When a packet is queued for multicast by pub_queue_packet() it's
// payload and packed id is stored in pending as pending_packet_t
// structs. When the packet is sent out via multicast, it is reported
// by a call to pub_packet_sent(), which will add a pointer to the
// given pending_packet_t to 'subscriber_t::inflight_packets' of all
// subscribers in 'subscriber'
//
// When a packet is acked by a subscriber via a pub_packet_ack() call,
// the corresponding pending_packet_t struct will be removed from the
// subscriber's 'inflight' list, and the pending packets
// reference counter is decreased by 1.
//
// When the reference counter reaches zero, all subscribers have acked
// the packet and the packet is removed from the 'inflight'
// list and is freed.
//
typedef struct pub_context {
pub_sub_list_t subscribers; // of pub_subscriber_t
pub_packet_list_t queued; // of pub_packet_t of packets waiting to be sent.
// List of pub_packet_t sent and awaiting acks. Packets in this list
// are referredd to by subscriber_t::inflight.
pub_packet_list_t inflight;
packet_id_t next_pid;
} pub_context_t;
// Init a new context.
//
// payload_free will be called to free the data pointed to by 'payload'
// in pub_queue_packet() once the packet has been ack:ed by all subscribers.
//
extern void pub_init_context(pub_context_t* ctx);
extern void pub_init_subscriber(pub_subscriber_t* sub,
pub_context_t* ctx,
user_data_t sub_user_data);
// Clean up sub and free all data related to its inflight packets.
void pub_reset_subscriber(pub_subscriber_t* sub,
void (*pub_payload_free)(void* payload,
payload_len_t payload_len,
user_data_t user_data));
// Payload will be freed by callback to (*pub_payload_free)() argument
// of pub_packet_ack()
extern packet_id_t pub_queue_packet(pub_context_t* ctx,
void* payload,
payload_len_t payload_len,
user_data_t pkg_user_data);
// Used to send out pid=0 announcement packets
extern packet_id_t pub_queue_no_acknowledge_packet(pub_context_t* ctx,
void* payload,
payload_len_t payload_len,
user_data_t pkg_user_data);
extern uint32_t pub_queue_size(pub_context_t* ctx);
extern pub_packet_t* pub_next_queued_packet(pub_context_t* ctx);
extern user_data_t pub_packet_user_data(pub_packet_t* ppack);
extern void pub_packet_sent(pub_context_t* ctx,
pub_packet_t* ppack,
usec_timestamp_t send_ts);
extern void pub_packet_ack(pub_subscriber_t* sub,
packet_id_t pid,
// Called if this was the last subscriber acking the packet
// meaning that we can discard it.
void (*pub_payload_free)(void* payload,
payload_len_t payload_len,
user_data_t user_data));
// Collect all subscribers that have unacknowledged
// packets older than or equal to max_age usecs.
extern void pub_get_timed_out_subscribers(pub_context_t* ctx,
usec_timestamp_t current_ts, // as reported by rmc_usec_monotonic_timestamp().
usec_timestamp_t timeout_period, // Number of usecs until timeout
pub_sub_list_t* result);
extern void pub_get_timed_out_packets(pub_subscriber_t* sub,
usec_timestamp_t current_ts, // as reported by rmc_usec_monotonic_timestamp().
usec_timestamp_t timeout_period, // Number of usecs until timeout
pub_packet_list_t* result);
extern uint32_t pub_get_unacknowledged_packet_count(pub_context_t* ctx);
// Get the time when the oldest packet was sent that we still are waiting
// for an acknowledgement on from the subscriber.
extern int pub_get_oldest_unackowledged_packet(pub_context_t* ctx,
usec_timestamp_t* sent_ts);
extern user_data_t pub_user_data(pub_context_t* ctx);
extern user_data_t pub_subscriber_user_data(pub_subscriber_t* sub);
#endif // __RMC_PUB_H__