Public Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
FIX::ThreadedSocketInitiator Class Reference

Threaded Socket implementation of Initiator. More...

#include <ThreadedSocketInitiator.h>

Inheritance diagram for FIX::ThreadedSocketInitiator:
Inheritance graph
[legend]
Collaboration diagram for FIX::ThreadedSocketInitiator:
Collaboration graph
[legend]

Public Member Functions

 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 ThreadedSocketInitiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~ThreadedSocketInitiator ()
 
- Public Member Functions inherited from FIX::Initiator
 Initiator (Application &, MessageStoreFactory &, const SessionSettings &) throw ( ConfigError )
 
 Initiator (Application &, MessageStoreFactory &, const SessionSettings &, LogFactory &) throw ( ConfigError )
 
virtual ~Initiator ()
 
void start () throw ( ConfigError, RuntimeError )
 Start initiator.
 
void block () throw ( ConfigError, RuntimeError )
 Block on the initiator.
 
bool poll (double timeout=0.0) throw ( ConfigError, RuntimeError )
 Poll the initiator.
 
void stop (bool force=false)
 Stop initiator.
 
bool isLoggedOn ()
 Check to see if any sessions are currently logged on.
 
SessiongetSession (const SessionID &sessionID, Responder &)
 
const std::set< SessionID > & getSessions () const
 
SessiongetSession (const SessionID &sessionID) const
 
const Dictionary *const getSessionSettings (const SessionID &sessionID) const
 
bool has (const SessionID &id)
 
bool isStopped ()
 
ApplicationgetApplication ()
 
MessageStoreFactorygetMessageStoreFactory ()
 
LoggetLog ()
 

Private Types

typedef std::map< int, thread_idSocketToThread
 
typedef std::map< SessionID, int > SessionToHostNum
 
typedef std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection * > ThreadPair
 

Private Member Functions

void onConfigure (const SessionSettings &) throw ( ConfigError )
 Implemented to configure acceptor.
 
void onInitialize (const SessionSettings &) throw ( RuntimeError )
 Implemented to initialize initiator.
 
void onStart ()
 Implemented to start connecting to targets.
 
bool onPoll (double timeout)
 Implemented to connect and poll for events.
 
void onStop ()
 Implemented to stop a running initiator.
 
void doConnect (const SessionID &s, const Dictionary &d)
 Implemented to connect a session to its target.
 
void addThread (int s, thread_id t)
 
void removeThread (int s)
 
void lock ()
 
void getHost (const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
 

Static Private Member Functions

static THREAD_PROC socketThread (void *p)
 

Private Attributes

SessionSettings m_settings
 
SessionToHostNum m_sessionToHostNum
 
time_t m_lastConnect
 
int m_reconnectInterval
 
bool m_noDelay
 
int m_sendBufSize
 
int m_rcvBufSize
 
SocketToThread m_threads
 
Mutex m_mutex
 

Additional Inherited Members

- Protected Member Functions inherited from FIX::Initiator
void setPending (const SessionID &)
 
void setConnected (const SessionID &)
 
void setDisconnected (const SessionID &)
 
bool isPending (const SessionID &)
 
bool isConnected (const SessionID &)
 
bool isDisconnected (const SessionID &)
 
void connect ()
 
- Protected Attributes inherited from FIX::Initiator
SessionSettings m_settings
 

Detailed Description

Threaded Socket implementation of Initiator.

Definition at line 39 of file ThreadedSocketInitiator.h.

Member Typedef Documentation

◆ SessionToHostNum

Definition at line 52 of file ThreadedSocketInitiator.h.

◆ SocketToThread

typedef std::map< int, thread_id > FIX::ThreadedSocketInitiator::SocketToThread
private

Definition at line 51 of file ThreadedSocketInitiator.h.

◆ ThreadPair

Definition at line 53 of file ThreadedSocketInitiator.h.

Constructor & Destructor Documentation

◆ ThreadedSocketInitiator() [1/2]

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings 
)
throw (ConfigError
)

Definition at line 32 of file ThreadedSocketInitiator.cpp.

