Skip to content

Commit

Permalink
Refactoring changes as a preparation for file transfer support (conge…
Browse files Browse the repository at this point in the history
…stion control)
  • Loading branch information
Mikołaj Małecki committed Sep 13, 2017
1 parent c91d5f7 commit 9c7c2fe
Show file tree
Hide file tree
Showing 31 changed files with 3,074 additions and 3,084 deletions.
320 changes: 46 additions & 274 deletions apps/siplex.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,14 @@

#define REQUIRE_CXX11 1

#include "../common/appcommon.hpp" // CreateAddrInet
#include "../common/uriparser.hpp" // UriParser
#include "../common/socketoptions.hpp"
#include "../common/logsupport.hpp"
#include "../common/transmitbase.hpp"
#include "../common/transmitmedia.hpp"
#include "../common/netinet_any.h"
#include "../common/threadname.h"
#include "appcommon.hpp" // CreateAddrInet
#include "uriparser.hpp" // UriParser
#include "socketoptions.hpp"
#include "logsupport.hpp"
#include "transmitbase.hpp"
#include "transmitmedia.hpp"
#include "netinet_any.h"
#include "threadname.h"

#include <srt.h>
#include <logging.h>
Expand All @@ -35,78 +35,15 @@ using namespace std;
// The length of the SRT payload used in srt_recvmsg call.
// So far, this function must be used and up to this length of payload.
const size_t DEFAULT_CHUNK = 1316;
const size_t DEF_MAX_STREAMS = 10;

const logging::LogFA SRT_LOGFA_APP = 10;
logging::Logger applog(SRT_LOGFA_APP, &srt_logger_config, "siplex");

// Some utilities borrowed from tumux, as this is using options
// similar way.
template <class Container, class Value = typename Container::value_type, typename... Args> inline
std::string Printable(const Container& in, Value /*pseudoargument*/, Args&&... args)
{
std::ostringstream os;
Print(os, args...);
os << "[ ";
for (auto i: in)
os << Value(i) << " ";
os << "]";
return os.str();
}

template <class Container> inline
std::string Printable(const Container& in)
{
using Value = typename Container::value_type;
return Printable(in, Value());
}

template <class OutputIterator>
inline void Split(const std::string & str, char delimiter, OutputIterator tokens)
{
if ( str.empty() )
return; // May cause crash and won't extract anything anyway

std::size_t start;
std::size_t end = -1;

do
{
start = end + 1;
end = str.find(delimiter, start);
*tokens = str.substr(
start,
(end == std::string::npos) ? std::string::npos : end - start);
++tokens;
} while (end != std::string::npos);
}

template <class It>
inline size_t safe_advance(It& it, size_t num, It end)
{
while ( it != end && num )
{
--num;
++it;
}

return num; // will be effectively 0, if reached the required point, or >0, if end was by that number earlier
}

template<typename Map, typename Key>
auto map_get(Map& m, const Key& key, typename Map::mapped_type def = typename Map::mapped_type()) -> typename Map::mapped_type
{
auto it = m.find(key);
return it == m.end() ? def : it->second;
}



volatile bool int_state = false;
volatile bool siplex_int_state = false;
void OnINT_SetIntState(int)
{
cerr << "\n-------- REQUESTED INTERRUPT!\n";
int_state = true;
siplex_int_state = true;
if ( transmit_throw_on_interrupt )
throw std::runtime_error("Requested exception interrupt");
}
Expand Down Expand Up @@ -199,7 +136,7 @@ struct MediumPair
break;
}
sout << " sent";
if ( int_state )
if ( siplex_int_state )
{
sout << " --- (interrupted on request)";
applog.Note() << sout.str();
Expand Down Expand Up @@ -269,151 +206,6 @@ class MediaBase

} g_media_base;


