XRootD
XrdClStream.cc
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #include "XrdCl/XrdClStream.hh"
26 #include "XrdCl/XrdClSocket.hh"
27 #include "XrdCl/XrdClChannel.hh"
28 #include "XrdCl/XrdClConstants.hh"
29 #include "XrdCl/XrdClLog.hh"
30 #include "XrdCl/XrdClMessage.hh"
31 #include "XrdCl/XrdClDefaultEnv.hh"
32 #include "XrdCl/XrdClUtils.hh"
33 #include "XrdCl/XrdClOutQueue.hh"
34 #include "XrdCl/XrdClMonitor.hh"
39 
40 #include <sys/types.h>
41 #include <algorithm>
42 #include <sys/socket.h>
43 #include <sys/time.h>
44 
45 namespace XrdCl
46 {
47  //----------------------------------------------------------------------------
48  // Statics
49  //----------------------------------------------------------------------------
50  RAtomic_uint64_t Stream::sSessCntGen{0};
51 
52  //----------------------------------------------------------------------------
53  // Incoming message helper
54  //----------------------------------------------------------------------------
56  {
57  InMessageHelper( Message *message = 0,
58  MsgHandler *hndlr = 0,
59  time_t expir = 0,
60  uint16_t actio = 0 ):
61  msg( message ), handler( hndlr ), expires( expir ), action( actio ) {}
62  void Reset()
63  {
64  msg = 0; handler = 0; expires = 0; action = 0;
65  }
68  time_t expires;
69  uint16_t action;
70  };
71 
72  //----------------------------------------------------------------------------
73  // Sub stream helper
74  //----------------------------------------------------------------------------
76  {
77  SubStreamData(): socket( 0 ), status( Socket::Disconnected )
78  {
79  outQueue = new OutQueue();
80  }
82  {
83  delete socket;
84  delete outQueue;
85  }
91  };
92 
93  //----------------------------------------------------------------------------
94  // Constructor
95  //----------------------------------------------------------------------------
96  Stream::Stream( std::shared_ptr<URL> url, const URL &prefer ):
97  pUrl( url ),
98  pPrefer( prefer ),
99  pTransport( 0 ),
100  pPoller( 0 ),
101  pTaskManager( 0 ),
102  pJobManager( 0 ),
103  pIncomingQueue( 0 ),
104  pChannelData( 0 ),
105  pLastStreamError( 0 ),
106  pConnectionCount( 0 ),
107  pConnectionInitTime( 0 ),
108  pAddressType( Utils::IPAll ),
109  pSessionId( 0 ),
110  pTTLDiscJob( nullptr ),
111  pSubsWaitingClose( 0 ),
112  pDiscCV( 0 ),
113  pDiscAllCnt( 0 ),
114  pBytesSent( 0 ),
115  pBytesReceived( 0 )
116  {
117  pConnectionStarted.tv_sec = 0; pConnectionStarted.tv_usec = 0;
118  pConnectionDone.tv_sec = 0; pConnectionDone.tv_usec = 0;
119 
120  std::ostringstream o;
121  o << pUrl->GetHostId();
122  pStreamName = o.str();
123 
124  pConnectionWindow = Utils::GetIntParameter( *url, "ConnectionWindow",
126  pConnectionRetry = Utils::GetIntParameter( *url, "ConnectionRetry",
128  pStreamErrorWindow = Utils::GetIntParameter( *url, "StreamErrorWindow",
130 
131  std::string netStack = Utils::GetStringParameter( *url, "NetworkStack",
133 
134  pAddressType = Utils::String2AddressType( netStack );
135  if( pAddressType == Utils::AddressType::IPAuto )
136  {
137  XrdNetUtils::NetProt stacks = XrdNetUtils::NetConfig( XrdNetUtils::NetType::qryINIF );
138  if( !( stacks & XrdNetUtils::hasIP64 ) )
139  {
140  if( stacks & XrdNetUtils::hasIPv4 )
141  pAddressType = Utils::AddressType::IPv4;
142  else if( stacks & XrdNetUtils::hasIPv6 )
143  pAddressType = Utils::AddressType::IPv6;
144  }
145  }
146 
147  Log *log = DefaultEnv::GetLog();
148  log->Debug( PostMasterMsg, "[%s] Stream parameters: Network Stack: %s, "
149  "Connection Window: %d, ConnectionRetry: %d, Stream Error "
150  "Window: %d", pStreamName.c_str(), netStack.c_str(),
151  pConnectionWindow, pConnectionRetry, pStreamErrorWindow );
152  }
153 
154  //----------------------------------------------------------------------------
155  // Destructor
156  //----------------------------------------------------------------------------
158  {
159  Disconnect( true );
160 
161  Log *log = DefaultEnv::GetLog();
162  log->Debug( PostMasterMsg, "[%s] Destroying stream",
163  pStreamName.c_str() );
164 
165  MonitorDisconnection( XRootDStatus() );
166 
167  SubStreamList::iterator it;
168  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
169  delete *it;
170  }
171 
172  //----------------------------------------------------------------------------
173  // Initializer
174  //----------------------------------------------------------------------------
176  {
177  if( !pTransport || !pPoller || !pChannelData )
179 
180  AsyncSocketHandler *s = new AsyncSocketHandler( *pUrl, pPoller, pTransport,
181  pChannelData, 0, this );
182  pSubStreams.push_back( new SubStreamData() );
183  pSubStreams[0]->socket = s;
184  return XRootDStatus();
185  }
186 
187  //------------------------------------------------------------------------
188  // Make sure that the underlying socket handler gets write readiness
189  // events
190  //------------------------------------------------------------------------
192  {
193  XrdSysMutexHelper scopedLock( pMutex );
194 
195  //--------------------------------------------------------------------------
196  // We are in the process of connecting the main stream, so we do nothing
197  // because when the main stream connection is established it will connect
198  // all the other streams
199  //--------------------------------------------------------------------------
200  if( pSubStreams[0]->status == Socket::Connecting )
201  return XRootDStatus();
202 
203  //--------------------------------------------------------------------------
204  // The main stream is connected, so we can verify whether we have
205  // the up and the down stream connected and ready to handle data.
206  // If anything is not right we fall back to stream 0.
207  //--------------------------------------------------------------------------
208  if( pSubStreams[0]->status == Socket::Connected )
209  {
210  if( pSubStreams[path.down]->status != Socket::Connected )
211  path.down = 0;
212 
213  if( pSubStreams[path.up]->status == Socket::Disconnected )
214  {
215  path.up = 0;
216  return pSubStreams[0]->socket->EnableUplink();
217  }
218 
219  if( pSubStreams[path.up]->status == Socket::Connected )
220  return pSubStreams[path.up]->socket->EnableUplink();
221 
222  return XRootDStatus();
223  }
224 
225  //--------------------------------------------------------------------------
226  // The main stream is not connected, we need to check whether enough time
227  // has passed since we last encountered an error (if any) so that we could
228  // re-attempt the connection
229  //--------------------------------------------------------------------------
230  Log *log = DefaultEnv::GetLog();
231  time_t now = ::time(0);
232 
233  if( now-pLastStreamError < pStreamErrorWindow )
234  return pLastFatalError;
235 
236  gettimeofday( &pConnectionStarted, 0 );
237  ++pConnectionCount;
238 
239  //--------------------------------------------------------------------------
240  // Resolve all the addresses of the host we're supposed to connect to
241  //--------------------------------------------------------------------------
242  XRootDStatus st = Utils::GetHostAddresses( pAddresses, *pUrl, pAddressType );
243  if( !st.IsOK() )
244  {
245  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for "
246  "the host", pStreamName.c_str() );
247  pLastStreamError = now;
248  st.status = stFatal;
249  pLastFatalError = st;
250  return st;
251  }
252 
253  if( pPrefer.IsValid() )
254  {
255  std::vector<XrdNetAddr> addrresses;
256  XRootDStatus st = Utils::GetHostAddresses( addrresses, pPrefer, pAddressType );
257  if( !st.IsOK() )
258  {
259  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s",
260  pStreamName.c_str(), pPrefer.GetHostName().c_str() );
261  }
262  else
263  {
264  std::vector<XrdNetAddr> tmp;
265  tmp.reserve( pAddresses.size() );
266  // first add all remaining addresses
267  auto itr = pAddresses.begin();
268  for( ; itr != pAddresses.end() ; ++itr )
269  {
270  if( !HasNetAddr( *itr, addrresses ) )
271  tmp.push_back( *itr );
272  }
273  // then copy all 'preferred' addresses
274  std::copy( addrresses.begin(), addrresses.end(), std::back_inserter( tmp ) );
275  // and keep the result
276  pAddresses.swap( tmp );
277  }
278  }
279 
280  Utils::LogHostAddresses( log, PostMasterMsg, pUrl->GetHostId(),
281  pAddresses );
282 
283  while( !pAddresses.empty() )
284  {
285  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
286  pAddresses.pop_back();
287  pConnectionInitTime = ::time( 0 );
288  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
289  if( st.IsOK() )
290  {
291  pSubStreams[0]->status = Socket::Connecting;
292  break;
293  }
294  }
295  return st;
296  }
297 
298  //----------------------------------------------------------------------------
299  // Queue the message for sending
300  //----------------------------------------------------------------------------
302  MsgHandler *handler,
303  bool stateful,
304  time_t expires )
305  {
306  XrdSysMutexHelper scopedLock( pMutex );
307  Log *log = DefaultEnv::GetLog();
308 
309  //--------------------------------------------------------------------------
310  // Check the session ID and bounce if needed
311  //--------------------------------------------------------------------------
312  if( msg->GetSessionId() &&
313  (pSubStreams[0]->status != Socket::Connected ||
314  pSessionId != msg->GetSessionId()) )
316 
317  //--------------------------------------------------------------------------
318  // Decide on the path to send the message
319  //--------------------------------------------------------------------------
320  PathID path = pTransport->MultiplexSubStream( msg, *pChannelData );
321  if( pSubStreams.size() <= path.up )
322  {
323  log->Warning( PostMasterMsg, "[%s] Unable to send message %s through "
324  "substream %d, using 0 instead", pStreamName.c_str(),
325  msg->GetObfuscatedDescription().c_str(), path.up );
326  path.up = 0;
327  }
328 
329  log->Dump( PostMasterMsg, "[%s] Sending message %s (%p) through "
330  "substream %d expecting answer at %d", pStreamName.c_str(),
331  msg->GetObfuscatedDescription().c_str(), (void*)msg, path.up, path.down );
332 
333  //--------------------------------------------------------------------------
334  // Enable *a* path and insert the message to the right queue
335  //--------------------------------------------------------------------------
336  XRootDStatus st = EnableLink( path );
337  if( st.IsOK() )
338  {
339  pTransport->MultiplexSubStream( msg, *pChannelData, &path );
340  pSubStreams[path.up]->outQueue->PushBack( msg, handler,
341  expires, stateful );
342  }
343  else
344  st.status = stFatal;
345  return st;
346  }
347 
348  //----------------------------------------------------------------------------
349  // Force connection
350  //----------------------------------------------------------------------------
352  {
353  XrdSysMutexHelper scopedLock( pMutex );
354  if( pSubStreams[0]->status == Socket::Connecting )
355  {
356  pSubStreams[0]->status = Socket::Disconnected;
357  XrdCl::PathID path( 0, 0 );
358  XrdCl::XRootDStatus st = EnableLink( path );
359  if( !st.IsOK() )
360  OnConnectError( 0, st );
361  }
362  }
363 
364  //----------------------------------------------------------------------------
365  // Disconnect the stream
366  //----------------------------------------------------------------------------
367  void Stream::Disconnect( bool /*force*/ )
368  {
369  //--------------------------------------------------------------------------
370  // See comment about deadlocks in ForceError() method. We don't expect
371  // to be called from a callback thread.
372  //--------------------------------------------------------------------------
373  XrdSysCondVarHelper discLock( pDiscCV );
374  while ( pDiscAllCnt )
375  {
376  pDiscCV.Wait();
377  }
378  ++pDiscAllCnt;
379  discLock.UnLock();
380 
381  XrdSysMutexHelper scopedLock( pMutex );
382  SubStreamList::iterator it;
383  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
384  {
385  (*it)->socket->Close();
386  (*it)->status = Socket::Disconnected;
387  }
388  pSubsWaitingClose = 0;
389 
390  scopedLock.UnLock();
391  discLock.Lock( &pDiscCV );
392  --pDiscAllCnt;
393  pDiscCV.Signal();
394  }
395 
396  //----------------------------------------------------------------------------
397  // Handle a clock event
398  //----------------------------------------------------------------------------
399  void Stream::Tick( time_t now )
400  {
401  //--------------------------------------------------------------------------
402  // Check for timed-out requests and incoming handlers
403  //--------------------------------------------------------------------------
404  pMutex.Lock();
405  OutQueue q;
406  SubStreamList::iterator it;
407  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
408  q.GrabExpired( *(*it)->outQueue, now );
409  pMutex.UnLock();
410 
412  pIncomingQueue->ReportTimeout( now );
413  }
414 }
415 
416 //------------------------------------------------------------------------------
417 // Handle message timeouts and reconnection in the future
418 //------------------------------------------------------------------------------
419 namespace
420 {
421  class StreamConnectorTask: public XrdCl::Task
422  {
423  public:
424  //------------------------------------------------------------------------
425  // Constructor
426  //------------------------------------------------------------------------
427  StreamConnectorTask( const XrdCl::URL &url, const std::string &n ):
428  url( url )
429  {
430  std::string name = "StreamConnectorTask for ";
431  name += n;
432  SetName( name );
433  }
434 
435  //------------------------------------------------------------------------
436  // Run the task
437  //------------------------------------------------------------------------
438  time_t Run( time_t )
439  {
441  return 0;
442  }
443 
444  private:
445  XrdCl::URL url;
446  };
447 }
448 
449 namespace XrdCl
450 {
451  XRootDStatus Stream::RequestClose( Message &response )
452  {
453  ServerResponse *rsp = reinterpret_cast<ServerResponse*>( response.GetBuffer() );
454  if( rsp->hdr.dlen < 4 ) return XRootDStatus( stError );
455  Message *msg;
456  ClientCloseRequest *req;
457  MessageUtils::CreateRequest( msg, req );
458  req->requestid = kXR_close;
459  memcpy( req->fhandle, reinterpret_cast<uint8_t*>( rsp->body.buffer.data ), 4 );
461  msg->SetSessionId( pSessionId );
462  NullResponseHandler *handler = new NullResponseHandler();
463  MessageSendParams params;
464  params.timeout = 0;
465  params.followRedirects = false;
466  params.stateful = true;
468  return MessageUtils::SendMessage( *pUrl, msg, handler, params, 0 );
469  }
470 
471  //------------------------------------------------------------------------
472  // Check if message is a partial response
473  //------------------------------------------------------------------------
474  bool Stream::IsPartial( Message &msg )
475  {
476  ServerResponseHeader *rsphdr = (ServerResponseHeader*)msg.GetBuffer();
477  if( rsphdr->status == kXR_oksofar )
478  return true;
479 
480  if( rsphdr->status == kXR_status )
481  {
482  ServerResponseStatus *rspst = (ServerResponseStatus*)msg.GetBuffer();
484  return true;
485  }
486 
487  return false;
488  }
489 
490  //----------------------------------------------------------------------------
491  // Call back when a message has been reconstructed
492  //----------------------------------------------------------------------------
493  void Stream::OnIncoming( uint16_t subStream,
494  std::shared_ptr<Message> msg,
495  uint32_t bytesReceived )
496  {
497  msg->SetSessionId( pSessionId );
498  pBytesReceived += bytesReceived;
499 
500  MsgHandler *handler = nullptr;
501  uint16_t action = 0;
502  {
503  InMessageHelper &mh = pSubStreams[subStream]->inMsgHelper;
504  handler = mh.handler;
505  action = mh.action;
506  mh.Reset();
507  }
508 
509  if( !IsPartial( *msg ) )
510  {
511  uint32_t streamAction = pTransport->MessageReceived( *msg, subStream,
512  *pChannelData );
513  if( streamAction & TransportHandler::DigestMsg )
514  return;
515 
516  if( streamAction & TransportHandler::RequestClose )
517  {
518  RequestClose( *msg );
519  return;
520  }
521  }
522 
523  Log *log = DefaultEnv::GetLog();
524 
525  //--------------------------------------------------------------------------
526  // No handler, we discard the message ...
527  //--------------------------------------------------------------------------
528  if( !handler )
529  {
530  ServerResponse *rsp = (ServerResponse*)msg->GetBuffer();
531  log->Warning( PostMasterMsg, "[%s] Discarding received message: %p "
532  "(status=%d, SID=[%d,%d]), no MsgHandler found.",
533  pStreamName.c_str(), (void*)msg.get(), rsp->hdr.status,
534  rsp->hdr.streamid[0], rsp->hdr.streamid[1] );
535  return;
536  }
537 
538  //--------------------------------------------------------------------------
539  // We have a handler, so we call the callback
540  //--------------------------------------------------------------------------
541  log->Dump( PostMasterMsg, "[%s] Handling received message: %p.",
542  pStreamName.c_str(), (void*)msg.get() );
543 
545  {
546  log->Dump( PostMasterMsg, "[%s] Ignoring the processing handler for: %s.",
547  pStreamName.c_str(), msg->GetObfuscatedDescription().c_str() );
548 
549  // if we are handling partial response we have to take down the timeout fence
550  if( IsPartial( *msg ) )
551  {
552  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( handler );
553  if( xrdHandler ) xrdHandler->PartialReceived();
554  }
555 
556  return;
557  }
558 
559  Job *job = new HandleIncMsgJob( handler );
560  pJobManager->QueueJob( job );
561  }
562 
563  //----------------------------------------------------------------------------
564  // Call when one of the sockets is ready to accept a new message
565  //----------------------------------------------------------------------------
566  std::pair<Message *, MsgHandler *>
567  Stream::OnReadyToWrite( uint16_t subStream )
568  {
569  XrdSysMutexHelper scopedLock( pMutex );
570  Log *log = DefaultEnv::GetLog();
571  if( pSubStreams[subStream]->outQueue->IsEmpty() )
572  {
573  log->Dump( PostMasterMsg, "[%s] Nothing to write, disable uplink",
574  pSubStreams[subStream]->socket->GetStreamName().c_str() );
575 
576  pSubStreams[subStream]->socket->DisableUplink();
577  return std::make_pair( (Message *)0, (MsgHandler *)0 );
578  }
579 
580  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
581  h.msg = pSubStreams[subStream]->outQueue->PopMessage( h.handler,
582  h.expires,
583  h.stateful );
584 
585  log->Debug( PostMasterMsg, "[%s] Duplicating MsgHandler: %p (message: %s) "
586  "from out-queue to in-queue, starting to send outgoing.",
587  pUrl->GetHostId().c_str(), (void*)h.handler,
588  h.msg->GetObfuscatedDescription().c_str() );
589 
590  scopedLock.UnLock();
591 
592  if( h.handler )
593  {
594  bool rmMsg = false;
595  pIncomingQueue->AddMessageHandler( h.handler, rmMsg );
596  if( rmMsg )
597  {
598  Log *log = DefaultEnv::GetLog();
599  log->Warning( PostMasterMsg, "[%s] Removed a leftover msg from the in-queue.",
600  pStreamName.c_str() );
601  }
602  h.handler->OnReadyToSend( h.msg );
603  }
604  return std::make_pair( h.msg, h.handler );
605  }
606 
607  void Stream::DisableIfEmpty( uint16_t subStream )
608  {
609  XrdSysMutexHelper scopedLock( pMutex );
610  Log *log = DefaultEnv::GetLog();
611 
612  if( pSubStreams[subStream]->outQueue->IsEmpty() )
613  {
614  log->Dump( PostMasterMsg, "[%s] All messages consumed, disable uplink",
615  pSubStreams[subStream]->socket->GetStreamName().c_str() );
616  pSubStreams[subStream]->socket->DisableUplink();
617  }
618  }
619 
620  //----------------------------------------------------------------------------
621  // Call when a message is written to the socket
622  //----------------------------------------------------------------------------
623  void Stream::OnMessageSent( uint16_t subStream,
624  Message *msg,
625  uint32_t bytesSent )
626  {
627  pTransport->MessageSent( msg, subStream, bytesSent,
628  *pChannelData );
629  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
630  pBytesSent += bytesSent;
631  if( h.handler )
632  {
633  // ensure expiration time is assigned if still in queue
634  pIncomingQueue->AssignTimeout( h.handler );
635  // OnStatusReady may cause the handler to delete itself, in
636  // which case the handler or the user callback may also delete msg
637  h.handler->OnStatusReady( msg, XRootDStatus() );
638  }
639  pSubStreams[subStream]->outMsgHelper.Reset();
640  }
641 
642  //----------------------------------------------------------------------------
643  // Call back when a message has been reconstructed
644  //----------------------------------------------------------------------------
645  void Stream::OnConnect( uint16_t subStream )
646  {
647  XrdSysMutexHelper scopedLock( pMutex );
648  if( subStream == 0 )
649  {
650  int nsubconn = 0;
651  if( pSubStreams.size() > 1 )
652  {
653  for( size_t i = 1; i < pSubStreams.size(); ++i )
654  if( pSubStreams[i]->status != Socket::Disconnected ) nsubconn++;
655  }
656  if( nsubconn )
657  {
658  pSubsWaitingClose = nsubconn;
659  pSubStreams[0]->socket->DisableUplink();
660  return;
661  }
662  else
663  pSubStreams[0]->socket->EnableUplink();
664  }
665  else
666  {
667  if( pSubsWaitingClose > 0 )
668  {
669  pSubStreams[subStream]->socket->Close();
670  pSubStreams[subStream]->status = Socket::Disconnected;
671  if( --pSubsWaitingClose == 0 )
672  {
673  scopedLock.UnLock();
674  OnConnect( 0 );
675  }
676  return;
677  }
678  }
679 
680  pSubStreams[subStream]->status = Socket::Connected;
681 
682  std::string ipstack( pSubStreams[0]->socket->GetIpStack() );
683  Log *log = DefaultEnv::GetLog();
684  log->Debug( PostMasterMsg, "[%s] Stream %d connected (%s).", pStreamName.c_str(),
685  subStream, ipstack.c_str() );
686 
687  if( subStream == 0 )
688  {
689  pLastStreamError = 0;
690  pLastFatalError = XRootDStatus();
691  pConnectionCount = 0;
692  uint16_t numSub = pTransport->SubStreamNumber( *pChannelData );
693  pSessionId = ++sSessCntGen;
694 
695  //------------------------------------------------------------------------
696  // Create the streams if they don't exist yet
697  //------------------------------------------------------------------------
698  if( pSubStreams.size() == 1 && numSub > 1 )
699  {
700  for( uint16_t i = 1; i < numSub; ++i )
701  {
702  URL url = pTransport->GetBindPreference( *pUrl, *pChannelData );
703  AsyncSocketHandler *s = new AsyncSocketHandler( url, pPoller, pTransport,
704  pChannelData, i, this );
705  pSubStreams.push_back( new SubStreamData() );
706  pSubStreams[i]->socket = s;
707  }
708  }
709 
710  //------------------------------------------------------------------------
711  // Connect the extra streams, if we fail we move all the outgoing items
712  // to stream 0, we don't need to enable the uplink here, because it
713  // should be already enabled after the handshaking process is completed.
714  //------------------------------------------------------------------------
715  if( pSubStreams.size() > 1 )
716  {
717  log->Debug( PostMasterMsg, "[%s] Attempting to connect %zu additional streams.",
718  pStreamName.c_str(), pSubStreams.size() - 1 );
719  for( size_t i = 1; i < pSubStreams.size(); ++i )
720  {
721  pSubStreams[i]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
722  XRootDStatus st = pSubStreams[i]->socket->Connect( pConnectionWindow );
723  if( !st.IsOK() )
724  {
725  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[i]->outQueue );
726  // mark as disconnected. We don't try to actively Close here as
727  // we're in a poller callback thread and the i'th substream here
728  // may be hadled by a different poller callback thread, raising
729  // the possibility of deadlock.
730  pSubStreams[i]->status = Socket::Disconnected;
731  }
732  else
733  {
734  pSubStreams[i]->status = Socket::Connecting;
735  }
736  }
737  }
738 
739  //------------------------------------------------------------------------
740  // Inform monitoring
741  //------------------------------------------------------------------------
742  pBytesSent = 0;
743  pBytesReceived = 0;
744  gettimeofday( &pConnectionDone, 0 );
746  if( mon )
747  {
749  i.server = pUrl->GetHostId();
750  i.sTOD = pConnectionStarted;
751  i.eTOD = pConnectionDone;
752  i.streams = pSubStreams.size();
753 
754  AnyObject qryResult;
755  std::string *qryResponse = nullptr;
756  pTransport->Query( TransportQuery::Auth, qryResult, *pChannelData );
757  qryResult.Get( qryResponse );
758 
759  if (qryResponse) {
760  i.auth = *qryResponse;
761  delete qryResponse;
762  } else {
763  i.auth = "";
764  }
765 
766  mon->Event( Monitor::EvConnect, &i );
767  }
768 
769  //------------------------------------------------------------------------
770  // For every connected control-stream call the global on-connect handler
771  //------------------------------------------------------------------------
773  }
774  else if( pOnDataConnJob )
775  {
776  //------------------------------------------------------------------------
777  // For every connected data-stream call the on-connect handler
778  //------------------------------------------------------------------------
779  pJobManager->QueueJob( pOnDataConnJob.get(), 0 );
780  }
781  }
782 
783  //----------------------------------------------------------------------------
784  // On connect error
785  //----------------------------------------------------------------------------
786  void Stream::OnConnectError( uint16_t subStream, XRootDStatus status )
787  {
788  XrdSysMutexHelper scopedLock( pMutex );
789  Log *log = DefaultEnv::GetLog();
790  pSubStreams[subStream]->socket->Close();
791  time_t now = ::time(0);
792 
793  //--------------------------------------------------------------------------
794  // For every connection error call the global connection error handler
795  //--------------------------------------------------------------------------
797 
798  //--------------------------------------------------------------------------
799  // If we connected subStream == 0 and cannot connect >0 then we just give
800  // up and move the outgoing messages to another queue
801  //--------------------------------------------------------------------------
802  if( subStream > 0 )
803  {
804  const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
805  pSubStreams[subStream]->status = Socket::Disconnected;
806 
807  if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
808  {
809  if( --pSubsWaitingClose == 0 )
810  {
811  scopedLock.UnLock();
812  OnConnect( 0 );
813  }
814  return;
815  }
816 
817  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
818  if( pSubStreams[0]->status == Socket::Connected )
819  {
820  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
821  if( !st.IsOK() )
822  OnFatalError( 0, st, scopedLock );
823  return;
824  }
825 
826  if( pSubStreams[0]->status == Socket::Connecting )
827  return;
828 
829  OnFatalError( subStream, status, scopedLock );
830  return;
831  }
832 
833  //--------------------------------------------------------------------------
834  // Check if we still have time to try and do something in the current window
835  //--------------------------------------------------------------------------
836  time_t elapsed = now-pConnectionInitTime;
837  log->Error( PostMasterMsg, "[%s] elapsed = %lld, pConnectionWindow = %d seconds.",
838  pStreamName.c_str(), (long long) elapsed, pConnectionWindow );
839 
840  //------------------------------------------------------------------------
841  // If we have some IP addresses left we try them
842  //------------------------------------------------------------------------
843  if( !pAddresses.empty() )
844  {
845  XRootDStatus st;
846  do
847  {
848  pSubStreams[0]->socket->SetAddress( pAddresses.back() );
849  pAddresses.pop_back();
850  pConnectionInitTime = ::time( 0 );
851  st = pSubStreams[0]->socket->Connect( pConnectionWindow );
852  }
853  while( !pAddresses.empty() && !st.IsOK() );
854 
855  if( !st.IsOK() )
856  OnFatalError( subStream, st, scopedLock );
857 
858  return;
859  }
860  //------------------------------------------------------------------------
861  // If we still can retry with the same host name, we sleep until the end
862  // of the connection window and try
863  //------------------------------------------------------------------------
864  else if( elapsed < pConnectionWindow && pConnectionCount < pConnectionRetry
865  && !status.IsFatal() )
866  {
867  log->Info( PostMasterMsg, "[%s] Attempting reconnection in %lld seconds.",
868  pStreamName.c_str(), (long long) (pConnectionWindow - elapsed) );
869 
870  Task *task = new ::StreamConnectorTask( *pUrl, pStreamName );
871  pTaskManager->RegisterTask( task, pConnectionInitTime+pConnectionWindow );
872  return;
873  }
874  //--------------------------------------------------------------------------
875  // We are out of the connection window, the only thing we can do here
876  // is re-resolving the host name and retrying if we still can
877  //--------------------------------------------------------------------------
878  else if( pConnectionCount < pConnectionRetry && !status.IsFatal() )
879  {
880  pAddresses.clear();
881  pSubStreams[0]->status = Socket::Disconnected;
882  PathID path( 0, 0 );
883  XRootDStatus st = EnableLink( path );
884  if( !st.IsOK() )
885  OnFatalError( subStream, st, scopedLock );
886  return;
887  }
888 
889  //--------------------------------------------------------------------------
890  // Else, we fail
891  //--------------------------------------------------------------------------
892  OnFatalError( subStream, status, scopedLock );
893  }
894 
895  //----------------------------------------------------------------------------
896  // Call back when an error has occurred
897  //----------------------------------------------------------------------------
898  void Stream::OnError( uint16_t subStream, XRootDStatus status )
899  {
900  //--------------------------------------------------------------------------
901  // See comment about deadlocks in ForceError() method. We expect to be
902  // called form a callback thread. However we take care to only potentially
903  // disconnect the socket for our own subStream. We require no ongoing
904  // disconnect of all substreams and ensure that remains true throughout
905  // our execution by releasing discLock only after acquiring pMutex.
906  //--------------------------------------------------------------------------
907 
908  XrdSysCondVarHelper discLock( pDiscCV );
909  if( pDiscAllCnt ) return;
910 
911  XrdSysMutexHelper scopedLock( pMutex );
912  discLock.UnLock();
913  Log *log = DefaultEnv::GetLog();
914  const Socket::SocketStatus oldstate = pSubStreams[subStream]->status;
915  pSubStreams[subStream]->socket->Close();
916  pSubStreams[subStream]->status = Socket::Disconnected;
917 
918  log->Debug( PostMasterMsg, "[%s] Recovering error for stream #%d: %s.",
919  pStreamName.c_str(), subStream, status.ToString().c_str() );
920 
921  //--------------------------------------------------------------------------
922  // Reinsert the stuff that we have failed to sent
923  //--------------------------------------------------------------------------
924  if( pSubStreams[subStream]->outMsgHelper.msg )
925  {
926  OutQueue::MsgHelper &h = pSubStreams[subStream]->outMsgHelper;
927  pSubStreams[subStream]->outQueue->PushFront( h.msg, h.handler, h.expires,
928  h.stateful );
929  pIncomingQueue->RemoveMessageHandler(h.handler);
930  pSubStreams[subStream]->outMsgHelper.Reset();
931  }
932 
933  //--------------------------------------------------------------------------
934  // Reinsert the receiving handler and reset any partially read partial
935  //--------------------------------------------------------------------------
936  if( pSubStreams[subStream]->inMsgHelper.handler )
937  {
938  InMessageHelper &h = pSubStreams[subStream]->inMsgHelper;
939  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
940  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
941  if( xrdHandler ) xrdHandler->PartialReceived();
942  h.Reset();
943  }
944 
945  //--------------------------------------------------------------------------
946  // We are dealing with an error of a peripheral stream. If we don't have
947  // anything to send don't bother recovering. Otherwise move the requests
948  // to stream 0 if possible.
949  //--------------------------------------------------------------------------
950  if( subStream > 0 )
951  {
952  if( pSubsWaitingClose > 0 && oldstate != Socket::Disconnected )
953  {
954  if( --pSubsWaitingClose == 0 )
955  {
956  scopedLock.UnLock();
957  OnConnect( 0 );
958  }
959  return;
960  }
961 
962  if( pSubStreams[0]->status != Socket::Disconnected )
963  {
964  pSubStreams[subStream]->socket->SetAddress( pSubStreams[0]->socket->GetAddress() );
965  XRootDStatus st = pSubStreams[subStream]->socket->Connect( pConnectionWindow );
966  if( !st.IsOK() )
967  {
968  pSubStreams[subStream]->socket->Close();
969  if( pSubStreams[subStream]->outQueue->IsEmpty() )
970  return;
971  pSubStreams[0]->outQueue->GrabItems( *pSubStreams[subStream]->outQueue );
972  if( pSubStreams[0]->status == Socket::Connected )
973  {
974  XRootDStatus st = pSubStreams[0]->socket->EnableUplink();
975  if( !st.IsOK() )
976  OnFatalError( 0, st, scopedLock );
977  return;
978  }
979  if( pSubStreams[0]->status == Socket::Connecting )
980  return;
981  }
982  else
983  {
984  pSubStreams[subStream]->status = Socket::Connecting;
985  return;
986  }
987  OnFatalError( subStream, status, scopedLock );
988  return;
989  }
990  if( pSubStreams[subStream]->outQueue->IsEmpty() )
991  return;
992  OnFatalError( subStream, status, scopedLock );
993  return;
994  }
995 
996  //--------------------------------------------------------------------------
997  // If we lost the stream 0 we have lost the session, we re-enable the
998  // stream if we still have things in one of the outgoing queues, otherwise
999  // there is not point to recover at this point.
1000  //--------------------------------------------------------------------------
1001  if( subStream == 0 )
1002  {
1003  MonitorDisconnection( status );
1004 
1005  SubStreamList::iterator it;
1006  size_t outstanding = 0;
1007  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1008  outstanding += (*it)->outQueue->GetSizeStateless();
1009 
1010  if( outstanding )
1011  {
1012  PathID path( 0, 0 );
1013  XRootDStatus st = EnableLink( path );
1014  if( !st.IsOK() )
1015  {
1016  OnFatalError( 0, st, scopedLock );
1017  return;
1018  }
1019  }
1020 
1021  //------------------------------------------------------------------------
1022  // We're done here, unlock the stream mutex to avoid deadlocks and
1023  // report the disconnection event to the handlers
1024  //------------------------------------------------------------------------
1025  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1026  "message handlers.", pStreamName.c_str() );
1027  OutQueue q;
1028  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1029  q.GrabStateful( *(*it)->outQueue );
1030  scopedLock.UnLock();
1031 
1032  q.Report( status );
1033  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1034  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1035  return;
1036  }
1037  }
1038 
1039  //------------------------------------------------------------------------
1040  // Force error
1041  //------------------------------------------------------------------------
1042  void Stream::ForceError( XRootDStatus status, bool hush )
1043  {
1044  //----------------------------------------------------------------------
1045  // We can be called in two ways: first is by by a non-poller thread,
1046  // with errOperationInterrupted as error as part of ForceDisconnect. In
1047  // which case the Stream will be destoryed shortly after we return. The
1048  // second way is call by a poller thread with another type of error.
1049  // Further, when we call socket handler Close() for a socket handled a
1050  // callback running we wait for that to complete (unless it is
1051  // ourselves). This raises the possibility of a deadlock. We avoid this
1052  // by returning quickly if we detect we're in a callback thread and
1053  // there's already a disconnect affecting multiple streams in progress.
1054  //----------------------------------------------------------------------
1055  XrdSysCondVarHelper discLock( pDiscCV );
1056  if( pDiscAllCnt &&
1057  !( status.IsError() && status.code == errOperationInterrupted ) )
1058  {
1059  return;
1060  }
1061  while( pDiscAllCnt )
1062  {
1063  pDiscCV.Wait();
1064  }
1065  ++pDiscAllCnt;
1066  discLock.UnLock();
1067 
1068  XrdSysMutexHelper scopedLock( pMutex );
1069  Log *log = DefaultEnv::GetLog();
1070  for( size_t substream = 0; substream < pSubStreams.size(); ++substream )
1071  {
1072  if( pSubStreams[substream]->status != Socket::Connected ) continue;
1073  pSubStreams[substream]->socket->Close();
1074  pSubStreams[substream]->status = Socket::Disconnected;
1075 
1076  if( !hush )
1077  log->Debug( PostMasterMsg, "[%s] Forcing error on disconnect: %s.",
1078  pStreamName.c_str(), status.ToString().c_str() );
1079 
1080  //--------------------------------------------------------------------
1081  // Reinsert the stuff that we have failed to sent
1082  //--------------------------------------------------------------------
1083  if( pSubStreams[substream]->outMsgHelper.msg )
1084  {
1085  OutQueue::MsgHelper &h = pSubStreams[substream]->outMsgHelper;
1086  pSubStreams[substream]->outQueue->PushFront( h.msg, h.handler, h.expires,
1087  h.stateful );
1088  pIncomingQueue->RemoveMessageHandler(h.handler);
1089  pSubStreams[substream]->outMsgHelper.Reset();
1090  }
1091 
1092  //--------------------------------------------------------------------
1093  // Reinsert the receiving handler and reset any partially read partial
1094  //--------------------------------------------------------------------
1095  if( pSubStreams[substream]->inMsgHelper.handler )
1096  {
1097  InMessageHelper &h = pSubStreams[substream]->inMsgHelper;
1098  pIncomingQueue->ReAddMessageHandler( h.handler, h.expires );
1099  XRootDMsgHandler *xrdHandler = dynamic_cast<XRootDMsgHandler*>( h.handler );
1100  if( xrdHandler ) xrdHandler->PartialReceived();
1101  h.Reset();
1102  }
1103  }
1104 
1105  pConnectionCount = 0;
1106  pSubsWaitingClose = 0;
1107 
1108  //------------------------------------------------------------------------
1109  // We're done here, unlock the stream mutex to avoid deadlocks and
1110  // report the disconnection event to the handlers
1111  //------------------------------------------------------------------------
1112  log->Debug( PostMasterMsg, "[%s] Reporting disconnection to queued "
1113  "message handlers.", pStreamName.c_str() );
1114 
1115  SubStreamList::iterator it;
1116  OutQueue q;
1117  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1118  q.GrabItems( *(*it)->outQueue );
1119  scopedLock.UnLock();
1120 
1121  q.Report( status );
1122 
1123  pIncomingQueue->ReportStreamEvent( MsgHandler::Broken, status );
1124  pChannelEvHandlers.ReportEvent( ChannelEventHandler::StreamBroken, status );
1125 
1126  discLock.Lock( &pDiscCV );
1127  --pDiscAllCnt;
1128  pDiscCV.Signal();
1129  }
1130 
1131  //----------------------------------------------------------------------------
1132  // On fatal error
1133  //----------------------------------------------------------------------------
1134  void Stream::OnFatalError( uint16_t subStream,
1135  XRootDStatus status,
1136  XrdSysMutexHelper &lock )
1137  {
1138  Log *log = DefaultEnv::GetLog();
1139  pSubStreams[subStream]->status = Socket::Disconnected;
1140  log->Error( PostMasterMsg, "[%s] Unable to recover: %s.",
1141  pStreamName.c_str(), status.ToString().c_str() );
1142 
1143  //--------------------------------------------------------------------------
1144  // Don't set the stream error windows for authentication errors as the user
1145  // may refresh his credential at any time
1146  //--------------------------------------------------------------------------
1147  if( status.code != errAuthFailed )
1148  {
1149  pConnectionCount = 0;
1150  pLastStreamError = ::time(0);
1151  pLastFatalError = status;
1152  }
1153 
1154  SubStreamList::iterator it;
1155  OutQueue q;
1156  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1157  q.GrabItems( *(*it)->outQueue );
1158  lock.UnLock();
1159 
1160  status.status = stFatal;
1161  q.Report( status );
1162  pIncomingQueue->ReportStreamEvent( MsgHandler::FatalError, status );
1163  pChannelEvHandlers.ReportEvent( ChannelEventHandler::FatalError, status );
1164 
1165  }
1166 
1167  //----------------------------------------------------------------------------
1168  // Inform monitoring about disconnection
1169  //----------------------------------------------------------------------------
1170  void Stream::MonitorDisconnection( XRootDStatus status )
1171  {
1172  Monitor *mon = DefaultEnv::GetMonitor();
1173  if( mon )
1174  {
1175  Monitor::DisconnectInfo i;
1176  i.server = pUrl->GetHostId();
1177  i.rBytes = pBytesReceived;
1178  i.sBytes = pBytesSent;
1179  i.cTime = ::time(0) - pConnectionDone.tv_sec;
1180  i.status = status;
1181  mon->Event( Monitor::EvDisconnect, &i );
1182  }
1183  }
1184 
1185  //----------------------------------------------------------------------------
1186  // Call back when a message has been reconstructed
1187  //----------------------------------------------------------------------------
1188  bool Stream::OnReadTimeout( uint16_t substream )
1189  {
1190  //--------------------------------------------------------------------------
1191  // We only take the main stream into account
1192  //--------------------------------------------------------------------------
1193  if( substream != 0 )
1194  {
1195  if( pSubsWaitingClose )
1196  {
1197  XrdSysMutexHelper scopedLock( pMutex );
1198  if( !pSubsWaitingClose ) return true;
1199  if( pSubStreams[substream]->status == Socket::Disconnected ) return true;
1200  pSubStreams[substream]->socket->Close();
1201  pSubStreams[substream]->status = Socket::Disconnected;
1202  if( --pSubsWaitingClose == 0 )
1203  {
1204  scopedLock.UnLock();
1205  OnConnect( 0 );
1206  }
1207  }
1208  return true;
1209  }
1210 
1211  //--------------------------------------------------------------------------
1212  // Check if there is no outgoing messages and if the stream TTL is elapesed.
1213  // It is assumed that the underlying transport makes sure that there is no
1214  // pending requests that are not answered, ie. all possible virtual streams
1215  // are de-allocated
1216  //--------------------------------------------------------------------------
1217  Log *log = DefaultEnv::GetLog();
1218  SubStreamList::iterator it;
1219  time_t now = time(0);
1220 
1221  XrdSysMutexHelper scopedLock( pMutex );
1222  uint32_t outgoingMessages = 0;
1223  time_t lastActivity = 0;
1224  for( it = pSubStreams.begin(); it != pSubStreams.end(); ++it )
1225  {
1226  outgoingMessages += (*it)->outQueue->GetSize();
1227  time_t sockLastActivity = (*it)->socket->GetLastActivity();
1228  if( lastActivity < sockLastActivity )
1229  lastActivity = sockLastActivity;
1230  }
1231 
1232  if( !outgoingMessages )
1233  {
1234  bool disconnect = pTransport->IsStreamTTLElapsed( now-lastActivity,
1235  *pChannelData );
1236  if( disconnect )
1237  {
1238  log->Debug( PostMasterMsg, "[%s] Stream TTL elapsed, disconnecting...",
1239  pStreamName.c_str() );
1240  //----------------------------------------------------------------------
1241  // Important note!
1242  //
1243  // This job destroys the Stream object itself, the underlying
1244  // AsyncSocketHandler object (that called this method) and the Channel
1245  // object that aggregates this Stream.
1246  //
1247  // Additionally &(*pUrl) is used by ForceDisconnect to check if we are
1248  // in a Channel that was previously collapsed in a redirect.
1249  //----------------------------------------------------------------------
1250  if( !pTTLDiscJob )
1251  {
1252  pTTLDiscJob = new ForceDisconnectJob( pUrl );
1253  pJobManager->QueueJob( pTTLDiscJob );
1254  }
1255  return false;
1256  }
1257  }
1258 
1259  //--------------------------------------------------------------------------
1260  // Check if the stream is broken
1261  //--------------------------------------------------------------------------
1262  XRootDStatus st = pTransport->IsStreamBroken( now-lastActivity,
1263  *pChannelData );
1264  if( !st.IsOK() )
1265  {
1266  scopedLock.UnLock();
1267  OnError( substream, st );
1268  return false;
1269  }
1270  return true;
1271  }
1272 
1273  //----------------------------------------------------------------------------
1274  // Call back when a message has been reconstru
1275  //----------------------------------------------------------------------------
1276  bool Stream::OnWriteTimeout( uint16_t /*substream*/ )
1277  {
1278  return true;
1279  }
1280 
1281  //----------------------------------------------------------------------------
1282  // Register channel event handler
1283  //----------------------------------------------------------------------------
1285  {
1286  pChannelEvHandlers.AddHandler( handler );
1287  }
1288 
1289  //----------------------------------------------------------------------------
1290  // Remove a channel event handler
1291  //----------------------------------------------------------------------------
1293  {
1294  pChannelEvHandlers.RemoveHandler( handler );
1295  }
1296 
1297  //----------------------------------------------------------------------------
1298  // Install a incoming message handler
1299  //----------------------------------------------------------------------------
1300  MsgHandler*
1301  Stream::InstallIncHandler( std::shared_ptr<Message> &msg, uint16_t stream )
1302  {
1303  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1304  if( !mh.handler )
1305  mh.handler = pIncomingQueue->GetHandlerForMessage( msg,
1306  mh.expires,
1307  mh.action );
1308 
1309  if( !mh.handler )
1310  return nullptr;
1311 
1312  if( mh.action & MsgHandler::Raw )
1313  return mh.handler;
1314  return nullptr;
1315  }
1316 
1317  //----------------------------------------------------------------------------
1321  //----------------------------------------------------------------------------
1322  uint16_t Stream::InspectStatusRsp( uint16_t stream,
1323  MsgHandler *&incHandler )
1324  {
1325  InMessageHelper &mh = pSubStreams[stream]->inMsgHelper;
1326  if( !mh.handler )
1328 
1329  uint16_t action = mh.handler->InspectStatusRsp();
1330  mh.action |= action;
1331 
1332  if( action & MsgHandler::RemoveHandler )
1333  pIncomingQueue->RemoveMessageHandler( mh.handler );
1334 
1335  if( action & MsgHandler::Raw )
1336  {
1337  incHandler = mh.handler;
1338  return MsgHandler::Raw;
1339  }
1340 
1341  if( action & MsgHandler::Corrupted )
1342  return MsgHandler::Corrupted;
1343 
1344  if( action & MsgHandler::More )
1345  return MsgHandler::More;
1346 
1347  return MsgHandler::None;
1348  }
1349 
1350  //----------------------------------------------------------------------------
1351  // Check if channel can be collapsed using given URL
1352  //----------------------------------------------------------------------------
1353  bool Stream::CanCollapse( const URL &url )
1354  {
1355  Log *log = DefaultEnv::GetLog();
1356 
1357  //--------------------------------------------------------------------------
1358  // Resolve all the addresses of the host we're supposed to connect to
1359  //--------------------------------------------------------------------------
1360  std::vector<XrdNetAddr> prefaddrs;
1361  XRootDStatus st = Utils::GetHostAddresses( prefaddrs, url, pAddressType );
1362  if( !st.IsOK() )
1363  {
1364  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1365  , pStreamName.c_str(), url.GetHostName().c_str() );
1366  return false;
1367  }
1368 
1369  //--------------------------------------------------------------------------
1370  // Resolve all the addresses of the alias
1371  //--------------------------------------------------------------------------
1372  std::vector<XrdNetAddr> aliasaddrs;
1373  st = Utils::GetHostAddresses( aliasaddrs, *pUrl, pAddressType );
1374  if( !st.IsOK() )
1375  {
1376  log->Error( PostMasterMsg, "[%s] Unable to resolve IP address for %s."
1377  , pStreamName.c_str(), pUrl->GetHostName().c_str() );
1378  return false;
1379  }
1380 
1381  //--------------------------------------------------------------------------
1382  // Now check if the preferred host is part of the alias
1383  //--------------------------------------------------------------------------
1384  auto itr = prefaddrs.begin();
1385  for( ; itr != prefaddrs.end() ; ++itr )
1386  {
1387  auto itr2 = aliasaddrs.begin();
1388  for( ; itr2 != aliasaddrs.end() ; ++itr2 )
1389  if( itr->Same( &*itr2 ) ) return true;
1390  }
1391 
1392  return false;
1393  }
1394 
1395  //------------------------------------------------------------------------
1396  // Query the stream
1397  //------------------------------------------------------------------------
1398  Status Stream::Query( uint16_t query, AnyObject &result )
1399  {
1400  switch( query )
1401  {
1402  case StreamQuery::IpAddr:
1403  {
1404  result.Set( new std::string( pSubStreams[0]->socket->GetIpAddr() ), false );
1405  return Status();
1406  }
1407 
1408  case StreamQuery::IpStack:
1409  {
1410  result.Set( new std::string( pSubStreams[0]->socket->GetIpStack() ), false );
1411  return Status();
1412  }
1413 
1414  case StreamQuery::HostName:
1415  {
1416  result.Set( new std::string( pSubStreams[0]->socket->GetHostName() ), false );
1417  return Status();
1418  }
1419 
1420  default:
1421  return Status( stError, errQueryNotSupported );
1422  }
1423  }
1424 
1425 }
union ServerResponse::@0 body
kXR_char streamid[2]
Definition: XProtocol.hh:914
kXR_unt16 requestid
Definition: XProtocol.hh:228
@ kXR_oksofar
Definition: XProtocol.hh:900
@ kXR_status
Definition: XProtocol.hh:907
struct ServerResponseBody_Status bdy
Definition: XProtocol.hh:1262
kXR_char fhandle[4]
Definition: XProtocol.hh:229
@ kXR_close
Definition: XProtocol.hh:115
ServerResponseHeader hdr
Definition: XProtocol.hh:1288
void Set(Type object, bool own=true)
void Get(Type &object)
Retrieve the object being held.
@ FatalError
Stream has been broken and won't be recovered.
void RemoveHandler(ChannelEventHandler *handler)
Remove the channel event handler.
void AddHandler(ChannelEventHandler *handler)
Add a channel event handler.
void ReportEvent(ChannelEventHandler::ChannelEvent event, Status status)
Report an event to the channel event handlers.
static Monitor * GetMonitor()
Get the monitor object.
static Log * GetLog()
Get default log.
static PostMaster * GetPostMaster()
Get default post master.
void ReportTimeout(time_t now=0)
Timeout handlers.
void RemoveMessageHandler(MsgHandler *handler)
Remove a listener.
void ReAddMessageHandler(MsgHandler *handler, time_t expires)
Re-insert the handler without scanning the cached messages.
void ReportStreamEvent(MsgHandler::StreamEvent event, XRootDStatus status)
Report an event to the handlers.
MsgHandler * GetHandlerForMessage(std::shared_ptr< Message > &msg, time_t &expires, uint16_t &action)
Definition: XrdClInQueue.cc:66
void AssignTimeout(MsgHandler *handler)
void AddMessageHandler(MsgHandler *handler, bool &rmMsg)
Definition: XrdClInQueue.cc:54
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
Interface for a job to be run by the job manager.
Handle diagnostics.
Definition: XrdClLog.hh:101
void Error(uint64_t topic, const char *format,...)
Report an error.
Definition: XrdClLog.cc:231
void Warning(uint64_t topic, const char *format,...)
Report a warning.
Definition: XrdClLog.cc:248
void Dump(uint64_t topic, const char *format,...)
Print a dump message.
Definition: XrdClLog.cc:299
void Info(uint64_t topic, const char *format,...)
Print an info.
Definition: XrdClLog.cc:265
void Debug(uint64_t topic, const char *format,...)
Print a debug message.
Definition: XrdClLog.cc:282
static void ProcessSendParams(MessageSendParams &sendParams)
Process sending params.
static XRootDStatus SendMessage(const URL &url, Message *msg, ResponseHandler *handler, MessageSendParams &sendParams, LocalFileHandler *lFileHandler)
Send message.
static void CreateRequest(Message *&msg, Request *&req, uint32_t payloadSize=0)
Create a message.
The message representation used throughout the system.
Definition: XrdClMessage.hh:32
const std::string & GetObfuscatedDescription() const
Get the description of the message with authz parameter obfuscated.
uint64_t GetSessionId() const
Get the session ID the message is meant for.
An abstract class to describe the client-side monitoring plugin interface.
Definition: XrdClMonitor.hh:56
@ EvDisconnect
DisconnectInfo: Logout from a server.
@ EvConnect
ConnectInfo: Login into a server.
virtual void Event(EventCode evCode, void *evData)=0
@ More
there are more (non-raw) data to be read
@ Ignore
Ignore the message.
virtual void OnReadyToSend(Message *msg)
@ FatalError
Stream has been broken and won't be recovered.
@ Broken
The stream is broken.
virtual uint16_t InspectStatusRsp()=0
virtual void OnStatusReady(const Message *message, XRootDStatus status)=0
The requested action has been performed and the status is available.
A synchronized queue for the outgoing data.
void GrabStateful(OutQueue &queue)
void GrabExpired(OutQueue &queue, time_t exp=0)
void GrabItems(OutQueue &queue)
void Report(XRootDStatus status)
Report status to all the handlers.
Status ForceReconnect(const URL &url)
Reconnect the channel.
void NotifyConnErrHandler(const URL &url, const XRootDStatus &status)
Notify the global error connection handler.
void NotifyConnectHandler(const URL &url)
Notify the global on-connect handler.
A network socket.
Definition: XrdClSocket.hh:43
SocketStatus
Status of the socket.
Definition: XrdClSocket.hh:49
@ Disconnected
The socket is disconnected.
Definition: XrdClSocket.hh:50
@ Connected
The socket is connected.
Definition: XrdClSocket.hh:51
@ Connecting
The connection process is in progress.
Definition: XrdClSocket.hh:52
XRootDStatus Send(Message *msg, MsgHandler *handler, bool stateful, time_t expires)
Queue the message for sending.
Definition: XrdClStream.cc:301
bool OnReadTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On read timeout.
bool CanCollapse(const URL &url)
void ForceConnect()
Force connection.
Definition: XrdClStream.cc:351
Stream(std::shared_ptr< URL > url, const URL &prefer=URL())
Constructor.
Definition: XrdClStream.cc:96
void ForceError(XRootDStatus status, bool hush=false)
Force error.
Status Query(uint16_t query, AnyObject &result)
Query the stream.
void Disconnect(bool force=false)
Disconnect the stream.
Definition: XrdClStream.cc:367
XRootDStatus EnableLink(PathID &path)
Definition: XrdClStream.cc:191
void OnConnect(uint16_t subStream)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:645
void Tick(time_t now)
Definition: XrdClStream.cc:399
void OnConnectError(uint16_t subStream, XRootDStatus status)
On connect error.
Definition: XrdClStream.cc:786
~Stream()
Destructor.
Definition: XrdClStream.cc:157
bool OnWriteTimeout(uint16_t subStream) XRD_WARN_UNUSED_RESULT
On write timeout.
void DisableIfEmpty(uint16_t subStream)
Disables respective uplink if empty.
Definition: XrdClStream.cc:607
void RemoveEventHandler(ChannelEventHandler *handler)
Remove a channel event handler.
void OnMessageSent(uint16_t subStream, Message *msg, uint32_t bytesSent)
Definition: XrdClStream.cc:623
MsgHandler * InstallIncHandler(std::shared_ptr< Message > &msg, uint16_t stream)
void OnIncoming(uint16_t subStream, std::shared_ptr< Message > msg, uint32_t bytesReceived)
Call back when a message has been reconstructed.
Definition: XrdClStream.cc:493
void OnError(uint16_t subStream, XRootDStatus status)
On error.
Definition: XrdClStream.cc:898
uint16_t InspectStatusRsp(uint16_t stream, MsgHandler *&incHandler)
std::pair< Message *, MsgHandler * > OnReadyToWrite(uint16_t subStream)
Definition: XrdClStream.cc:567
XRootDStatus Initialize()
Initializer.
Definition: XrdClStream.cc:175
void RegisterEventHandler(ChannelEventHandler *handler)
Register channel event handler.
void RegisterTask(Task *task, time_t time, bool own=true)
Interface for a task to be run by the TaskManager.
virtual time_t Run(time_t now)=0
void SetName(const std::string &name)
Set name of the task.
@ RequestClose
Send a close request.
virtual uint32_t MessageReceived(Message &msg, uint16_t subStream, AnyObject &channelData)=0
Check if the message invokes a stream action.
virtual uint16_t SubStreamNumber(AnyObject &channelData)=0
Return a number of substreams per stream that should be created.
virtual bool IsStreamTTLElapsed(time_t inactiveTime, AnyObject &channelData)=0
Check if the stream should be disconnected.
virtual Status Query(uint16_t query, AnyObject &result, AnyObject &channelData)=0
Query the channel.
virtual PathID MultiplexSubStream(Message *msg, AnyObject &channelData, PathID *hint=0)=0
virtual URL GetBindPreference(const URL &url, AnyObject &channelData)=0
Get bind preference for the next data stream.
virtual Status IsStreamBroken(time_t inactiveTime, AnyObject &channelData)=0
virtual void MessageSent(Message *msg, uint16_t subStream, uint32_t bytesSent, AnyObject &channelData)=0
Notify the transport about a message having been sent.
URL representation.
Definition: XrdClURL.hh:31
const std::string & GetHostName() const
Get the name of the target host.
Definition: XrdClURL.hh:170
bool IsValid() const
Is the url valid.
Definition: XrdClURL.cc:452
Random utilities.
Definition: XrdClUtils.hh:50
static void LogHostAddresses(Log *log, uint64_t type, const std::string &hostId, std::vector< XrdNetAddr > &addresses)
Log all the addresses on the list.
Definition: XrdClUtils.cc:234
static Status GetHostAddresses(std::vector< XrdNetAddr > &addresses, const URL &url, AddressType type)
Resolve IP addresses.
Definition: XrdClUtils.cc:140
static AddressType String2AddressType(const std::string &addressType)
Interpret a string as address type, default to IPAll.
Definition: XrdClUtils.cc:123
static int GetIntParameter(const URL &url, const std::string &name, int defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:81
static std::string GetStringParameter(const URL &url, const std::string &name, const std::string &defaultVal)
Get a parameter either from the environment or URL.
Definition: XrdClUtils.cc:104
Handle/Process/Forward XRootD messages.
static void SetDescription(Message *msg)
Get the description of a message.
static NetProt NetConfig(NetType netquery=qryINET, const char **eText=0)
Definition: XrdNetUtils.cc:716
void Lock(XrdSysCondVar *CndVar)
const uint16_t errQueryNotSupported
Definition: XrdClStatus.hh:89
const uint16_t errUninitialized
Definition: XrdClStatus.hh:60
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stFatal
Fatal error, it's still an error.
Definition: XrdClStatus.hh:33
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
const uint64_t PostMasterMsg
const int DefaultStreamErrorWindow
const int DefaultConnectionRetry
const int DefaultConnectionWindow
const char *const DefaultNetworkStack
const uint16_t errOperationInterrupted
Definition: XrdClStatus.hh:91
const uint16_t errInvalidSession
Definition: XrdClStatus.hh:79
const uint16_t errAuthFailed
Definition: XrdClStatus.hh:88
@ kXR_PartialResult
Definition: XProtocol.hh:1251
MsgHandler * handler
Definition: XrdClStream.cc:67
InMessageHelper(Message *message=0, MsgHandler *hndlr=0, time_t expir=0, uint16_t actio=0)
Definition: XrdClStream.cc:57
Describe a server login event.
Definition: XrdClMonitor.hh:72
std::string server
"user@host:port"
Definition: XrdClMonitor.hh:78
uint16_t streams
Number of streams.
Definition: XrdClMonitor.hh:82
timeval sTOD
gettimeofday() when login started
Definition: XrdClMonitor.hh:80
timeval eTOD
gettimeofday() when login ended
Definition: XrdClMonitor.hh:81
std::string auth
authentication protocol used or empty if none
Definition: XrdClMonitor.hh:79
Procedure execution status.
Definition: XrdClStatus.hh:115
uint16_t code
Error type, or additional hints on what to do.
Definition: XrdClStatus.hh:147
uint16_t status
Status of the execution.
Definition: XrdClStatus.hh:146
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124
bool IsFatal() const
Fatal error.
Definition: XrdClStatus.hh:123
std::string ToString() const
Create a string representation.
Definition: XrdClStatus.cc:97
bool IsError() const
Error.
Definition: XrdClStatus.hh:122
static const uint16_t IpAddr
static const uint16_t HostName
static const uint16_t IpStack
InMessageHelper inMsgHelper
Definition: XrdClStream.cc:89
AsyncSocketHandler * socket
Definition: XrdClStream.cc:86
OutQueue::MsgHelper outMsgHelper
Definition: XrdClStream.cc:88
Socket::SocketStatus status
Definition: XrdClStream.cc:90
static const uint16_t Auth
Transport name, returns std::string *.