-
Notifications
You must be signed in to change notification settings - Fork 295
/
rtmpchunk.cpp
380 lines (342 loc) · 8.22 KB
/
rtmpchunk.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
#include "log.h"
#include "tools.h"
#include "rtmp/rtmpchunk.h"
#include <stdexcept>
#include <cstdlib>
/**************************************
* RTMPChunkStreamInfo
* Base information needed to process chunks in a chunk stream
**************************************/
RTMPChunkStreamInfo::RTMPChunkStreamInfo()
{
timestamp = 0;
timestampDelta = 0;
streamId = -1;
length = -1;
}
void RTMPChunkStreamInfo::SetTimestamp(DWORD ts)
{
//Set timestamp
timestamp = ts;
}
void RTMPChunkStreamInfo::SetTimestampDelta(DWORD delta)
{
//Set delta
timestampDelta = delta;
}
DWORD RTMPChunkStreamInfo::GetTimestamp()
{
//Return timestamp
return timestamp;
}
void RTMPChunkStreamInfo::IncreaseTimestampWithDelta()
{
//Increase timestamp with delta value
timestamp += timestampDelta;
}
void RTMPChunkStreamInfo::SetMessageTypeId(BYTE type)
{
//Store value
this->type = (RTMPMessage::Type)type;
}
void RTMPChunkStreamInfo::SetMessageStreamId(DWORD streamId)
{
//Store value
this->streamId = streamId;
}
void RTMPChunkStreamInfo::SetMessageLength(DWORD length)
{
//Store value
this->length = length;
}
DWORD RTMPChunkStreamInfo::GetMessageStreamId()
{
return streamId;
}
BYTE RTMPChunkStreamInfo::GetMessageTypeId()
{
return (BYTE)type;
}
/***********************************
* RTMPChunkInputStream
* Chunk stream sent by the remote peer
***********************************/
RTMPChunkInputStream::RTMPChunkInputStream() : RTMPChunkStreamInfo()
{
message = NULL;
}
RTMPChunkInputStream::~RTMPChunkInputStream()
{
if (message)
delete (message);
}
void RTMPChunkInputStream::StartChunkData()
{
//If we don't have an message object
if (!message)
//Create a new one
message = new RTMPMessage(streamId,timestamp,type,length);
}
DWORD RTMPChunkInputStream::Parse(BYTE *data,DWORD size)
{
return message->Parse(data,size);
}
bool RTMPChunkInputStream::IsParsed()
{
return message?message->IsParsed():false;
}
RTMPMessage* RTMPChunkInputStream::GetMessage()
{
RTMPMessage *ret = NULL;
//Check if it is parsed
if (message->IsParsed())
{
//Return the parsed message
ret = message;
//Nullify
message = NULL;
}
//Exit
return ret;
}
bool RTMPChunkInputStream::IsFirstChunk()
{
return message==NULL;
}
/***********************************
* RTMPChunkOutputStream
* Chunk stream sent by the local server
***********************************/
RTMPChunkOutputStream::RTMPChunkOutputStream(DWORD chunkStreamId) : RTMPChunkStreamInfo()
{
//Empty message
message = NULL;
msgBuffer = NULL;
//Store own id
this->chunkStreamId = chunkStreamId;
//Init mutex
pthread_mutex_init(&mutex,0);
}
RTMPChunkOutputStream::~RTMPChunkOutputStream()
{
//Clean messages in queue
for (RTMPMessages::iterator it=messages.begin(); it!=messages.end(); ++it)
//Delete message
delete(*it);
if (message)
{
delete(msgBuffer);
delete(message);
}
//Destroy mutex
pthread_mutex_destroy(&mutex);
}
void RTMPChunkOutputStream::SendMessage(RTMPMessage *msg)
{
//lock now
pthread_mutex_lock(&mutex);
//Check it is not null
if(msg)
//Push back the message
messages.push_back(msg);
//Unlock
pthread_mutex_unlock(&mutex);
}
DWORD RTMPChunkOutputStream::GetNextChunk(BYTE *data,DWORD size,DWORD maxChunkSize)
{
//lock now
pthread_mutex_lock(&mutex);
//Message basic header
RTMPChunkBasicHeader header;
//Set chunk stream id
header.SetStreamId(chunkStreamId);
//Chunk header
RTMPObject* chunkHeader = NULL;
//Extended timestamp
RTMPExtendedTimestamp extts;
//Use extended timestamp flag
bool useExtTimestamp = false;
//If we are not processing an object
if (!message)
{
//Check we hve still data
if (messages.empty())
{
//Unlock
pthread_mutex_unlock(&mutex);
//No more data to send here
return 0;
}
//Get the next message to send
message = messages.front();
//Remove from queue
messages.pop_front();
//Get message values
RTMPMessage::Type msgType = message->GetType();
DWORD msgStreamId = message->GetStreamId();
DWORD msgLength = message->GetLength();
DWORD msgTimestamp = message->GetTimestamp();
DWORD msgTimestampDelta = msgTimestamp-timestamp;
//Start sending
pos = 0;
//Allocate data for serialized message
msgBuffer = (BYTE*)malloc(msgLength);
//Serialize it
message->Serialize(msgBuffer,msgLength);
//Select wich header
if (!msgStreamId || msgStreamId!=streamId || msgTimestamp<timestamp)
{
//Create chunk header type 0 (last check is for backward time on Seek)
RTMPChunkType0* type0 = new RTMPChunkType0();
//Set header type
header.SetFmt(0);
//Check timestamp
if (msgTimestamp>=0xFFFFFF)
{
//Set flag
useExtTimestamp = true;
//Use extended header
type0->SetTimestamp(0xFFFFFF);
//Set it
extts.SetTimestamp(msgTimestamp);
} else {
//Set timestamp
type0->SetTimestamp(msgTimestamp);
}
//Set data in chunk header
type0->SetMessageLength(msgLength);
type0->SetMessageTypeId(msgType);
type0->SetMessageStreamId(msgStreamId);
//Not delta available for next packet
msgTimestampDelta = 0;
//Store object
chunkHeader = type0;
} else if (msgLength!=length || msgType!=type) {
//Create chunk header type 1
RTMPChunkType1* type1 = new RTMPChunkType1();
//Set header type
header.SetFmt(1);
//Set data in chunk header
type1->SetTimestampDelta(msgTimestampDelta);
type1->SetMessageLength(msgLength);
type1->SetMessageTypeId(msgType);
//Store object
chunkHeader = type1;
} else if (msgTimestampDelta!=timestampDelta) {
//Create chunk header type 1
RTMPChunkType2* type2 = new RTMPChunkType2();
//Set header type
header.SetFmt(2);
//Set data in chunk header
type2->SetTimestampDelta(msgTimestampDelta);
//Store object
chunkHeader = type2;
} else {
//Set header type 3 as it shares all data with previous
header.SetFmt(3);
//Store object
chunkHeader = NULL;
}
//And update the stream values with latest message values
SetTimestamp(msgTimestamp);
SetTimestampDelta(msgTimestampDelta);
SetMessageLength(msgLength);
SetMessageTypeId(msgType);
SetMessageStreamId(msgStreamId);
} else {
//Set header type 3 as it shares all data with previous
header.SetFmt(3);
//Store object
chunkHeader = NULL;
}
//Serialize header
DWORD headersLen = header.Serialize(data,size);
//Check if we need chunk header
if (chunkHeader)
//Serialize chunk header
headersLen += chunkHeader->Serialize(data+headersLen,size-headersLen);
//Check if need to use extended timestamp
if (useExtTimestamp)
//Serialize extened header
headersLen += extts.Serialize(data+headersLen,size-headersLen);
//Size of the msg data of the chunk
DWORD payloadLen = maxChunkSize;
//If we have more than needed
if (payloadLen>length-pos)
//Just copy until the oend of the object
payloadLen = length-pos;
//Copy
memcpy(data+headersLen,msgBuffer+pos,payloadLen);
//Increase sent data from msg
pos += payloadLen;
//Check if we have finished with this message
if (pos==length)
{
//Delete buffer
free(msgBuffer);
//Null
msgBuffer = NULL;
//Delete message
delete(message);
//Next one
message = NULL;
}
//Check
if (chunkHeader)
//Delete it
delete (chunkHeader);
//Unlock
pthread_mutex_unlock(&mutex);
//Return copied data
return headersLen+payloadLen;
}
bool RTMPChunkOutputStream::HasData()
{
//lock now
pthread_mutex_lock(&mutex);
//Return true if we are sending a message or we have more in the queue
bool ret = message || !messages.empty();
//Unlock
pthread_mutex_unlock(&mutex);
return ret;
}
bool RTMPChunkOutputStream::ResetStream(DWORD id)
{
Log("-ResetStream %d\n",id);
bool abort = false;
//lock now
pthread_mutex_lock(&mutex);
//Iterate the messages
RTMPMessages::iterator it = messages.begin();
//remove any message from the stream
while(it!=messages.end())
{
//Get Message
RTMPMessage *msg = *it;
//Get message
if (msg && msg->GetStreamId()==id)
//Remove it
it = messages.erase(it);
else
//next one;
++it;
}
//If we have message of this stream
if (message && message->GetStreamId()==id)
{
//Delete buffer
free(msgBuffer);
//Null
msgBuffer = NULL;
//Delete message
delete(message);
//Next one
message = NULL;
//We have to abort
abort = true;
}
//Unlock
pthread_mutex_unlock(&mutex);
//NOt needed to abort
return abort;
}