-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathAsioSocketInitiator.cpp
147 lines (121 loc) · 4.54 KB
/
AsioSocketInitiator.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
/*
AsioQuickfix (c) by Michiel van Slobbe
AsioQuickfix is licensed under a
Creative Commons Attribution-ShareAlike 3.0 Unported License.
You should have received a copy of the license along with this
work. If not, see <http://creativecommons.org/licenses/by-sa/3.0/>.
*/
#include "AsioSocketInitiator.hpp"
namespace FIX
{
const char * AsioSocketInitiator::m_connection_type_label ( "ConnectionType" );
const char * AsioSocketInitiator::m_initiator_label ( "initiator" );
AsioSocketInitiator::AsioSocketInitiator( boost::asio::io_service & service,
Application& application,
MessageStoreFactory& factory,
const SessionSettings& settings ) :
m_service ( service ),
m_application( application ),
m_messageStoreFactory( factory ),
m_settings( settings ),
m_pLogFactory( 0 ),
m_pLog( 0 )
{
initialize();
}
AsioSocketInitiator::~AsioSocketInitiator()
{
stop();
}
void AsioSocketInitiator::start()
{
}
void AsioSocketInitiator::stop()
{
for ( boost::unordered_map < std::string, AsioSocketConnection_ptr >::iterator iter = m_connections.begin();
iter != m_connections.end();
iter ++ )
iter->second->disconnect();
m_connections.clear();
}
// just like the Initiator::initialize function
void AsioSocketInitiator::initialize()
{
std::set < SessionID > sessions = m_settings.getSessions();
std::set < SessionID > ::iterator i;
if ( !sessions.size() )
throw ConfigError( "No sessions defined" );
SessionFactory factory( m_application, m_messageStoreFactory,
m_pLogFactory );
for ( i = sessions.begin(); i != sessions.end(); ++i )
{
if ( m_settings.get( *i ).getString( m_connection_type_label ).compare ( m_initiator_label ) == 0 )
{
m_sessions[ *i ] = factory.create( *i, m_settings.get( *i ) );
ConnectSession ( *i, m_settings.get( *i ) );
}
}
if ( !m_sessions.size() )
throw ConfigError( "No sessions defined for initiator" );
}
void AsioSocketInitiator::ConnectSession( const SessionID& s, const Dictionary& d )
{
std::string address;
short port = 0;
Session* session = Session::lookupSession( s );
assert ( session != 0 ) ;
if( !session->isSessionTime(UtcTimeStamp()) ) return;
getHost( s, d, address, port );
boost::shared_ptr < boost::asio::ip::tcp::socket > socket ( new boost::asio::ip::tcp::socket ( m_service ) );
boost::asio::ip::tcp::resolver::query query ( boost::asio::ip::tcp::v4(),
address,
boost::lexical_cast < std::string > ( port ) );
boost::asio::ip::tcp::resolver resolver ( m_service );
boost::system::error_code ec;
boost::asio::ip::tcp::resolver::iterator iterator = resolver.resolve ( query, ec );
if ( ec != 0 )
throw new std::runtime_error ( ( boost::format ( "Unable to resolve [%1%][%2%]" ) % address % port ).str().c_str() );
assert ( socket.get() != 0 );
// we could have done this async, but it's easier this way ..
socket->connect ( *iterator, ec );
if ( ec != 0 )
throw new std::runtime_error ( ( boost::format ( "Unable to connect to [%1%][%2%]" ) % address % port ).str().c_str() );
AsioSocketConnection_ptr connection ( boost::make_shared < AsioSocketConnection > ( socket,
s,
session ) );
m_connections.insert ( std::make_pair ( s.toString(), connection ) );
}
void AsioSocketInitiator::getHost( const SessionID& s, const Dictionary& d,
std::string& address, short& port )
{
int num = 0;
SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
if ( i != m_sessionToHostNum.end() ) num = i->second;
std::stringstream hostStream;
hostStream << SOCKET_CONNECT_HOST << num;
std::string hostString = hostStream.str();
std::stringstream portStream;
std::string portString = portStream.str();
portStream << SOCKET_CONNECT_PORT << num;
if( d.has(hostString) && d.has(portString) )
{
address = d.getString( hostString );
port = ( short ) d.getLong( portString );
}
else
{
num = 0;
address = d.getString( SOCKET_CONNECT_HOST );
port = ( short ) d.getLong( SOCKET_CONNECT_PORT );
}
m_sessionToHostNum[ s ] = ++num;
}
void AsioSocketInitiator::onConnect( SocketConnector&, int socket )
{
}
void AsioSocketInitiator::onWrite( SocketConnector&, int socket ) {}
bool AsioSocketInitiator::onData( SocketConnector&, int socket ) { return true; }
void AsioSocketInitiator::onDisconnect( SocketConnector&, int socket ) {}
void AsioSocketInitiator::onError( SocketConnector& ) {}
void AsioSocketInitiator::onTimeout( SocketConnector& ) {}
}