buffered_channel.hpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632
  1. // Copyright Oliver Kowalke 2016.
  2. // Distributed under the Boost Software License, Version 1.0.
  3. // (See accompanying file LICENSE_1_0.txt or copy at
  4. // http://www.boost.org/LICENSE_1_0.txt)
  5. //
  6. #ifndef BOOST_FIBERS_BUFFERED_CHANNEL_H
  7. #define BOOST_FIBERS_BUFFERED_CHANNEL_H
  8. #include <atomic>
  9. #include <chrono>
  10. #include <cstddef>
  11. #include <cstdint>
  12. #include <memory>
  13. #include <type_traits>
  14. #include <boost/config.hpp>
  15. #include <boost/fiber/channel_op_status.hpp>
  16. #include <boost/fiber/context.hpp>
  17. #include <boost/fiber/detail/config.hpp>
  18. #include <boost/fiber/detail/convert.hpp>
  19. #include <boost/fiber/detail/spinlock.hpp>
  20. #include <boost/fiber/exceptions.hpp>
  21. #ifdef BOOST_HAS_ABI_HEADERS
  22. # include BOOST_ABI_PREFIX
  23. #endif
  24. namespace boost {
  25. namespace fibers {
  26. template< typename T >
  27. class buffered_channel {
  28. public:
  29. using value_type = typename std::remove_reference<T>::type;
  30. private:
  31. using wait_queue_type = context::wait_queue_t;
  32. using slot_type = value_type;
  33. mutable detail::spinlock splk_{};
  34. wait_queue_type waiting_producers_{};
  35. wait_queue_type waiting_consumers_{};
  36. slot_type * slots_;
  37. std::size_t pidx_{ 0 };
  38. std::size_t cidx_{ 0 };
  39. std::size_t capacity_;
  40. bool closed_{ false };
  41. bool is_full_() const noexcept {
  42. return cidx_ == ((pidx_ + 1) % capacity_);
  43. }
  44. bool is_empty_() const noexcept {
  45. return cidx_ == pidx_;
  46. }
  47. bool is_closed_() const noexcept {
  48. return closed_;
  49. }
  50. public:
  51. explicit buffered_channel( std::size_t capacity) :
  52. capacity_{ capacity } {
  53. if ( BOOST_UNLIKELY( 2 > capacity_ || 0 != ( capacity_ & (capacity_ - 1) ) ) ) {
  54. throw fiber_error{ std::make_error_code( std::errc::invalid_argument),
  55. "boost fiber: buffer capacity is invalid" };
  56. }
  57. slots_ = new slot_type[capacity_];
  58. }
  59. ~buffered_channel() {
  60. close();
  61. delete [] slots_;
  62. }
  63. buffered_channel( buffered_channel const&) = delete;
  64. buffered_channel & operator=( buffered_channel const&) = delete;
  65. bool is_closed() const noexcept {
  66. detail::spinlock_lock lk{ splk_ };
  67. return is_closed_();
  68. }
  69. void close() noexcept {
  70. context * active_ctx = context::active();
  71. detail::spinlock_lock lk{ splk_ };
  72. if ( ! closed_) {
  73. closed_ = true;
  74. // notify all waiting producers
  75. while ( ! waiting_producers_.empty() ) {
  76. context * producer_ctx = & waiting_producers_.front();
  77. waiting_producers_.pop_front();
  78. auto expected = reinterpret_cast< std::intptr_t >( this);
  79. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  80. // notify context
  81. active_ctx->schedule( producer_ctx);
  82. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  83. // no timed-wait op.
  84. // notify context
  85. active_ctx->schedule( producer_ctx);
  86. }
  87. }
  88. // notify all waiting consumers
  89. while ( ! waiting_consumers_.empty() ) {
  90. context * consumer_ctx = & waiting_consumers_.front();
  91. waiting_consumers_.pop_front();
  92. auto expected = reinterpret_cast< std::intptr_t >( this);
  93. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  94. // notify context
  95. active_ctx->schedule( consumer_ctx);
  96. } else if ( static_cast< std::intptr_t >( 0) == expected) {
  97. // no timed-wait op.
  98. // notify context
  99. active_ctx->schedule( consumer_ctx);
  100. }
  101. }
  102. }
  103. }
  104. channel_op_status try_push( value_type const& value) {
  105. context * active_ctx = context::active();
  106. detail::spinlock_lock lk{ splk_ };
  107. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  108. return channel_op_status::closed;
  109. }
  110. if ( is_full_() ) {
  111. return channel_op_status::full;
  112. }
  113. slots_[pidx_] = value;
  114. pidx_ = (pidx_ + 1) % capacity_;
  115. // notify one waiting consumer
  116. while ( ! waiting_consumers_.empty() ) {
  117. context * consumer_ctx = & waiting_consumers_.front();
  118. waiting_consumers_.pop_front();
  119. auto expected = reinterpret_cast< std::intptr_t >( this);
  120. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  121. lk.unlock();
  122. // notify context
  123. active_ctx->schedule( consumer_ctx);
  124. break;
  125. }
  126. if ( static_cast< std::intptr_t >( 0) == expected) {
  127. lk.unlock();
  128. // no timed-wait op.
  129. // notify context
  130. active_ctx->schedule( consumer_ctx);
  131. break;
  132. }
  133. }
  134. return channel_op_status::success;
  135. }
  136. channel_op_status try_push( value_type && value) {
  137. context * active_ctx = context::active();
  138. detail::spinlock_lock lk{ splk_ };
  139. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  140. return channel_op_status::closed;
  141. }
  142. if ( is_full_() ) {
  143. return channel_op_status::full;
  144. }
  145. slots_[pidx_] = std::move( value);
  146. pidx_ = (pidx_ + 1) % capacity_;
  147. // notify one waiting consumer
  148. while ( ! waiting_consumers_.empty() ) {
  149. context * consumer_ctx = & waiting_consumers_.front();
  150. waiting_consumers_.pop_front();
  151. auto expected = reinterpret_cast< std::intptr_t >( this);
  152. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  153. lk.unlock();
  154. // notify context
  155. active_ctx->schedule( consumer_ctx);
  156. break;
  157. }
  158. if ( static_cast< std::intptr_t >( 0) == expected) {
  159. lk.unlock();
  160. // no timed-wait op.
  161. // notify context
  162. active_ctx->schedule( consumer_ctx);
  163. break;
  164. }
  165. }
  166. return channel_op_status::success;
  167. }
  168. channel_op_status push( value_type const& value) {
  169. context * active_ctx = context::active();
  170. for (;;) {
  171. detail::spinlock_lock lk{ splk_ };
  172. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  173. return channel_op_status::closed;
  174. }
  175. if ( is_full_() ) {
  176. active_ctx->wait_link( waiting_producers_);
  177. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  178. // suspend this producer
  179. active_ctx->suspend( lk);
  180. } else {
  181. slots_[pidx_] = value;
  182. pidx_ = (pidx_ + 1) % capacity_;
  183. // notify one waiting consumer
  184. while ( ! waiting_consumers_.empty() ) {
  185. context * consumer_ctx = & waiting_consumers_.front();
  186. waiting_consumers_.pop_front();
  187. auto expected = reinterpret_cast< std::intptr_t >( this);
  188. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  189. lk.unlock();
  190. // notify context
  191. active_ctx->schedule( consumer_ctx);
  192. break;
  193. }
  194. if ( static_cast< std::intptr_t >( 0) == expected) {
  195. lk.unlock();
  196. // no timed-wait op.
  197. // notify context
  198. active_ctx->schedule( consumer_ctx);
  199. break;
  200. }
  201. }
  202. return channel_op_status::success;
  203. }
  204. }
  205. }
  206. channel_op_status push( value_type && value) {
  207. context * active_ctx = context::active();
  208. for (;;) {
  209. detail::spinlock_lock lk{ splk_ };
  210. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  211. return channel_op_status::closed;
  212. }
  213. if ( is_full_() ) {
  214. active_ctx->wait_link( waiting_producers_);
  215. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  216. // suspend this producer
  217. active_ctx->suspend( lk);
  218. } else {
  219. slots_[pidx_] = std::move( value);
  220. pidx_ = (pidx_ + 1) % capacity_;
  221. // notify one waiting consumer
  222. while ( ! waiting_consumers_.empty() ) {
  223. context * consumer_ctx = & waiting_consumers_.front();
  224. waiting_consumers_.pop_front();
  225. auto expected = reinterpret_cast< std::intptr_t >( this);
  226. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  227. lk.unlock();
  228. // notify context
  229. active_ctx->schedule( consumer_ctx);
  230. break;
  231. }
  232. if ( static_cast< std::intptr_t >( 0) == expected) {
  233. lk.unlock();
  234. // no timed-wait op.
  235. // notify context
  236. active_ctx->schedule( consumer_ctx);
  237. break;
  238. }
  239. }
  240. return channel_op_status::success;
  241. }
  242. }
  243. }
  244. template< typename Rep, typename Period >
  245. channel_op_status push_wait_for( value_type const& value,
  246. std::chrono::duration< Rep, Period > const& timeout_duration) {
  247. return push_wait_until( value,
  248. std::chrono::steady_clock::now() + timeout_duration);
  249. }
  250. template< typename Rep, typename Period >
  251. channel_op_status push_wait_for( value_type && value,
  252. std::chrono::duration< Rep, Period > const& timeout_duration) {
  253. return push_wait_until( std::forward< value_type >( value),
  254. std::chrono::steady_clock::now() + timeout_duration);
  255. }
  256. template< typename Clock, typename Duration >
  257. channel_op_status push_wait_until( value_type const& value,
  258. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  259. context * active_ctx = context::active();
  260. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  261. for (;;) {
  262. detail::spinlock_lock lk{ splk_ };
  263. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  264. return channel_op_status::closed;
  265. }
  266. if ( is_full_() ) {
  267. active_ctx->wait_link( waiting_producers_);
  268. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  269. // suspend this producer
  270. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  271. // relock local lk
  272. lk.lock();
  273. // remove from waiting-queue
  274. waiting_producers_.remove( * active_ctx);
  275. return channel_op_status::timeout;
  276. }
  277. } else {
  278. slots_[pidx_] = value;
  279. pidx_ = (pidx_ + 1) % capacity_;
  280. // notify one waiting consumer
  281. while ( ! waiting_consumers_.empty() ) {
  282. context * consumer_ctx = & waiting_consumers_.front();
  283. waiting_consumers_.pop_front();
  284. auto expected = reinterpret_cast< std::intptr_t >( this);
  285. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  286. lk.unlock();
  287. // notify context
  288. active_ctx->schedule( consumer_ctx);
  289. break;
  290. }
  291. if ( static_cast< std::intptr_t >( 0) == expected) {
  292. lk.unlock();
  293. // no timed-wait op.
  294. // notify context
  295. active_ctx->schedule( consumer_ctx);
  296. break;
  297. }
  298. }
  299. return channel_op_status::success;
  300. }
  301. }
  302. }
  303. template< typename Clock, typename Duration >
  304. channel_op_status push_wait_until( value_type && value,
  305. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  306. context * active_ctx = context::active();
  307. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  308. for (;;) {
  309. detail::spinlock_lock lk{ splk_ };
  310. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  311. return channel_op_status::closed;
  312. }
  313. if ( is_full_() ) {
  314. active_ctx->wait_link( waiting_producers_);
  315. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  316. // suspend this producer
  317. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  318. // relock local lk
  319. lk.lock();
  320. // remove from waiting-queue
  321. waiting_producers_.remove( * active_ctx);
  322. return channel_op_status::timeout;
  323. }
  324. } else {
  325. slots_[pidx_] = std::move( value);
  326. pidx_ = (pidx_ + 1) % capacity_;
  327. // notify one waiting consumer
  328. while ( ! waiting_consumers_.empty() ) {
  329. context * consumer_ctx = & waiting_consumers_.front();
  330. waiting_consumers_.pop_front();
  331. auto expected = reinterpret_cast< std::intptr_t >( this);
  332. if ( consumer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  333. lk.unlock();
  334. // notify context
  335. active_ctx->schedule( consumer_ctx);
  336. break;
  337. }
  338. if ( static_cast< std::intptr_t >( 0) == expected) {
  339. lk.unlock();
  340. // no timed-wait op.
  341. // notify context
  342. active_ctx->schedule( consumer_ctx);
  343. break;
  344. }
  345. }
  346. return channel_op_status::success;
  347. }
  348. }
  349. }
  350. channel_op_status try_pop( value_type & value) {
  351. context * active_ctx = context::active();
  352. detail::spinlock_lock lk{ splk_ };
  353. if ( is_empty_() ) {
  354. return is_closed_()
  355. ? channel_op_status::closed
  356. : channel_op_status::empty;
  357. }
  358. value = std::move( slots_[cidx_]);
  359. cidx_ = (cidx_ + 1) % capacity_;
  360. // notify one waiting producer
  361. while ( ! waiting_producers_.empty() ) {
  362. context * producer_ctx = & waiting_producers_.front();
  363. waiting_producers_.pop_front();
  364. auto expected = reinterpret_cast< std::intptr_t >( this);
  365. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  366. lk.unlock();
  367. // notify context
  368. active_ctx->schedule( producer_ctx);
  369. break;
  370. }
  371. if ( static_cast< std::intptr_t >( 0) == expected) {
  372. lk.unlock();
  373. // no timed-wait op.
  374. // notify context
  375. active_ctx->schedule( producer_ctx);
  376. break;
  377. }
  378. }
  379. return channel_op_status::success;
  380. }
  381. channel_op_status pop( value_type & value) {
  382. context * active_ctx = context::active();
  383. for (;;) {
  384. detail::spinlock_lock lk{ splk_ };
  385. if ( is_empty_() ) {
  386. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  387. return channel_op_status::closed;
  388. }
  389. active_ctx->wait_link( waiting_consumers_);
  390. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  391. // suspend this consumer
  392. active_ctx->suspend( lk);
  393. } else {
  394. value = std::move( slots_[cidx_]);
  395. cidx_ = (cidx_ + 1) % capacity_;
  396. // notify one waiting producer
  397. while ( ! waiting_producers_.empty() ) {
  398. context * producer_ctx = & waiting_producers_.front();
  399. waiting_producers_.pop_front();
  400. auto expected = reinterpret_cast< std::intptr_t >( this);
  401. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  402. lk.unlock();
  403. // notify context
  404. active_ctx->schedule( producer_ctx);
  405. break;
  406. }
  407. if ( static_cast< std::intptr_t >( 0) == expected) {
  408. lk.unlock();
  409. // no timed-wait op.
  410. // notify context
  411. active_ctx->schedule( producer_ctx);
  412. break;
  413. }
  414. }
  415. return channel_op_status::success;
  416. }
  417. }
  418. }
  419. value_type value_pop() {
  420. context * active_ctx = context::active();
  421. for (;;) {
  422. detail::spinlock_lock lk{ splk_ };
  423. if ( is_empty_() ) {
  424. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  425. throw fiber_error{
  426. std::make_error_code( std::errc::operation_not_permitted),
  427. "boost fiber: channel is closed" };
  428. }
  429. active_ctx->wait_link( waiting_consumers_);
  430. active_ctx->twstatus.store( static_cast< std::intptr_t >( 0), std::memory_order_release);
  431. // suspend this consumer
  432. active_ctx->suspend( lk);
  433. } else {
  434. value_type value = std::move( slots_[cidx_]);
  435. cidx_ = (cidx_ + 1) % capacity_;
  436. // notify one waiting producer
  437. while ( ! waiting_producers_.empty() ) {
  438. context * producer_ctx = & waiting_producers_.front();
  439. waiting_producers_.pop_front();
  440. auto expected = reinterpret_cast< std::intptr_t >( this);
  441. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  442. lk.unlock();
  443. // notify context
  444. active_ctx->schedule( producer_ctx);
  445. break;
  446. }
  447. if ( static_cast< std::intptr_t >( 0) == expected) {
  448. lk.unlock();
  449. // no timed-wait op.
  450. // notify context
  451. active_ctx->schedule( producer_ctx);
  452. break;
  453. }
  454. }
  455. return value;
  456. }
  457. }
  458. }
  459. template< typename Rep, typename Period >
  460. channel_op_status pop_wait_for( value_type & value,
  461. std::chrono::duration< Rep, Period > const& timeout_duration) {
  462. return pop_wait_until( value,
  463. std::chrono::steady_clock::now() + timeout_duration);
  464. }
  465. template< typename Clock, typename Duration >
  466. channel_op_status pop_wait_until( value_type & value,
  467. std::chrono::time_point< Clock, Duration > const& timeout_time_) {
  468. context * active_ctx = context::active();
  469. std::chrono::steady_clock::time_point timeout_time = detail::convert( timeout_time_);
  470. for (;;) {
  471. detail::spinlock_lock lk{ splk_ };
  472. if ( is_empty_() ) {
  473. if ( BOOST_UNLIKELY( is_closed_() ) ) {
  474. return channel_op_status::closed;
  475. }
  476. active_ctx->wait_link( waiting_consumers_);
  477. active_ctx->twstatus.store( reinterpret_cast< std::intptr_t >( this), std::memory_order_release);
  478. // suspend this consumer
  479. if ( ! active_ctx->wait_until( timeout_time, lk) ) {
  480. // relock local lk
  481. lk.lock();
  482. // remove from waiting-queue
  483. waiting_consumers_.remove( * active_ctx);
  484. return channel_op_status::timeout;
  485. }
  486. } else {
  487. value = std::move( slots_[cidx_]);
  488. cidx_ = (cidx_ + 1) % capacity_;
  489. // notify one waiting producer
  490. while ( ! waiting_producers_.empty() ) {
  491. context * producer_ctx = & waiting_producers_.front();
  492. waiting_producers_.pop_front();
  493. auto expected = reinterpret_cast< std::intptr_t >( this);
  494. if ( producer_ctx->twstatus.compare_exchange_strong( expected, static_cast< std::intptr_t >( -1), std::memory_order_acq_rel) ) {
  495. lk.unlock();
  496. // notify context
  497. active_ctx->schedule( producer_ctx);
  498. break;
  499. } if ( static_cast< std::intptr_t >( 0) == expected) {
  500. lk.unlock();
  501. // no timed-wait op.
  502. // notify context
  503. active_ctx->schedule( producer_ctx);
  504. break;
  505. }
  506. }
  507. return channel_op_status::success;
  508. }
  509. }
  510. }
  511. class iterator {
  512. private:
  513. typedef typename std::aligned_storage< sizeof( value_type), alignof( value_type) >::type storage_type;
  514. buffered_channel * chan_{ nullptr };
  515. storage_type storage_;
  516. void increment_() {
  517. BOOST_ASSERT( nullptr != chan_);
  518. try {
  519. ::new ( static_cast< void * >( std::addressof( storage_) ) ) value_type{ chan_->value_pop() };
  520. } catch ( fiber_error const&) {
  521. chan_ = nullptr;
  522. }
  523. }
  524. public:
  525. using iterator_category = std::input_iterator_tag;
  526. using difference_type = std::ptrdiff_t;
  527. using pointer = value_type *;
  528. using reference = value_type &;
  529. using pointer_t = pointer;
  530. using reference_t = reference;
  531. iterator() noexcept = default;
  532. explicit iterator( buffered_channel< T > * chan) noexcept :
  533. chan_{ chan } {
  534. increment_();
  535. }
  536. iterator( iterator const& other) noexcept :
  537. chan_{ other.chan_ } {
  538. }
  539. iterator & operator=( iterator const& other) noexcept {
  540. if ( BOOST_LIKELY( this != & other) ) {
  541. chan_ = other.chan_;
  542. }
  543. return * this;
  544. }
  545. bool operator==( iterator const& other) const noexcept {
  546. return other.chan_ == chan_;
  547. }
  548. bool operator!=( iterator const& other) const noexcept {
  549. return other.chan_ != chan_;
  550. }
  551. iterator & operator++() {
  552. reinterpret_cast< value_type * >( std::addressof( storage_) )->~value_type();
  553. increment_();
  554. return * this;
  555. }
  556. const iterator operator++( int) = delete;
  557. reference_t operator*() noexcept {
  558. return * reinterpret_cast< value_type * >( std::addressof( storage_) );
  559. }
  560. pointer_t operator->() noexcept {
  561. return reinterpret_cast< value_type * >( std::addressof( storage_) );
  562. }
  563. };
  564. friend class iterator;
  565. };
  566. template< typename T >
  567. typename buffered_channel< T >::iterator
  568. begin( buffered_channel< T > & chan) {
  569. return typename buffered_channel< T >::iterator( & chan);
  570. }
  571. template< typename T >
  572. typename buffered_channel< T >::iterator
  573. end( buffered_channel< T > &) {
  574. return typename buffered_channel< T >::iterator();
  575. }
  576. }}
  577. #ifdef BOOST_HAS_ABI_HEADERS
  578. # include BOOST_ABI_SUFFIX
  579. #endif
  580. #endif // BOOST_FIBERS_BUFFERED_CHANNEL_H