XRootD
XrdCl::Stream Class Reference

Stream. More...

#include <XrdClStream.hh>

+ Collaboration diagram for XrdCl::Stream:

Public Types

enum  StreamStatus {
  Disconnected = 0 ,
  Connected = 1 ,
  Connecting = 2 ,
  Error = 3
}
 Status of the stream. More...
 

Public Member Functions

 Stream (std::shared_ptr< URL > url, const URL &prefer=URL())
 Constructor. More...
 
 ~Stream ()
 Destructor. More...
 
bool CanCollapse (const URL &url)
 
void DisableIfEmpty (uint16_t subStream)
 Disables respective uplink if empty. More...
 
void Disconnect (bool force=false)
 Disconnect the stream. More...
 
XRootDStatus EnableLink (PathID &path)
 
void ForceConnect ()
 Force connection. More...
 
void ForceError (XRootDStatus status, bool hush=false)
 Force error. More...
 
const std::string & GetName () const
 Return stream name. More...
 
const URLGetURL () const
 Get the URL. More...
 
XRootDStatus Initialize ()
 Initializer. More...
 
uint16_t InspectStatusRsp (uint16_t stream, MsgHandler *&incHandler)
 
MsgHandlerInstallIncHandler (std::shared_ptr< Message > &msg, uint16_t stream)
 
void OnConnect (uint16_t subStream)
 Call back when a message has been reconstructed. More...
 
void OnConnectError (uint16_t subStream, XRootDStatus status)
 On connect error. More...
 
void OnError (uint16_t subStream, XRootDStatus status)
 On error. More...
 
void OnIncoming (uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
 Call back when a message has been reconstructed. More...
 
void OnMessageSent (uint16_t subStream, Message *msg, uint32_t bytesSent)
 
bool OnReadTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On read timeout. More...
 
std::pair< Message *, MsgHandler * > OnReadyToWrite (uint16_t subStream)
 
bool OnWriteTimeout (uint16_t subStream) XRD_WARN_UNUSED_RESULT
 On write timeout. More...
 
Status Query (uint16_t query, AnyObject &result)
 Query the stream. More...
 
void RegisterEventHandler (ChannelEventHandler *handler)
 Register channel event handler. More...
 
void RemoveEventHandler (ChannelEventHandler *handler)
 Remove a channel event handler. More...
 
XRootDStatus Send (Message *msg, MsgHandler *handler, bool stateful, time_t expires)
 Queue the message for sending. More...
 
void SetChannelData (AnyObject *channelData)
 Set the channel data. More...
 
void SetIncomingQueue (InQueue *incomingQueue)
 Set the incoming queue. More...
 
void SetJobManager (JobManager *jobManager)
 Set job manager. More...
 
void SetOnDataConnectHandler (std::shared_ptr< Job > &onConnJob)
 Set the on-connect handler for data streams. More...
 
void SetPoller (Poller *poller)
 Set the poller. More...
 
void SetTaskManager (TaskManager *taskManager)
 Set task manager. More...
 
void SetTransport (TransportHandler *transport)
 Set the transport. More...
 
void Tick (time_t now)
 

Detailed Description

Stream.

Definition at line 51 of file XrdClStream.hh.

Member Enumeration Documentation

◆ StreamStatus

Status of the stream.

Enumerator
Disconnected 

Not connected.

Connected 

Connected.

Connecting 

In the process of being connected.

Error 

Broken.

Definition at line 57 of file XrdClStream.hh.

58  {
59  Disconnected = 0,
60  Connected = 1,
61  Connecting = 2,
62  Error = 3
63  };
@ Disconnected
Not connected.
Definition: XrdClStream.hh:59
@ Error
Broken.
Definition: XrdClStream.hh:62
@ Connected
Connected.
Definition: XrdClStream.hh:60
@ Connecting
In the process of being connected.
Definition: XrdClStream.hh:61

Constructor & Destructor Documentation

◆ Stream()

XrdCl::Stream::Stream ( std::shared_ptr< URL url,
const URL prefer = URL() 
)

Constructor.

Definition at line 96 of file XrdClStream.cc.

96  :
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pTTLDiscJob( nullptr ),
111  pSubsWaitingClose( 0 ),
112  pDiscCV( 0 ),
113  pDiscAllCnt( 0 ),
114  pBytesSent( 0 ),
115  pBytesReceived( 0 )
116  {
117  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
118  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
119 
120  std::ostringstream o;
121  o << pUrl->GetHostId();
122  pStreamName = o.str();
123 
124  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
126  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
128  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
130 
131  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
133 
134  pAddressType = Utils::String2AddressType( netStack );
135  if( pAddressType == Utils::AddressType::IPAuto )
136  {
137  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
138  if( !( stacks & XrdNetUtils::hasIP64 ) )
139  {
140  if( stacks & XrdNetUtils::hasIPv4 )
141  pAddressType = Utils::AddressType::IPv4;
142  else if( stacks & XrdNetUtils::hasIPv6 )
143  pAddressType = Utils::AddressType::IPv6;
144  }
145  }
146 
147  Log *log = DefaultEnv::GetLog();
148  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
149  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
150  "Window: %d", pStreamName.c_str(), netStack.c_str(),
151  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
152  }
static Log * GetLog()
Get default log.
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:716
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
XrdSysError Log
Definition: XrdConfig.cc:113

References XrdCl::Log::Debug(), XrdCl::DefaultConnectionRetry, XrdCl::DefaultConnectionWindow, XrdCl::DefaultNetworkStack, XrdCl::DefaultStreamErrorWindow, XrdCl::Utils::GetIntParameter(), XrdCl::DefaultEnv::GetLog(), XrdCl::Utils::GetStringParameter(), XrdNetUtils::hasIP64, XrdNetUtils::hasIPv4, XrdNetUtils::hasIPv6, XrdNetUtils::NetConfig(), XrdCl::PostMasterMsg, and XrdCl::Utils::String2AddressType().

+ Here is the call graph for this function:

◆ ~Stream()

XrdCl::Stream::~Stream ( )

Destructor.

Definition at line 157 of file XrdClStream.cc.

158  {
159  Disconnect( true );
160 
161  Log *log = DefaultEnv::GetLog();
162  log->Debug( PostMasterMsg, "[%s] Destroying stream",
163  pStreamName.c_str() );
164 
165  MonitorDisconnection( XRootDStatus() );
166 
167  SubStreamList::iterator it;
168  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
169  delete *it;
170  }
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:367

References XrdCl::Log::Debug(), Disconnect(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

+ Here is the call graph for this function:

Member Function Documentation

◆ CanCollapse()

bool XrdCl::Stream::CanCollapse ( const URL url)
Returns
: true is this channel can be collapsed using this URL, false otherwise

Definition at line 1353 of file XrdClStream.cc.

1354  {
1355  Log *log = DefaultEnv::GetLog();
1356 
1357  //--------------------------------------------------------------------------
1358  // Resolve all the addresses of the host we're supposed to connect to
1359  //--------------------------------------------------------------------------
1360  std::vector<XrdNetAddr> prefaddrs;
1361  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1362  if( !st.IsOK() )
1363  {
1364  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1365  , pStreamName.c_str(), url.GetHostName().c_str() );
1366  return false;
1367  }
1368 
1369  //--------------------------------------------------------------------------
1370  // Resolve all the addresses of the alias
1371  //--------------------------------------------------------------------------
1372  std::vector<XrdNetAddr> aliasaddrs;
1373  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1374  if( !st.IsOK() )
1375  {
1376  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1377  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1378  return false;
1379  }
1380 
1381  //--------------------------------------------------------------------------
1382  // Now check if the preferred host is part of the alias
1383  //--------------------------------------------------------------------------
1384  auto itr = prefaddrs.begin();
1385  for( ; itr != prefaddrs.end() ; ++itr )
1386  {
1387  auto itr2 = aliasaddrs.begin();
1388  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1389  if( itr->Same( &*itr2 ) ) return true;
1390  }
1391 
1392  return false;
1393  }
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140

References XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::Channel::CanCollapse().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ DisableIfEmpty()

void XrdCl::Stream::DisableIfEmpty ( uint16_t  subStream)

Disables respective uplink if empty.

Definition at line 607 of file XrdClStream.cc.

608  {
609  XrdSysMutexHelper scopedLock( pMutex );
610  Log *log = DefaultEnv::GetLog();
611 
612  if( pSubStreams[subStream]->outQueue->IsEmpty() )
613  {
614  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
615  pSubStreams[subStream]->socket->GetStreamName().c_str() );
616  pSubStreams[subStream]->socket->DisableUplink();
617  }
618  }

References XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), and XrdCl::PostMasterMsg.

Referenced by XrdCl::AsyncSocketHandler::OnWrite().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Disconnect()

void XrdCl::Stream::Disconnect ( bool  force = false)

Disconnect the stream.

Definition at line 367 of file XrdClStream.cc.

368  {
369  //--------------------------------------------------------------------------
370  // See comment about deadlocks in ForceError() method. We don't expect
371  // to be called from a callback thread.
372  //--------------------------------------------------------------------------
373  XrdSysCondVarHelper discLock( pDiscCV );
374  while ( pDiscAllCnt )
375  {
376  pDiscCV.Wait();
377  }
378  ++pDiscAllCnt;
379  discLock.UnLock();
380 
381  XrdSysMutexHelper scopedLock( pMutex );
382  SubStreamList::iterator it;
383  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
384  {
385  (*it)->socket->Close();
386  (*it)->status = Socket::Disconnected;
387  }
388  pSubsWaitingClose = 0;
389 
390  scopedLock.UnLock();
391  discLock.Lock( &pDiscCV );
392  --pDiscAllCnt;
393  pDiscCV.Signal();
394  }
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50

References XrdCl::Socket::Disconnected, XrdSysCondVarHelper::Lock(), XrdSysCondVar::Signal(), XrdSysCondVarHelper::UnLock(), XrdSysMutexHelper::UnLock(), and XrdSysCondVar::Wait().

Referenced by ~Stream().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ EnableLink()

XRootDStatus XrdCl::Stream::EnableLink ( PathID path)

Connect if needed, otherwise make sure that the underlying socket handler gets write readiness events, it will update the path with what it has actually enabled

Definition at line 191 of file XrdClStream.cc.

