xrootd
XrdClParallelOperation.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_PARALLELOPERATION_HH__
27 #define __XRD_CL_PARALLELOPERATION_HH__
28 
29 #include "XrdCl/XrdClOperations.hh"
31 
32 #include <atomic>
33 
34 namespace XrdCl
35 {
36 
37  //----------------------------------------------------------------------------
38  // Interface for different execution policies:
39  // - all : all operations need to succeed in order for the parallel
40  // operation to be successful
41  // - any : just one of the operations needs to succeed in order for
42  // the parallel operation to be successful
43  // - some : n (user defined) operations need to succeed in order for
44  // the parallel operation to be successful
45  // - at least : at least n (user defined) operations need to succeed in
46  // order for the parallel operation to be successful (the
47  // user handler will be called only when all operations are
48  // resolved)
49  //
50  // @param status : status returned by one of the aggregated operations
51  //
52  // @return : true if the status should be passed to the user handler,
53  // false otherwise.
54  //----------------------------------------------------------------------------
56  {
57  virtual ~PolicyExecutor()
58  {
59  }
60 
61  virtual bool Examine( const XrdCl::XRootDStatus &status ) = 0;
62  };
63 
64  //----------------------------------------------------------------------------
70  //----------------------------------------------------------------------------
71  template<bool HasHndl>
72  class ParallelOperation: public ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>
73  {
74  template<bool> friend class ParallelOperation;
75 
76  public:
77 
78  //------------------------------------------------------------------------
80  //------------------------------------------------------------------------
81  template<bool from>
83  ConcreteOperation<ParallelOperation, HasHndl, Resp<void>>( std::move( obj ) ),
84  pipelines( std::move( obj.pipelines ) ),
85  policy( std::move( obj.policy ) )
86  {
87  }
88 
89  //------------------------------------------------------------------------
95  //------------------------------------------------------------------------
96  template<class Container>
97  ParallelOperation( Container &&container )
98  {
99  static_assert( !HasHndl, "Constructor is available only operation without handler");
100 
101  pipelines.reserve( container.size() );
102  auto begin = std::make_move_iterator( container.begin() );
103  auto end = std::make_move_iterator( container.end() );
104  std::copy( begin, end, std::back_inserter( pipelines ) );
105  container.clear(); // there's junk inside so we clear it
106  }
107 
108  //------------------------------------------------------------------------
110  //------------------------------------------------------------------------
111  std::string ToString()
112  {
113  std::ostringstream oss;
114  oss << "Parallel(";
115  for( size_t i = 0; i < pipelines.size(); i++ )
116  {
117  oss << pipelines[i]->ToString();
118  if( i + 1 != pipelines.size() )
119  {
120  oss << " && ";
121  }
122  }
123  oss << ")";
124  return oss.str();
125  }
126 
127  //------------------------------------------------------------------------
132  //------------------------------------------------------------------------
134  {
135  policy.reset( new AllPolicy() );
136  return std::move( *this );
137  }
138 
139  //------------------------------------------------------------------------
144  //------------------------------------------------------------------------
146  {
147  policy.reset( new AnyPolicy( pipelines.size() ) );
148  return std::move( *this );
149  }
150 
151  //------------------------------------------------------------------------
152  // Set policy to `Some`
156  //------------------------------------------------------------------------
157  ParallelOperation<HasHndl> Some( size_t threshold )
158  {
159  policy.reset( new SomePolicy( pipelines.size(), threshold ) );
160  return std::move( *this );
161  }
162 
163  //------------------------------------------------------------------------
169  //------------------------------------------------------------------------
171  {
172  policy.reset( new AtLeastPolicy( pipelines.size(), threshold ) );
173  return std::move( *this );
174  }
175 
176  private:
177 
178  //------------------------------------------------------------------------
183  //------------------------------------------------------------------------
184  struct AllPolicy : public PolicyExecutor
185  {
186  bool Examine( const XrdCl::XRootDStatus &status )
187  {
188  if( status.IsOK() ) return false;
189  // we require all request to succeed
190  return true;
191  }
192  };
193 
194  //------------------------------------------------------------------------
199  //------------------------------------------------------------------------
200  struct AnyPolicy : public PolicyExecutor
201  {
202  AnyPolicy( size_t size) : cnt( size )
203  {
204  }
205 
206  bool Examine( const XrdCl::XRootDStatus &status )
207  {
208  // decrement the counter
209  size_t nb = cnt.fetch_sub( 1 );
210  // we require just one operation to be successful
211  if( status.IsOK() ) return true;
212  // lets see if this is the last one?
213  if( nb == 1 ) return true;
214  // we still have a chance there will be one that is successful
215  return false;
216  }
217 
218  private:
219  std::atomic<size_t> cnt;
220  };
221 
222  //------------------------------------------------------------------------
227  //------------------------------------------------------------------------
229  {
230  SomePolicy( size_t size, size_t threshold ) : cnt( size ), succeeded( 0 ), threshold( threshold )
231  {
232  }
233 
234  bool Examine( const XrdCl::XRootDStatus &status )
235  {
236  // decrement the counter
237  size_t nb = cnt.fetch_sub( 1 );
238  if( status.IsOK() )
239  {
240  size_t s = succeeded.fetch_add( 1 );
241  if( s + 1 == threshold ) return true; // we reached the threshold
242  // we are not yet there
243  return false;
244  }
245  // did we dropped bellow the threshold
246  if( nb == threshold ) return true;
247  // we still have a chance there will be enough of successful operations
248  return false;
249  }
250 
251  private:
252  std::atomic<size_t> cnt;
253  std::atomic<size_t> succeeded;
254  const size_t threshold;
255  };
256 
257  //------------------------------------------------------------------------
263  //------------------------------------------------------------------------
265  {
266  AtLeastPolicy( size_t size, size_t threshold ) : cnt( size ), threshold( threshold )
267  {
268  }
269 
270  bool Examine( const XrdCl::XRootDStatus &status )
271  {
272  // decrement the counter
273  size_t nb = cnt.fetch_sub( 1 );
274  // although we might have the minimum to succeed we wait for the rest
275  if( status.IsOK() ) return false;
276  if( nb == threshold ) return true; // we dropped bellow the threshold
277  // we still have a chance there will be enough of successful operations
278  return false;
279  }
280 
281  private:
282  std::atomic<size_t> cnt;
283  const size_t threshold;
284  };
285 
286  //------------------------------------------------------------------------
291  //------------------------------------------------------------------------
292  struct Ctx
293  {
294  //----------------------------------------------------------------------
298  //----------------------------------------------------------------------
300  policy( policy )
301  {
302  }
303 
304  //----------------------------------------------------------------------
306  //----------------------------------------------------------------------
308  {
309  Handle( XRootDStatus() );
310  }
311 
312  //----------------------------------------------------------------------
317  //----------------------------------------------------------------------
318  inline void Examine( const XRootDStatus &st )
319  {
320  if( policy->Examine( st ) )
321  Handle( st );
322  }
323 
324  //----------------------------------------------------------------------
329  //---------------------------------------------------------------------
330  inline void Handle( const XRootDStatus &st )
331  {
332  PipelineHandler* hdlr = handler.exchange( nullptr );
333  if( hdlr )
334  hdlr->HandleResponse( new XRootDStatus( st ), nullptr );
335  }
336 
337  //----------------------------------------------------------------------
339  //----------------------------------------------------------------------
340  std::atomic<PipelineHandler*> handler;
341 
342  //----------------------------------------------------------------------
344  //----------------------------------------------------------------------
345  std::unique_ptr<PolicyExecutor> policy;
346  };
347 
348  //------------------------------------------------------------------------
354  //------------------------------------------------------------------------
356  {
357  // make sure we have a valid policy for the parallel operation
358  if( !policy ) policy.reset( new AllPolicy() );
359 
360  std::shared_ptr<Ctx> ctx =
361  std::make_shared<Ctx>( this->handler.release(), policy.release() );
362 
363  try
364  {
365  for( size_t i = 0; i < pipelines.size(); ++i )
366  {
367  pipelines[i].Run( [ctx]( const XRootDStatus &st ){ ctx->Examine( st ); } );
368  }
369  }
370  catch( const PipelineException& ex )
371  {
372  return ex.GetError();
373  }
374  catch( const std::exception& ex )
375  {
376  return XRootDStatus( stError, ex.what() );
377  }
378 
379  return XRootDStatus();
380  }
381 
382  std::vector<Pipeline> pipelines;
383  std::unique_ptr<PolicyExecutor> policy;
384  };
385 
386  //----------------------------------------------------------------------------
388  //----------------------------------------------------------------------------
389  template<class Container>
390  inline ParallelOperation<false> Parallel( Container &container )
391  {
392  return ParallelOperation<false>( container );
393  }
394 
395  //----------------------------------------------------------------------------
397  //----------------------------------------------------------------------------
398  inline void PipesToVec( std::vector<Pipeline>& )
399  {
400  // base case
401  }
402 
403  //----------------------------------------------------------------------------
404  // Declare PipesToVec (we need to do declare those functions ahead of
405  // definitions, as they may call each other.
406  //----------------------------------------------------------------------------
407  template<typename ... Others>
408  inline void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
409  Others&... others );
410 
411  template<typename ... Others>
412  inline void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
413  Others&... others );
414 
415  template<typename ... Others>
416  inline void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
417  Others&... others );
418 
419  //----------------------------------------------------------------------------
420  // Define PipesToVec
421  //----------------------------------------------------------------------------
422  template<typename ... Others>
423  void PipesToVec( std::vector<Pipeline> &v, Operation<false> &operation,
424  Others&... others )
425  {
426  v.emplace_back( operation );
427  PipesToVec( v, others... );
428  }
429 
430  template<typename ... Others>
431  void PipesToVec( std::vector<Pipeline> &v, Operation<true> &operation,
432  Others&... others )
433  {
434  v.emplace_back( operation );
435  PipesToVec( v, others... );
436  }
437 
438  template<typename ... Others>
439  void PipesToVec( std::vector<Pipeline> &v, Pipeline &pipeline,
440  Others&... others )
441  {
442  v.emplace_back( std::move( pipeline ) );
443  PipesToVec( v, others... );
444  }
445 
446  //----------------------------------------------------------------------------
451  //----------------------------------------------------------------------------
452  template<typename ... Operations>
453  inline ParallelOperation<false> Parallel( Operations&& ... operations )
454  {
455  constexpr size_t size = sizeof...( operations );
456  std::vector<Pipeline> v;
457  v.reserve( size );
458  PipesToVec( v, operations... );
459  return Parallel( v );
460  }
461 }
462 
463 #endif // __XRD_CL_OPERATIONS_HH__
XrdClOperationHandlers.hh
XrdCl::Resp
Definition: XrdClOperationHandlers.hh:648
XrdCl::ParallelOperation::ToString
std::string ToString()
Definition: XrdClParallelOperation.hh:111
XrdCl::ParallelOperation::SomePolicy
Definition: XrdClParallelOperation.hh:229
XrdCl::ParallelOperation::AtLeastPolicy::AtLeastPolicy
AtLeastPolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:266
XrdCl::ParallelOperation::SomePolicy::succeeded
std::atomic< size_t > succeeded
Definition: XrdClParallelOperation.hh:253
XrdCl::PipelineException::GetError
const XRootDStatus & GetError() const
Definition: XrdClOperationHandlers.hh:399
XrdCl::ParallelOperation::Ctx::Handle
void Handle(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:330
XrdCl::ParallelOperation::pipelines
std::vector< Pipeline > pipelines
Definition: XrdClParallelOperation.hh:382
XrdCl::PolicyExecutor::Examine
virtual bool Examine(const XrdCl::XRootDStatus &status)=0
XrdCl::ParallelOperation::Any
ParallelOperation< HasHndl > Any()
Definition: XrdClParallelOperation.hh:145
XrdCl::ParallelOperation::AnyPolicy::cnt
std::atomic< size_t > cnt
Definition: XrdClParallelOperation.hh:219
XrdCl::ParallelOperation::AtLeastPolicy::threshold
const size_t threshold
Definition: XrdClParallelOperation.hh:283
XrdCl::PipelineHandler
Definition: XrdClOperations.hh:57
XrdCl::ParallelOperation::Ctx::policy
std::unique_ptr< PolicyExecutor > policy
Policy defining when the user handler should be called.
Definition: XrdClParallelOperation.hh:345
XrdCl::ParallelOperation::AtLeastPolicy::Examine
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:270
XrdCl::ParallelOperation::AnyPolicy::AnyPolicy
AnyPolicy(size_t size)
Definition: XrdClParallelOperation.hh:202
XrdCl::Operation::handler
std::unique_ptr< PipelineHandler > handler
Operation handler.
Definition: XrdClOperations.hh:288
XrdCl::ParallelOperation::AnyPolicy
Definition: XrdClParallelOperation.hh:201
XrdCl::PipelineException
Pipeline exception, wrapps an XRootDStatus.
Definition: XrdClOperationHandlers.hh:360
XrdCl::PipesToVec
void PipesToVec(std::vector< Pipeline > &)
Helper function for converting parameter pack into a vector.
Definition: XrdClParallelOperation.hh:398
XrdCl::ConcreteOperation
Definition: XrdClOperations.hh:476
XrdCl::ParallelOperation::SomePolicy::threshold
const size_t threshold
Definition: XrdClParallelOperation.hh:254
XrdCl::ParallelOperation
Definition: XrdClParallelOperation.hh:73
XrdCl::ParallelOperation::Ctx
Definition: XrdClParallelOperation.hh:293
XrdCl::PipelineHandler::HandleResponse
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
XrdCl::ParallelOperation::RunImpl
XRootDStatus RunImpl()
Definition: XrdClParallelOperation.hh:355
XrdClOperations.hh
XrdCl::XRootDStatus
Request status.
Definition: XrdClXRootDResponses.hh:215
XrdCl::ParallelOperation::Ctx::handler
std::atomic< PipelineHandler * > handler
PipelineHandler of the ParallelOperation.
Definition: XrdClParallelOperation.hh:340
XrdCl::ParallelOperation::policy
std::unique_ptr< PolicyExecutor > policy
Definition: XrdClParallelOperation.hh:383
XrdCl::ParallelOperation::All
ParallelOperation< HasHndl > All()
Definition: XrdClParallelOperation.hh:133
XrdCl::ParallelOperation::Ctx::Ctx
Ctx(PipelineHandler *handler, PolicyExecutor *policy)
Definition: XrdClParallelOperation.hh:299
XrdCl::ParallelOperation::AllPolicy
Definition: XrdClParallelOperation.hh:185
XrdCl::ParallelOperation::Some
ParallelOperation< HasHndl > Some(size_t threshold)
Definition: XrdClParallelOperation.hh:157
XrdCl::Pipeline
Definition: XrdClOperations.hh:304
XrdCl::PolicyExecutor
Definition: XrdClParallelOperation.hh:56
XrdCl::Status::IsOK
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:120
XrdCl::ParallelOperation::AtLeastPolicy
Definition: XrdClParallelOperation.hh:265
XrdCl
Definition: XrdClAnyObject.hh:26
XrdCl::Operation
Definition: XrdClOperations.hh:166
XrdCl::ParallelOperation::SomePolicy::cnt
std::atomic< size_t > cnt
Definition: XrdClParallelOperation.hh:252
XrdCl::PolicyExecutor::~PolicyExecutor
virtual ~PolicyExecutor()
Definition: XrdClParallelOperation.hh:57
XrdCl::ParallelOperation::AllPolicy::Examine
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:186
XrdCl::ParallelOperation::SomePolicy::Examine
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:234
XrdCl::ParallelOperation::AtLeastPolicy::cnt
std::atomic< size_t > cnt
Definition: XrdClParallelOperation.hh:282
XrdCl::stError
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
XrdCl::ParallelOperation::ParallelOperation
ParallelOperation(Container &&container)
Definition: XrdClParallelOperation.hh:97
XrdCl::ParallelOperation::AtLeast
ParallelOperation< HasHndl > AtLeast(size_t threshold)
Definition: XrdClParallelOperation.hh:170
XrdCl::ParallelOperation::ParallelOperation
ParallelOperation(ParallelOperation< from > &&obj)
Constructor: copy-move a ParallelOperation in different state.
Definition: XrdClParallelOperation.hh:82
XrdCl::ParallelOperation::Ctx::Examine
void Examine(const XRootDStatus &st)
Definition: XrdClParallelOperation.hh:318
XrdCl::Parallel
ParallelOperation< false > Parallel(Container &container)
Factory function for creating parallel operation from a vector.
Definition: XrdClParallelOperation.hh:390
XrdCl::ParallelOperation::AnyPolicy::Examine
bool Examine(const XrdCl::XRootDStatus &status)
Definition: XrdClParallelOperation.hh:206
XrdCl::ParallelOperation::SomePolicy::SomePolicy
SomePolicy(size_t size, size_t threshold)
Definition: XrdClParallelOperation.hh:230
XrdCl::ParallelOperation::Ctx::~Ctx
~Ctx()
Destructor.
Definition: XrdClParallelOperation.hh:307