36: Initiator( application, factory, settings ),
37 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
38 m_sendBufSize( 0 ), m_rcvBufSize( 0 )
39{
40 socket_init();
41}
Initiator(Application &, MessageStoreFactory &, const SessionSettings &)
Definition Initiator.cpp:36
void socket_init()
Definition Utility.cpp:81

References FIX::socket_init().

◆ ThreadedSocketInitiator() [2/2]

FIX::ThreadedSocketInitiator::ThreadedSocketInitiator ( Application application,
MessageStoreFactory factory,
const SessionSettings settings,
LogFactory logFactory 
)
throw (ConfigError
)

Definition at line 43 of file ThreadedSocketInitiator.cpp.

48: Initiator( application, factory, settings, logFactory ),
49 m_lastConnect( 0 ), m_reconnectInterval( 30 ), m_noDelay( false ),
50 m_sendBufSize( 0 ), m_rcvBufSize( 0 )
51{
52 socket_init();
53}

References FIX::socket_init().

◆ ~ThreadedSocketInitiator()

FIX::ThreadedSocketInitiator::~ThreadedSocketInitiator ( )
virtual

Definition at line 55 of file ThreadedSocketInitiator.cpp.

56{
57 socket_term();
58}
void socket_term()
Definition Utility.cpp:96

References FIX::socket_term().

Member Function Documentation

◆ addThread()

void FIX::ThreadedSocketInitiator::addThread ( int  s,
thread_id  t 
)
private

Definition at line 183 of file ThreadedSocketInitiator.cpp.

184{
185 Locker l(m_mutex);
186
187 m_threads[ s ] = t;
188}

References m_mutex, and m_threads.

Referenced by doConnect().

◆ doConnect()

void FIX::ThreadedSocketInitiator::doConnect ( const SessionID ,
const Dictionary  
)
privatevirtual

Implemented to connect a session to its target.

Implements FIX::Initiator.

Definition at line 133 of file ThreadedSocketInitiator.cpp.

134{
135 try
136 {
137 Session* session = Session::lookupSession( s );
138 if( !session->isSessionTime(UtcTimeStamp()) ) return;
139
140 Log* log = session->getLog();
141
142 std::string address;
143 short port = 0;
144 std::string sourceAddress;
145 short sourcePort = 0;
146 getHost( s, d, address, port, sourceAddress, sourcePort );
147
148 int socket = socket_createConnector();
149 if( m_noDelay )
150 socket_setsockopt( socket, TCP_NODELAY );
151 if( m_sendBufSize )
152 socket_setsockopt( socket, SO_SNDBUF, m_sendBufSize );
153 if( m_rcvBufSize )
154 socket_setsockopt( socket, SO_RCVBUF, m_rcvBufSize );
155
156 setPending( s );
157 log->onEvent( "Connecting to " + address + " on port " + IntConvertor::convert((unsigned short)port) + " (Source " + sourceAddress + ":" + IntConvertor::convert((unsigned short)sourcePort) + ")");
158
159 ThreadedSocketConnection* pConnection =
160 new ThreadedSocketConnection( s, socket, address, port, getLog(), sourceAddress, sourcePort );
161
162 ThreadPair* pair = new ThreadPair( this, pConnection );
163
164 {
165 Locker l( m_mutex );
166 thread_id thread;
167 if ( thread_spawn( &socketThread, pair, thread ) )
168 {
169 addThread( socket, thread );
170 }
171 else
172 {
173 delete pair;
174 pConnection->disconnect();
175 delete pConnection;
176 setDisconnected( s );
177 }
178 }
179 }
180 catch ( std::exception& ) {}
181}
void setPending(const SessionID &)
void setDisconnected(const SessionID &)
Log * getLog()
Definition Initiator.h:90
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496
std::pair< ThreadedSocketInitiator *, ThreadedSocketConnection * > ThreadPair
void getHost(const SessionID &, const Dictionary &, std::string &, short &, std::string &, short &)
static THREAD_PROC socketThread(void *p)
int socket_setsockopt(int s, int opt)
Definition Utility.cpp:208
int socket_createConnector()
Definition Utility.cpp:143
pthread_t thread_id
Definition Utility.h:190
bool thread_spawn(THREAD_START_ROUTINE func, void *var, thread_id &thread)
Definition Utility.cpp:416
static std::string convert(signed_int value)