192  {
193  XrdSysMutexHelper scopedLock( pMutex );
194 
195  //--------------------------------------------------------------------------
196  // We are in the process of connecting the main stream, so we do nothing
197  // because when the main stream connection is established it will connect
198  // all the other streams
199  //--------------------------------------------------------------------------
200  if( pSubStreams[0]->status == Socket::Connecting )
201  return XRootDStatus();
202 
203  //--------------------------------------------------------------------------
204  // The main stream is connected, so we can verify whether we have
205  // the up and the down stream connected and ready to handle data.
206  // If anything is not right we fall back to stream 0.
207  //--------------------------------------------------------------------------
208  if( pSubStreams[0]->status == Socket::Connected )
209  {
210  if( pSubStreams[path.down]->status != Socket::Connected )
211  path.down = 0;
212 
213  if( pSubStreams[path.up]->status == Socket::Disconnected )
214  {
215  path.up = 0;
216  return pSubStreams[0]->socket->EnableUplink();
217  }
218 
219  if( pSubStreams[path.up]->status == Socket::Connected )
220  return pSubStreams[path.up]->socket->EnableUplink();
221 
222  return XRootDStatus();
223  }
224 
225  //--------------------------------------------------------------------------
226  // The main stream is not connected, we need to check whether enough time
227  // has passed since we last encountered an error (if any) so that we could
228  // re-attempt the connection
229  //--------------------------------------------------------------------------
230  Log *log = DefaultEnv::GetLog();
231  time_t now = ::time(0);
232 
233  if( now-pLastStreamError < pStreamErrorWindow )
234  return pLastFatalError;
235 
236  gettimeofday( &pConnectionStarted, 0 );
237  ++pConnectionCount;
238 
239  //--------------------------------------------------------------------------
240  // Resolve all the addresses of the host we're supposed to connect to
241  //--------------------------------------------------------------------------
242  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
243  if( !st.IsOK() )
244  {
245  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
246  "the host", pStreamName.c_str() );
247  pLastStreamError = now;
248  st.status = stFatal;
249  pLastFatalError = st;
250  return st;
251  }
252 
253  if( pPrefer.IsValid() )
254  {
255  std::vector<XrdNetAddr> addrresses;
256  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
257  if( !st.IsOK() )
258  {
259  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
260  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
261  }
262  else
263  {
264  std::vector<XrdNetAddr> tmp;
265  tmp.reserve( pAddresses.size() );
266  // first add all remaining addresses
267  auto itr = pAddresses.begin();
268  for( ; itr != pAddresses.end() ; ++itr )
269  {
270  if( !HasNetAddr( *itr, addrresses ) )
271  tmp.push_back( *itr );
272  }
273  // then copy all 'preferred' addresses
274  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
275  // and keep the result
276  pAddresses.swap( tmp );
277  }
278  }
279 
280  Utils::LogHostAddresses( log, PostMasterMsg, pUrl->GetHostId(),
281  pAddresses );
282 
283  while( !pAddresses.empty() )
284  {
285  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
286  pAddresses.pop_back();
287  pConnectionInitTime = ::time( 0 );
288  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
289  if( st.IsOK() )
290  {
291  pSubStreams[0]->status = Socket::Connecting;
292  break;
293  }
294  }
295  return st;
296  }
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, XrdCl::PathID::down, XrdCl::Log::Error(), XrdCl::Utils::GetHostAddresses(), XrdCl::URL::GetHostName(), XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::URL::IsValid(), XrdCl::Utils::LogHostAddresses(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stFatal, and XrdCl::PathID::up.

Referenced by ForceConnect(), OnConnectError(), OnError(), and Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceConnect()

void XrdCl::Stream::ForceConnect ( )

Force connection.

Definition at line 351 of file XrdClStream.cc.

352  {
353  XrdSysMutexHelper scopedLock( pMutex );
354  if( pSubStreams[0]->status == Socket::Connecting )
355  {
356  pSubStreams[0]->status = Socket::Disconnected;
357  XrdCl::PathID path( 0, 0 );
358  XrdCl::XRootDStatus st = EnableLink( path );
359  if( !st.IsOK() )
360  OnConnectError( 0, st );
361  }
362  }
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:191
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:786
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124

References XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Status::IsOK(), and OnConnectError().

Referenced by XrdCl::Channel::ForceReconnect().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ ForceError()

void XrdCl::Stream::ForceError ( XRootDStatus  status,
bool  hush = false 
)

Force error.

Definition at line 1042 of file XrdClStream.cc.

1043  {
1044  //----------------------------------------------------------------------
1045  // We can be called in two ways: first is by by a non-poller thread,
1046  // with errOperationInterrupted as error as part of ForceDisconnect. In
1047  // which case the Stream will be destoryed shortly after we return. The
1048  // second way is call by a poller thread with another type of error.
1049  // Further, when we call socket handler Close() for a socket handled a
1050  // callback running we wait for that to complete (unless it is
1051  // ourselves). This raises the possibility of a deadlock. We avoid this
1052  // by returning quickly if we detect we're in a callback thread and
1053  // there's already a disconnect affecting multiple streams in progress.
1054  //----------------------------------------------------------------------
1055  XrdSysCondVarHelper discLock( pDiscCV );
1056  if( pDiscAllCnt &&
1057  !( status.IsError() && status.code == errOperationInterrupted ) )
1058  {
1059  return;
1060  }
1061  while( pDiscAllCnt )
1062  {
1063  pDiscCV.Wait();
1064  }
1065  ++pDiscAllCnt;
1066  discLock.UnLock();
1067 
1068  XrdSysMutexHelper scopedLock( pMutex );
1069  Log *log = DefaultEnv::GetLog();
1070  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1071  {
1072  if( pSubStreams[substream]->status != Socket::Connected ) continue;
1073  pSubStreams[substream]->socket->Close();
1074  pSubStreams[substream]->status = Socket::Disconnected;
1075 
1076  if( !hush )
1077  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1078  pStreamName.c_str(), status.ToString().c_str() );
1079 
1080  //--------------------------------------------------------------------
1081  // Reinsert the stuff that we have failed to sent
1082  //--------------------------------------------------------------------
1083  if( pSubStreams[substream]->outMsgHelper.msg )
1084  {
1085  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
1086  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
1087  h.stateful );
1088  pIncomingQueue->RemoveMessageHandler(h.handler);
1089  pSubStreams[substream]->outMsgHelper.Reset();
1090  }
1091 
1092  //--------------------------------------------------------------------
1093  // Reinsert the receiving handler and reset any partially read partial
1094  //--------------------------------------------------------------------
1095  if( pSubStreams[substream]->inMsgHelper.handler )
1096  {
1097  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
1098  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
1099  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
1100  if( xrdHandler ) xrdHandler->PartialReceived();
1101  h.Reset();
1102  }
1103  }
1104 
1105  pConnectionCount = 0;
1106  pSubsWaitingClose = 0;
1107 
1108  //------------------------------------------------------------------------
1109  // We're done here, unlock the stream mutex to avoid deadlocks and
1110  // report the disconnection event to the handlers
1111  //------------------------------------------------------------------------
1112  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1113  "message handlers.", pStreamName.c_str() );
1114 
1115  SubStreamList::iterator it;
1116  OutQueue q;
1117  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1118  q.GrabItems( *(*it)->outQueue );
1119  scopedLock.UnLock();
1120 
1121  q.Report( status );
1122 
1123  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1124  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1125 
1126  discLock.Lock( &pDiscCV );
1127  --pDiscAllCnt;
1128  pDiscCV.Signal();
1129  }
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
@ Broken
The stream is broken.
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91

References XrdCl::MsgHandler::Broken, XrdCl::Status::code, XrdCl::Socket::Connected, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::errOperationInterrupted, XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabItems(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsError(), XrdSysCondVarHelper::Lock(), XrdCl::OutQueue::MsgHelper::msg, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdSysCondVar::Signal(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), XrdSysCondVarHelper::UnLock(), XrdSysMutexHelper::UnLock(), and XrdSysCondVar::Wait().

Referenced by XrdCl::Channel::ForceDisconnect(), and XrdCl::AsyncSocketHandler::OnHeaderCorruption().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ GetName()

const std::string& XrdCl::Stream::GetName ( ) const
inline

Return stream name.

Definition at line 170 of file XrdClStream.hh.

171  {
172  return pStreamName;
173  }

◆ GetURL()

const URL* XrdCl::Stream::GetURL ( ) const
inline

Get the URL.

Definition at line 157 of file XrdClStream.hh.

158  {
159  return pUrl.get();
160  }

Referenced by XrdCl::AsyncSocketHandler::OnConnectionReturn().

+ Here is the caller graph for this function:

◆ Initialize()

XRootDStatus XrdCl::Stream::Initialize ( )

Initializer.

Definition at line 175 of file XrdClStream.cc.

176  {
177  if( !pTransport || !pPoller || !pChannelData )
178  return XRootDStatus( stError, errUninitialized );
179 
180  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
181  pChannelData, 0, this );
182  pSubStreams.push_back( new SubStreamData() );
183  pSubStreams[0]->socket = s;
184  return XRootDStatus();
185  }
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32

References XrdCl::errUninitialized, and XrdCl::stError.

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ InspectStatusRsp()

uint16_t XrdCl::Stream::InspectStatusRsp ( uint16_t  stream,
MsgHandler *&  incHandler 
)

In case the message is a kXR_status response it needs further attention

Returns
: a MsgHandler in case we need to read out raw data

