| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469 | // Copyright (C) 2014 Ian Forbed// Copyright (C) 2014-2017 Vicente J. Botet Escriba////  Distributed under the Boost Software License, Version 1.0. (See accompanying//  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//#ifndef BOOST_THREAD_SYNC_TIMED_QUEUE_HPP#define BOOST_THREAD_SYNC_TIMED_QUEUE_HPP#include <boost/thread/detail/config.hpp>#include <boost/thread/concurrent_queues/sync_priority_queue.hpp>#include <boost/chrono/duration.hpp>#include <boost/chrono/time_point.hpp>#include <boost/chrono/system_clocks.hpp>#include <boost/chrono/chrono_io.hpp>#include <algorithm> // std::min#include <boost/config/abi_prefix.hpp>namespace boost{namespace concurrent{namespace detail{  // fixme: shouldn't the timepoint be configurable  template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>  struct scheduled_type  {    typedef T value_type;    typedef Clock clock;    typedef TimePoint time_point;    T data;    time_point time;    BOOST_THREAD_COPYABLE_AND_MOVABLE(scheduled_type)    scheduled_type(T const& pdata, time_point tp) : data(pdata), time(tp) {}    scheduled_type(BOOST_THREAD_RV_REF(T) pdata, time_point tp) : data(boost::move(pdata)), time(tp) {}    scheduled_type(scheduled_type const& other) : data(other.data), time(other.time) {}    scheduled_type& operator=(BOOST_THREAD_COPY_ASSIGN_REF(scheduled_type) other) {      data = other.data;      time = other.time;      return *this;    }    scheduled_type(BOOST_THREAD_RV_REF(scheduled_type) other) : data(boost::move(other.data)), time(other.time) {}    scheduled_type& operator=(BOOST_THREAD_RV_REF(scheduled_type) other) {      data = boost::move(other.data);      time = other.time;      return *this;    }    bool operator <(const scheduled_type & other) const    {      return this->time > other.time;    }  }; //end struct  template <class Duration>  chrono::time_point<chrono::steady_clock,Duration>  limit_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)  {    // Clock == chrono::steady_clock    return tp;  }  template <class Clock, class Duration>  chrono::time_point<Clock,Duration>  limit_timepoint(chrono::time_point<Clock,Duration> const& tp)  {    // Clock != chrono::steady_clock    // The system time may jump while wait_until() is waiting. To compensate for this and time out near    // the correct time, we limit how long wait_until() can wait before going around the loop again.    const chrono::time_point<Clock,Duration> tpmax(chrono::time_point_cast<Duration>(Clock::now() + chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS)));    return (std::min)(tp, tpmax);  }  template <class Duration>  chrono::steady_clock::time_point  convert_to_steady_clock_timepoint(chrono::time_point<chrono::steady_clock,Duration> const& tp)  {    // Clock == chrono::steady_clock    return chrono::time_point_cast<chrono::steady_clock::duration>(tp);  }  template <class Clock, class Duration>  chrono::steady_clock::time_point  convert_to_steady_clock_timepoint(chrono::time_point<Clock,Duration> const& tp)  {    // Clock != chrono::steady_clock    // The system time may jump while wait_until() is waiting. To compensate for this and time out near    // the correct time, we limit how long wait_until() can wait before going around the loop again.    const chrono::steady_clock::duration dura(chrono::duration_cast<chrono::steady_clock::duration>(tp - Clock::now()));    const chrono::steady_clock::duration duramax(chrono::milliseconds(BOOST_THREAD_POLL_INTERVAL_MILLISECONDS));    return chrono::steady_clock::now() + (std::min)(dura, duramax);  }} //end detail namespace  template <class T, class Clock = chrono::steady_clock, class TimePoint=typename Clock::time_point>  class sync_timed_queue    : private sync_priority_queue<detail::scheduled_type<T, Clock, TimePoint> >  {    typedef detail::scheduled_type<T, Clock, TimePoint> stype;    typedef sync_priority_queue<stype> super;  public:    typedef T value_type;    typedef Clock clock;    typedef typename clock::duration duration;    typedef typename clock::time_point time_point;    typedef typename super::underlying_queue_type underlying_queue_type;    typedef typename super::size_type size_type;    typedef typename super::op_status op_status;    sync_timed_queue() : super() {};    ~sync_timed_queue() {}    using super::size;    using super::empty;    using super::full;    using super::close;    using super::closed;    T pull();    void pull(T& elem);    template <class Duration>    queue_op_status pull_until(chrono::time_point<clock,Duration> const& tp, T& elem);    template <class Rep, class Period>    queue_op_status pull_for(chrono::duration<Rep,Period> const& dura, T& elem);    queue_op_status try_pull(T& elem);    queue_op_status wait_pull(T& elem);    queue_op_status nonblocking_pull(T& elem);    template <class Duration>    void push(const T& elem, chrono::time_point<clock,Duration> const& tp);    template <class Rep, class Period>    void push(const T& elem, chrono::duration<Rep,Period> const& dura);    template <class Duration>    void push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);    template <class Rep, class Period>    void push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);    template <class Duration>    queue_op_status try_push(const T& elem, chrono::time_point<clock,Duration> const& tp);    template <class Rep, class Period>    queue_op_status try_push(const T& elem, chrono::duration<Rep,Period> const& dura);    template <class Duration>    queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp);    template <class Rep, class Period>    queue_op_status try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura);  private:    inline bool not_empty_and_time_reached(unique_lock<mutex>& lk) const;    inline bool not_empty_and_time_reached(lock_guard<mutex>& lk) const;    bool wait_to_pull(unique_lock<mutex>&);    queue_op_status wait_to_pull_until(unique_lock<mutex>&, TimePoint const& tp);    template <class Rep, class Period>    queue_op_status wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura);    T pull(unique_lock<mutex>&);    T pull(lock_guard<mutex>&);    void pull(unique_lock<mutex>&, T& elem);    void pull(lock_guard<mutex>&, T& elem);    queue_op_status try_pull(unique_lock<mutex>&, T& elem);    queue_op_status try_pull(lock_guard<mutex>&, T& elem);    queue_op_status wait_pull(unique_lock<mutex>& lk, T& elem);    sync_timed_queue(const sync_timed_queue&);    sync_timed_queue& operator=(const sync_timed_queue&);    sync_timed_queue(BOOST_THREAD_RV_REF(sync_timed_queue));    sync_timed_queue& operator=(BOOST_THREAD_RV_REF(sync_timed_queue));  }; //end class  template <class T, class Clock, class TimePoint>  template <class Duration>  void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::time_point<clock,Duration> const& tp)  {    super::push(stype(elem,tp));  }  template <class T, class Clock, class TimePoint>  template <class Rep, class Period>  void sync_timed_queue<T, Clock, TimePoint>::push(const T& elem, chrono::duration<Rep,Period> const& dura)  {    push(elem, clock::now() + dura);  }  template <class T, class Clock, class TimePoint>  template <class Duration>  void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)  {    super::push(stype(boost::move(elem),tp));  }  template <class T, class Clock, class TimePoint>  template <class Rep, class Period>  void sync_timed_queue<T, Clock, TimePoint>::push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)  {    push(boost::move(elem), clock::now() + dura);  }  template <class T, class Clock, class TimePoint>  template <class Duration>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::time_point<clock,Duration> const& tp)  {    return super::try_push(stype(elem,tp));  }  template <class T, class Clock, class TimePoint>  template <class Rep, class Period>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(const T& elem, chrono::duration<Rep,Period> const& dura)  {    return try_push(elem,clock::now() + dura);  }  template <class T, class Clock, class TimePoint>  template <class Duration>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::time_point<clock,Duration> const& tp)  {    return super::try_push(stype(boost::move(elem), tp));  }  template <class T, class Clock, class TimePoint>  template <class Rep, class Period>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_push(BOOST_THREAD_RV_REF(T) elem, chrono::duration<Rep,Period> const& dura)  {    return try_push(boost::move(elem), clock::now() + dura);  }  ///////////////////////////  template <class T, class Clock, class TimePoint>  bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(unique_lock<mutex>& lk) const  {    return ! super::empty(lk) && clock::now() >= super::data_.top().time;  }  template <class T, class Clock, class TimePoint>  bool sync_timed_queue<T, Clock, TimePoint>::not_empty_and_time_reached(lock_guard<mutex>& lk) const  {    return ! super::empty(lk) && clock::now() >= super::data_.top().time;  }  ///////////////////////////  template <class T, class Clock, class TimePoint>  bool sync_timed_queue<T, Clock, TimePoint>::wait_to_pull(unique_lock<mutex>& lk)  {    for (;;)    {      if (not_empty_and_time_reached(lk)) return false; // success      if (super::closed(lk)) return true; // closed      super::wait_until_not_empty_or_closed(lk);      if (not_empty_and_time_reached(lk)) return false; // success      if (super::closed(lk)) return true; // closed      const time_point tpmin(detail::limit_timepoint(super::data_.top().time));      super::cond_.wait_until(lk, tpmin);    }  }  template <class T, class Clock, class TimePoint>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_until(unique_lock<mutex>& lk, TimePoint const& tp)  {    for (;;)    {      if (not_empty_and_time_reached(lk)) return queue_op_status::success;      if (super::closed(lk)) return queue_op_status::closed;      if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;      super::wait_until_not_empty_or_closed_until(lk, tp);      if (not_empty_and_time_reached(lk)) return queue_op_status::success;      if (super::closed(lk)) return queue_op_status::closed;      if (clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;      const time_point tpmin((std::min)(tp, detail::limit_timepoint(super::data_.top().time)));      super::cond_.wait_until(lk, tpmin);    }  }  template <class T, class Clock, class TimePoint>  template <class Rep, class Period>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_to_pull_for(unique_lock<mutex>& lk, chrono::duration<Rep,Period> const& dura)  {    const chrono::steady_clock::time_point tp(chrono::steady_clock::now() + chrono::duration_cast<chrono::steady_clock::duration>(dura));    for (;;)    {      if (not_empty_and_time_reached(lk)) return queue_op_status::success;      if (super::closed(lk)) return queue_op_status::closed;      if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;      super::wait_until_not_empty_or_closed_until(lk, tp);      if (not_empty_and_time_reached(lk)) return queue_op_status::success;      if (super::closed(lk)) return queue_op_status::closed;      if (chrono::steady_clock::now() >= tp) return super::empty(lk) ? queue_op_status::timeout : queue_op_status::not_ready;      const chrono::steady_clock::time_point tpmin((std::min)(tp, detail::convert_to_steady_clock_timepoint(super::data_.top().time)));      super::cond_.wait_until(lk, tpmin);    }  }  ///////////////////////////  template <class T, class Clock, class TimePoint>  T sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&)  {#if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES    return boost::move(super::data_.pull().data);#else    return super::data_.pull().data;#endif  }  template <class T, class Clock, class TimePoint>  T sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&)  {#if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES    return boost::move(super::data_.pull().data);#else    return super::data_.pull().data;#endif  }  template <class T, class Clock, class TimePoint>  T sync_timed_queue<T, Clock, TimePoint>::pull()  {    unique_lock<mutex> lk(super::mtx_);    const bool has_been_closed = wait_to_pull(lk);    if (has_been_closed) super::throw_if_closed(lk);    return pull(lk);  }  ///////////////////////////  template <class T, class Clock, class TimePoint>  void sync_timed_queue<T, Clock, TimePoint>::pull(unique_lock<mutex>&, T& elem)  {#if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES    elem = boost::move(super::data_.pull().data);#else    elem = super::data_.pull().data;#endif  }  template <class T, class Clock, class TimePoint>  void sync_timed_queue<T, Clock, TimePoint>::pull(lock_guard<mutex>&, T& elem)  {#if ! defined  BOOST_NO_CXX11_RVALUE_REFERENCES    elem = boost::move(super::data_.pull().data);#else    elem = super::data_.pull().data;#endif  }  template <class T, class Clock, class TimePoint>  void sync_timed_queue<T, Clock, TimePoint>::pull(T& elem)  {    unique_lock<mutex> lk(super::mtx_);    const bool has_been_closed = wait_to_pull(lk);    if (has_been_closed) super::throw_if_closed(lk);    pull(lk, elem);  }  //////////////////////  template <class T, class Clock, class TimePoint>  template <class Duration>  queue_op_status  sync_timed_queue<T, Clock, TimePoint>::pull_until(chrono::time_point<clock,Duration> const& tp, T& elem)  {    unique_lock<mutex> lk(super::mtx_);    const queue_op_status rc = wait_to_pull_until(lk, chrono::time_point_cast<typename time_point::duration>(tp));    if (rc == queue_op_status::success) pull(lk, elem);    return rc;  }  //////////////////////  template <class T, class Clock, class TimePoint>  template <class Rep, class Period>  queue_op_status  sync_timed_queue<T, Clock, TimePoint>::pull_for(chrono::duration<Rep,Period> const& dura, T& elem)  {    unique_lock<mutex> lk(super::mtx_);    const queue_op_status rc = wait_to_pull_for(lk, dura);    if (rc == queue_op_status::success) pull(lk, elem);    return rc;  }  ///////////////////////////  template <class T, class Clock, class TimePoint>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(unique_lock<mutex>& lk, T& elem)  {    if (not_empty_and_time_reached(lk))    {      pull(lk, elem);      return queue_op_status::success;    }    if (super::closed(lk)) return queue_op_status::closed;    if (super::empty(lk)) return queue_op_status::empty;    return queue_op_status::not_ready;  }  template <class T, class Clock, class TimePoint>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(lock_guard<mutex>& lk, T& elem)  {    if (not_empty_and_time_reached(lk))    {      pull(lk, elem);      return queue_op_status::success;    }    if (super::closed(lk)) return queue_op_status::closed;    if (super::empty(lk)) return queue_op_status::empty;    return queue_op_status::not_ready;  }  template <class T, class Clock, class TimePoint>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::try_pull(T& elem)  {    lock_guard<mutex> lk(super::mtx_);    return try_pull(lk, elem);  }  ///////////////////////////  template <class T, class Clock, class TimePoint>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(unique_lock<mutex>& lk, T& elem)  {    const bool has_been_closed = wait_to_pull(lk);    if (has_been_closed) return queue_op_status::closed;    pull(lk, elem);    return queue_op_status::success;  }  template <class T, class Clock, class TimePoint>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::wait_pull(T& elem)  {    unique_lock<mutex> lk(super::mtx_);    return wait_pull(lk, elem);  }  ///////////////////////////  template <class T, class Clock, class TimePoint>  queue_op_status sync_timed_queue<T, Clock, TimePoint>::nonblocking_pull(T& elem)  {    unique_lock<mutex> lk(super::mtx_, try_to_lock);    if (! lk.owns_lock()) return queue_op_status::busy;    return try_pull(lk, elem);  }} //end concurrent namespaceusing concurrent::sync_timed_queue;} //end boost namespace#include <boost/config/abi_suffix.hpp>#endif
 |