References addThread(), FIX::IntConvertor::convert(), FIX::ThreadedSocketConnection::disconnect(), getHost(), FIX::Initiator::getLog(), FIX::Session::getLog(), FIX::Session::isSessionTime(), FIX::Session::lookupSession(), m_mutex, m_noDelay, m_rcvBufSize, m_sendBufSize, FIX::Log::onEvent(), FIX::Initiator::setDisconnected(), FIX::Initiator::setPending(), FIX::socket_createConnector(), FIX::socket_setsockopt(), socketThread(), and FIX::thread_spawn().

◆ getHost()

void FIX::ThreadedSocketInitiator::getHost ( const SessionID s,
const Dictionary d,
std::string &  address,
short &  port,
std::string &  sourceAddress,
short &  sourcePort 
)
private

Definition at line 240 of file ThreadedSocketInitiator.cpp.

243{
244 int num = 0;
245 SessionToHostNum::iterator i = m_sessionToHostNum.find( s );
246 if ( i != m_sessionToHostNum.end() ) num = i->second;
247
248 std::stringstream hostStream;
249 hostStream << SOCKET_CONNECT_HOST << num;
250 std::string hostString = hostStream.str();
251
252 std::stringstream portStream;
253 portStream << SOCKET_CONNECT_PORT << num;
254 std::string portString = portStream.str();
255
256 if( d.has(hostString) && d.has(portString) )
257 {
258 address = d.getString( hostString );
259 port = ( short ) d.getInt( portString );
260
261 std::stringstream sourceHostStream;
262 sourceHostStream << SOCKET_CONNECT_SOURCE_HOST << num;
263 hostString = sourceHostStream.str();
264 if( d.has(hostString) )
265 sourceAddress = d.getString( hostString );
266
267 std::stringstream sourcePortStream;
268 sourcePortStream << SOCKET_CONNECT_SOURCE_PORT << num;
269 portString = sourcePortStream.str();
270 if( d.has(portString) )
271 sourcePort = ( short ) d.getInt( portString );
272 }
273 else
274 {
275 num = 0;
276 address = d.getString( SOCKET_CONNECT_HOST );
277 port = ( short ) d.getInt( SOCKET_CONNECT_PORT );
278
279 if( d.has(SOCKET_CONNECT_SOURCE_HOST) )
280 sourceAddress = d.getString( SOCKET_CONNECT_SOURCE_HOST );
281 if( d.has(SOCKET_CONNECT_SOURCE_PORT) )
282 sourcePort = ( short ) d.getInt( SOCKET_CONNECT_SOURCE_PORT );
283 }
284
285 m_sessionToHostNum[ s ] = ++num;
286}
const char SOCKET_CONNECT_PORT[]
const char SOCKET_CONNECT_HOST[]
const char SOCKET_CONNECT_SOURCE_PORT[]
const char SOCKET_CONNECT_SOURCE_HOST[]

References FIX::Dictionary::getInt(), FIX::Dictionary::getString(), FIX::Dictionary::has(), m_sessionToHostNum, FIX::SOCKET_CONNECT_HOST, FIX::SOCKET_CONNECT_PORT, FIX::SOCKET_CONNECT_SOURCE_HOST, and FIX::SOCKET_CONNECT_SOURCE_PORT.

Referenced by doConnect().

◆ lock()

void FIX::ThreadedSocketInitiator::lock ( )
inlineprivate

Definition at line 66 of file ThreadedSocketInitiator.h.

66{ Locker l(m_mutex); }

References m_mutex.

Referenced by socketThread().

◆ onConfigure()

void FIX::ThreadedSocketInitiator::onConfigure ( const SessionSettings )
throw (ConfigError
)
privatevirtual

Implemented to configure acceptor.

Reimplemented from FIX::Initiator.

Definition at line 60 of file ThreadedSocketInitiator.cpp.