Definition at line 1322 of file XrdClStream.cc.

1324  {
1325  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1326  if( !mh.handler )
1328 
1329  uint16_t action = mh.handler->InspectStatusRsp();
1330  mh.action |= action;
1331 
1332  if( action & MsgHandler::RemoveHandler )
1333  pIncomingQueue->RemoveMessageHandler( mh.handler );
1334 
1335  if( action & MsgHandler::Raw )
1336  {
1337  incHandler = mh.handler;
1338  return MsgHandler::Raw;
1339  }
1340 
1341  if( action & MsgHandler::Corrupted )
1342  return MsgHandler::Corrupted;
1343 
1344  if( action & MsgHandler::More )
1345  return MsgHandler::More;
1346 
1347  return MsgHandler::None;
1348  }
@ More
there are more (non-raw) data to be read

References XrdCl::InMessageHelper::action, XrdCl::MsgHandler::Corrupted, XrdCl::InMessageHelper::handler, XrdCl::MsgHandler::InspectStatusRsp(), XrdCl::MsgHandler::More, XrdCl::MsgHandler::None, XrdCl::MsgHandler::Raw, XrdCl::MsgHandler::RemoveHandler, and XrdCl::InQueue::RemoveMessageHandler().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ InstallIncHandler()

MsgHandler * XrdCl::Stream::InstallIncHandler ( std::shared_ptr< Message > &  msg,
uint16_t  stream 
)

Install a message handler for the given message if there is one available, if the handler want's to be called in the raw mode it will be returned, the message ownership flag is returned in any case

Parameters
msgmessage header
streamstream concerned
Returns
a pair containing the handler and ownership flag

Definition at line 1301 of file XrdClStream.cc.

1302  {
1303  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1304  if( !mh.handler )
1305  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1306  mh.expires,
1307  mh.action );
1308 
1309  if( !mh.handler )
1310  return nullptr;
1311 
1312  if( mh.action & MsgHandler::Raw )
1313  return mh.handler;
1314  return nullptr;
1315  }
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66

References XrdCl::InMessageHelper::action, XrdCl::InMessageHelper::expires, XrdCl::InQueue::GetHandlerForMessage(), XrdCl::InMessageHelper::handler, and XrdCl::MsgHandler::Raw.

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnect()

void XrdCl::Stream::OnConnect ( uint16_t  subStream)

Call back when a message has been reconstructed.

Definition at line 645 of file XrdClStream.cc.

646  {
647  XrdSysMutexHelper scopedLock( pMutex );
648  if( subStream == 0 )
649  {
650  int nsubconn = 0;
651  if( pSubStreams.size() > 1 )
652  {
653  for( size_t i = 1; i < pSubStreams.size(); ++i )
654  if( pSubStreams[i]->status != Socket::Disconnected ) nsubconn++;
655  }
656  if( nsubconn )
657  {
658  pSubsWaitingClose = nsubconn;
659  pSubStreams[0]->socket->DisableUplink();
660  return;
661  }
662  else
663  pSubStreams[0]->socket->EnableUplink();
664  }
665  else
666  {
667  if( pSubsWaitingClose > 0 )
668  {
669  pSubStreams[subStream]->socket->Close();
670  pSubStreams[subStream]->status = Socket::Disconnected;
671  if( --pSubsWaitingClose == 0 )
672  {
673  scopedLock.UnLock();
674  OnConnect( 0 );
675  }
676  return;
677  }
678  }
679 
680  pSubStreams[subStream]->status = Socket::Connected;
681 
682  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
683  Log *log = DefaultEnv::GetLog();
684  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
685  subStream, ipstack.c_str() );
686 
687  if( subStream == 0 )
688  {
689  pLastStreamError = 0;
690  pLastFatalError = XRootDStatus();
691  pConnectionCount = 0;
692  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
693  pSessionId = ++sSessCntGen;
694 
695  //------------------------------------------------------------------------
696  // Create the streams if they don't exist yet
697  //------------------------------------------------------------------------
698  if( pSubStreams.size() == 1 && numSub > 1 )
699  {
700  for( uint16_t i = 1; i < numSub; ++i )
701  {
702  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
703  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
704  pChannelData, i, this );
705  pSubStreams.push_back( new SubStreamData() );
706  pSubStreams[i]->socket = s;
707  }
708  }
709 
710  //------------------------------------------------------------------------
711  // Connect the extra streams, if we fail we move all the outgoing items
712  // to stream 0, we don't need to enable the uplink here, because it
713  // should be already enabled after the handshaking process is completed.
714  //------------------------------------------------------------------------
715  if( pSubStreams.size() > 1 )
716  {
717  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
718  pStreamName.c_str(), pSubStreams.size() - 1 );
719  for( size_t i = 1; i < pSubStreams.size(); ++i )
720  {
721  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
722  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
723  if( !st.IsOK() )
724  {
725  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
726  // mark as disconnected. We don't try to actively Close here as
727  // we're in a poller callback thread and the i'th substream here
728  // may be hadled by a different poller callback thread, raising
729  // the possibility of deadlock.
730  pSubStreams[i]->status = Socket::Disconnected;
731  }
732  else
733  {
734  pSubStreams[i]->status = Socket::Connecting;
735  }
736  }
737  }
738 
739  //------------------------------------------------------------------------
740  // Inform monitoring
741  //------------------------------------------------------------------------
742  pBytesSent = 0;
743  pBytesReceived = 0;
744  gettimeofday( &pConnectionDone, 0 );
745  Monitor *mon = DefaultEnv::GetMonitor();
746  if( mon )
747  {
748  Monitor::ConnectInfo i;
749  i.server = pUrl->GetHostId();
750  i.sTOD = pConnectionStarted;
751  i.eTOD = pConnectionDone;
752  i.streams = pSubStreams.size();
753 
754  AnyObject qryResult;
755  std::string *qryResponse = nullptr;
756  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
757  qryResult.Get( qryResponse );
758 
759  if (qryResponse) {
760  i.auth = *qryResponse;
761  delete qryResponse;
762  } else {
763  i.auth = "";
764  }
765 
766  mon->Event( Monitor::EvConnect, &i );
767  }
768 
769  //------------------------------------------------------------------------
770  // For every connected control-stream call the global on-connect handler
771  //------------------------------------------------------------------------
773  }
774  else if( pOnDataConnJob )
775  {
776  //------------------------------------------------------------------------
777  // For every connected data-stream call the on-connect handler
778  //------------------------------------------------------------------------
779  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
780  }
781  }
static Monitor * GetMonitor()
Get the monitor object.
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
@ EvConnect
ConnectInfo: Login into a server.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:645
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
static const uint16_t Auth
Transport name, returns std::string *.

