unbuffered_channel.hpp 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef BOOST_FIBERS_UNBUFFERED_CHANNEL_H
  6. #define BOOST_FIBERS_UNBUFFERED_CHANNEL_H
  7. #include <atomic>
  8. #include <chrono>
  9. #include <cstddef>
  10. #include <cstdint>
  11. #include <memory>
  12. #include <vector>
  13. #include <boost/config.hpp>
  14. #include <boost/fiber/channel_op_status.hpp>
  15. #include <boost/fiber/context.hpp>
  16. #include <boost/fiber/detail/config.hpp>
  17. #include <boost/fiber/detail/convert.hpp>
  18. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  19. #include <boost/fiber/detail/exchange.hpp>
  20. #endif
  21. #include <boost/fiber/detail/spinlock.hpp>
  22. #include <boost/fiber/exceptions.hpp>
  23. #ifdef BOOST_HAS_ABI_HEADERS
  24. # include BOOST_ABI_PREFIX
  25. #endif
  26. namespace boost {
  27. namespace fibers {
  28. template< typename T >
  29. class unbuffered_channel {
  30. public:
  31. using value_type = typename std::remove_reference<T>::type;
  32. private:
  33. using wait_queue_type = context::wait_queue_t;
  34. struct slot {
  35. value_type value;
  36. context * ctx;
  37. slot( value_type const& value_, context * ctx_) :
  38. value{ value_ },
  39. ctx{ ctx_ } {
  40. }
  41. slot( value_type && value_, context * ctx_) :
  42. value{ std::move( value_) },
  43. ctx{ ctx_ } {
  44. }
  45. };
  46. // shared cacheline
  47. std::atomic< slot * > slot_{ nullptr };
  48. // shared cacheline
  49. std::atomic_bool closed_{ false };
  50. mutable detail::spinlock splk_producers_{};
  51. wait_queue_type waiting_producers_{};
  52. mutable detail::spinlock splk_consumers_{};
  53. wait_queue_type waiting_consumers_{};
  54. char pad_[cacheline_length];
  55. bool is_empty_() {
  56. return nullptr == slot_.load( std::memory_order_acquire);
  57. }
  58. bool try_push_( slot * own_slot) {
  59. for (;;) {
  60. slot * s = slot_.load( std::memory_order_acquire);
  61. if ( nullptr == s) {
  62. if ( ! slot_.compare_exchange_strong( s, own_slot, std::memory_order_acq_rel) ) {
  63. continue;
  64. }
  65. return true;
  66. }
  67. return false;
  68. }
  69. }
  70. slot * try_pop_() {
  71. slot * nil_slot = nullptr;
  72. for (;;) {
  73. slot * s = slot_.load( std::memory_order_acquire);
  74. if ( nullptr != s) {
  75. if ( ! slot_.compare_exchange_strong( s, nil_slot, std::memory_order_acq_rel) ) {
  76. continue;}
  77. }
  78. return s;
  79. }
  80. }
  81. public:
  82. unbuffered_channel() = default;
  83. ~unbuffered_channel() {
  84. close();
  85. }
  86. unbuffered_channel( unbuffered_channel const&) = delete;
  87. unbuffered_channel & operator=( unbuffered_channel const&) = delete;
  88. bool is_closed() const noexcept {
  89. return closed_.load( std::memory_order_acquire);
  90. }
  91. void close() noexcept {
  92. context * active_ctx = context::active();
  93. // set flag
  94. if ( ! closed_.exchange( true, std::memory_order_acquire) ) {
  95. // notify current waiting
  96. slot * s = slot_.load( std::memory_order_acquire);
  97. if ( nullptr != s) {
  98. // notify context
  99. active_ctx->schedule( s->ctx);
  100. }
  101. // notify all waiting producers
  102. detail::spinlock_lock lk1{ splk_producers_ };
  103. while ( ! waiting_producers_.empty() ) {
  104. context * producer_ctx = & waiting_producers_.front();
  105. waiting_producers_.pop_front();
  106. auto expected = reinterpret_cast< std::intptr_t >( this);
  107. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  108. // notify context
  109. active_ctx->schedule( producer_ctx);
  110. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  111. // no timed-wait op.
  112. // notify context
  113. active_ctx->schedule( producer_ctx);
  114. }
  115. }
  116. // notify all waiting consumers
  117. detail::spinlock_lock lk2{ splk_consumers_ };
  118. while ( ! waiting_consumers_.empty() ) {
  119. context * consumer_ctx = & waiting_consumers_.front();
  120. waiting_consumers_.pop_front();
  121. auto expected = reinterpret_cast< std::intptr_t >( this);
  122. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  123. // notify context
  124. active_ctx->schedule( consumer_ctx);
  125. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  126. // no timed-wait op.
  127. // notify context
  128. active_ctx->schedule( consumer_ctx);
  129. }
  130. }
  131. }
  132. }
  133. channel_op_status push( value_type const& value) {
  134. context * active_ctx = context::active();
  135. slot s{ value, active_ctx };
  136. for (;;) {
  137. if ( BOOST_UNLIKELY( is_closed() ) ) {
  138. return channel_op_status::closed;
  139. }
  140. if ( try_push_( & s) ) {
  141. detail::spinlock_lock lk{ splk_consumers_ };
  142. // notify one waiting consumer
  143. while ( ! waiting_consumers_.empty() ) {
  144. context * consumer_ctx = & waiting_consumers_.front();
  145. waiting_consumers_.pop_front();
  146. auto expected = reinterpret_cast< std::intptr_t >( this);
  147. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  148. // notify context
  149. active_ctx->schedule( consumer_ctx);
  150. break;
  151. }
  152. if ( static_cast< std::intptr_t >( 0) == expected) {
  153. // no timed-wait op.
  154. // notify context
  155. active_ctx->schedule( consumer_ctx);
  156. break;
  157. }
  158. }
  159. // suspend till value has been consumed
  160. active_ctx->suspend( lk);
  161. // resumed
  162. if ( nullptr == s.ctx) {
  163. // value has been consumed
  164. return channel_op_status::success;
  165. }
  166. // channel was closed before value was consumed
  167. return channel_op_status::closed;
  168. }
  169. detail::spinlock_lock lk{ splk_producers_ };
  170. if ( BOOST_UNLIKELY( is_closed() ) ) {
  171. return channel_op_status::closed;
  172. }
  173. if ( is_empty_() ) {
  174. continue;
  175. }
  176. active_ctx->wait_link( waiting_producers_);
  177. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  178. // suspend this producer
  179. active_ctx->suspend( lk);
  180. // resumed, slot mabye free
  181. }
  182. }
  183. channel_op_status push( value_type && value) {
  184. context * active_ctx = context::active();
  185. slot s{ std::move( value), active_ctx };
  186. for (;;) {
  187. if ( BOOST_UNLIKELY( is_closed() ) ) {
  188. return channel_op_status::closed;
  189. }
  190. if ( try_push_( & s) ) {
  191. detail::spinlock_lock lk{ splk_consumers_ };
  192. // notify one waiting consumer
  193. while ( ! waiting_consumers_.empty() ) {
  194. context * consumer_ctx = & waiting_consumers_.front();
  195. waiting_consumers_.pop_front();
  196. auto expected = reinterpret_cast< std::intptr_t >( this);
  197. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  198. // notify context
  199. active_ctx->schedule( consumer_ctx);
  200. break;
  201. } if ( static_cast< std::intptr_t >( 0) == expected) {
  202. // no timed-wait op.
  203. // notify context
  204. active_ctx->schedule( consumer_ctx);
  205. break;
  206. }
  207. }
  208. // suspend till value has been consumed
  209. active_ctx->suspend( lk);
  210. // resumed
  211. if ( nullptr == s.ctx) {
  212. // value has been consumed
  213. return channel_op_status::success;
  214. }
  215. // channel was closed before value was consumed
  216. return channel_op_status::closed;
  217. }
  218. detail::spinlock_lock lk{ splk_producers_ };
  219. if ( BOOST_UNLIKELY( is_closed() ) ) {
  220. return channel_op_status::closed;
  221. }
  222. if ( is_empty_() ) {
  223. continue;
  224. }
  225. active_ctx->wait_link( waiting_producers_);
  226. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  227. // suspend this producer
  228. active_ctx->suspend( lk);
  229. // resumed, slot mabye free
  230. }
  231. }
  232. template< typename Rep, typename Period >
  233. channel_op_status push_wait_for( value_type const& value,
  234. std::chrono::duration< Rep, Period > const& timeout_duration) {
  235. return push_wait_until( value,
  236. std::chrono::steady_clock::now() + timeout_duration);
  237. }
  238. template< typename Rep, typename Period >
  239. channel_op_status push_wait_for( value_type && value,
  240. std::chrono::duration< Rep, Period > const& timeout_duration) {
  241. return push_wait_until( std::forward< value_type >( value),
  242. std::chrono::steady_clock::now() + timeout_duration);
  243. }
  244. template< typename Clock, typename Duration >
  245. channel_op_status push_wait_until( value_type const& value,
  246. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  247. context * active_ctx = context::active();
  248. slot s{ value, active_ctx };
  249. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  250. for (;;) {
  251. if ( BOOST_UNLIKELY( is_closed() ) ) {
  252. return channel_op_status::closed;
  253. }
  254. if ( try_push_( & s) ) {
  255. detail::spinlock_lock lk{ splk_consumers_ };
  256. // notify one waiting consumer
  257. while ( ! waiting_consumers_.empty() ) {
  258. context * consumer_ctx = & waiting_consumers_.front();
  259. waiting_consumers_.pop_front();
  260. auto expected = reinterpret_cast< std::intptr_t >( this);
  261. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  262. // notify context
  263. active_ctx->schedule( consumer_ctx);
  264. break;
  265. }
  266. if ( static_cast< std::intptr_t >( 0) == expected) {
  267. // no timed-wait op.
  268. // notify context
  269. active_ctx->schedule( consumer_ctx);
  270. break;
  271. }
  272. }
  273. // suspend this producer
  274. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  275. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  276. // clear slot
  277. slot * nil_slot = nullptr, * own_slot = & s;
  278. slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
  279. // resumed, value has not been consumed
  280. return channel_op_status::timeout;
  281. }
  282. // resumed
  283. if ( nullptr == s.ctx) {
  284. // value has been consumed
  285. return channel_op_status::success;
  286. }
  287. // channel was closed before value was consumed
  288. return channel_op_status::closed;
  289. }
  290. detail::spinlock_lock lk{ splk_producers_ };
  291. if ( BOOST_UNLIKELY( is_closed() ) ) {
  292. return channel_op_status::closed;
  293. }
  294. if ( is_empty_() ) {
  295. continue;
  296. }
  297. active_ctx->wait_link( waiting_producers_);
  298. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  299. // suspend this producer
  300. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  301. // relock local lk
  302. lk.lock();
  303. // remove from waiting-queue
  304. waiting_producers_.remove( * active_ctx);
  305. return channel_op_status::timeout;
  306. }
  307. // resumed, slot maybe free
  308. }
  309. }
  310. template< typename Clock, typename Duration >
  311. channel_op_status push_wait_until( value_type && value,
  312. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  313. context * active_ctx = context::active();
  314. slot s{ std::move( value), active_ctx };
  315. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  316. for (;;) {
  317. if ( BOOST_UNLIKELY( is_closed() ) ) {
  318. return channel_op_status::closed;
  319. }
  320. if ( try_push_( & s) ) {
  321. detail::spinlock_lock lk{ splk_consumers_ };
  322. // notify one waiting consumer
  323. while ( ! waiting_consumers_.empty() ) {
  324. context * consumer_ctx = & waiting_consumers_.front();
  325. waiting_consumers_.pop_front();
  326. auto expected = reinterpret_cast< std::intptr_t >( this);
  327. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  328. // notify context
  329. active_ctx->schedule( consumer_ctx);
  330. break;
  331. } if ( static_cast< std::intptr_t >( 0) == expected) {
  332. // no timed-wait op.
  333. // notify context
  334. active_ctx->schedule( consumer_ctx);
  335. break;
  336. }
  337. }
  338. // suspend this producer
  339. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  340. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  341. // clear slot
  342. slot * nil_slot = nullptr, * own_slot = & s;
  343. slot_.compare_exchange_strong( own_slot, nil_slot, std::memory_order_acq_rel);
  344. // resumed, value has not been consumed
  345. return channel_op_status::timeout;
  346. }
  347. // resumed
  348. if ( nullptr == s.ctx) {
  349. // value has been consumed
  350. return channel_op_status::success;
  351. }
  352. // channel was closed before value was consumed
  353. return channel_op_status::closed;
  354. }
  355. detail::spinlock_lock lk{ splk_producers_ };
  356. if ( BOOST_UNLIKELY( is_closed() ) ) {
  357. return channel_op_status::closed;
  358. }
  359. if ( is_empty_() ) {
  360. continue;
  361. }
  362. active_ctx->wait_link( waiting_producers_);
  363. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  364. // suspend this producer
  365. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  366. // relock local lk
  367. lk.lock();
  368. // remove from waiting-queue
  369. waiting_producers_.remove( * active_ctx);
  370. return channel_op_status::timeout;
  371. }
  372. // resumed, slot maybe free
  373. }
  374. }
  375. channel_op_status pop( value_type & value) {
  376. context * active_ctx = context::active();
  377. slot * s = nullptr;
  378. for (;;) {
  379. if ( nullptr != ( s = try_pop_() ) ) {
  380. {
  381. detail::spinlock_lock lk{ splk_producers_ };
  382. // notify one waiting producer
  383. while ( ! waiting_producers_.empty() ) {
  384. context * producer_ctx = & waiting_producers_.front();
  385. waiting_producers_.pop_front();
  386. auto expected = reinterpret_cast< std::intptr_t >( this);
  387. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  388. lk.unlock();
  389. // notify context
  390. active_ctx->schedule( producer_ctx);
  391. break;
  392. } if ( static_cast< std::intptr_t >( 0) == expected) {
  393. lk.unlock();
  394. // no timed-wait op.
  395. // notify context
  396. active_ctx->schedule( producer_ctx);
  397. break;
  398. }
  399. }
  400. }
  401. value = std::move( s->value);
  402. // notify context
  403. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  404. active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
  405. #else
  406. active_ctx->schedule( std::exchange( s->ctx, nullptr) );
  407. #endif
  408. return channel_op_status::success;
  409. }
  410. detail::spinlock_lock lk{ splk_consumers_ };
  411. if ( BOOST_UNLIKELY( is_closed() ) ) {
  412. return channel_op_status::closed;
  413. }
  414. if ( ! is_empty_() ) {
  415. continue;
  416. }
  417. active_ctx->wait_link( waiting_consumers_);
  418. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  419. // suspend this consumer
  420. active_ctx->suspend( lk);
  421. // resumed, slot mabye set
  422. }
  423. }
  424. value_type value_pop() {
  425. context * active_ctx = context::active();
  426. slot * s = nullptr;
  427. for (;;) {
  428. if ( nullptr != ( s = try_pop_() ) ) {
  429. {
  430. detail::spinlock_lock lk{ splk_producers_ };
  431. // notify one waiting producer
  432. while ( ! waiting_producers_.empty() ) {
  433. context * producer_ctx = & waiting_producers_.front();
  434. waiting_producers_.pop_front();
  435. auto expected = reinterpret_cast< std::intptr_t >( this);
  436. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  437. lk.unlock();
  438. // notify context
  439. active_ctx->schedule( producer_ctx);
  440. break;
  441. } if ( static_cast< std::intptr_t >( 0) == expected) {
  442. lk.unlock();
  443. // no timed-wait op.
  444. // notify context
  445. active_ctx->schedule( producer_ctx);
  446. break;
  447. }
  448. }
  449. }
  450. // consume value
  451. value_type value = std::move( s->value);
  452. // notify context
  453. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  454. active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
  455. #else
  456. active_ctx->schedule( std::exchange( s->ctx, nullptr) );
  457. #endif
  458. return std::move( value);
  459. }
  460. detail::spinlock_lock lk{ splk_consumers_ };
  461. if ( BOOST_UNLIKELY( is_closed() ) ) {
  462. throw fiber_error{
  463. std::make_error_code( std::errc::operation_not_permitted),
  464. "boost fiber: channel is closed" };
  465. }
  466. if ( ! is_empty_() ) {
  467. continue;
  468. }
  469. active_ctx->wait_link( waiting_consumers_);
  470. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  471. // suspend this consumer
  472. active_ctx->suspend( lk);
  473. // resumed, slot mabye set
  474. }
  475. }
  476. template< typename Rep, typename Period >
  477. channel_op_status pop_wait_for( value_type & value,
  478. std::chrono::duration< Rep, Period > const& timeout_duration) {
  479. return pop_wait_until( value,
  480. std::chrono::steady_clock::now() + timeout_duration);
  481. }
  482. template< typename Clock, typename Duration >
  483. channel_op_status pop_wait_until( value_type & value,
  484. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  485. context * active_ctx = context::active();
  486. slot * s = nullptr;
  487. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  488. for (;;) {
  489. if ( nullptr != ( s = try_pop_() ) ) {
  490. {
  491. detail::spinlock_lock lk{ splk_producers_ };
  492. // notify one waiting producer
  493. while ( ! waiting_producers_.empty() ) {
  494. context * producer_ctx = & waiting_producers_.front();
  495. waiting_producers_.pop_front();
  496. auto expected = reinterpret_cast< std::intptr_t >( this);
  497. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  498. lk.unlock();
  499. // notify context
  500. active_ctx->schedule( producer_ctx);
  501. break;
  502. }
  503. if ( static_cast< std::intptr_t >( 0) == expected) {
  504. lk.unlock();
  505. // no timed-wait op.
  506. // notify context
  507. active_ctx->schedule( producer_ctx);
  508. break;
  509. }
  510. }
  511. }
  512. // consume value
  513. value = std::move( s->value);
  514. // notify context
  515. #if defined(BOOST_NO_CXX14_STD_EXCHANGE)
  516. active_ctx->schedule( detail::exchange( s->ctx, nullptr) );
  517. #else
  518. active_ctx->schedule( std::exchange( s->ctx, nullptr) );
  519. #endif
  520. return channel_op_status::success;
  521. }
  522. detail::spinlock_lock lk{ splk_consumers_ };
  523. if ( BOOST_UNLIKELY( is_closed() ) ) {
  524. return channel_op_status::closed;
  525. }
  526. if ( ! is_empty_() ) {
  527. continue;
  528. }
  529. active_ctx->wait_link( waiting_consumers_);
  530. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  531. // suspend this consumer
  532. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  533. // relock local lk
  534. lk.lock();
  535. // remove from waiting-queue
  536. waiting_consumers_.remove( * active_ctx);
  537. return channel_op_status::timeout;
  538. }
  539. }
  540. }
  541. class iterator {
  542. private:
  543. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  544. unbuffered_channel * chan_{ nullptr };
  545. storage_type storage_;
  546. void increment_() {
  547. BOOST_ASSERT( nullptr != chan_);
  548. try {
  549. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  550. } catch ( fiber_error const&) {
  551. chan_ = nullptr;
  552. }
  553. }
  554. public:
  555. using iterator_category = std::input_iterator_tag;
  556. using difference_type = std::ptrdiff_t;
  557. using pointer = value_type *;
  558. using reference = value_type &;
  559. using pointer_t = pointer;
  560. using reference_t = reference;
  561. iterator() noexcept = default;
  562. explicit iterator( unbuffered_channel< T > * chan) noexcept :
  563. chan_{ chan } {
  564. increment_();
  565. }
  566. iterator( iterator const& other) noexcept :
  567. chan_{ other.chan_ } {
  568. }
  569. iterator & operator=( iterator const& other) noexcept {
  570. if ( this == & other) return * this;
  571. chan_ = other.chan_;
  572. return * this;
  573. }
  574. bool operator==( iterator const& other) const noexcept {
  575. return other.chan_ == chan_;
  576. }
  577. bool operator!=( iterator const& other) const noexcept {
  578. return other.chan_ != chan_;
  579. }
  580. iterator & operator++() {
  581. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  582. increment_();
  583. return * this;
  584. }
  585. const iterator operator++( int) = delete;
  586. reference_t operator*() noexcept {
  587. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  588. }
  589. pointer_t operator->() noexcept {
  590. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  591. }
  592. };
  593. friend class iterator;
  594. };
  595. template< typename T >
  596. typename unbuffered_channel< T >::iterator
  597. begin( unbuffered_channel< T > & chan) {
  598. return typename unbuffered_channel< T >::iterator( & chan);
  599. }
  600. template< typename T >
  601. typename unbuffered_channel< T >::iterator
  602. end( unbuffered_channel< T > &) {
  603. return typename unbuffered_channel< T >::iterator();
  604. }
  605. }}
  606. #ifdef BOOST_HAS_ABI_HEADERS
  607. # include BOOST_ABI_SUFFIX
  608. #endif
  609. #endif // BOOST_FIBERS_UNBUFFERED_CHANNEL_H