62{
63 const Dictionary& dict = s.get();
64
65 if( dict.has( RECONNECT_INTERVAL ) )
67 if( dict.has( SOCKET_NODELAY ) )
68 m_noDelay = dict.getBool( SOCKET_NODELAY );
69 if( dict.has( SOCKET_SEND_BUFFER_SIZE ) )
71 if( dict.has( SOCKET_RECEIVE_BUFFER_SIZE ) )
73}
const char SOCKET_SEND_BUFFER_SIZE[]
const char SOCKET_NODELAY[]
const char SOCKET_RECEIVE_BUFFER_SIZE[]
const char RECONNECT_INTERVAL[]

References FIX::Dictionary::getBool(), FIX::Dictionary::getInt(), FIX::Dictionary::has(), FIX::RECONNECT_INTERVAL, FIX::SOCKET_NODELAY, FIX::SOCKET_RECEIVE_BUFFER_SIZE, and FIX::SOCKET_SEND_BUFFER_SIZE.

◆ onInitialize()

void FIX::ThreadedSocketInitiator::onInitialize ( const SessionSettings )
throw (RuntimeError
)
privatevirtual

Implemented to initialize initiator.

Reimplemented from FIX::Initiator.

Definition at line 75 of file ThreadedSocketInitiator.cpp.

77{
78}

◆ onPoll()

bool FIX::ThreadedSocketInitiator::onPoll ( double  timeout)
privatevirtual

Implemented to connect and poll for events.

Implements FIX::Initiator.

Definition at line 98 of file ThreadedSocketInitiator.cpp.

99{
100 return false;
101}

◆ onStart()

void FIX::ThreadedSocketInitiator::onStart ( )
privatevirtual

Implemented to start connecting to targets.

Implements FIX::Initiator.

Definition at line 80 of file ThreadedSocketInitiator.cpp.

81{
82 while ( !isStopped() )
83 {
84 time_t now;
85 ::time( &now );
86
87 if ( (now - m_lastConnect) >= m_reconnectInterval )
88 {
89 Locker l( m_mutex );
90 connect();
91 m_lastConnect = now;
92 }
93
94 process_sleep( 1 );
95 }
96}
bool isStopped()
Definition Initiator.h:83
void process_sleep(double s)
Definition Utility.cpp:466

References FIX::Initiator::connect(), FIX::Initiator::isStopped(), m_lastConnect, m_mutex, m_reconnectInterval, and FIX::process_sleep().

◆ onStop()

void FIX::ThreadedSocketInitiator::onStop ( )
privatevirtual

Implemented to stop a running initiator.

Implements FIX::Initiator.

Definition at line 103 of file ThreadedSocketInitiator.cpp.

104{
105 SocketToThread threads;
106 SocketToThread::iterator i;
107
108 {
109 Locker l(m_mutex);
110
111 time_t start = 0;
112 time_t now = 0;
113
114 ::time( &start );
115 while ( isLoggedOn() )
116 {
117 if( ::time(&now) -5 >= start )
118 break;
119 }
120
121 threads = m_threads;
122 m_threads.clear();
123 }
124
125 for ( i = threads.begin(); i != threads.end(); ++i )
126 socket_close( i->first );
127
128 for ( i = threads.begin(); i != threads.end(); ++i )
129 thread_join( i->second );
130 threads.clear();
131}
bool isLoggedOn()
Check to see if any sessions are currently logged on.
void start()
Start initiator.
std::map< int, thread_id > SocketToThread
void socket_close(int s)
Definition Utility.cpp:180
void thread_join(thread_id thread)
Definition Utility.cpp:437

References FIX::Initiator::isLoggedOn(), m_mutex, m_threads, FIX::socket_close(), FIX::Initiator::start(), and FIX::thread_join().

◆ removeThread()

void FIX::ThreadedSocketInitiator::removeThread ( int  s)
private

Definition at line 190 of file ThreadedSocketInitiator.cpp.

191{
192 Locker l(m_mutex);
193 SocketToThread::iterator i = m_threads.find( s );
194
195 if ( i != m_threads.end() )
196 {
197 thread_detach( i->second );
198 m_threads.erase( i );
199 }
200}
void thread_detach(thread_id thread)
Definition Utility.cpp:447

References m_mutex, m_threads, and FIX::thread_detach().

Referenced by socketThread().

◆ socketThread()