References XrdCl::Monitor::ConnectInfo::auth, XrdCl::TransportQuery::Auth, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::Monitor::ConnectInfo::eTOD, XrdCl::Monitor::EvConnect, XrdCl::Monitor::Event(), XrdCl::AnyObject::Get(), XrdCl::TransportHandler::GetBindPreference(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetMonitor(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnectHandler(), XrdCl::PostMasterMsg, XrdCl::TransportHandler::Query(), XrdCl::JobManager::QueueJob(), XrdCl::Monitor::ConnectInfo::server, XrdCl::Monitor::ConnectInfo::sTOD, XrdCl::Monitor::ConnectInfo::streams, XrdCl::TransportHandler::SubStreamNumber(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::HandShakeNextStep(), OnConnectError(), OnError(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnConnectError()

void XrdCl::Stream::OnConnectError ( uint16_t  subStream,
XRootDStatus  status 
)

On connect error.

Definition at line 786 of file XrdClStream.cc.

787  {
788  XrdSysMutexHelper scopedLock( pMutex );
789  Log *log = DefaultEnv::GetLog();
790  pSubStreams[subStream]->socket->Close();
791  time_t now = ::time(0);
792 
793  //--------------------------------------------------------------------------
794  // For every connection error call the global connection error handler
795  //--------------------------------------------------------------------------
797 
798  //--------------------------------------------------------------------------
799  // If we connected subStream == 0 and cannot connect >0 then we just give
800  // up and move the outgoing messages to another queue
801  //--------------------------------------------------------------------------
802  if( subStream > 0 )
803  {
804  const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
805  pSubStreams[subStream]->status = Socket::Disconnected;
806 
807  if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
808  {
809  if( --pSubsWaitingClose == 0 )
810  {
811  scopedLock.UnLock();
812  OnConnect( 0 );
813  }
814  return;
815  }
816 
817  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
818  if( pSubStreams[0]->status == Socket::Connected )
819  {
820  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
821  if( !st.IsOK() )
822  OnFatalError( 0, st, scopedLock );
823  return;
824  }
825 
826  if( pSubStreams[0]->status == Socket::Connecting )
827  return;
828 
829  OnFatalError( subStream, status, scopedLock );
830  return;
831  }
832 
833  //--------------------------------------------------------------------------
834  // Check if we still have time to try and do something in the current window
835  //--------------------------------------------------------------------------
836  time_t elapsed = now-pConnectionInitTime;
837  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
838  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
839 
840  //------------------------------------------------------------------------
841  // If we have some IP addresses left we try them
842  //------------------------------------------------------------------------
843  if( !pAddresses.empty() )
844  {
845  XRootDStatus st;
846  do
847  {
848  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
849  pAddresses.pop_back();
850  pConnectionInitTime = ::time( 0 );
851  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
852  }
853  while( !pAddresses.empty() && !st.IsOK() );
854 
855  if( !st.IsOK() )
856  OnFatalError( subStream, st, scopedLock );
857 
858  return;
859  }
860  //------------------------------------------------------------------------
861  // If we still can retry with the same host name, we sleep until the end
862  // of the connection window and try
863  //------------------------------------------------------------------------
864  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
865  && !status.IsFatal() )
866  {
867  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
868  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
869 
870  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
871  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
872  return;
873  }
874  //--------------------------------------------------------------------------
875  // We are out of the connection window, the only thing we can do here
876  // is re-resolving the host name and retrying if we still can
877  //--------------------------------------------------------------------------
878  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
879  {
880  pAddresses.clear();
881  pSubStreams[0]->status = Socket::Disconnected;
882  PathID path( 0, 0 );
883  XRootDStatus st = EnableLink( path );
884  if( !st.IsOK() )
885  OnFatalError( subStream, st, scopedLock );
886  return;
887  }
888 
889  //--------------------------------------------------------------------------
890  // Else, we fail
891  //--------------------------------------------------------------------------
892  OnFatalError( subStream, status, scopedLock );
893  }
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
SocketStatus
Status of the socket.
Definition: XrdClSocket.hh:49
void RegisterTask(Task *task, time_t time, bool own=true)

References XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Socket::Disconnected, EnableLink(), XrdCl::Log::Error(), XrdCl::DefaultEnv::GetLog(), XrdCl::DefaultEnv::GetPostMaster(), XrdCl::Log::Info(), XrdCl::Status::IsFatal(), XrdCl::Status::IsOK(), XrdCl::PostMaster::NotifyConnErrHandler(), OnConnect(), XrdCl::PostMasterMsg, XrdCl::TaskManager::RegisterTask(), and XrdSysMutexHelper::UnLock().

Referenced by ForceConnect(), XrdCl::AsyncSocketHandler::OnConnectionReturn(), and XrdCl::AsyncSocketHandler::OnFaultWhileHandshaking().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnError()

void XrdCl::Stream::OnError ( uint16_t  subStream,
XRootDStatus  status 
)

On error.

Definition at line 898 of file XrdClStream.cc.

899  {
900  //--------------------------------------------------------------------------
901  // See comment about deadlocks in ForceError() method. We expect to be
902  // called form a callback thread. However we take care to only potentially
903  // disconnect the socket for our own subStream. We require no ongoing
904  // disconnect of all substreams and ensure that remains true throughout
905  // our execution by releasing discLock only after acquiring pMutex.
906  //--------------------------------------------------------------------------
907 
908  XrdSysCondVarHelper discLock( pDiscCV );
909  if( pDiscAllCnt ) return;
910 
911  XrdSysMutexHelper scopedLock( pMutex );
912  discLock.UnLock();
913  Log *log = DefaultEnv::GetLog();
914  const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
915  pSubStreams[subStream]->socket->Close();
916  pSubStreams[subStream]->status = Socket::Disconnected;
917 
918  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
919  pStreamName.c_str(), subStream, status.ToString().c_str() );
920 
921  //--------------------------------------------------------------------------
922  // Reinsert the stuff that we have failed to sent
923  //--------------------------------------------------------------------------
924  if( pSubStreams[subStream]->outMsgHelper.msg )
925  {
926  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
927  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
928  h.stateful );
929  pIncomingQueue->RemoveMessageHandler(h.handler);
930  pSubStreams[subStream]->outMsgHelper.Reset();
931  }
932 
933  //--------------------------------------------------------------------------
934  // Reinsert the receiving handler and reset any partially read partial
935  //--------------------------------------------------------------------------
936  if( pSubStreams[subStream]->inMsgHelper.handler )
937  {
938  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
939  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
940  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
941  if( xrdHandler ) xrdHandler->PartialReceived();
942  h.Reset();
943  }
944 
945  //--------------------------------------------------------------------------
946  // We are dealing with an error of a peripheral stream. If we don't have
947  // anything to send don't bother recovering. Otherwise move the requests
948  // to stream 0 if possible.
949  //--------------------------------------------------------------------------
950  if( subStream > 0 )
951  {
952  if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
953  {
954  if( --pSubsWaitingClose == 0 )
955  {
956  scopedLock.UnLock();
957  OnConnect( 0 );
958  }
959  return;
960  }
961 
962  if( pSubStreams[0]->status != Socket::Disconnected )
963  {
964  pSubStreams[subStream]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
965  XRootDStatus st = pSubStreams[subStream]->socket->Connect( pConnectionWindow );
966  if( !st.IsOK() )
967  {
968  pSubStreams[subStream]->socket->Close();
969  if( pSubStreams[subStream]->outQueue->IsEmpty() )
970  return;
971  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
972  if( pSubStreams[0]->status == Socket::Connected )
973  {
974  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
975  if( !st.IsOK() )
976  OnFatalError( 0, st, scopedLock );
977  return;
978  }
979  if( pSubStreams[0]->status == Socket::Connecting )
980  return;
981  }
982  else
983  {
984  pSubStreams[subStream]->status = Socket::Connecting;
985  return;
986  }
987  OnFatalError( subStream, status, scopedLock );
988  return;
989  }
990  if( pSubStreams[subStream]->outQueue->IsEmpty() )
991  return;
992  OnFatalError( subStream, status, scopedLock );
993  return;
994  }
995 
996  //--------------------------------------------------------------------------
997  // If we lost the stream 0 we have lost the session, we re-enable the
998  // stream if we still have things in one of the outgoing queues, otherwise
999  // there is not point to recover at this point.
1000  //--------------------------------------------------------------------------
1001  if( subStream == 0 )
1002  {
1003  MonitorDisconnection( status );
1004 
1005  SubStreamList::iterator it;
1006  size_t outstanding = 0;
1007  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1008  outstanding += (*it)->outQueue->GetSizeStateless();
1009 
1010  if( outstanding )
1011  {
1012  PathID path( 0, 0 );
1013  XRootDStatus st = EnableLink( path );
1014  if( !st.IsOK() )
1015  {
1016  OnFatalError( 0, st, scopedLock );
1017  return;
1018  }
1019  }
1020 
1021  //------------------------------------------------------------------------
1022  // We're done here, unlock the stream mutex to avoid deadlocks and
1023  // report the disconnection event to the handlers
1024  //------------------------------------------------------------------------
1025  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1026  "message handlers.", pStreamName.c_str() );
1027  OutQueue q;
1028  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1029  q.GrabStateful( *(*it)->outQueue );
1030  scopedLock.UnLock();
1031 
1032  q.Report( status );
1033  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1034  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1035  return;
1036  }
1037  }

