-
-
Notifications
You must be signed in to change notification settings - Fork 5.4k
/
srs_app_conn.hpp
299 lines (284 loc) · 9.5 KB
/
srs_app_conn.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
//
// Copyright (c) 2013-2024 The SRS Authors
//
// SPDX-License-Identifier: MIT
//
#ifndef SRS_APP_CONN_HPP
#define SRS_APP_CONN_HPP
#include <srs_core.hpp>
#include <string>
#include <vector>
#include <map>
#include <openssl/ssl.h>
#include <openssl/err.h>
#include <srs_app_st.hpp>
#include <srs_protocol_kbps.hpp>
#include <srs_app_reload.hpp>
#include <srs_protocol_conn.hpp>
#include <srs_core_autofree.hpp>
class SrsWallClock;
class SrsBuffer;
// Hooks for connection manager, to handle the event when disposing connections.
class ISrsDisposingHandler
{
public:
ISrsDisposingHandler();
virtual ~ISrsDisposingHandler();
public:
// When before disposing resource, trigger when manager.remove(c), sync API.
// @remark Recommend to unref c, after this, no other objects refs to c.
virtual void on_before_dispose(ISrsResource* c) = 0;
// When disposing resource, async API, c is freed after it.
// @remark Recommend to stop any thread/timer of c, after this, fields of c is able
// to be deleted in any order.
virtual void on_disposing(ISrsResource* c) = 0;
};
// The item to identify the fast id object.
class SrsResourceFastIdItem
{
public:
// If available, use the resource in item.
bool available;
// How many resource have the same fast-id, which contribute a collision.
int nn_collisions;
// The first fast-id of resources.
uint64_t fast_id;
// The first resource object.
ISrsResource* impl;
public:
SrsResourceFastIdItem() {
available = false;
nn_collisions = 0;
fast_id = 0;
impl = NULL;
}
};
// The resource manager remove resource and delete it asynchronously.
class SrsResourceManager : public ISrsCoroutineHandler, public ISrsResourceManager
{
private:
std::string label_;
SrsContextId cid_;
bool verbose_;
private:
SrsCoroutine* trd;
srs_cond_t cond;
// Callback handlers.
std::vector<ISrsDisposingHandler*> handlers_;
// Unsubscribing handlers, skip it for notifying.
std::vector<ISrsDisposingHandler*> unsubs_;
// Whether we are removing resources.
bool removing_;
// The zombie connections, we will delete it asynchronously.
std::vector<ISrsResource*> zombies_;
std::vector<ISrsResource*>* p_disposing_;
private:
// The connections without any id.
std::vector<ISrsResource*> conns_;
// The connections with resource id.
std::map<std::string, ISrsResource*> conns_id_;
// The connections with resource fast(int) id.
std::map<uint64_t, ISrsResource*> conns_fast_id_;
// The level-0 fast cache for fast id.
int nn_level0_cache_;
SrsResourceFastIdItem* conns_level0_cache_;
// The connections with resource name.
std::map<std::string, ISrsResource*> conns_name_;
public:
SrsResourceManager(const std::string& label, bool verbose = false);
virtual ~SrsResourceManager();
public:
srs_error_t start();
bool empty();
size_t size();
// Interface ISrsCoroutineHandler
public:
virtual srs_error_t cycle();
public:
void add(ISrsResource* conn, bool* exists = NULL);
void add_with_id(const std::string& id, ISrsResource* conn);
void add_with_fast_id(uint64_t id, ISrsResource* conn);
void add_with_name(const std::string& name, ISrsResource* conn);
ISrsResource* at(int index);
ISrsResource* find_by_id(std::string id);
ISrsResource* find_by_fast_id(uint64_t id);
ISrsResource* find_by_name(std::string name);
public:
void subscribe(ISrsDisposingHandler* h);
void unsubscribe(ISrsDisposingHandler* h);
// Interface ISrsResourceManager
public:
virtual void remove(ISrsResource* c);
private:
void do_remove(ISrsResource* c);
void check_remove(ISrsResource* c, bool& in_zombie, bool& in_disposing);
void clear();
void do_clear();
void dispose(ISrsResource* c);
};
// This class implements the ISrsResource interface using a smart pointer, allowing the Manager to delete this
// smart pointer resource, such as by implementing delayed release.
//
// It embeds an SrsSharedPtr to provide the same interface, but it is not an inheritance relationship. Its usage
// is identical to SrsSharedPtr, but they cannot replace each other. They are not related and cannot be converted
// to one another.
//
// Note that we don't need to implement the move constructor and move assignment operator, because we directly
// use SrsSharedPtr as instance member, so we can only copy it.
//
// Usage:
// SrsSharedResource<MyClass>* ptr = new SrsSharedResource<MyClass>(new MyClass());
// (*ptr)->do_something();
//
// ISrsResourceManager* manager = ...;
// manager->remove(ptr);
template<typename T>
class SrsSharedResource : public ISrsResource
{
private:
SrsSharedPtr<T> ptr_;
public:
SrsSharedResource(T* ptr = NULL) : ptr_(ptr) {
}
SrsSharedResource(const SrsSharedResource<T>& cp) : ptr_(cp.ptr_) {
}
virtual ~SrsSharedResource() {
}
public:
// Get the object.
T* get() {
return ptr_.get();
}
// Overload the -> operator.
T* operator->() {
return ptr_.operator->();
}
// The assign operator.
SrsSharedResource<T>& operator=(const SrsSharedResource<T>& cp) {
if (this != &cp) {
ptr_ = cp.ptr_;
}
return *this;
}
private:
// Overload the * operator.
T& operator*() {
return ptr_.operator*();
}
// Overload the bool operator.
operator bool() const {
return ptr_.operator bool();
}
// Interface ISrsResource
public:
virtual const SrsContextId& get_id() {
return ptr_->get_id();
}
virtual std::string desc() {
return ptr_->desc();
}
};
// If a connection is able be expired, user can use HTTP-API to kick-off it.
class ISrsExpire
{
public:
ISrsExpire();
virtual ~ISrsExpire();
public:
// Set connection to expired to kick-off it.
virtual void expire() = 0;
};
// The basic connection of SRS, for TCP based protocols,
// all connections accept from listener must extends from this base class,
// server will add the connection to manager, and delete it when remove.
class SrsTcpConnection : public ISrsProtocolReadWriter
{
private:
// The underlayer st fd handler.
srs_netfd_t stfd;
// The underlayer socket.
SrsStSocket* skt;
public:
SrsTcpConnection(srs_netfd_t c);
virtual ~SrsTcpConnection();
public:
// Set socket option TCP_NODELAY.
virtual srs_error_t set_tcp_nodelay(bool v);
// Set socket option SO_SNDBUF in srs_utime_t.
virtual srs_error_t set_socket_buffer(srs_utime_t buffer_v);
// Interface ISrsProtocolReadWriter
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
// With a small fast read buffer, to support peek for protocol detecting. Note that directly write to io without any
// cache or buffer.
class SrsBufferedReadWriter : public ISrsProtocolReadWriter
{
private:
// The under-layer transport.
ISrsProtocolReadWriter* io_;
// Fixed, small and fast buffer. Note that it must be very small piece of cache, make sure matches all protocols,
// because we will full fill it when peeking.
char cache_[16];
// Current reading position.
SrsBuffer* buf_;
public:
SrsBufferedReadWriter(ISrsProtocolReadWriter* io);
virtual ~SrsBufferedReadWriter();
public:
// Peek the head of cache to buf in size of bytes.
srs_error_t peek(char* buf, int* size);
private:
srs_error_t reload_buffer();
// Interface ISrsProtocolReadWriter
public:
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
// The SSL connection over TCP transport, in server mode.
class SrsSslConnection : public ISrsProtocolReadWriter
{
private:
// The under-layer plaintext transport.
ISrsProtocolReadWriter* transport;
private:
SSL_CTX* ssl_ctx;
SSL* ssl;
BIO* bio_in;
BIO* bio_out;
public:
SrsSslConnection(ISrsProtocolReadWriter* c);
virtual ~SrsSslConnection();
public:
virtual srs_error_t handshake(std::string key_file, std::string crt_file);
// Interface ISrsProtocolReadWriter
public:
virtual void set_recv_timeout(srs_utime_t tm);
virtual srs_utime_t get_recv_timeout();
virtual srs_error_t read_fully(void* buf, size_t size, ssize_t* nread);
virtual int64_t get_recv_bytes();
virtual int64_t get_send_bytes();
virtual srs_error_t read(void* buf, size_t size, ssize_t* nread);
virtual void set_send_timeout(srs_utime_t tm);
virtual srs_utime_t get_send_timeout();
virtual srs_error_t write(void* buf, size_t size, ssize_t* nwrite);
virtual srs_error_t writev(const iovec *iov, int iov_size, ssize_t* nwrite);
};
#endif