SocketConnection.cpp
Go to the documentation of this file.
1/****************************************************************************
2** Copyright (c) 2001-2014
3**
4** This file is part of the QuickFIX FIX Engine
5**
6** This file may be distributed under the terms of the quickfixengine.org
7** license as defined by quickfixengine.org and appearing in the file
8** LICENSE included in the packaging of this file.
9**
10** This file is provided AS IS with NO WARRANTY OF ANY KIND, INCLUDING THE
11** WARRANTY OF DESIGN, MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
12**
13** See http://www.quickfixengine.org/LICENSE for licensing information.
14**
15** Contact ask@quickfixengine.org if any conditions of this licensing are
16** not clear to you.
17**
18****************************************************************************/
19
20#ifdef _MSC_VER
21#include "stdafx.h"
22#else
23#include "config.h"
24#endif
25
26#include "SocketConnection.h"
27#include "SocketAcceptor.h"
28#include "SocketConnector.h"
29#include "SocketInitiator.h"
30#include "Session.h"
31#include "Utility.h"
32
33namespace FIX
34{
36 SocketMonitor* pMonitor )
37: m_socket( s ), m_sendLength( 0 ),
38 m_sessions(sessions), m_pSession( 0 ), m_pMonitor( pMonitor )
39{
40 FD_ZERO( &m_fds );
41 FD_SET( m_socket, &m_fds );
42}
43
45 const SessionID& sessionID, int s,
46 SocketMonitor* pMonitor )
47: m_socket( s ), m_sendLength( 0 ),
48 m_pSession( i.getSession( sessionID, *this ) ),
49 m_pMonitor( pMonitor )
50{
51 FD_ZERO( &m_fds );
52 FD_SET( m_socket, &m_fds );
53 m_sessions.insert( sessionID );
54}
55
61
62bool SocketConnection::send( const std::string& msg )
63{
64 Locker l( m_mutex );
65
66 m_sendQueue.push_back( msg );
68 signal();
69 return true;
70}
71
73{
74 Locker l( m_mutex );
75
76 if( !m_sendQueue.size() ) return true;
77
78 struct timeval timeout = { 0, 0 };
79 fd_set writeset = m_fds;
80 if( select( 1 + m_socket, 0, &writeset, 0, &timeout ) <= 0 )
81 return false;
82
83 const std::string& msg = m_sendQueue.front();
84
85 ssize_t result = socket_send
86 ( m_socket, msg.c_str() + m_sendLength, msg.length() - m_sendLength );
87
88 if( result > 0 )
89 m_sendLength += result;
90
91 if( m_sendLength == msg.length() )
92 {
93 m_sendLength = 0;
94 m_sendQueue.pop_front();
95 }
96
97 return !m_sendQueue.size();
98}
99
105
107{
108 if ( !m_pSession ) return false;
109
110 try
111 {
114 }
115 catch( SocketRecvFailed& e )
116 {
117 m_pSession->getLog()->onEvent( e.what() );
118 return false;
119 }
120 return true;
121}
122
124{
125 std::string msg;
126 try
127 {
128 if ( !m_pSession )
129 {
130 struct timeval timeout = { 1, 0 };
131 fd_set readset = m_fds;
132
133 while( !readMessage( msg ) )
134 {
135 int result = select( 1 + m_socket, &readset, 0, 0, &timeout );
136 if( result > 0 )
138 else if( result == 0 )
139 return false;
140 else if( result < 0 )
141 return false;
142 }
143
144 m_pSession = Session::lookupSession( msg, true );
145 if( !isValidSession() )
146 {
147 m_pSession = 0;
148 if( a.getLog() )
149 {
150 a.getLog()->onEvent( "Session not found for incoming message: " + msg );
151 a.getLog()->onIncoming( msg );
152 }
153 }
154 if( m_pSession )
155 m_pSession = a.getSession( msg, *this );
156 if( m_pSession )
157 m_pSession->next( msg, UtcTimeStamp() );
158 if( !m_pSession )
159 {
160 s.getMonitor().drop( m_socket );
161 return false;
162 }
163
165 return true;
166 }
167 else
168 {
171 return true;
172 }
173 }
174 catch ( SocketRecvFailed& e )
175 {
176 if( m_pSession )
177 m_pSession->getLog()->onEvent( e.what() );
178 s.getMonitor().drop( m_socket );
179 }
180 catch ( InvalidMessage& )
181 {
182 s.getMonitor().drop( m_socket );
183 }
184 return false;
185}
186
188{
189 if( m_pSession == 0 )
190 return false;
191 SessionID sessionID = m_pSession->getSessionID();
192 if( Session::isSessionRegistered(sessionID) )
193 return false;
194 return !( m_sessions.find(sessionID) == m_sessions.end() );
195}
196
198throw( SocketRecvFailed )
199{
200 ssize_t size = socket_recv( m_socket, m_buffer, sizeof(m_buffer) );
201 if( size <= 0 ) throw SocketRecvFailed( size );
203}
204
205bool SocketConnection::readMessage( std::string& msg )
206{
207 try
208 {
209 return m_parser.readFixMessage( msg );
210 }
211 catch ( MessageParseError& ) {}
212 return true;
213}
214
216{
217 if( !m_pSession ) return;
218
219 std::string msg;
220 while( readMessage( msg ) )
221 {
222 try
223 {
224 m_pSession->next( msg, UtcTimeStamp() );
225 }
226 catch ( InvalidMessage& )
227 {
228 if( !m_pSession->isLoggedOn() )
229 s.drop( m_socket );
230 }
231 }
232}
233
238} // namespace FIX
Session * getSession(const std::string &msg, Responder &)
Definition Acceptor.cpp:104
Log * getLog()
Definition Acceptor.h:59
Locks/Unlocks a mutex using RAII.
Definition Mutex.h:96
virtual void onIncoming(const std::string &)=0
virtual void onEvent(const std::string &)=0
void addToStream(const char *str, size_t len)
Definition Parser.h:48
bool readFixMessage(std::string &str)
Definition Parser.cpp:59
static Session * registerSession(const SessionID &)
Definition Session.cpp:1537
static void unregisterSession(const SessionID &)
Definition Session.cpp:1547
static Session * lookupSession(const SessionID &)
Definition Session.cpp:1496
const SessionID & getSessionID() const
Definition Session.h:75
Log * getLog()
Definition Session.h:227
void next()
Definition Session.cpp:125
static bool isSessionRegistered(const SessionID &)
Definition Session.cpp:1531
bool isLoggedOn()
Definition Session.h:65
Unique session id consists of BeginString, SenderCompID and TargetCompID.
Definition SessionID.h:31
Socket implementation of Acceptor.
void readMessages(SocketMonitor &s)
SocketMonitor * m_pMonitor
bool send(const std::string &)
bool readMessage(std::string &msg)
std::set< SessionID > Sessions
bool read(SocketConnector &s)
SocketConnection(int s, Sessions sessions, SocketMonitor *pMonitor)
Connects sockets to remote ports and addresses.
SocketMonitor & getMonitor()
Socket implementation of Initiator.
Monitors events on a collection of sockets.
bool drop(int socket)
Listens for and accepts incoming socket connections on a port.
SocketMonitor & getMonitor()
Date and Time represented in UTC.
Definition FieldTypes.h:583
ssize_t socket_recv(int s, char *buf, size_t length)
Definition Utility.cpp:170
ssize_t socket_send(int s, const char *msg, size_t length)
Definition Utility.cpp:175
Not a recognizable message.
Definition Exceptions.h:81
Unable to parse message.
Definition Exceptions.h:74
Socket recv operation failed.
Definition Exceptions.h:279

Generated on Sat Feb 3 2024 04:23:15 for QuickFIX by doxygen 1.9.8 written by Dimitri van Heesch, © 1997-2001