| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 | // Copyright (C) 2004-2006 The Trustees of Indiana University.// Use, modification and distribution is subject to the Boost Software// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)//  Authors: Douglas Gregor//           Andrew Lumsdaine#include <boost/optional.hpp>#include <cassert>#include <boost/graph/parallel/algorithm.hpp>#include <boost/graph/parallel/process_group.hpp>#include <functional>#include <algorithm>#include <boost/graph/parallel/simple_trigger.hpp>#ifndef BOOST_GRAPH_USE_MPI#error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"#endifnamespace boost { namespace graph { namespace distributed {template<BOOST_DISTRIBUTED_QUEUE_PARMS>BOOST_DISTRIBUTED_QUEUE_TYPE::distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,                  const Buffer& buffer, bool polling)  : process_group(process_group, attach_distributed_object()),    owner(owner),    buffer(buffer),    polling(polling){  if (!polling)    outgoing_buffers.reset(      new outgoing_buffers_t(num_processes(process_group)));  setup_triggers();}template<BOOST_DISTRIBUTED_QUEUE_PARMS>BOOST_DISTRIBUTED_QUEUE_TYPE::distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,                  const Buffer& buffer, const UnaryPredicate& pred,                  bool polling)  : process_group(process_group, attach_distributed_object()),    owner(owner),    buffer(buffer),    pred(pred),    polling(polling){  if (!polling)    outgoing_buffers.reset(      new outgoing_buffers_t(num_processes(process_group)));  setup_triggers();}template<BOOST_DISTRIBUTED_QUEUE_PARMS>BOOST_DISTRIBUTED_QUEUE_TYPE::distributed_queue(const ProcessGroup& process_group, const OwnerMap& owner,                  const UnaryPredicate& pred, bool polling)  : process_group(process_group, attach_distributed_object()),    owner(owner),    pred(pred),    polling(polling){  if (!polling)    outgoing_buffers.reset(      new outgoing_buffers_t(num_processes(process_group)));  setup_triggers();}template<BOOST_DISTRIBUTED_QUEUE_PARMS>voidBOOST_DISTRIBUTED_QUEUE_TYPE::push(const value_type& x){  typename ProcessGroup::process_id_type dest = get(owner, x);  if (outgoing_buffers)    outgoing_buffers->at(dest).push_back(x);  else if (dest == process_id(process_group))    buffer.push(x);  else    send(process_group, get(owner, x), msg_push, x);}template<BOOST_DISTRIBUTED_QUEUE_PARMS>boolBOOST_DISTRIBUTED_QUEUE_TYPE::empty() const{  /* Processes will stay here until the buffer is nonempty or     synchronization with the other processes indicates that all local     buffers are empty (and no messages are in transit).   */  while (buffer.empty() && !do_synchronize()) ;  return buffer.empty();}template<BOOST_DISTRIBUTED_QUEUE_PARMS>typename BOOST_DISTRIBUTED_QUEUE_TYPE::size_typeBOOST_DISTRIBUTED_QUEUE_TYPE::size() const{  empty();  return buffer.size();}template<BOOST_DISTRIBUTED_QUEUE_PARMS>void BOOST_DISTRIBUTED_QUEUE_TYPE::setup_triggers(){  using boost::graph::parallel::simple_trigger;  simple_trigger(process_group, msg_push, this,                  &distributed_queue::handle_push);  simple_trigger(process_group, msg_multipush, this,                  &distributed_queue::handle_multipush);}template<BOOST_DISTRIBUTED_QUEUE_PARMS>void BOOST_DISTRIBUTED_QUEUE_TYPE::handle_push(int /*source*/, int /*tag*/, const value_type& value,             trigger_receive_context){  if (pred(value)) buffer.push(value);}template<BOOST_DISTRIBUTED_QUEUE_PARMS>void BOOST_DISTRIBUTED_QUEUE_TYPE::handle_multipush(int /*source*/, int /*tag*/,                  const std::vector<value_type>& values,                  trigger_receive_context){  for (std::size_t i = 0; i < values.size(); ++i)    if (pred(values[i])) buffer.push(values[i]);}template<BOOST_DISTRIBUTED_QUEUE_PARMS>boolBOOST_DISTRIBUTED_QUEUE_TYPE::do_synchronize() const{#ifdef PBGL_ACCOUNTING  ++num_synchronizations;#endif  using boost::parallel::all_reduce;  using std::swap;  typedef typename ProcessGroup::process_id_type process_id_type;  if (outgoing_buffers) {    // Transfer all of the push requests    process_id_type id = process_id(process_group);    process_id_type np = num_processes(process_group);    for (process_id_type dest = 0; dest < np; ++dest) {      outgoing_buffer_t& outgoing = outgoing_buffers->at(dest);      std::size_t size = outgoing.size();      if (size != 0) {        if (dest != id) {          send(process_group, dest, msg_multipush, outgoing);        } else {          for (std::size_t i = 0; i < size; ++i)            buffer.push(outgoing[i]);        }        outgoing.clear();      }    }  }  synchronize(process_group);  unsigned local_size = buffer.size();  unsigned global_size =    all_reduce(process_group, local_size, std::plus<unsigned>());  return global_size == 0;}} } } // end namespace boost::graph::distributed
 |