| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631 | // Copyright (C) 2018 Alain Miniussi <alain.miniussi@oca.eu>.// Use, modification and distribution is subject to the Boost Software// License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at// http://www.boost.org/LICENSE_1_0.txt)// Request implementation dtails// This header should be included only after the communicator and request // classes has been defined.#ifndef BOOST_MPI_REQUEST_HANDLERS_HPP#define BOOST_MPI_REQUEST_HANDLERS_HPP#include <boost/mpi/skeleton_and_content_types.hpp>namespace boost { namespace mpi {namespace detail {/** * Internal data structure that stores everything required to manage * the receipt of serialized data via a request object. */template<typename T>struct serialized_irecv_data {  serialized_irecv_data(const communicator& comm, T& value)    : m_ia(comm), m_value(value) {}  void deserialize(status& stat)   {     m_ia >> m_value;     stat.m_count = 1;  }  std::size_t     m_count;  packed_iarchive m_ia;  T&              m_value;};template<>struct serialized_irecv_data<packed_iarchive>{  serialized_irecv_data(communicator const&, packed_iarchive& ia) : m_ia(ia) { }  void deserialize(status&) { /* Do nothing. */ }  std::size_t      m_count;  packed_iarchive& m_ia;};/** * Internal data structure that stores everything required to manage * the receipt of an array of serialized data via a request object. */template<typename T>struct serialized_array_irecv_data{  serialized_array_irecv_data(const communicator& comm, T* values, int n)    : m_count(0), m_ia(comm), m_values(values), m_nb(n) {}  void deserialize(status& stat);  std::size_t     m_count;  packed_iarchive m_ia;  T*              m_values;  int             m_nb;};template<typename T>void serialized_array_irecv_data<T>::deserialize(status& stat){  T* v = m_values;  T* end =  m_values+m_nb;  while (v < end) {    m_ia >> *v++;  }  stat.m_count = m_nb;}/** * Internal data structure that stores everything required to manage * the receipt of an array of primitive data but unknown size. * Such an array can have been send with blocking operation and so must * be compatible with the (size_t,raw_data[]) format. */template<typename T, class A>struct dynamic_array_irecv_data{  BOOST_STATIC_ASSERT_MSG(is_mpi_datatype<T>::value, "Can only be specialized for MPI datatypes.");  dynamic_array_irecv_data(std::vector<T,A>& values)    : m_count(-1), m_values(values) {}  std::size_t       m_count;  std::vector<T,A>& m_values;};template<typename T>struct serialized_irecv_data<const skeleton_proxy<T> >{  serialized_irecv_data(const communicator& comm, skeleton_proxy<T> proxy)    : m_isa(comm), m_ia(m_isa.get_skeleton()), m_proxy(proxy) { }  void deserialize(status& stat)   {     m_isa >> m_proxy.object;    stat.m_count = 1;  }  std::size_t              m_count;  packed_skeleton_iarchive m_isa;  packed_iarchive&         m_ia;  skeleton_proxy<T>        m_proxy;};template<typename T>struct serialized_irecv_data<skeleton_proxy<T> >  : public serialized_irecv_data<const skeleton_proxy<T> >{  typedef serialized_irecv_data<const skeleton_proxy<T> > inherited;  serialized_irecv_data(const communicator& comm, const skeleton_proxy<T>& proxy)    : inherited(comm, proxy) { }};}#if BOOST_MPI_VERSION >= 3template<class Data>class request::probe_handler  : public request::handler,    protected Data {protected:  template<typename I1>  probe_handler(communicator const& comm, int source, int tag, I1& i1)    : Data(comm, i1),      m_comm(comm),      m_source(source),      m_tag(tag) {}  // no variadic template for now  template<typename I1, typename I2>  probe_handler(communicator const& comm, int source, int tag, I1& i1, I2& i2)    : Data(comm, i1, i2),      m_comm(comm),      m_source(source),      m_tag(tag) {}public:  bool active() const { return m_source != MPI_PROC_NULL; }  optional<MPI_Request&> trivial() { return boost::none; }  void cancel() { m_source = MPI_PROC_NULL; }  status wait() {    MPI_Message msg;    status stat;    BOOST_MPI_CHECK_RESULT(MPI_Mprobe, (m_source,m_tag,m_comm,&msg,&stat.m_status));    return unpack(msg, stat);  }    optional<status> test() {    status stat;    int flag = 0;    MPI_Message msg;    BOOST_MPI_CHECK_RESULT(MPI_Improbe, (m_source,m_tag,m_comm,&flag,&msg,&stat.m_status));    if (flag) {      return unpack(msg, stat);    } else {      return optional<status>();    }   }protected:  friend class request;  status unpack(MPI_Message& msg, status& stat) {    int count;    MPI_Datatype datatype = this->Data::datatype();    BOOST_MPI_CHECK_RESULT(MPI_Get_count, (&stat.m_status, datatype, &count));    this->Data::resize(count);    BOOST_MPI_CHECK_RESULT(MPI_Mrecv, (this->Data::buffer(), count, datatype, &msg, &stat.m_status));    this->Data::deserialize();    m_source = MPI_PROC_NULL;    stat.m_count = 1;    return stat;  }    communicator const& m_comm;  int m_source;  int m_tag;};#endif // BOOST_MPI_VERSION >= 3namespace detail {template<class A>struct dynamic_primitive_array_data {  dynamic_primitive_array_data(communicator const&, A& arr) : m_buffer(arr) {}    void* buffer() { return m_buffer.data(); }  void  resize(std::size_t sz) { m_buffer.resize(sz); }  void  deserialize() {}  MPI_Datatype datatype() { return get_mpi_datatype<typename A::value_type>(); }    A& m_buffer;};template<typename T>struct serialized_data {  serialized_data(communicator const& comm, T& value) : m_archive(comm), m_value(value) {}  void* buffer() { return m_archive.address(); }  void  resize(std::size_t sz) { m_archive.resize(sz); }  void  deserialize() { m_archive >> m_value; }  MPI_Datatype datatype() { return MPI_PACKED; }  packed_iarchive m_archive;  T& m_value;};template<>struct serialized_data<packed_iarchive> {  serialized_data(communicator const& comm, packed_iarchive& ar) : m_archive(ar) {}    void* buffer() { return m_archive.address(); }  void  resize(std::size_t sz) { m_archive.resize(sz); }  void  deserialize() {}  MPI_Datatype datatype() { return MPI_PACKED; }  packed_iarchive& m_archive;};template<typename T>struct serialized_data<const skeleton_proxy<T> > {  serialized_data(communicator const& comm, skeleton_proxy<T> skel)    : m_proxy(skel),      m_archive(comm) {}    void* buffer() { return m_archive.get_skeleton().address(); }  void  resize(std::size_t sz) { m_archive.get_skeleton().resize(sz); }  void  deserialize() { m_archive >> m_proxy.object; }  MPI_Datatype datatype() { return MPI_PACKED; }  skeleton_proxy<T> m_proxy;  packed_skeleton_iarchive m_archive;};template<typename T>struct serialized_data<skeleton_proxy<T> >  : public serialized_data<const skeleton_proxy<T> > {  typedef serialized_data<const skeleton_proxy<T> > super;  serialized_data(communicator const& comm, skeleton_proxy<T> skel)    : super(comm, skel) {}};template<typename T>struct serialized_array_data {  serialized_array_data(communicator const& comm, T* values, int nb)    : m_archive(comm), m_values(values), m_nb(nb) {}  void* buffer() { return m_archive.address(); }  void  resize(std::size_t sz) { m_archive.resize(sz); }  void  deserialize() {    T* end = m_values + m_nb;    T* v = m_values;    while (v != end) {      m_archive >> *v++;    }  }  MPI_Datatype datatype() { return MPI_PACKED; }  packed_iarchive m_archive;  T*  m_values;  int m_nb;};}class BOOST_MPI_DECL request::legacy_handler : public request::handler {public:  legacy_handler(communicator const& comm, int source, int tag);    void cancel() {    for (int i = 0; i < 2; ++i) {      if (m_requests[i] != MPI_REQUEST_NULL) {        BOOST_MPI_CHECK_RESULT(MPI_Cancel, (m_requests+i));      }    }  }    bool active() const;  optional<MPI_Request&> trivial();    MPI_Request      m_requests[2];  communicator     m_comm;  int              m_source;  int              m_tag;};template<typename T>class request::legacy_serialized_handler   : public request::legacy_handler,     protected detail::serialized_irecv_data<T> {public:  typedef detail::serialized_irecv_data<T> extra;  legacy_serialized_handler(communicator const& comm, int source, int tag, T& value)    : legacy_handler(comm, source, tag),      extra(comm, value)  {    BOOST_MPI_CHECK_RESULT(MPI_Irecv,			   (&this->extra::m_count, 1, 			    get_mpi_datatype(this->extra::m_count),			    source, tag, comm, m_requests+0));      }  status wait() {    status stat;    if (m_requests[1] == MPI_REQUEST_NULL) {      // Wait for the count message to complete      BOOST_MPI_CHECK_RESULT(MPI_Wait,                             (m_requests, &stat.m_status));      // Resize our buffer and get ready to receive its data      this->extra::m_ia.resize(this->extra::m_count);      BOOST_MPI_CHECK_RESULT(MPI_Irecv,                             (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,                              stat.source(), stat.tag(),                               MPI_Comm(m_comm), m_requests + 1));    }    // Wait until we have received the entire message    BOOST_MPI_CHECK_RESULT(MPI_Wait,                           (m_requests + 1, &stat.m_status));    this->deserialize(stat);    return stat;      }    optional<status> test() {    status stat;    int flag = 0;        if (m_requests[1] == MPI_REQUEST_NULL) {      // Check if the count message has completed      BOOST_MPI_CHECK_RESULT(MPI_Test,                             (m_requests, &flag, &stat.m_status));      if (flag) {        // Resize our buffer and get ready to receive its data        this->extra::m_ia.resize(this->extra::m_count);        BOOST_MPI_CHECK_RESULT(MPI_Irecv,                               (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,                                stat.source(), stat.tag(),                                 MPI_Comm(m_comm), m_requests + 1));      } else        return optional<status>(); // We have not finished yet    }     // Check if we have received the message data    BOOST_MPI_CHECK_RESULT(MPI_Test,                           (m_requests + 1, &flag, &stat.m_status));    if (flag) {      this->deserialize(stat);      return stat;    } else       return optional<status>();  }};template<typename T>class request::legacy_serialized_array_handler   : public    request::legacy_handler,    protected detail::serialized_array_irecv_data<T> {  typedef detail::serialized_array_irecv_data<T> extra;public:  legacy_serialized_array_handler(communicator const& comm, int source, int tag, T* values, int n)    : legacy_handler(comm, source, tag),      extra(comm, values, n) {    BOOST_MPI_CHECK_RESULT(MPI_Irecv,                           (&this->extra::m_count, 1,                             get_mpi_datatype(this->extra::m_count),                            source, tag, comm, m_requests+0));  }  status wait() {    status stat;    if (m_requests[1] == MPI_REQUEST_NULL) {      // Wait for the count message to complete      BOOST_MPI_CHECK_RESULT(MPI_Wait,                             (m_requests, &stat.m_status));      // Resize our buffer and get ready to receive its data      this->extra::m_ia.resize(this->extra::m_count);      BOOST_MPI_CHECK_RESULT(MPI_Irecv,                             (this->extra::m_ia.address(), this->extra::m_ia.size(), MPI_PACKED,                              stat.source(), stat.tag(),                               MPI_Comm(m_comm), m_requests + 1));    }    // Wait until we have received the entire message    BOOST_MPI_CHECK_RESULT(MPI_Wait,                           (m_requests + 1, &stat.m_status));    this->deserialize(stat);    return stat;  }    optional<status> test() {    status stat;    int flag = 0;        if (m_requests[1] == MPI_REQUEST_NULL) {      // Check if the count message has completed      BOOST_MPI_CHECK_RESULT(MPI_Test,                             (m_requests, &flag, &stat.m_status));      if (flag) {        // Resize our buffer and get ready to receive its data        this->extra::m_ia.resize(this->extra::m_count);        BOOST_MPI_CHECK_RESULT(MPI_Irecv,                               (this->extra::m_ia.address(), this->extra::m_ia.size(),MPI_PACKED,                                stat.source(), stat.tag(),                                 MPI_Comm(m_comm), m_requests + 1));      } else        return optional<status>(); // We have not finished yet    }     // Check if we have received the message data    BOOST_MPI_CHECK_RESULT(MPI_Test,                           (m_requests + 1, &flag, &stat.m_status));    if (flag) {      this->deserialize(stat);      return stat;    } else       return optional<status>();  }};template<typename T, class A>class request::legacy_dynamic_primitive_array_handler   : public request::legacy_handler,    protected detail::dynamic_array_irecv_data<T,A>{  typedef detail::dynamic_array_irecv_data<T,A> extra;public:  legacy_dynamic_primitive_array_handler(communicator const& comm, int source, int tag, std::vector<T,A>& values)    : legacy_handler(comm, source, tag),      extra(values) {    BOOST_MPI_CHECK_RESULT(MPI_Irecv,                           (&this->extra::m_count, 1,                             get_mpi_datatype(this->extra::m_count),                            source, tag, comm, m_requests+0));  }  status wait() {    status stat;    if (m_requests[1] == MPI_REQUEST_NULL) {      // Wait for the count message to complete      BOOST_MPI_CHECK_RESULT(MPI_Wait,                             (m_requests, &stat.m_status));      // Resize our buffer and get ready to receive its data      this->extra::m_values.resize(this->extra::m_count);      BOOST_MPI_CHECK_RESULT(MPI_Irecv,                             (&(this->extra::m_values[0]), this->extra::m_values.size(), get_mpi_datatype<T>(),                              stat.source(), stat.tag(),                               MPI_Comm(m_comm), m_requests + 1));    }    // Wait until we have received the entire message    BOOST_MPI_CHECK_RESULT(MPI_Wait,                           (m_requests + 1, &stat.m_status));    return stat;      }  optional<status> test() {    status stat;    int flag = 0;        if (m_requests[1] == MPI_REQUEST_NULL) {      // Check if the count message has completed      BOOST_MPI_CHECK_RESULT(MPI_Test,                             (m_requests, &flag, &stat.m_status));      if (flag) {        // Resize our buffer and get ready to receive its data        this->extra::m_values.resize(this->extra::m_count);        BOOST_MPI_CHECK_RESULT(MPI_Irecv,                               (&(this->extra::m_values[0]), this->extra::m_values.size(), get_mpi_datatype<T>(),                                stat.source(), stat.tag(),                                 MPI_Comm(m_comm), m_requests + 1));      } else        return optional<status>(); // We have not finished yet    }     // Check if we have received the message data    BOOST_MPI_CHECK_RESULT(MPI_Test,                           (m_requests + 1, &flag, &stat.m_status));    if (flag) {      return stat;    } else       return optional<status>();  }};class BOOST_MPI_DECL request::trivial_handler : public request::handler {public:  trivial_handler();    status wait();  optional<status> test();  void cancel();    bool active() const;  optional<MPI_Request&> trivial();private:  friend class request;  MPI_Request      m_request;};class request::dynamic_handler : public request::handler {  dynamic_handler();    status wait();  optional<status> test();  void cancel();    bool active() const;  optional<MPI_Request&> trivial();private:  friend class request;  MPI_Request      m_requests[2];};template<typename T> request request::make_serialized(communicator const& comm, int source, int tag, T& value) {#if defined(BOOST_MPI_USE_IMPROBE)  return request(new probe_handler<detail::serialized_data<T> >(comm, source, tag, value));#else  return request(new legacy_serialized_handler<T>(comm, source, tag, value));#endif}template<typename T>request request::make_serialized_array(communicator const& comm, int source, int tag, T* values, int n) {#if defined(BOOST_MPI_USE_IMPROBE)  return request(new probe_handler<detail::serialized_array_data<T> >(comm, source, tag, values, n));#else  return request(new legacy_serialized_array_handler<T>(comm, source, tag, values, n));#endif}template<typename T, class A>request request::make_dynamic_primitive_array_recv(communicator const& comm, int source, int tag,                                                    std::vector<T,A>& values) {#if defined(BOOST_MPI_USE_IMPROBE)  return request(new probe_handler<detail::dynamic_primitive_array_data<std::vector<T,A> > >(comm,source,tag,values));#else  return request(new legacy_dynamic_primitive_array_handler<T,A>(comm, source, tag, values));#endif}template<typename T>requestrequest::make_trivial_send(communicator const& comm, int dest, int tag, T const* values, int n) {  trivial_handler* handler = new trivial_handler;  BOOST_MPI_CHECK_RESULT(MPI_Isend,                         (const_cast<T*>(values), n,                           get_mpi_datatype<T>(),                          dest, tag, comm, &handler->m_request));  return request(handler);}template<typename T>requestrequest::make_trivial_send(communicator const& comm, int dest, int tag, T const& value) {  return make_trivial_send(comm, dest, tag, &value, 1);}template<typename T>requestrequest::make_trivial_recv(communicator const& comm, int dest, int tag, T* values, int n) {  trivial_handler* handler = new trivial_handler;  BOOST_MPI_CHECK_RESULT(MPI_Irecv,                         (values, n,                           get_mpi_datatype<T>(),                          dest, tag, comm, &handler->m_request));  return request(handler);}template<typename T>requestrequest::make_trivial_recv(communicator const& comm, int dest, int tag, T& value) {  return make_trivial_recv(comm, dest, tag, &value, 1);}template<typename T, class A>request request::make_dynamic_primitive_array_send(communicator const& comm, int dest, int tag,                                                    std::vector<T,A> const& values) {#if defined(BOOST_MPI_USE_IMPROBE)  return make_trivial_send(comm, dest, tag, values.data(), values.size());#else  {    // non blocking recv by legacy_dynamic_primitive_array_handler    // blocking recv by status recv_vector(source,tag,value,primitive)    boost::shared_ptr<std::size_t> size(new std::size_t(values.size()));    dynamic_handler* handler = new dynamic_handler;    request req(handler);    req.preserve(size);        BOOST_MPI_CHECK_RESULT(MPI_Isend,                           (size.get(), 1,                            get_mpi_datatype(*size),                            dest, tag, comm, handler->m_requests+0));    BOOST_MPI_CHECK_RESULT(MPI_Isend,                           (const_cast<T*>(values.data()), *size,                             get_mpi_datatype<T>(),                            dest, tag, comm, handler->m_requests+1));    return req;  }#endif}inlinerequest::legacy_handler::legacy_handler(communicator const& comm, int source, int tag)  : m_comm(comm),    m_source(source),    m_tag(tag){  m_requests[0] = MPI_REQUEST_NULL;  m_requests[1] = MPI_REQUEST_NULL;}    }}#endif // BOOST_MPI_REQUEST_HANDLERS_HPP
 |