References XrdCl::MsgHandler::Broken, XrdCl::Socket::Connected, XrdCl::Socket::Connecting, XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, EnableLink(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::InMessageHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::OutQueue::GrabStateful(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::InMessageHelper::handler, XrdCl::Status::IsOK(), XrdCl::OutQueue::MsgHelper::msg, OnConnect(), XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::InQueue::ReAddMessageHandler(), XrdCl::InQueue::RemoveMessageHandler(), XrdCl::OutQueue::Report(), XrdCl::ChannelHandlerList::ReportEvent(), XrdCl::InQueue::ReportStreamEvent(), XrdCl::InMessageHelper::Reset(), XrdCl::OutQueue::MsgHelper::stateful, XrdCl::ChannelEventHandler::StreamBroken, XrdCl::Status::ToString(), XrdSysCondVarHelper::UnLock(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnFault(), and OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnIncoming()

void XrdCl::Stream::OnIncoming ( uint16_t  subStream,
std::shared_ptr< Message msg,
uint32_t  bytesReceived 
)

Call back when a message has been reconstructed.

Definition at line 493 of file XrdClStream.cc.

496  {
497  msg->SetSessionId( pSessionId );
498  pBytesReceived += bytesReceived;
499 
500  MsgHandler *handler = nullptr;
501  uint16_t action = 0;
502  {
503  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
504  handler = mh.handler;
505  action = mh.action;
506  mh.Reset();
507  }
508 
509  if( !IsPartial( *msg ) )
510  {
511  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
512  *pChannelData );
513  if( streamAction & TransportHandler::DigestMsg )
514  return;
515 
516  if( streamAction & TransportHandler::RequestClose )
517  {
518  RequestClose( *msg );
519  return;
520  }
521  }
522 
523  Log *log = DefaultEnv::GetLog();
524 
525  //--------------------------------------------------------------------------
526  // No handler, we discard the message ...
527  //--------------------------------------------------------------------------
528  if( !handler )
529  {
530  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
531  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
532  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
533  pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
534  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
535  return;
536  }
537 
538  //--------------------------------------------------------------------------
539  // We have a handler, so we call the callback
540  //--------------------------------------------------------------------------
541  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
542  pStreamName.c_str(), (void*)msg.get() );
543 
545  {
546  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
547  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
548 
549  // if we are handling partial response we have to take down the timeout fence
550  if( IsPartial( *msg ) )
551  {
552  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
553  if( xrdHandler ) xrdHandler->PartialReceived();
554  }
555 
556  return;
557  }
558 
559  Job *job = new HandleIncMsgJob( handler );
560  pJobManager->QueueJob( job );
561  }
kXR_char streamid[2]
Definition: XProtocol.hh:914
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
@ Ignore
Ignore the message.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.

References XrdCl::InMessageHelper::action, XrdCl::TransportHandler::DigestMsg, XrdCl::Log::Dump(), XrdCl::DefaultEnv::GetLog(), XrdCl::InMessageHelper::handler, ServerResponse::hdr, XrdCl::MsgHandler::Ignore, XrdCl::TransportHandler::MessageReceived(), XrdCl::MsgHandler::NoProcess, XrdCl::XRootDMsgHandler::PartialReceived(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), XrdCl::TransportHandler::RequestClose, XrdCl::InMessageHelper::Reset(), ServerResponseHeader::status, ServerResponseHeader::streamid, and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgReader::Read().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnMessageSent()

void XrdCl::Stream::OnMessageSent ( uint16_t  subStream,
Message msg,
uint32_t  bytesSent 
)

Definition at line 623 of file XrdClStream.cc.

626  {
627  pTransport->MessageSent( msg, subStream, bytesSent,
628  *pChannelData );
629  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
630  pBytesSent += bytesSent;
631  if( h.handler )
632  {
633  // ensure expiration time is assigned if still in queue
634  pIncomingQueue->AssignTimeout( h.handler );
635  // OnStatusReady may cause the handler to delete itself, in
636  // which case the handler or the user callback may also delete msg
637  h.handler->OnStatusReady( msg, XRootDStatus() );
638  }
639  pSubStreams[subStream]->outMsgHelper.Reset();
640  }
void AssignTimeout(MsgHandler *handler)
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.

References XrdCl::InQueue::AssignTimeout(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::TransportHandler::MessageSent(), and XrdCl::MsgHandler::OnStatusReady().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadTimeout()

bool XrdCl::Stream::OnReadTimeout ( uint16_t  subStream)

On read timeout.

Definition at line 1188 of file XrdClStream.cc.

1189  {
1190  //--------------------------------------------------------------------------
1191  // We only take the main stream into account
1192  //--------------------------------------------------------------------------
1193  if( substream != 0 )
1194  {
1195  if( pSubsWaitingClose )
1196  {
1197  XrdSysMutexHelper scopedLock( pMutex );
1198  if( !pSubsWaitingClose ) return true;
1199  if( pSubStreams[substream]->status == Socket::Disconnected ) return true;
1200  pSubStreams[substream]->socket->Close();
1201  pSubStreams[substream]->status = Socket::Disconnected;
1202  if( --pSubsWaitingClose == 0 )
1203  {
1204  scopedLock.UnLock();
1205  OnConnect( 0 );
1206  }
1207  }
1208  return true;
1209  }
1210 
1211  //--------------------------------------------------------------------------
1212  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1213  // It is assumed that the underlying transport makes sure that there is no
1214  // pending requests that are not answered, ie. all possible virtual streams
1215  // are de-allocated
1216  //--------------------------------------------------------------------------
1217  Log *log = DefaultEnv::GetLog();
1218  SubStreamList::iterator it;
1219  time_t now = time(0);
1220 
1221  XrdSysMutexHelper scopedLock( pMutex );
1222  uint32_t outgoingMessages = 0;
1223  time_t lastActivity = 0;
1224  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1225  {
1226  outgoingMessages += (*it)->outQueue->GetSize();
1227  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1228  if( lastActivity < sockLastActivity )
1229  lastActivity = sockLastActivity;
1230  }
1231 
1232  if( !outgoingMessages )
1233  {
1234  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1235  *pChannelData );
1236  if( disconnect )
1237  {
1238  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1239  pStreamName.c_str() );
1240  //----------------------------------------------------------------------
1241  // Important note!
1242  //
1243  // This job destroys the Stream object itself, the underlying
1244  // AsyncSocketHandler object (that called this method) and the Channel
1245  // object that aggregates this Stream.
1246  //
1247  // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1248  // in a Channel that was previously collapsed in a redirect.
1249  //----------------------------------------------------------------------
1250  if( !pTTLDiscJob )
1251  {
1252  pTTLDiscJob = new ForceDisconnectJob( pUrl );
1253  pJobManager->QueueJob( pTTLDiscJob );
1254  }
1255  return false;
1256  }
1257  }
1258 
1259  //--------------------------------------------------------------------------
1260  // Check if the stream is broken
1261  //--------------------------------------------------------------------------
1262  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1263  *pChannelData );
1264  if( !st.IsOK() )
1265  {
1266  scopedLock.UnLock();
1267  OnError( substream, st );
1268  return false;
1269  }
1270  return true;
1271  }
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:898
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0