// This class is used when we don't know yet whether the given URI
// designates an effective listener or caller. So we create it, initialize,
// then we know what mode we'll be using.
//
// When caller, then we will do connect() using this object, then clone out
// a new object - of a direction specific class - which will steal the socket
// from this one and then roll the data. After this, this object is ready
// to connect again, and will create its own socket for that occasion, and
// the whole procedure repeats.
//
// When listener, then this object will be doing accept() and with every
// successful acceptation it will clone out a new object - of a direction
// specific class - which will steal just the connection socket from this
// object. This object will still live on and accept new connections and
// so on.
class SrtModel: public SrtCommon
{
public:
bool is_caller = false;
string m_host;
int m_port = 0;


SrtModel(string host, int port, map<string,string> par)
{
InitParameters(host, par);
if (m_mode == "caller")
is_caller = true;
else if (m_mode != "listener")
throw std::invalid_argument("Only caller and listener modes supported");

m_host = host;
m_port = port;
}

bool Establish(ref_t<std::string> name)
{
// This does connect or accept.
// When this returned true, the caller should create
// a new SrtSource or SrtTaget then call StealFrom(*this) on it.

// If this is a connector and the peer doesn't have a corresponding
// medium, it should send back a single byte with value 0. This means
// that agent should stop connecting.

if (is_caller)
{
// Establish a connection

PrepareClient();

if (name.get() != "")
{
applog.Note() << "Connect with requesting stream [" << name.get() << "]";
UDT::setstreamid(m_sock, name);
}
else
{
applog.Warn() << "NO STREAM ID for SRT connection";
}

if (m_outgoing_port)
{
applog.Note() << "Setting outgoing port: " << m_outgoing_port;
SetupAdapter("", m_outgoing_port);
}

ConnectClient(m_host, m_port);

if (m_outgoing_port == 0)
{
// Must rely on a randomly selected one. Extract the port
// so that it will be reused next time.
sockaddr_any s(AF_INET);
int namelen = s.size();
if ( srt_getsockname(Socket(), &s, &namelen) == SRT_ERROR )
{
Error(UDT::getlasterror(), "srt_getsockname");
}

m_outgoing_port = s.hport();
applog.Note() << "Extracted outgoing port: " << m_outgoing_port << " - will reuse it for next connections";
}
}
else
{
// Listener - get a socket by accepting.
// Check if the listener is already created first
if (Listener() == SRT_INVALID_SOCK)
{
applog.Note() << "Setting up listener: port=" << m_port << " backlog=5";
PrepareListener(m_adapter, m_port, 5);
}

applog.Note() << "Accepting a client...";
AcceptNewClient();
// This rewrites m_sock with a new SRT socket ("accepted" socket)
name = UDT::getstreamid(m_sock);
applog.Note() << "... GOT CLIENT for stream [" << name.get() << "]";
}

return true;
}

void Stall()
{
// Call this function if everything is running in their own
// threads and there's nothing more to run. Check periodically
// if all threads are still alive, quit if all are dead.

while (!int_state)
{
this_thread::sleep_for(chrono::seconds(1));

// Check all cars if any crashed
for (auto i = g_media_base.media.begin(), i_next = i; i != g_media_base.media.end(); i = i_next)
{
++i_next;
if (i->has_quit)
{
applog.Note() << "Found QUIT mediumpair: " << i->name << " - removing from base";
i->Stop();
g_media_base.media.erase(i);
}
}

if (g_media_base.media.empty())
{
applog.Note() << "All media have quit. Marking exit.";
break;
}
}
}

void Close()
{
if (m_sock != SRT_INVALID_SOCK)
{
srt_close(m_sock);
m_sock = SRT_INVALID_SOCK;
}
}
};

string ResolveFilePattern(int number)
{
vector<string> parts;
Expand Down Expand Up @@ -572,46 +364,37 @@ bool SelectAndLink(SrtModel& m, string id, bool mode_output)
return true;
}

string Join(const vector<string>& in, string sep)
{
if ( in.empty() )
return "";

ostringstream os;

os << in[0];
for (auto i = in.begin()+1; i != in.end(); ++i)
os << sep << *i;
return os.str();
}

typedef map<string,vector<string>> options_t;

struct OutList
void Stall()
{
typedef vector<string> type;
static type process(const options_t::mapped_type& i) { return i; }
};

struct OutString
{
typedef string type;
static type process(const options_t::mapped_type& i) { return Join(i, " "); }
};
// Call this function if everything is running in their own
// threads and there's nothing more to run. Check periodically
// if all threads are still alive, quit if all are dead.

while (!siplex_int_state)
{
this_thread::sleep_for(chrono::seconds(1));

template <class OutType, class OutValue >
typename OutType::type Option(const options_t&, OutValue deflt=OutValue()) { return deflt; }
// Check all cars if any crashed
for (auto i = g_media_base.media.begin(), i_next = i; i != g_media_base.media.end(); i = i_next)
{
++i_next;
if (i->has_quit)
{
Verb() << "Found QUIT mediumpair: " << i->name << " - removing from base";
i->Stop();
g_media_base.media.erase(i);
}
}

template <class OutType, class OutValue, class... Args>
typename OutType::type Option(const options_t& options, OutValue deflt, string key, Args... further_keys)
{
auto i = options.find(key);
if ( i == options.end() )
return Option<OutType>(options, deflt, further_keys...);
return OutType::process(i->second);
if (g_media_base.media.empty())
{
Verb() << "All media have quit. Marking exit.";
break;
}
}
}


void Usage(string program)
{
cerr << "Usage: " << program << " <SRT URI> [-i INPUT...] [-o OUTPUT...]\n";
Expand Down Expand Up @@ -672,25 +455,16 @@ int main( int argc, char** argv )
}
} cleanupobj;

vector<string> args;
copy(argv+1, argv+argc, back_inserter(args));
//vector<string> args;
//copy(argv+1, argv+argc, back_inserter(args));

// Check options
map<string, vector<string>> params;

string current_key = "";

for (string a: args)
{
if ( a[0] == '-' )
{
current_key = a.substr(1);
params[current_key].clear();
continue;
}

params[current_key].push_back(a);
}
vector<OptionScheme> optargs = {
{ {"ll", "loglevel"}, OptionScheme::ARG_ONE },
{ {"i"}, OptionScheme::ARG_VAR },
{ {"o"}, OptionScheme::ARG_VAR }
};
map<string, vector<string>> params = ProcessOptions(argv, argc, optargs);

// The call syntax is:
//
Expand Down Expand Up @@ -812,9 +586,7 @@ int main( int argc, char** argv )
for(;;)
{
string id = *ids.begin();
bool done = m.Establish(Ref(id));
if ( !done )
break;
m.Establish(Ref(id));

// The 'id' could have been altered.
// If Establish did connect(), then it gave this stream id,
Expand All @@ -836,7 +608,7 @@ int main( int argc, char** argv )
}

applog.Note() << "All local stream definitions covered. Waiting for interrupt/broken all connections.";
m.Stall();
Stall();
}
catch (std::exception& x)
{
Expand Down
Loading

0 comments on commit 9c7c2fe

Please sign in to comment.