THREAD_PROC FIX::ThreadedSocketInitiator::socketThread ( void *  p)
staticprivate

Definition at line 202 of file ThreadedSocketInitiator.cpp.

203{
204 ThreadPair * pair = reinterpret_cast < ThreadPair* > ( p );
205
206 ThreadedSocketInitiator* pInitiator = pair->first;
207 ThreadedSocketConnection* pConnection = pair->second;
208 FIX::SessionID sessionID = pConnection->getSession()->getSessionID();
209 FIX::Session* pSession = FIX::Session::lookupSession( sessionID );
210 int socket = pConnection->getSocket();
211 delete pair;
212
213 pInitiator->lock();
214
215 if( !pConnection->connect() )
216 {
217 pInitiator->getLog()->onEvent( "Connection failed" );
218 pConnection->disconnect();
219 delete pConnection;
220 pInitiator->removeThread( socket );
221 pInitiator->setDisconnected( sessionID );
222 return 0;
223 }
224
225 pInitiator->setConnected( sessionID );
226 pInitiator->getLog()->onEvent( "Connection succeeded" );
227
228 pSession->next();
229
230 while ( pConnection->read() ) {}
231
232 delete pConnection;
233 if( !pInitiator->isStopped() )
234 pInitiator->removeThread( socket );
235
236 pInitiator->setDisconnected( sessionID );
237 return 0;
238}
Maintains the state and implements the logic of a FIX session.
Definition Session.h:46
void next()
Definition Session.cpp:125
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition SessionID.h:31
ThreadedSocketInitiator(Application &, MessageStoreFactory &, const SessionSettings &)

References FIX::ThreadedSocketConnection::connect(), FIX::ThreadedSocketConnection::disconnect(), FIX::Initiator::getLog(), FIX::ThreadedSocketConnection::getSession(), FIX::Session::getSessionID(), FIX::ThreadedSocketConnection::getSocket(), FIX::Initiator::isStopped(), lock(), FIX::Session::lookupSession(), FIX::Session::next(), FIX::Log::onEvent(), FIX::ThreadedSocketConnection::read(), removeThread(), FIX::Initiator::setConnected(), and FIX::Initiator::setDisconnected().

Referenced by doConnect().

Member Data Documentation

◆ m_lastConnect

time_t FIX::ThreadedSocketInitiator::m_lastConnect
private

Definition at line 73 of file ThreadedSocketInitiator.h.

Referenced by onStart().

◆ m_mutex

Mutex FIX::ThreadedSocketInitiator::m_mutex
private

Definition at line 79 of file ThreadedSocketInitiator.h.

Referenced by addThread(), doConnect(), lock(), onStart(), onStop(), and removeThread().

◆ m_noDelay

bool FIX::ThreadedSocketInitiator::m_noDelay
private

Definition at line 75 of file ThreadedSocketInitiator.h.

Referenced by doConnect().

◆ m_rcvBufSize

int FIX::ThreadedSocketInitiator::m_rcvBufSize
private

Definition at line 77 of file ThreadedSocketInitiator.h.

Referenced by doConnect().

◆ m_reconnectInterval

int FIX::ThreadedSocketInitiator::m_reconnectInterval
private

Definition at line 74 of file ThreadedSocketInitiator.h.

Referenced by onStart().

◆ m_sendBufSize

int FIX::ThreadedSocketInitiator::m_sendBufSize
private

Definition at line 76 of file ThreadedSocketInitiator.h.

Referenced by doConnect().

◆ m_sessionToHostNum

SessionToHostNum FIX::ThreadedSocketInitiator::m_sessionToHostNum
private

Definition at line 72 of file ThreadedSocketInitiator.h.

Referenced by getHost().

◆ m_settings

SessionSettings FIX::ThreadedSocketInitiator::m_settings
private

Definition at line 71 of file ThreadedSocketInitiator.h.

◆ m_threads

SocketToThread FIX::ThreadedSocketInitiator::m_threads
private

Definition at line 78 of file ThreadedSocketInitiator.h.

Referenced by addThread(), onStop(), and removeThread().


The documentation for this class was generated from the following files:

Generated on Sat Feb 3 2024 04:23:15 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001