References XrdCl::Log::Debug(), XrdCl::Socket::Disconnected, XrdCl::DefaultEnv::GetLog(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::IsStreamBroken(), XrdCl::TransportHandler::IsStreamTTLElapsed(), OnConnect(), OnError(), XrdCl::PostMasterMsg, XrdCl::JobManager::QueueJob(), and XrdSysMutexHelper::UnLock().

Referenced by XrdCl::AsyncSocketHandler::OnReadTimeout().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnReadyToWrite()

std::pair< Message *, MsgHandler * > XrdCl::Stream::OnReadyToWrite ( uint16_t  subStream)

Definition at line 567 of file XrdClStream.cc.

568  {
569  XrdSysMutexHelper scopedLock( pMutex );
570  Log *log = DefaultEnv::GetLog();
571  if( pSubStreams[subStream]->outQueue->IsEmpty() )
572  {
573  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
574  pSubStreams[subStream]->socket->GetStreamName().c_str() );
575 
576  pSubStreams[subStream]->socket->DisableUplink();
577  return std::make_pair( (Message *)0, (MsgHandler *)0 );
578  }
579 
580  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
581  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
582  h.expires,
583  h.stateful );
584 
585  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
586  "from out-queue to in-queue, starting to send outgoing.",
587  pUrl->GetHostId().c_str(), (void*)h.handler,
588  h.msg->GetObfuscatedDescription().c_str() );
589 
590  scopedLock.UnLock();
591 
592  if( h.handler )
593  {
594  bool rmMsg = false;
595  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
596  if( rmMsg )
597  {
598  Log *log = DefaultEnv::GetLog();
599  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
600  pStreamName.c_str() );
601  }
602  h.handler->OnReadyToSend( h.msg );
603  }
604  return std::make_pair( h.msg, h.handler );
605  }
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54

References XrdCl::InQueue::AddMessageHandler(), XrdCl::Log::Debug(), XrdCl::Log::Dump(), XrdCl::OutQueue::MsgHelper::expires, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::OutQueue::MsgHelper::handler, XrdCl::OutQueue::MsgHelper::msg, XrdCl::MsgHandler::OnReadyToSend(), XrdCl::PostMasterMsg, XrdCl::OutQueue::MsgHelper::stateful, XrdSysMutexHelper::UnLock(), and XrdCl::Log::Warning().

Referenced by XrdCl::AsyncMsgWriter::Write().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ OnWriteTimeout()

bool XrdCl::Stream::OnWriteTimeout ( uint16_t  subStream)

On write timeout.

Definition at line 1276 of file XrdClStream.cc.

1277  {
1278  return true;
1279  }

Referenced by XrdCl::AsyncSocketHandler::OnWriteTimeout().

+ Here is the caller graph for this function:

◆ Query()

Status XrdCl::Stream::Query ( uint16_t  query,
AnyObject result 
)

Query the stream.

Definition at line 1398 of file XrdClStream.cc.

1399  {
1400  switch( query )
1401  {
1402  case StreamQuery::IpAddr:
1403  {
1404  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1405  return Status();
1406  }
1407 
1408  case StreamQuery::IpStack:
1409  {
1410  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1411  return Status();
1412  }
1413 
1414  case StreamQuery::HostName:
1415  {
1416  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1417  return Status();
1418  }
1419 
1420  default:
1421  return Status( stError, errQueryNotSupported );
1422  }
1423  }
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack

