From 4356ff64198dc6f3a45d34c407cb9d01937e4677 Mon Sep 17 00:00:00 2001 From: wanglei Date: Tue, 11 Jan 2022 16:14:10 +0800 Subject: [PATCH] bugfix: SrsMetaCache memleak; getaddrinfo use delete. --- trunk/src/app/srs_app_source.cpp | 4 +- trunk/src/kernel/srs_kernel_utility.cpp | 292 ++++++++++++------------ trunk/src/protocol/srs_service_st.cpp | 77 ++++--- 3 files changed, 187 insertions(+), 186 deletions(-) diff --git a/trunk/src/app/srs_app_source.cpp b/trunk/src/app/srs_app_source.cpp index 623e5162b2..fd42b95449 100755 --- a/trunk/src/app/srs_app_source.cpp +++ b/trunk/src/app/srs_app_source.cpp @@ -1526,8 +1526,8 @@ void SrsMetaCache::dispose() clear(); srs_freep(previous_video); srs_freep(previous_audio); - srs_freep(previous_video); - srs_freep(previous_audio); + srs_freep(vformat); + srs_freep(aformat); } void SrsMetaCache::clear() diff --git a/trunk/src/kernel/srs_kernel_utility.cpp b/trunk/src/kernel/srs_kernel_utility.cpp index b7fcb80ff2..3298970ab2 100644 --- a/trunk/src/kernel/srs_kernel_utility.cpp +++ b/trunk/src/kernel/srs_kernel_utility.cpp @@ -36,11 +36,11 @@ using namespace std; srs_error_t srs_avc_nalu_read_uev(SrsBitBuffer* stream, int32_t& v) { srs_error_t err = srs_success; - + if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "empty stream"); } - + // ue(v) in 9.1 Parsing process for Exp-Golomb codes // ISO_IEC_14496-10-AVC-2012.pdf, page 227. // Syntax elements coded as ue(v), me(v), or se(v) are Exp-Golomb-coded. @@ -53,34 +53,34 @@ srs_error_t srs_avc_nalu_read_uev(SrsBitBuffer* stream, int32_t& v) for (int8_t b = 0; !b && !stream->empty(); leadingZeroBits++) { b = stream->read_bit(); } - + if (leadingZeroBits >= 31) { return srs_error_new(ERROR_AVC_NALU_UEV, "%dbits overflow 31bits", leadingZeroBits); } - + v = (1 << leadingZeroBits) - 1; for (int i = 0; i < (int)leadingZeroBits; i++) { if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "no bytes for leadingZeroBits=%d", leadingZeroBits); } - + int32_t b = stream->read_bit(); v += b << (leadingZeroBits - 1 - i); } - + return err; } srs_error_t srs_avc_nalu_read_bit(SrsBitBuffer* stream, int8_t& v) { srs_error_t err = srs_success; - + if (stream->empty()) { return srs_error_new(ERROR_AVC_NALU_UEV, "empty stream"); } - + v = stream->read_bit(); - + return err; } @@ -92,7 +92,7 @@ srs_utime_t srs_get_system_time() if (_srs_system_time_us_cache <= 0) { srs_update_system_time(); } - + return _srs_system_time_us_cache; } @@ -113,15 +113,15 @@ srs_gettimeofday_t _srs_gettimeofday = (srs_gettimeofday_t)::gettimeofday; srs_utime_t srs_update_system_time() { timeval now; - + if (_srs_gettimeofday(&now, NULL) < 0) { srs_warn("gettimeofday failed, ignore"); return -1; } - + // we must convert the tv_sec/tv_usec to int64_t. int64_t now_us = ((int64_t)now.tv_sec) * 1000 * 1000 + (int64_t)now.tv_usec; - + // for some ARM os, the starttime maybe invalid, // for example, on the cubieboard2, the srs_startup_time is 1262304014640, // while now is 1403842979210 in ms, diff is 141538964570 ms, 1638 days @@ -132,7 +132,7 @@ srs_utime_t srs_update_system_time() _srs_system_time_startup_time = _srs_system_time_us_cache = now_us; return _srs_system_time_us_cache; } - + // use relative time. int64_t diff = now_us - _srs_system_time_us_cache; diff = srs_max(0, diff); @@ -140,10 +140,10 @@ srs_utime_t srs_update_system_time() srs_warn("clock jump, history=%" PRId64 "us, now=%" PRId64 "us, diff=%" PRId64 "us", _srs_system_time_us_cache, now_us, diff); _srs_system_time_startup_time += diff; } - + _srs_system_time_us_cache = now_us; srs_info("clock updated, startup=%" PRId64 "us, now=%" PRId64 "us", _srs_system_time_startup_time, _srs_system_time_us_cache); - + return _srs_system_time_us_cache; } @@ -249,7 +249,7 @@ void srs_parse_endpoint(string hostport, string& ip, int& port) // Handle IP address ip = hostport.substr(0, pos); } - + const string sport = hostport.substr(pos + 1); port = ::atoi(sport.c_str()); } else { @@ -267,12 +267,12 @@ bool srs_check_ip_addr_valid(string ip) if (ret > 0) { return true; } - + ret = inet_pton(AF_INET6, ip.data(), buf); if (ret > 0) { return true; } - + return false; } @@ -301,84 +301,84 @@ bool srs_is_little_endian() // convert to network(big-endian) order, if not equals, // the system is little-endian, so need to convert the int64 static int little_endian_check = -1; - + if(little_endian_check == -1) { union { int32_t i; int8_t c; } little_check_union; - + little_check_union.i = 0x01; little_endian_check = little_check_union.c; } - + return (little_endian_check == 1); } string srs_string_replace(string str, string old_str, string new_str) { std::string ret = str; - + if (old_str == new_str) { return ret; } - + size_t pos = 0; while ((pos = ret.find(old_str, pos)) != std::string::npos) { ret = ret.replace(pos, old_str.length(), new_str); pos += new_str.length(); } - + return ret; } string srs_string_trim_end(string str, string trim_chars) { std::string ret = str; - + for (int i = 0; i < (int)trim_chars.length(); i++) { char ch = trim_chars.at(i); - + while (!ret.empty() && ret.at(ret.length() - 1) == ch) { ret.erase(ret.end() - 1); - + // ok, matched, should reset the search i = -1; } } - + return ret; } string srs_string_trim_start(string str, string trim_chars) { std::string ret = str; - + for (int i = 0; i < (int)trim_chars.length(); i++) { char ch = trim_chars.at(i); - + while (!ret.empty() && ret.at(0) == ch) { ret.erase(ret.begin()); - + // ok, matched, should reset the search i = -1; } } - + return ret; } string srs_string_remove(string str, string remove_chars) { std::string ret = str; - + for (int i = 0; i < (int)remove_chars.length(); i++) { char ch = remove_chars.at(i); - + for (std::string::iterator it = ret.begin(); it != ret.end();) { if (ch == *it) { it = ret.erase(it); - + // ok, matched, should reset the search i = -1; } else { @@ -386,7 +386,7 @@ string srs_string_remove(string str, string remove_chars) } } } - + return ret; } @@ -399,7 +399,7 @@ string srs_erase_first_substr(string str, string erase_string) if (pos != std::string::npos) { ret.erase(pos, erase_string.length()); } - + return ret; } @@ -412,7 +412,7 @@ string srs_erase_last_substr(string str, string erase_string) if (pos != std::string::npos) { ret.erase(pos, erase_string.length()); } - + return ret; } @@ -490,7 +490,7 @@ vector srs_string_split(string s, string seperator) result.push_back(s); return result; } - + size_t posBegin = 0; size_t posSeperator = s.find(seperator); while (posSeperator != string::npos) { @@ -506,42 +506,42 @@ vector srs_string_split(string s, string seperator) string srs_string_min_match(string str, vector seperators) { string match; - + if (seperators.empty()) { return str; } - + size_t min_pos = string::npos; for (vector::iterator it = seperators.begin(); it != seperators.end(); ++it) { string seperator = *it; - + size_t pos = str.find(seperator); if (pos == string::npos) { continue; } - + if (min_pos == string::npos || pos < min_pos) { min_pos = pos; match = seperator; } } - + return match; } vector srs_string_split(string str, vector seperators) { vector arr; - + size_t pos = string::npos; string s = str; - + while (true) { string seperator = srs_string_min_match(s, seperators); if (seperator.empty()) { break; } - + if ((pos = s.find(seperator)) == string::npos) { break; } @@ -549,23 +549,23 @@ vector srs_string_split(string str, vector seperators) arr.push_back(s.substr(0, pos)); s = s.substr(pos + seperator.length()); } - + if (!s.empty()) { arr.push_back(s); } - + return arr; } int srs_do_create_dir_recursively(string dir) { int ret = ERROR_SUCCESS; - + // stat current dir, if exists, return error. if (srs_path_exists(dir)) { return ERROR_SYSTEM_DIR_EXISTS; } - + // create parent first. size_t pos; if ((pos = dir.rfind("/")) != std::string::npos) { @@ -578,7 +578,7 @@ int srs_do_create_dir_recursively(string dir) // parent exists, set to ok. ret = ERROR_SUCCESS; } - + // create curren dir. #ifdef _WIN32 if (::_mkdir(dir.c_str()) < 0) { @@ -589,59 +589,59 @@ int srs_do_create_dir_recursively(string dir) if (errno == EEXIST) { return ERROR_SYSTEM_DIR_EXISTS; } - + ret = ERROR_SYSTEM_CREATE_DIR; srs_error("create dir %s failed. ret=%d", dir.c_str(), ret); return ret; } - + srs_info("create dir %s success.", dir.c_str()); - + return ret; } - + bool srs_bytes_equals(void* pa, void* pb, int size) { uint8_t* a = (uint8_t*)pa; uint8_t* b = (uint8_t*)pb; - + if (!a && !b) { return true; } - + if (!a || !b) { return false; } - + for(int i = 0; i < size; i++){ if(a[i] != b[i]){ return false; } } - + return true; } srs_error_t srs_create_dir_recursively(string dir) { int ret = srs_do_create_dir_recursively(dir); - + if (ret == ERROR_SYSTEM_DIR_EXISTS || ret == ERROR_SUCCESS) { return srs_success; } - + return srs_error_new(ret, "create dir %s", dir.c_str()); } bool srs_path_exists(std::string path) { struct stat st; - + // stat current dir, if exists, return error. if (stat(path.c_str(), &st) == 0) { return true; } - + return false; } @@ -669,7 +669,7 @@ string srs_path_basename(string path) { std::string dirname = path; size_t pos = string::npos; - + if ((pos = dirname.rfind("/")) != string::npos) { // the basename("/") is "/" if (dirname.length() == 1) { @@ -677,7 +677,7 @@ string srs_path_basename(string path) } dirname = dirname.substr(pos + 1); } - + return dirname; } @@ -685,22 +685,22 @@ string srs_path_filename(string path) { std::string filename = path; size_t pos = string::npos; - + if ((pos = filename.rfind(".")) != string::npos) { return filename.substr(0, pos); } - + return filename; } string srs_path_filext(string path) { size_t pos = string::npos; - + if ((pos = path.rfind(".")) != string::npos) { return path.substr(pos); } - + return ""; } @@ -709,20 +709,20 @@ bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code) if (!stream) { return false; } - + char* bytes = stream->data() + stream->pos(); char* p = bytes; - + for (;;) { if (!stream->require((int)(p - bytes + 3))) { return false; } - + // not match if (p[0] != (char)0x00 || p[1] != (char)0x00) { return false; } - + // match N[00] 00 00 01, where N>=0 if (p[2] == (char)0x01) { if (pnb_start_code) { @@ -730,10 +730,10 @@ bool srs_avc_startswith_annexb(SrsBuffer* stream, int* pnb_start_code) } return true; } - + p++; } - + return false; } @@ -742,53 +742,53 @@ bool srs_aac_startswith_adts(SrsBuffer* stream) if (!stream) { return false; } - + char* bytes = stream->data() + stream->pos(); char* p = bytes; - + if (!stream->require((int)(p - bytes) + 2)) { return false; } - + // matched 12bits 0xFFF, // @remark, we must cast the 0xff to char to compare. if (p[0] != (char)0xff || (char)(p[1] & 0xf0) != (char)0xf0) { return false; } - + return true; } - + // @see pycrc reflect at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L107 uint64_t __crc32_reflect(uint64_t data, int width) { uint64_t res = data & 0x01; - + for (int i = 0; i < (int)width - 1; i++) { data >>= 1; res = (res << 1) | (data & 0x01); } - + return res; } - + // @see pycrc gen_table at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L178 void __crc32_make_table(uint32_t t[256], uint32_t poly, bool reflect_in) { int width = 32; // 32bits checksum. uint64_t msb_mask = (uint32_t)(0x01 << (width - 1)); uint64_t mask = (uint32_t)(((msb_mask - 1) << 1) | 1); - + int tbl_idx_width = 8; // table index size. int tbl_width = 0x01 << tbl_idx_width; // table size: 256 - + for (int i = 0; i < (int)tbl_width; i++) { uint64_t reg = uint64_t(i); - + if (reflect_in) { reg = __crc32_reflect(reg, tbl_idx_width); } - + reg = reg << (width - tbl_idx_width); for (int j = 0; j < tbl_idx_width; j++) { if ((reg&msb_mask) != 0) { @@ -797,53 +797,53 @@ void __crc32_make_table(uint32_t t[256], uint32_t poly, bool reflect_in) reg = reg << 1; } } - + if (reflect_in) { reg = __crc32_reflect(reg, width); } - + t[i] = (uint32_t)(reg & mask); } } - + // @see pycrc table_driven at https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L207 uint32_t __crc32_table_driven(uint32_t* t, const void* buf, int size, uint32_t previous, bool reflect_in, uint32_t xor_in, bool reflect_out, uint32_t xor_out) { int width = 32; // 32bits checksum. uint64_t msb_mask = (uint32_t)(0x01 << (width - 1)); uint64_t mask = (uint32_t)(((msb_mask - 1) << 1) | 1); - + int tbl_idx_width = 8; // table index size. - + uint8_t* p = (uint8_t*)buf; uint64_t reg = 0; - + if (!reflect_in) { reg = xor_in; - + for (int i = 0; i < (int)size; i++) { uint8_t tblidx = (uint8_t)((reg >> (width - tbl_idx_width)) ^ p[i]); reg = t[tblidx] ^ (reg << tbl_idx_width); } } else { reg = previous ^ __crc32_reflect(xor_in, width); - + for (int i = 0; i < (int)size; i++) { uint8_t tblidx = (uint8_t)(reg ^ p[i]); reg = t[tblidx] ^ (reg >> tbl_idx_width); } - + reg = __crc32_reflect(reg, width); } - + if (reflect_out) { reg = __crc32_reflect(reg, width); } - + reg ^= xor_out; return (uint32_t)(reg & mask); } - + // @see pycrc https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L207 // IEEETable is the table for the IEEE polynomial. static uint32_t __crc32_IEEE_table[256]; @@ -868,20 +868,20 @@ uint32_t srs_crc32_ieee(const void* buf, int size, uint32_t previous) // @remark The poly of CRC32 IEEE is 0x04C11DB7, its reverse is 0xEDB88320, // please read https://en.wikipedia.org/wiki/Cyclic_redundancy_check uint32_t poly = 0x04C11DB7; - + bool reflect_in = true; uint32_t xor_in = 0xffffffff; bool reflect_out = true; uint32_t xor_out = 0xffffffff; - + if (!__crc32_IEEE_table_initialized) { __crc32_make_table(__crc32_IEEE_table, poly, reflect_in); __crc32_IEEE_table_initialized = true; } - + return __crc32_table_driven(__crc32_IEEE_table, buf, size, previous, reflect_in, xor_in, reflect_out, xor_out); } - + // @see pycrc https://github.com/winlinvip/pycrc/blob/master/pycrc/algorithms.py#L238 // IEEETable is the table for the MPEG polynomial. static uint32_t __crc32_MPEG_table[256]; @@ -906,17 +906,17 @@ uint32_t srs_crc32_mpegts(const void* buf, int size) // @remark The poly of CRC32 IEEE is 0x04C11DB7, its reverse is 0xEDB88320, // please read https://en.wikipedia.org/wiki/Cyclic_redundancy_check uint32_t poly = 0x04C11DB7; - + bool reflect_in = false; uint32_t xor_in = 0xffffffff; bool reflect_out = false; uint32_t xor_out = 0x0; - + if (!__crc32_MPEG_table_initialized) { __crc32_make_table(__crc32_MPEG_table, poly, reflect_in); __crc32_MPEG_table_initialized = true; } - + return __crc32_table_driven(__crc32_MPEG_table, buf, size, 0x00, reflect_in, xor_in, reflect_out, xor_out); } @@ -931,51 +931,51 @@ namespace { srs_error_t srs_av_base64_decode(string cipher, string& plaintext) { srs_error_t err = srs_success; - + uint8_t decodeMap[256]; memset(decodeMap, 0xff, sizeof(decodeMap)); - + for (int i = 0; i < (int)encoder.length(); i++) { decodeMap[(uint8_t)encoder.at(i)] = uint8_t(i); } - + // decode is like Decode but returns an additional 'end' value, which // indicates if end-of-message padding or a partial quantum was encountered // and thus any additional data is an error. int si = 0; - + // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } - + for (bool end = false; si < (int)cipher.length() && !end;) { // Decode quantum using the base64 alphabet uint8_t dbuf[4]; memset(dbuf, 0x00, sizeof(dbuf)); - + int dinc = 3; int dlen = 4; srs_assert(dinc > 0); - + for (int j = 0; j < (int)sizeof(dbuf); j++) { if (si == (int)cipher.length()) { if (padding != -1 || j < 2) { return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } - + dinc = j - 1; dlen = j; end = true; break; } - + char in = cipher.at(si); - + si++; // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } - + if (in == padding) { // We've reached the end and there's padding switch (j) { @@ -992,13 +992,13 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) // incorrect padding return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } - + si++; // skip over newlines for (; si < (int)cipher.length() && (cipher.at(si) == '\n' || cipher.at(si) == '\r'); si++) { } } - + if (si < (int)cipher.length()) { // trailing garbage err = srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); @@ -1008,13 +1008,13 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) end = true; break; } - + dbuf[j] = decodeMap[(uint8_t)in]; if (dbuf[j] == 0xff) { return srs_error_new(ERROR_BASE64_DECODE, "corrupt input at %d", si); } } - + // Convert 4x 6bit source bytes into 3 bytes uint32_t val = uint32_t(dbuf[0])<<18 | uint32_t(dbuf[1])<<12 | uint32_t(dbuf[2])<<6 | uint32_t(dbuf[3]); if (dlen >= 2) { @@ -1027,7 +1027,7 @@ srs_error_t srs_av_base64_decode(string cipher, string& plaintext) plaintext.append(1, char(val)); } } - + return err; } @@ -1037,7 +1037,7 @@ srs_error_t srs_av_base64_encode(std::string plaintext, std::string& cipher) srs_error_t err = srs_success; uint8_t decodeMap[256]; memset(decodeMap, 0xff, sizeof(decodeMap)); - + for (int i = 0; i < (int)encoder.length(); i++) { decodeMap[(uint8_t)encoder.at(i)] = uint8_t(i); } @@ -1098,7 +1098,7 @@ int av_toupper(int c) } return c; } - + // fromHexChar converts a hex character into its value and a success flag. uint8_t srs_from_hex_char(uint8_t c) { @@ -1122,11 +1122,11 @@ char* srs_data_to_hex(char* des, const u_int8_t* src, int len) } const char *hex_table = "0123456789ABCDEF"; - + for (int i=0; i> 4]; des[i * 2 + 1] = hex_table[src[i] & 0x0F]; - } + } return des; } @@ -1152,21 +1152,21 @@ int srs_hex_to_data(uint8_t* data, const char* p, int size) if (size <= 0 || (size%2) == 1) { return -1; } - + for (int i = 0; i < (int)size / 2; i++) { uint8_t a = srs_from_hex_char(p[i*2]); if (a == (uint8_t)-1) { return -1; } - + uint8_t b = srs_from_hex_char(p[i*2 + 1]); if (b == (uint8_t)-1) { return -1; } - + data[i] = (a << 4) | b; } - + return size / 2; } @@ -1174,18 +1174,18 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng { // to directly set the field. char* pp = NULL; - + // generate the header. char* p = cache; - + // no header. if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT0_HEADER_SIZE) { return 0; } - + // write new chunk stream header, fmt is 0 *p++ = 0x00 | (perfer_cid & 0x3F); - + // chunk message header, 11 bytes // timestamp, 3bytes, big-endian if (timestamp < RTMP_EXTENDED_TIMESTAMP) { @@ -1198,23 +1198,23 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng *p++ = (char)0xFF; *p++ = (char)0xFF; } - + // message_length, 3bytes, big-endian pp = (char*)&payload_length; *p++ = pp[2]; *p++ = pp[1]; *p++ = pp[0]; - + // message_type, 1bytes *p++ = message_type; - + // stream_id, 4bytes, little-endian pp = (char*)&stream_id; *p++ = pp[0]; *p++ = pp[1]; *p++ = pp[2]; *p++ = pp[3]; - + // for c0 // chunk extended timestamp header, 0 or 4 bytes, big-endian // @@ -1240,7 +1240,7 @@ int srs_chunk_header_c0(int perfer_cid, uint32_t timestamp, int32_t payload_leng *p++ = pp[1]; *p++ = pp[0]; } - + // always has header return (int)(p - cache); } @@ -1249,20 +1249,20 @@ int srs_chunk_header_c3(int perfer_cid, uint32_t timestamp, char* cache, int nb_ { // to directly set the field. char* pp = NULL; - + // generate the header. char* p = cache; - + // no header. if (nb_cache < SRS_CONSTS_RTMP_MAX_FMT3_HEADER_SIZE) { return 0; } - + // write no message header chunk stream, fmt is 3 // @remark, if perfer_cid > 0x3F, that is, use 2B/3B chunk header, // SRS will rollback to 1B chunk header. *p++ = 0xC0 | (perfer_cid & 0x3F); - + // for c0 // chunk extended timestamp header, 0 or 4 bytes, big-endian // @@ -1288,7 +1288,7 @@ int srs_chunk_header_c3(int perfer_cid, uint32_t timestamp, char* cache, int nb_ *p++ = pp[1]; *p++ = pp[0]; } - + // always has header return (int)(p - cache); } diff --git a/trunk/src/protocol/srs_service_st.cpp b/trunk/src/protocol/srs_service_st.cpp index b5c12cdfb3..a7edeb1ea8 100644 --- a/trunk/src/protocol/srs_service_st.cpp +++ b/trunk/src/protocol/srs_service_st.cpp @@ -27,12 +27,12 @@ using namespace std; bool srs_st_epoll_is_supported(void) { struct epoll_event ev; - + ev.events = EPOLLIN; ev.data.ptr = NULL; /* Guaranteed to fail */ epoll_ctl(-1, EPOLL_CTL_ADD, -1, &ev); - + return (errno != ENOSYS); } #endif @@ -45,7 +45,7 @@ srs_error_t srs_st_init() return srs_error_new(ERROR_ST_SET_EPOLL, "linux epoll disabled"); } #endif - + // Select the best event system available on the OS. In Linux this is // epoll(). On BSD it will be kqueue. if (st_set_eventsys(ST_EVENTSYS_ALT) == -1) { @@ -57,7 +57,7 @@ srs_error_t srs_st_init() if (cid.empty()) { cid = _srs_context->generate_id(); } - + int r0 = 0; if((r0 = st_init()) != 0){ return srs_error_new(ERROR_ST_INITIALIZE, "st initialize failed, r0=%d", r0); @@ -66,7 +66,7 @@ srs_error_t srs_st_init() // Switch to the background cid. _srs_context->set_id(cid); srs_info("st_init success, use %s", st_get_eventsys_name()); - + return srs_success; } @@ -277,6 +277,7 @@ srs_error_t srs_tcp_listen(std::string ip, int port, srs_netfd_t* pfd) return err; } + srs_error_t do_srs_udp_listen(int fd, addrinfo* r, srs_netfd_t* pfd) { srs_error_t err = srs_success; @@ -500,18 +501,18 @@ int64_t SrsStSocket::get_send_bytes() srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read((st_netfd_t)stfd, buf, size, rtm); } - + if (nread) { *nread = nb_read; } - + // On success a non-negative integer indicating the number of bytes actually read is returned // (a value of 0 means the network connection is closed or end of file is reached). // Otherwise, a value of -1 is returned and errno is set to indicate the error. @@ -519,34 +520,34 @@ srs_error_t SrsStSocket::read(void* buf, size_t size, ssize_t* nread) if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm)); } - + if (nb_read == 0) { errno = ECONNRESET; } - + return srs_error_new(ERROR_SOCKET_READ, "read"); } - + rbytes += nb_read; - + return err; } srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) { srs_error_t err = srs_success; - + ssize_t nb_read; if (rtm == SRS_UTIME_NO_TIMEOUT) { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_read = st_read_fully((st_netfd_t)stfd, buf, size, rtm); } - + if (nread) { *nread = nb_read; } - + // On success a non-negative integer indicating the number of bytes actually read is returned // (a value less than nbyte means the network connection is closed or end of file is reached) // Otherwise, a value of -1 is returned and errno is set to indicate the error. @@ -554,76 +555,76 @@ srs_error_t SrsStSocket::read_fully(void* buf, size_t size, ssize_t* nread) if (nb_read < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "timeout %d ms", srsu2msi(rtm)); } - + if (nb_read >= 0) { errno = ECONNRESET; } - + return srs_error_new(ERROR_SOCKET_READ_FULLY, "read fully"); } - + rbytes += nb_read; - + return err; } srs_error_t SrsStSocket::write(void* buf, size_t size, ssize_t* nwrite) { srs_error_t err = srs_success; - + ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_write((st_netfd_t)stfd, buf, size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_write((st_netfd_t)stfd, buf, size, stm); } - + if (nwrite) { *nwrite = nb_write; } - + // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "write timeout %d ms", srsu2msi(stm)); } - + return srs_error_new(ERROR_SOCKET_WRITE, "write"); } - + sbytes += nb_write; - + return err; } srs_error_t SrsStSocket::writev(const iovec *iov, int iov_size, ssize_t* nwrite) { srs_error_t err = srs_success; - + ssize_t nb_write; if (stm == SRS_UTIME_NO_TIMEOUT) { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, ST_UTIME_NO_TIMEOUT); } else { nb_write = st_writev((st_netfd_t)stfd, iov, iov_size, stm); } - + if (nwrite) { *nwrite = nb_write; } - + // On success a non-negative integer equal to nbyte is returned. // Otherwise, a value of -1 is returned and errno is set to indicate the error. if (nb_write <= 0) { if (nb_write < 0 && errno == ETIME) { return srs_error_new(ERROR_SOCKET_TIMEOUT, "writev timeout %d ms", srsu2msi(stm)); } - + return srs_error_new(ERROR_SOCKET_WRITE, "writev"); } - + sbytes += nb_write; - + return err; } @@ -631,7 +632,7 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) { stfd = NULL; io = new SrsStSocket(); - + host = h; port = p; timeout = tm; @@ -640,25 +641,25 @@ SrsTcpClient::SrsTcpClient(string h, int p, srs_utime_t tm) SrsTcpClient::~SrsTcpClient() { close(); - + srs_freep(io); } srs_error_t SrsTcpClient::connect() { srs_error_t err = srs_success; - + close(); - + srs_assert(stfd == NULL); if ((err = srs_tcp_connect(host, port, timeout, &stfd)) != srs_success) { return srs_error_wrap(err, "tcp: connect %s:%d to=%dms", host.c_str(), port, srsu2msi(timeout)); } - + if ((err = io->initialize(stfd)) != srs_success) { return srs_error_wrap(err, "tcp: init socket object"); } - + return err; } @@ -668,7 +669,7 @@ void SrsTcpClient::close() if (!io) { return; } - + srs_close_stfd(stfd); }