Skip to content

Commit

Permalink
[apps] Fixed srt-file-transit (issue #645) (#658)
Browse files Browse the repository at this point in the history
* [apps] Fixed srt-file-transit (issue #645)
  • Loading branch information
maxsharabayko authored and rndi committed Apr 23, 2019
1 parent 3ff0865 commit f5f52c0
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 77 deletions.
9 changes: 4 additions & 5 deletions apps/srt-file-transmit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ int parse_args(FileTransmitConfig &cfg, int argc, char** argv)
return 2;
}

cfg.chunk_size = stoul(Option<OutString>(params, "1316", o_chunk));
cfg.chunk_size = stoul(Option<OutString>(params, "1456", o_chunk));
cfg.skip_flushing = Option<OutBool>(params, false, o_no_flush);
cfg.bw_report = stoi(Option<OutString>(params, "0", o_bwreport));
cfg.stats_report = stoi(Option<OutString>(params, "0", o_statsrep));
Expand Down Expand Up @@ -584,24 +584,23 @@ bool DoDownload(UriParser& us, string directory, string filename,
if (connected)
{
vector<char> buf(cfg.chunk_size);
int n;

if(!ofile.is_open())
if (!ofile.is_open())
{
const char * fn = id.empty() ? filename.c_str() : id.c_str();
directory.append("/");
directory.append(fn);
ofile.open(directory.c_str(), ios::out | ios::trunc | ios::binary);

if(!ofile.is_open())
if (!ofile.is_open())
{
cerr << "Error opening file [" << directory << "]" << endl;
goto exit;
}
cerr << "Writing output to [" << directory << "]" << endl;
}

n = src->Read(cfg.chunk_size, buf, out_stats);
int n = src->Read(cfg.chunk_size, buf, out_stats);
if (n == SRT_ERROR)
{
cerr << "Download: SRT error: " << srt_getlasterror_str() << endl;
Expand Down
16 changes: 15 additions & 1 deletion apps/srt-live-transmit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -712,10 +712,24 @@ int main(int argc, char** argv)
{
std::shared_ptr<bytevector> pdata(
new bytevector(cfg.chunk_size));
if (!src->Read(cfg.chunk_size, *pdata, out_stats) || (*pdata).empty())

const int res = src->Read(cfg.chunk_size, *pdata, out_stats);

if (res == SRT_ERROR && src->uri.type() == UriParser::SRT)
{
if (srt_getlasterror(NULL) == SRT_EASYNCRCV)
break;

throw std::runtime_error(
string("error: recvmsg: ") + string(srt_getlasterror_str())
);
}

if (res == 0 || pdata->empty())
{
break;
}

dataqueue.push_back(pdata);
receivedBytes += (*pdata).size();
}
Expand Down
12 changes: 7 additions & 5 deletions apps/srt-multiplex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,8 @@ struct MediumPair

if (!initial_portion.empty())
{
tar->Write(initial_portion);
if ( tar->Broken() )
tar->Write(initial_portion.data(), initial_portion.size());
if (tar->Broken())
{
applog.Note() << "OUTPUT BROKEN for loop: " << name;
return;
Expand All @@ -121,7 +121,9 @@ struct MediumPair
ostringstream sout;
alarm(1);
bytevector data;
src->Read(chunk, data);
const int read_res = src->Read(chunk, data);


alarm(0);
if (alarm_state)
{
Expand All @@ -138,8 +140,8 @@ struct MediumPair
applog.Note() << sout.str();
break;
}
tar->Write(data);
if ( tar->Broken() )
tar->Write(data.data(), data.size());
if (tar->Broken())
{
sout << " OUTPUT broken";
applog.Note() << sout.str();
Expand Down
5 changes: 2 additions & 3 deletions apps/transmitbase.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Location
class Source: public Location
{
public:
virtual bool Read(size_t chunk, bytevector& data, std::ostream &out_stats = std::cout) = 0;
virtual int Read(size_t chunk, bytevector& data, std::ostream &out_stats = std::cout) = 0;
virtual bool IsOpen() = 0;
virtual bool End() = 0;
static std::unique_ptr<Source> Create(const std::string& url);
Expand All @@ -65,8 +65,7 @@ class Source: public Location
class Target: public Location
{
public:
virtual int Write(const char* data, size_t size, std::ostream &out_stats = std::cout) = 0;
virtual bool Write(const bytevector& portion) = 0;
virtual int Write(const char* data, size_t size, std::ostream &out_stats = std::cout) = 0;
virtual bool IsOpen() = 0;
virtual bool Broken() = 0;
virtual void Close() {}
Expand Down
84 changes: 23 additions & 61 deletions apps/transmitmedia.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FileSource: public Source
throw std::runtime_error(path + ": Can't open file for reading");
}

bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
{
if (data.size() < chunk)
data.resize(chunk);
Expand All @@ -67,12 +67,12 @@ class FileSource: public Source
if ( nread < data.size() )
data.resize(nread);

if ( data.empty() )
if (data.empty())
{
return false;
return 0;
}

return true;
return (int) nread;
}

bool IsOpen() override { return bool(ifile); }
Expand All @@ -86,16 +86,10 @@ class FileTarget: public Target

FileTarget(const string& path): ofile(path, ios::out | ios::trunc | ios::binary) {}

bool Write(const bytevector& data) override
{
ofile.write(data.data(), data.size());
return !(ofile.bad());
}

int Write(const char* data, size_t size, ostream &SRT_ATR_UNUSED = cout) override
{
ofile.write(data, size);
return !(ofile.bad()) ? size : 0;
return !(ofile.bad()) ? (int) size : 0;
}

bool IsOpen() override { return !!ofile; }
Expand Down Expand Up @@ -629,38 +623,22 @@ SrtSource::SrtSource(string host, int port, const map<string,string>& par)
hostport_copy = os.str();
}

bool SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats)
int SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats)
{
static unsigned long counter = 1;

if (data.size() < chunk)
data.resize(chunk);

bool ready = true;
int stat;
do
const int stat = srt_recvmsg(m_sock, data.data(), (int) chunk);
if (stat <= 0)
{
stat = srt_recvmsg(m_sock, data.data(), chunk);
if ( stat == SRT_ERROR )
{
// EAGAIN for SRT READING
if ( srt_getlasterror(NULL) == SRT_EASYNCRCV )
{
data.clear();
return false;
}
Error(UDT::getlasterror(), "recvmsg");
}

if ( stat == 0 )
{
throw ReadEOF(hostport_copy);
}
data.clear();
return stat;
}
while (!ready);

chunk = size_t(stat);
if ( chunk < data.size() )
if (chunk < data.size())
data.resize(chunk);

const bool need_bw_report = transmit_bw_report && (counter % transmit_bw_report) == transmit_bw_report - 1;
Expand All @@ -682,7 +660,7 @@ bool SrtSource::Read(size_t chunk, bytevector& data, ostream &out_stats)

++counter;

return true;
return stat;
}

int SrtTarget::ConfigurePre(SRTSOCKET sock)
Expand All @@ -707,7 +685,7 @@ int SrtTarget::Write(const char* data, size_t size, ostream &out_stats)
{
static unsigned long counter = 1;

int stat = srt_sendmsg2(m_sock, data, size, nullptr);
int stat = srt_sendmsg2(m_sock, data, (int) size, nullptr);
if (stat == SRT_ERROR)
{
return stat;
Expand Down Expand Up @@ -735,10 +713,6 @@ int SrtTarget::Write(const char* data, size_t size, ostream &out_stats)
return stat;
}

bool SrtTarget::Write(const bytevector& data)
{
return -1 != Write(data.data(), data.size());
}

SrtModel::SrtModel(string host, int port, map<string,string> par)
{
Expand Down Expand Up @@ -840,25 +814,23 @@ class ConsoleSource: public Source
#endif
}

bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
{
if (data.size() < chunk)
data.resize(chunk);

bool st = cin.read(data.data(), chunk).good();
chunk = cin.gcount();
if ( chunk == 0 && !st )
if (chunk == 0 || !st)
{
data.clear();
return false;
return 0;
}

if ( chunk < data.size() )
if (chunk < data.size())
data.resize(chunk);
if ( data.empty() )
return false;

return true;
return (int) chunk;
}

bool IsOpen() override { return cin.good(); }
Expand All @@ -882,12 +854,7 @@ class ConsoleTarget: public Target
int Write(const char* data, size_t len, ostream &SRT_ATR_UNUSED = cout) override
{
cout.write(data, len);
return len;
}

bool Write(const bytevector& data) override
{
return 0 != Write(data.data(), data.size());
return (int) len;
}

bool IsOpen() override { return cout.good(); }
Expand Down Expand Up @@ -1109,27 +1076,27 @@ class UdpSource: public Source, public UdpCommon
eof = false;
}

bool Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
int Read(size_t chunk, bytevector& data, ostream &SRT_ATR_UNUSED = cout) override
{
if (data.size() < chunk)
data.resize(chunk);

sockaddr_in sa;
socklen_t si = sizeof(sockaddr_in);
int stat = recvfrom(m_sock, data.data(), chunk, 0, (sockaddr*)&sa, &si);
if ( stat < 1 )
if (stat < 1)
{
if (SysError() != EWOULDBLOCK)
eof = true;
data.clear();
return false;
return stat;
}

chunk = size_t(stat);
if ( chunk < data.size() )
data.resize(chunk);

return true;
return stat;
}

bool IsOpen() override { return m_sock != -1; }
Expand Down Expand Up @@ -1158,11 +1125,6 @@ class UdpTarget: public Target, public UdpCommon
return stat;
}

bool Write(const bytevector& data) override
{
return -1 != Write(data.data(), data.size());
}

bool IsOpen() override { return m_sock != -1; }
bool Broken() override { return false; }

Expand Down
3 changes: 1 addition & 2 deletions apps/transmitmedia.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class SrtSource: public Source, public SrtCommon
// Do nothing - create just to prepare for use
}

bool Read(size_t chunk, bytevector& data, ostream& out_stats = cout) override;
int Read(size_t chunk, bytevector& data, ostream& out_stats = cout) override;

/*
In this form this isn't needed.
Expand Down Expand Up @@ -135,7 +135,6 @@ class SrtTarget: public Target, public SrtCommon

int ConfigurePre(SRTSOCKET sock) override;
int Write(const char* data, size_t size, ostream &out_stats = cout) override;
bool Write(const bytevector& data) override;
bool IsOpen() override { return IsUsable(); }
bool Broken() override { return IsBroken(); }
void Close() override { return SrtCommon::Close(); }
Expand Down

0 comments on commit f5f52c0

Please sign in to comment.