References XrdCl::errQueryNotSupported, XrdCl::StreamQuery::HostName, XrdCl::StreamQuery::IpAddr, XrdCl::StreamQuery::IpStack, XrdCl::AnyObject::Set(), and XrdCl::stError.

Referenced by XrdCl::Channel::QueryTransport().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RegisterEventHandler()

void XrdCl::Stream::RegisterEventHandler ( ChannelEventHandler handler)

Register channel event handler.

Definition at line 1284 of file XrdClStream.cc.

1285  {
1286  pChannelEvHandlers.AddHandler( handler );
1287  }
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.

References XrdCl::ChannelHandlerList::AddHandler().

Referenced by XrdCl::Channel::RegisterEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ RemoveEventHandler()

void XrdCl::Stream::RemoveEventHandler ( ChannelEventHandler handler)

Remove a channel event handler.

Definition at line 1292 of file XrdClStream.cc.

1293  {
1294  pChannelEvHandlers.RemoveHandler( handler );
1295  }
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.

References XrdCl::ChannelHandlerList::RemoveHandler().

Referenced by XrdCl::Channel::RemoveEventHandler().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ Send()

XRootDStatus XrdCl::Stream::Send ( Message msg,
MsgHandler handler,
bool  stateful,
time_t  expires 
)

Queue the message for sending.

Definition at line 301 of file XrdClStream.cc.

305  {
306  XrdSysMutexHelper scopedLock( pMutex );
307  Log *log = DefaultEnv::GetLog();
308 
309  //--------------------------------------------------------------------------
310  // Check the session ID and bounce if needed
311  //--------------------------------------------------------------------------
312  if( msg->GetSessionId() &&
313  (pSubStreams[0]->status != Socket::Connected ||
314  pSessionId != msg->GetSessionId()) )
315  return XRootDStatus( stError, errInvalidSession );
316 
317  //--------------------------------------------------------------------------
318  // Decide on the path to send the message
319  //--------------------------------------------------------------------------
320  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
321  if( pSubStreams.size() <= path.up )
322  {
323  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
324  "substream %d, using 0 instead", pStreamName.c_str(),
325  msg->GetObfuscatedDescription().c_str(), path.up );
326  path.up = 0;
327  }
328 
329  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
330  "substream %d expecting answer at %d", pStreamName.c_str(),
331  msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
332 
333  //--------------------------------------------------------------------------
334  // Enable *a* path and insert the message to the right queue
335  //--------------------------------------------------------------------------
336  XRootDStatus st = EnableLink( path );
337  if( st.IsOK() )
338  {
339  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
340  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
341  expires, stateful );
342  }
343  else
344  st.status = stFatal;
345  return st;
346  }
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79

References XrdCl::Socket::Connected, XrdCl::PathID::down, XrdCl::Log::Dump(), EnableLink(), XrdCl::errInvalidSession, XrdCl::DefaultEnv::GetLog(), XrdCl::Message::GetObfuscatedDescription(), XrdCl::Message::GetSessionId(), XrdCl::Status::IsOK(), XrdCl::TransportHandler::MultiplexSubStream(), XrdCl::PostMasterMsg, XrdCl::Status::status, XrdCl::stError, XrdCl::stFatal, XrdCl::PathID::up, and XrdCl::Log::Warning().

Referenced by XrdCl::Channel::Send().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

◆ SetChannelData()

void XrdCl::Stream::SetChannelData ( AnyObject channelData)
inline

Set the channel data.

Definition at line 115 of file XrdClStream.hh.

116  {
117  pChannelData = channelData;
118  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetIncomingQueue()

void XrdCl::Stream::SetIncomingQueue ( InQueue incomingQueue)
inline

Set the incoming queue.

Definition at line 107 of file XrdClStream.hh.

108  {
109  pIncomingQueue = incomingQueue;
110  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetJobManager()

void XrdCl::Stream::SetJobManager ( JobManager jobManager)
inline

Set job manager.

Definition at line 131 of file XrdClStream.hh.

132  {
133  pJobManager = jobManager;
134  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetOnDataConnectHandler()

void XrdCl::Stream::SetOnDataConnectHandler ( std::shared_ptr< Job > &  onConnJob)
inline

Set the on-connect handler for data streams.

Definition at line 263 of file XrdClStream.hh.

264  {
265  XrdSysMutexHelper scopedLock( pMutex );
266  pOnDataConnJob = onConnJob;
267  }

Referenced by XrdCl::Channel::SetOnDataConnectHandler().

+ Here is the caller graph for this function:

◆ SetPoller()

void XrdCl::Stream::SetPoller ( Poller poller)
inline

Set the poller.

Definition at line 99 of file XrdClStream.hh.

100  {
101  pPoller = poller;
102  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTaskManager()

void XrdCl::Stream::SetTaskManager ( TaskManager taskManager)
inline

Set task manager.

Definition at line 123 of file XrdClStream.hh.

124  {
125  pTaskManager = taskManager;
126  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ SetTransport()

void XrdCl::Stream::SetTransport ( TransportHandler transport)
inline

Set the transport.

Definition at line 91 of file XrdClStream.hh.

92  {
93  pTransport = transport;
94  }

Referenced by XrdCl::Channel::Channel().

+ Here is the caller graph for this function:

◆ Tick()

void XrdCl::Stream::Tick ( time_t  now)

Handle a clock event generated either by socket timeout, or by the task manager event

Definition at line 399 of file XrdClStream.cc.

400  {
401  //--------------------------------------------------------------------------
402  // Check for timed-out requests and incoming handlers
403  //--------------------------------------------------------------------------
404  pMutex.Lock();
405  OutQueue q;
406  SubStreamList::iterator it;
407  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
408  q.GrabExpired( *(*it)->outQueue, now );
409  pMutex.UnLock();
410 
411  q.Report( XRootDStatus( stError, errOperationExpired ) );
412  pIncomingQueue->ReportTimeout( now );
413  }
void ReportTimeout(time_t now=0)
Timeout handlers.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90

References XrdCl::errOperationExpired, XrdCl::OutQueue::GrabExpired(), XrdSysMutex::Lock(), XrdCl::OutQueue::Report(), XrdCl::InQueue::ReportTimeout(), XrdCl::stError, and XrdSysMutex::UnLock().

Referenced by XrdCl::Channel::Tick().

+ Here is the call graph for this function:
+ Here is the caller graph for this function:

The documentation for this class was generated from the following files: