| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661 | 
//          Copyright Oliver Kowalke 2016.// Distributed under the Boost Software License, Version 1.0.//    (See accompanying file LICENSE_1_0.txt or copy at//          http://www.boost.org/LICENSE_1_0.txt)#ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H#define BOOST_FIBERS_UNBUFFERED_CHANNEL_H#include <atomic>#include <chrono>#include <cstddef>#include <cstdint>#include <memory>#include <vector>#include <boost/config.hpp>#include <boost/fiber/channel_op_status.hpp>#include <boost/fiber/context.hpp>#include <boost/fiber/detail/config.hpp>#include <boost/fiber/detail/convert.hpp>#if defined(BOOST_NO_CXX14_STD_EXCHANGE)#include <boost/fiber/detail/exchange.hpp>#endif#include <boost/fiber/detail/spinlock.hpp>#include <boost/fiber/exceptions.hpp>#ifdef BOOST_HAS_ABI_HEADERS#  include BOOST_ABI_PREFIX#endifnamespace boost {namespace fibers {template< typename T >class unbuffered_channel {public:    using value_type = typename std::remove_reference<T>::type;private:    using wait_queue_type = context::wait_queue_t;    struct slot {        value_type  value;        context *   ctx;        slot( value_type const& value_, context * ctx_) :            value{ value_ },            ctx{ ctx_ } {        }        slot( value_type && value_, context * ctx_) :            value{ std::move( value_) },            ctx{ ctx_ } {        }    };    // shared cacheline    std::atomic< slot * >       slot_{ nullptr };    // shared cacheline    std::atomic_bool            closed_{ false };    mutable detail::spinlock    splk_producers_{};    wait_queue_type             waiting_producers_{};    mutable detail::spinlock    splk_consumers_{};    wait_queue_type             waiting_consumers_{};    char                        pad_[cacheline_length];    bool is_empty_() {        return nullptr == slot_.load( std::memory_order_acquire);    }    bool try_push_( slot * own_slot) {        for (;;) {            slot * s = slot_.load( std::memory_order_acquire);            if ( nullptr == s) {                if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {                    continue;                }                return true;            }            return false;        }    }    slot * try_pop_() {        slot * nil_slot = nullptr;        for (;;) {            slot * s = slot_.load( std::memory_order_acquire);            if ( nullptr != s) {                if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {                    continue;}            }            return s;        }    }public:    unbuffered_channel() = default;    ~unbuffered_channel() {        close();    }    unbuffered_channel( unbuffered_channel const&) = delete;    unbuffered_channel & operator=( unbuffered_channel const&) = delete;    bool is_closed() const noexcept {        return closed_.load( std::memory_order_acquire);    }    void close() noexcept {        context * active_ctx = context::active();        // set flag        if ( ! closed_.exchange( true, std::memory_order_acquire) ) {            // notify current waiting              slot * s = slot_.load( std::memory_order_acquire);            if ( nullptr != s) {                // notify context                active_ctx->schedule( s->ctx);            }            // notify all waiting producers            detail::spinlock_lock lk1{ splk_producers_ };            while ( ! waiting_producers_.empty() ) {                context * producer_ctx = & waiting_producers_.front();                waiting_producers_.pop_front();                auto expected = reinterpret_cast< std::intptr_t >( this);                if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                    // notify context                    active_ctx->schedule( producer_ctx);                } else if ( static_cast< std::intptr_t >( 0) == expected) {                    // no timed-wait op.                    // notify context                    active_ctx->schedule( producer_ctx);                }            }            // notify all waiting consumers            detail::spinlock_lock lk2{ splk_consumers_ };            while ( ! waiting_consumers_.empty() ) {                context * consumer_ctx = & waiting_consumers_.front();                waiting_consumers_.pop_front();                auto expected = reinterpret_cast< std::intptr_t >( this);                if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                    // notify context                    active_ctx->schedule( consumer_ctx);                } else if ( static_cast< std::intptr_t >( 0) == expected) {                    // no timed-wait op.                    // notify context                    active_ctx->schedule( consumer_ctx);                }            }        }    }    channel_op_status push( value_type const& value) {        context * active_ctx = context::active();        slot s{ value, active_ctx };        for (;;) {            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( try_push_( & s) ) {                detail::spinlock_lock lk{ splk_consumers_ };                // notify one waiting consumer                while ( ! waiting_consumers_.empty() ) {                    context * consumer_ctx = & waiting_consumers_.front();                    waiting_consumers_.pop_front();                    auto expected = reinterpret_cast< std::intptr_t >( this);                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    }                    if ( static_cast< std::intptr_t >( 0) == expected) {                        // no timed-wait op.                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    }                }                // suspend till value has been consumed                active_ctx->suspend( lk);                // resumed                if ( nullptr == s.ctx) {                    // value has been consumed                    return channel_op_status::success;                }                // channel was closed before value was consumed                return channel_op_status::closed;            }            detail::spinlock_lock lk{ splk_producers_ };            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( is_empty_() ) {                continue;            }            active_ctx->wait_link( waiting_producers_);            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);            // suspend this producer            active_ctx->suspend( lk);            // resumed, slot mabye free        }    }    channel_op_status push( value_type && value) {        context * active_ctx = context::active();        slot s{ std::move( value), active_ctx };        for (;;) {            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( try_push_( & s) ) {                detail::spinlock_lock lk{ splk_consumers_ };                // notify one waiting consumer                while ( ! waiting_consumers_.empty() ) {                    context * consumer_ctx = & waiting_consumers_.front();                    waiting_consumers_.pop_front();                    auto expected = reinterpret_cast< std::intptr_t >( this);                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    } if ( static_cast< std::intptr_t >( 0) == expected) {                        // no timed-wait op.                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    }                }                // suspend till value has been consumed                active_ctx->suspend( lk);                // resumed                if ( nullptr == s.ctx) {                    // value has been consumed                    return channel_op_status::success;                }                // channel was closed before value was consumed                return channel_op_status::closed;            }            detail::spinlock_lock lk{ splk_producers_ };            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( is_empty_() ) {                continue;            }            active_ctx->wait_link( waiting_producers_);            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);            // suspend this producer            active_ctx->suspend( lk);            // resumed, slot mabye free        }    }    template< typename Rep, typename Period >    channel_op_status push_wait_for( value_type const& value,                                     std::chrono::duration< Rep, Period > const& timeout_duration) {        return push_wait_until( value,                                std::chrono::steady_clock::now() + timeout_duration);    }    template< typename Rep, typename Period >    channel_op_status push_wait_for( value_type && value,                                     std::chrono::duration< Rep, Period > const& timeout_duration) {        return push_wait_until( std::forward< value_type >( value),                                std::chrono::steady_clock::now() + timeout_duration);    }    template< typename Clock, typename Duration >    channel_op_status push_wait_until( value_type const& value,                                       std::chrono::time_point< Clock, Duration > const& timeout_time_) {        context * active_ctx = context::active();        slot s{ value, active_ctx };        std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);        for (;;) {            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( try_push_( & s) ) {                detail::spinlock_lock lk{ splk_consumers_ };                // notify one waiting consumer                while ( ! waiting_consumers_.empty() ) {                    context * consumer_ctx = & waiting_consumers_.front();                    waiting_consumers_.pop_front();                    auto expected = reinterpret_cast< std::intptr_t >( this);                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    }                    if ( static_cast< std::intptr_t >( 0) == expected) {                        // no timed-wait op.                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    }                }                // suspend this producer                active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);                if ( ! active_ctx->wait_until( timeout_time, lk) ) {                    // clear slot                    slot * nil_slot = nullptr, * own_slot = & s;                    slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);                    // resumed, value has not been consumed                    return channel_op_status::timeout;                }                // resumed                if ( nullptr == s.ctx) {                    // value has been consumed                    return channel_op_status::success;                }                // channel was closed before value was consumed                return channel_op_status::closed;            }            detail::spinlock_lock lk{ splk_producers_ };            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( is_empty_() ) {                continue;            }            active_ctx->wait_link( waiting_producers_);            active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);            // suspend this producer            if ( ! active_ctx->wait_until( timeout_time, lk) ) {                // relock local lk                lk.lock();                // remove from waiting-queue                waiting_producers_.remove( * active_ctx);                return channel_op_status::timeout;            }            // resumed, slot maybe free        }    }    template< typename Clock, typename Duration >    channel_op_status push_wait_until( value_type && value,                                       std::chrono::time_point< Clock, Duration > const& timeout_time_) {        context * active_ctx = context::active();        slot s{ std::move( value), active_ctx };        std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);        for (;;) {            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( try_push_( & s) ) {                detail::spinlock_lock lk{ splk_consumers_ };                // notify one waiting consumer                while ( ! waiting_consumers_.empty() ) {                    context * consumer_ctx = & waiting_consumers_.front();                    waiting_consumers_.pop_front();                    auto expected = reinterpret_cast< std::intptr_t >( this);                    if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    } if ( static_cast< std::intptr_t >( 0) == expected) {                        // no timed-wait op.                        // notify context                        active_ctx->schedule( consumer_ctx);                        break;                    }                }                // suspend this producer                active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);                if ( ! active_ctx->wait_until( timeout_time, lk) ) {                    // clear slot                    slot * nil_slot = nullptr, * own_slot = & s;                    slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);                    // resumed, value has not been consumed                    return channel_op_status::timeout;                }                // resumed                if ( nullptr == s.ctx) {                    // value has been consumed                    return channel_op_status::success;                }                // channel was closed before value was consumed                return channel_op_status::closed;            }            detail::spinlock_lock lk{ splk_producers_ };            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( is_empty_() ) {                continue;            }            active_ctx->wait_link( waiting_producers_);            active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);            // suspend this producer            if ( ! active_ctx->wait_until( timeout_time, lk) ) {                // relock local lk                lk.lock();                // remove from waiting-queue                waiting_producers_.remove( * active_ctx);                return channel_op_status::timeout;            }            // resumed, slot maybe free        }    }    channel_op_status pop( value_type & value) {        context * active_ctx = context::active();        slot * s = nullptr;        for (;;) {            if ( nullptr != ( s = try_pop_() ) ) {                {                    detail::spinlock_lock lk{ splk_producers_ };                    // notify one waiting producer                    while ( ! waiting_producers_.empty() ) {                        context * producer_ctx = & waiting_producers_.front();                        waiting_producers_.pop_front();                        auto expected = reinterpret_cast< std::intptr_t >( this);                        if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                            lk.unlock();                            // notify context                            active_ctx->schedule( producer_ctx);                            break;                        } if ( static_cast< std::intptr_t >( 0) == expected) {                            lk.unlock();                            // no timed-wait op.                            // notify context                            active_ctx->schedule( producer_ctx);                            break;                        }                    }                }                value = std::move( s->value);                // notify context#if defined(BOOST_NO_CXX14_STD_EXCHANGE)                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );#else                active_ctx->schedule( std::exchange( s->ctx, nullptr) );#endif                return channel_op_status::success;            }            detail::spinlock_lock lk{ splk_consumers_ };            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( ! is_empty_() ) {                continue;            }            active_ctx->wait_link( waiting_consumers_);            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);            // suspend this consumer            active_ctx->suspend( lk);            // resumed, slot mabye set        }    }    value_type value_pop() {        context * active_ctx = context::active();        slot * s = nullptr;        for (;;) {            if ( nullptr != ( s = try_pop_() ) ) {                {                    detail::spinlock_lock lk{ splk_producers_ };                    // notify one waiting producer                    while ( ! waiting_producers_.empty() ) {                        context * producer_ctx = & waiting_producers_.front();                        waiting_producers_.pop_front();                        auto expected = reinterpret_cast< std::intptr_t >( this);                        if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                            lk.unlock();                            // notify context                            active_ctx->schedule( producer_ctx);                            break;                        } if ( static_cast< std::intptr_t >( 0) == expected) {                            lk.unlock();                            // no timed-wait op.                            // notify context                            active_ctx->schedule( producer_ctx);                            break;                        }                    }                }                // consume value                value_type value = std::move( s->value);                // notify context#if defined(BOOST_NO_CXX14_STD_EXCHANGE)                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );#else                active_ctx->schedule( std::exchange( s->ctx, nullptr) );#endif                return std::move( value);            }            detail::spinlock_lock lk{ splk_consumers_ };            if ( BOOST_UNLIKELY( is_closed() ) ) {                throw fiber_error{                        std::make_error_code( std::errc::operation_not_permitted),                        "boost fiber: channel is closed" };            }            if ( ! is_empty_() ) {                continue;            }            active_ctx->wait_link( waiting_consumers_);            active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);            // suspend this consumer            active_ctx->suspend( lk);            // resumed, slot mabye set        }    }    template< typename Rep, typename Period >    channel_op_status pop_wait_for( value_type & value,                                    std::chrono::duration< Rep, Period > const& timeout_duration) {        return pop_wait_until( value,                               std::chrono::steady_clock::now() + timeout_duration);    }    template< typename Clock, typename Duration >    channel_op_status pop_wait_until( value_type & value,                                      std::chrono::time_point< Clock, Duration > const& timeout_time_) {        context * active_ctx = context::active();        slot * s = nullptr;        std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);        for (;;) {            if ( nullptr != ( s = try_pop_() ) ) {                {                    detail::spinlock_lock lk{ splk_producers_ };                    // notify one waiting producer                    while ( ! waiting_producers_.empty() ) {                        context * producer_ctx = & waiting_producers_.front();                        waiting_producers_.pop_front();                        auto expected = reinterpret_cast< std::intptr_t >( this);                        if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {                            lk.unlock();                            // notify context                            active_ctx->schedule( producer_ctx);                            break;                        }                        if ( static_cast< std::intptr_t >( 0) == expected) {                            lk.unlock();                            // no timed-wait op.                            // notify context                            active_ctx->schedule( producer_ctx);                            break;                        }                    }                }                // consume value                value = std::move( s->value);                // notify context#if defined(BOOST_NO_CXX14_STD_EXCHANGE)                active_ctx->schedule( detail::exchange( s->ctx, nullptr) );#else                active_ctx->schedule( std::exchange( s->ctx, nullptr) );#endif                return channel_op_status::success;            }            detail::spinlock_lock lk{ splk_consumers_ };            if ( BOOST_UNLIKELY( is_closed() ) ) {                return channel_op_status::closed;            }            if ( ! is_empty_() ) {                continue;            }            active_ctx->wait_link( waiting_consumers_);            active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);            // suspend this consumer            if ( ! active_ctx->wait_until( timeout_time, lk) ) {                // relock local lk                lk.lock();                // remove from waiting-queue                waiting_consumers_.remove( * active_ctx);                return channel_op_status::timeout;            }        }    }    class iterator {    private:        typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type  storage_type;        unbuffered_channel  *   chan_{ nullptr };        storage_type            storage_;        void increment_() {            BOOST_ASSERT( nullptr != chan_);            try {                ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };            } catch ( fiber_error const&) {                chan_ = nullptr;            }        }    public:        using iterator_category = std::input_iterator_tag;        using difference_type = std::ptrdiff_t;        using pointer = value_type *;        using reference = value_type &;        using pointer_t = pointer;        using reference_t = reference;        iterator() noexcept = default;        explicit iterator( unbuffered_channel< T > * chan) noexcept :            chan_{ chan } {            increment_();        }        iterator( iterator const& other) noexcept :            chan_{ other.chan_ } {        }        iterator & operator=( iterator const& other) noexcept {            if ( this == & other) return * this;            chan_ = other.chan_;            return * this;        }        bool operator==( iterator const& other) const noexcept {            return other.chan_ == chan_;        }        bool operator!=( iterator const& other) const noexcept {            return other.chan_ != chan_;        }        iterator & operator++() {            reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();            increment_();            return * this;        }        const iterator operator++( int) = delete;        reference_t operator*() noexcept {            return * reinterpret_cast< value_type * >( std::addressof( storage_) );        }        pointer_t operator->() noexcept {            return reinterpret_cast< value_type * >( std::addressof( storage_) );        }    };    friend class iterator;};template< typename T >typename unbuffered_channel< T >::iteratorbegin( unbuffered_channel< T > & chan) {    return typename unbuffered_channel< T >::iterator( & chan);}template< typename T >typename unbuffered_channel< T >::iteratorend( unbuffered_channel< T > &) {    return typename unbuffered_channel< T >::iterator();}}}#ifdef BOOST_HAS_ABI_HEADERS#  include BOOST_ABI_SUFFIX#endif#endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H
 |