read.hpp 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291
  1. //
  2. // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
  3. //
  4. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  5. // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
  6. //
  7. // Official repository: https://github.com/boostorg/beast
  8. //
  9. #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  10. #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP
  11. #include <boost/beast/core/buffer_traits.hpp>
  12. #include <boost/beast/websocket/teardown.hpp>
  13. #include <boost/beast/websocket/detail/mask.hpp>
  14. #include <boost/beast/websocket/impl/stream_impl.hpp>
  15. #include <boost/beast/core/async_base.hpp>
  16. #include <boost/beast/core/bind_handler.hpp>
  17. #include <boost/beast/core/buffers_prefix.hpp>
  18. #include <boost/beast/core/buffers_suffix.hpp>
  19. #include <boost/beast/core/flat_static_buffer.hpp>
  20. #include <boost/beast/core/read_size.hpp>
  21. #include <boost/beast/core/stream_traits.hpp>
  22. #include <boost/beast/core/detail/bind_continuation.hpp>
  23. #include <boost/beast/core/detail/buffer.hpp>
  24. #include <boost/beast/core/detail/clamp.hpp>
  25. #include <boost/beast/core/detail/config.hpp>
  26. #include <boost/asio/coroutine.hpp>
  27. #include <boost/asio/post.hpp>
  28. #include <boost/assert.hpp>
  29. #include <boost/config.hpp>
  30. #include <boost/optional.hpp>
  31. #include <boost/throw_exception.hpp>
  32. #include <algorithm>
  33. #include <limits>
  34. #include <memory>
  35. namespace boost {
  36. namespace beast {
  37. namespace websocket {
  38. /* Read some message data into a buffer sequence.
  39. Also reads and handles control frames.
  40. */
  41. template<class NextLayer, bool deflateSupported>
  42. template<class Handler, class MutableBufferSequence>
  43. class stream<NextLayer, deflateSupported>::read_some_op
  44. : public beast::async_base<
  45. Handler, beast::executor_type<stream>>
  46. , public asio::coroutine
  47. {
  48. boost::weak_ptr<impl_type> wp_;
  49. MutableBufferSequence bs_;
  50. buffers_suffix<MutableBufferSequence> cb_;
  51. std::size_t bytes_written_ = 0;
  52. error_code result_;
  53. close_code code_;
  54. bool did_read_ = false;
  55. public:
  56. static constexpr int id = 1; // for soft_mutex
  57. template<class Handler_>
  58. read_some_op(
  59. Handler_&& h,
  60. boost::shared_ptr<impl_type> const& sp,
  61. MutableBufferSequence const& bs)
  62. : async_base<
  63. Handler, beast::executor_type<stream>>(
  64. std::forward<Handler_>(h),
  65. sp->stream().get_executor())
  66. , wp_(sp)
  67. , bs_(bs)
  68. , cb_(bs)
  69. , code_(close_code::none)
  70. {
  71. (*this)({}, 0, false);
  72. }
  73. void operator()(
  74. error_code ec = {},
  75. std::size_t bytes_transferred = 0,
  76. bool cont = true)
  77. {
  78. using beast::detail::clamp;
  79. auto sp = wp_.lock();
  80. if(! sp)
  81. {
  82. ec = net::error::operation_aborted;
  83. bytes_written_ = 0;
  84. return this->complete(cont, ec, bytes_written_);
  85. }
  86. auto& impl = *sp;
  87. BOOST_ASIO_CORO_REENTER(*this)
  88. {
  89. impl.update_timer(this->get_executor());
  90. acquire_read_lock:
  91. // Acquire the read lock
  92. if(! impl.rd_block.try_lock(this))
  93. {
  94. do_suspend:
  95. BOOST_ASIO_CORO_YIELD
  96. impl.op_r_rd.emplace(std::move(*this));
  97. impl.rd_block.lock(this);
  98. BOOST_ASIO_CORO_YIELD
  99. net::post(std::move(*this));
  100. BOOST_ASSERT(impl.rd_block.is_locked(this));
  101. // VFALCO Is this check correct here?
  102. BOOST_ASSERT(! ec && impl.check_stop_now(ec));
  103. if(impl.check_stop_now(ec))
  104. {
  105. BOOST_ASSERT(ec == net::error::operation_aborted);
  106. goto upcall;
  107. }
  108. // VFALCO Should never get here
  109. // The only way to get read blocked is if
  110. // a `close_op` wrote a close frame
  111. BOOST_ASSERT(impl.wr_close);
  112. BOOST_ASSERT(impl.status_ != status::open);
  113. ec = net::error::operation_aborted;
  114. goto upcall;
  115. }
  116. else
  117. {
  118. // Make sure the stream is not closed
  119. if( impl.status_ == status::closed ||
  120. impl.status_ == status::failed)
  121. {
  122. ec = net::error::operation_aborted;
  123. goto upcall;
  124. }
  125. }
  126. // if status_ == status::closing, we want to suspend
  127. // the read operation until the close completes,
  128. // then finish the read with operation_aborted.
  129. loop:
  130. BOOST_ASSERT(impl.rd_block.is_locked(this));
  131. // See if we need to read a frame header. This
  132. // condition is structured to give the decompressor
  133. // a chance to emit the final empty deflate block
  134. //
  135. if(impl.rd_remain == 0 &&
  136. (! impl.rd_fh.fin || impl.rd_done))
  137. {
  138. // Read frame header
  139. while(! impl.parse_fh(
  140. impl.rd_fh, impl.rd_buf, result_))
  141. {
  142. if(result_)
  143. {
  144. // _Fail the WebSocket Connection_
  145. if(result_ == error::message_too_big)
  146. code_ = close_code::too_big;
  147. else
  148. code_ = close_code::protocol_error;
  149. goto close;
  150. }
  151. BOOST_ASSERT(impl.rd_block.is_locked(this));
  152. BOOST_ASIO_CORO_YIELD
  153. impl.stream().async_read_some(
  154. impl.rd_buf.prepare(read_size(
  155. impl.rd_buf, impl.rd_buf.max_size())),
  156. std::move(*this));
  157. BOOST_ASSERT(impl.rd_block.is_locked(this));
  158. impl.rd_buf.commit(bytes_transferred);
  159. if(impl.check_stop_now(ec))
  160. goto upcall;
  161. impl.reset_idle();
  162. // Allow a close operation
  163. // to acquire the read block
  164. impl.rd_block.unlock(this);
  165. if( impl.op_r_close.maybe_invoke())
  166. {
  167. // Suspend
  168. BOOST_ASSERT(impl.rd_block.is_locked());
  169. goto do_suspend;
  170. }
  171. // Acquire read block
  172. impl.rd_block.lock(this);
  173. }
  174. // Immediately apply the mask to the portion
  175. // of the buffer holding payload data.
  176. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  177. detail::mask_inplace(buffers_prefix(
  178. clamp(impl.rd_fh.len),
  179. impl.rd_buf.data()),
  180. impl.rd_key);
  181. if(detail::is_control(impl.rd_fh.op))
  182. {
  183. // Clear this otherwise the next
  184. // frame will be considered final.
  185. impl.rd_fh.fin = false;
  186. // Handle ping frame
  187. if(impl.rd_fh.op == detail::opcode::ping)
  188. {
  189. if(impl.ctrl_cb)
  190. {
  191. if(! cont)
  192. {
  193. BOOST_ASIO_CORO_YIELD
  194. net::post(std::move(*this));
  195. BOOST_ASSERT(cont);
  196. // VFALCO call check_stop_now() here?
  197. }
  198. }
  199. {
  200. auto const b = buffers_prefix(
  201. clamp(impl.rd_fh.len),
  202. impl.rd_buf.data());
  203. auto const len = buffer_bytes(b);
  204. BOOST_ASSERT(len == impl.rd_fh.len);
  205. ping_data payload;
  206. detail::read_ping(payload, b);
  207. impl.rd_buf.consume(len);
  208. // Ignore ping when closing
  209. if(impl.status_ == status::closing)
  210. goto loop;
  211. if(impl.ctrl_cb)
  212. impl.ctrl_cb(
  213. frame_type::ping, payload);
  214. impl.rd_fb.clear();
  215. impl.template write_ping<
  216. flat_static_buffer_base>(impl.rd_fb,
  217. detail::opcode::pong, payload);
  218. }
  219. // Allow a close operation
  220. // to acquire the read block
  221. impl.rd_block.unlock(this);
  222. impl.op_r_close.maybe_invoke();
  223. // Acquire the write lock
  224. if(! impl.wr_block.try_lock(this))
  225. {
  226. BOOST_ASIO_CORO_YIELD
  227. impl.op_rd.emplace(std::move(*this));
  228. impl.wr_block.lock(this);
  229. BOOST_ASIO_CORO_YIELD
  230. net::post(std::move(*this));
  231. BOOST_ASSERT(impl.wr_block.is_locked(this));
  232. if(impl.check_stop_now(ec))
  233. goto upcall;
  234. }
  235. // Send pong
  236. BOOST_ASSERT(impl.wr_block.is_locked(this));
  237. BOOST_ASIO_CORO_YIELD
  238. net::async_write(
  239. impl.stream(), impl.rd_fb.data(),
  240. beast::detail::bind_continuation(std::move(*this)));
  241. BOOST_ASSERT(impl.wr_block.is_locked(this));
  242. if(impl.check_stop_now(ec))
  243. goto upcall;
  244. impl.wr_block.unlock(this);
  245. impl.op_close.maybe_invoke()
  246. || impl.op_idle_ping.maybe_invoke()
  247. || impl.op_ping.maybe_invoke()
  248. || impl.op_wr.maybe_invoke();
  249. goto acquire_read_lock;
  250. }
  251. // Handle pong frame
  252. if(impl.rd_fh.op == detail::opcode::pong)
  253. {
  254. // Ignore pong when closing
  255. if(! impl.wr_close && impl.ctrl_cb)
  256. {
  257. if(! cont)
  258. {
  259. BOOST_ASIO_CORO_YIELD
  260. net::post(std::move(*this));
  261. BOOST_ASSERT(cont);
  262. }
  263. }
  264. auto const cb = buffers_prefix(clamp(
  265. impl.rd_fh.len), impl.rd_buf.data());
  266. auto const len = buffer_bytes(cb);
  267. BOOST_ASSERT(len == impl.rd_fh.len);
  268. ping_data payload;
  269. detail::read_ping(payload, cb);
  270. impl.rd_buf.consume(len);
  271. // Ignore pong when closing
  272. if(! impl.wr_close && impl.ctrl_cb)
  273. impl.ctrl_cb(frame_type::pong, payload);
  274. goto loop;
  275. }
  276. // Handle close frame
  277. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  278. {
  279. if(impl.ctrl_cb)
  280. {
  281. if(! cont)
  282. {
  283. BOOST_ASIO_CORO_YIELD
  284. net::post(std::move(*this));
  285. BOOST_ASSERT(cont);
  286. }
  287. }
  288. auto const cb = buffers_prefix(clamp(
  289. impl.rd_fh.len), impl.rd_buf.data());
  290. auto const len = buffer_bytes(cb);
  291. BOOST_ASSERT(len == impl.rd_fh.len);
  292. BOOST_ASSERT(! impl.rd_close);
  293. impl.rd_close = true;
  294. close_reason cr;
  295. detail::read_close(cr, cb, result_);
  296. if(result_)
  297. {
  298. // _Fail the WebSocket Connection_
  299. code_ = close_code::protocol_error;
  300. goto close;
  301. }
  302. impl.cr = cr;
  303. impl.rd_buf.consume(len);
  304. if(impl.ctrl_cb)
  305. impl.ctrl_cb(frame_type::close,
  306. impl.cr.reason);
  307. // See if we are already closing
  308. if(impl.status_ == status::closing)
  309. {
  310. // _Close the WebSocket Connection_
  311. BOOST_ASSERT(impl.wr_close);
  312. code_ = close_code::none;
  313. result_ = error::closed;
  314. goto close;
  315. }
  316. // _Start the WebSocket Closing Handshake_
  317. code_ = cr.code == close_code::none ?
  318. close_code::normal :
  319. static_cast<close_code>(cr.code);
  320. result_ = error::closed;
  321. goto close;
  322. }
  323. }
  324. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  325. {
  326. // Empty non-final frame
  327. goto loop;
  328. }
  329. impl.rd_done = false;
  330. }
  331. if(! impl.rd_deflated())
  332. {
  333. if(impl.rd_remain > 0)
  334. {
  335. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  336. (std::min)(clamp(impl.rd_remain),
  337. buffer_bytes(cb_)))
  338. {
  339. // Fill the read buffer first, otherwise we
  340. // get fewer bytes at the cost of one I/O.
  341. BOOST_ASIO_CORO_YIELD
  342. impl.stream().async_read_some(
  343. impl.rd_buf.prepare(read_size(
  344. impl.rd_buf, impl.rd_buf.max_size())),
  345. std::move(*this));
  346. impl.rd_buf.commit(bytes_transferred);
  347. if(impl.check_stop_now(ec))
  348. goto upcall;
  349. impl.reset_idle();
  350. if(impl.rd_fh.mask)
  351. detail::mask_inplace(buffers_prefix(clamp(
  352. impl.rd_remain), impl.rd_buf.data()),
  353. impl.rd_key);
  354. }
  355. if(impl.rd_buf.size() > 0)
  356. {
  357. // Copy from the read buffer.
  358. // The mask was already applied.
  359. bytes_transferred = net::buffer_copy(cb_,
  360. impl.rd_buf.data(), clamp(impl.rd_remain));
  361. auto const mb = buffers_prefix(
  362. bytes_transferred, cb_);
  363. impl.rd_remain -= bytes_transferred;
  364. if(impl.rd_op == detail::opcode::text)
  365. {
  366. if(! impl.rd_utf8.write(mb) ||
  367. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  368. ! impl.rd_utf8.finish()))
  369. {
  370. // _Fail the WebSocket Connection_
  371. code_ = close_code::bad_payload;
  372. result_ = error::bad_frame_payload;
  373. goto close;
  374. }
  375. }
  376. bytes_written_ += bytes_transferred;
  377. impl.rd_size += bytes_transferred;
  378. impl.rd_buf.consume(bytes_transferred);
  379. }
  380. else
  381. {
  382. // Read into caller's buffer
  383. BOOST_ASSERT(impl.rd_remain > 0);
  384. BOOST_ASSERT(buffer_bytes(cb_) > 0);
  385. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  386. clamp(impl.rd_remain), cb_)) > 0);
  387. BOOST_ASIO_CORO_YIELD
  388. impl.stream().async_read_some(buffers_prefix(
  389. clamp(impl.rd_remain), cb_), std::move(*this));
  390. if(impl.check_stop_now(ec))
  391. goto upcall;
  392. impl.reset_idle();
  393. BOOST_ASSERT(bytes_transferred > 0);
  394. auto const mb = buffers_prefix(
  395. bytes_transferred, cb_);
  396. impl.rd_remain -= bytes_transferred;
  397. if(impl.rd_fh.mask)
  398. detail::mask_inplace(mb, impl.rd_key);
  399. if(impl.rd_op == detail::opcode::text)
  400. {
  401. if(! impl.rd_utf8.write(mb) ||
  402. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  403. ! impl.rd_utf8.finish()))
  404. {
  405. // _Fail the WebSocket Connection_
  406. code_ = close_code::bad_payload;
  407. result_ = error::bad_frame_payload;
  408. goto close;
  409. }
  410. }
  411. bytes_written_ += bytes_transferred;
  412. impl.rd_size += bytes_transferred;
  413. }
  414. }
  415. BOOST_ASSERT( ! impl.rd_done );
  416. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  417. impl.rd_done = true;
  418. }
  419. else
  420. {
  421. // Read compressed message frame payload:
  422. // inflate even if rd_fh_.len == 0, otherwise we
  423. // never emit the end-of-stream deflate block.
  424. while(buffer_bytes(cb_) > 0)
  425. {
  426. if( impl.rd_remain > 0 &&
  427. impl.rd_buf.size() == 0 &&
  428. ! did_read_)
  429. {
  430. // read new
  431. BOOST_ASIO_CORO_YIELD
  432. impl.stream().async_read_some(
  433. impl.rd_buf.prepare(read_size(
  434. impl.rd_buf, impl.rd_buf.max_size())),
  435. std::move(*this));
  436. if(impl.check_stop_now(ec))
  437. goto upcall;
  438. impl.reset_idle();
  439. BOOST_ASSERT(bytes_transferred > 0);
  440. impl.rd_buf.commit(bytes_transferred);
  441. if(impl.rd_fh.mask)
  442. detail::mask_inplace(
  443. buffers_prefix(clamp(impl.rd_remain),
  444. impl.rd_buf.data()), impl.rd_key);
  445. did_read_ = true;
  446. }
  447. zlib::z_params zs;
  448. {
  449. auto const out = buffers_front(cb_);
  450. zs.next_out = out.data();
  451. zs.avail_out = out.size();
  452. BOOST_ASSERT(zs.avail_out > 0);
  453. }
  454. if(impl.rd_remain > 0)
  455. {
  456. if(impl.rd_buf.size() > 0)
  457. {
  458. // use what's there
  459. auto const in = buffers_prefix(
  460. clamp(impl.rd_remain), buffers_front(
  461. impl.rd_buf.data()));
  462. zs.avail_in = in.size();
  463. zs.next_in = in.data();
  464. }
  465. else
  466. {
  467. break;
  468. }
  469. }
  470. else if(impl.rd_fh.fin)
  471. {
  472. // append the empty block codes
  473. std::uint8_t constexpr
  474. empty_block[4] = { 0x00, 0x00, 0xff, 0xff };
  475. zs.next_in = empty_block;
  476. zs.avail_in = sizeof(empty_block);
  477. impl.inflate(zs, zlib::Flush::sync, ec);
  478. if(! ec)
  479. {
  480. // https://github.com/madler/zlib/issues/280
  481. if(zs.total_out > 0)
  482. ec = error::partial_deflate_block;
  483. }
  484. if(impl.check_stop_now(ec))
  485. goto upcall;
  486. impl.do_context_takeover_read(impl.role);
  487. impl.rd_done = true;
  488. break;
  489. }
  490. else
  491. {
  492. break;
  493. }
  494. impl.inflate(zs, zlib::Flush::sync, ec);
  495. if(impl.check_stop_now(ec))
  496. goto upcall;
  497. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  498. impl.rd_size, zs.total_out, impl.rd_msg_max))
  499. {
  500. // _Fail the WebSocket Connection_
  501. code_ = close_code::too_big;
  502. result_ = error::message_too_big;
  503. goto close;
  504. }
  505. cb_.consume(zs.total_out);
  506. impl.rd_size += zs.total_out;
  507. impl.rd_remain -= zs.total_in;
  508. impl.rd_buf.consume(zs.total_in);
  509. bytes_written_ += zs.total_out;
  510. }
  511. if(impl.rd_op == detail::opcode::text)
  512. {
  513. // check utf8
  514. if(! impl.rd_utf8.write(
  515. buffers_prefix(bytes_written_, bs_)) || (
  516. impl.rd_done && ! impl.rd_utf8.finish()))
  517. {
  518. // _Fail the WebSocket Connection_
  519. code_ = close_code::bad_payload;
  520. result_ = error::bad_frame_payload;
  521. goto close;
  522. }
  523. }
  524. }
  525. goto upcall;
  526. close:
  527. // Acquire the write lock
  528. if(! impl.wr_block.try_lock(this))
  529. {
  530. BOOST_ASIO_CORO_YIELD
  531. impl.op_rd.emplace(std::move(*this));
  532. impl.wr_block.lock(this);
  533. BOOST_ASIO_CORO_YIELD
  534. net::post(std::move(*this));
  535. BOOST_ASSERT(impl.wr_block.is_locked(this));
  536. if(impl.check_stop_now(ec))
  537. goto upcall;
  538. }
  539. impl.change_status(status::closing);
  540. if(! impl.wr_close)
  541. {
  542. impl.wr_close = true;
  543. // Serialize close frame
  544. impl.rd_fb.clear();
  545. impl.template write_close<
  546. flat_static_buffer_base>(
  547. impl.rd_fb, code_);
  548. // Send close frame
  549. BOOST_ASSERT(impl.wr_block.is_locked(this));
  550. BOOST_ASIO_CORO_YIELD
  551. net::async_write(impl.stream(), impl.rd_fb.data(),
  552. beast::detail::bind_continuation(std::move(*this)));
  553. BOOST_ASSERT(impl.wr_block.is_locked(this));
  554. if(impl.check_stop_now(ec))
  555. goto upcall;
  556. }
  557. // Teardown
  558. using beast::websocket::async_teardown;
  559. BOOST_ASSERT(impl.wr_block.is_locked(this));
  560. BOOST_ASIO_CORO_YIELD
  561. async_teardown(impl.role, impl.stream(),
  562. beast::detail::bind_continuation(std::move(*this)));
  563. BOOST_ASSERT(impl.wr_block.is_locked(this));
  564. if(ec == net::error::eof)
  565. {
  566. // Rationale:
  567. // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
  568. ec = {};
  569. }
  570. if(! ec)
  571. ec = result_;
  572. if(ec && ec != error::closed)
  573. impl.change_status(status::failed);
  574. else
  575. impl.change_status(status::closed);
  576. impl.close();
  577. upcall:
  578. impl.rd_block.try_unlock(this);
  579. impl.op_r_close.maybe_invoke();
  580. if(impl.wr_block.try_unlock(this))
  581. impl.op_close.maybe_invoke()
  582. || impl.op_idle_ping.maybe_invoke()
  583. || impl.op_ping.maybe_invoke()
  584. || impl.op_wr.maybe_invoke();
  585. this->complete(cont, ec, bytes_written_);
  586. }
  587. }
  588. };
  589. //------------------------------------------------------------------------------
  590. template<class NextLayer, bool deflateSupported>
  591. template<class Handler, class DynamicBuffer>
  592. class stream<NextLayer, deflateSupported>::read_op
  593. : public beast::async_base<
  594. Handler, beast::executor_type<stream>>
  595. , public asio::coroutine
  596. {
  597. boost::weak_ptr<impl_type> wp_;
  598. DynamicBuffer& b_;
  599. std::size_t limit_;
  600. std::size_t bytes_written_ = 0;
  601. bool some_;
  602. public:
  603. template<class Handler_>
  604. read_op(
  605. Handler_&& h,
  606. boost::shared_ptr<impl_type> const& sp,
  607. DynamicBuffer& b,
  608. std::size_t limit,
  609. bool some)
  610. : async_base<Handler,
  611. beast::executor_type<stream>>(
  612. std::forward<Handler_>(h),
  613. sp->stream().get_executor())
  614. , wp_(sp)
  615. , b_(b)
  616. , limit_(limit ? limit : (
  617. std::numeric_limits<std::size_t>::max)())
  618. , some_(some)
  619. {
  620. (*this)({}, 0, false);
  621. }
  622. void operator()(
  623. error_code ec = {},
  624. std::size_t bytes_transferred = 0,
  625. bool cont = true)
  626. {
  627. using beast::detail::clamp;
  628. auto sp = wp_.lock();
  629. if(! sp)
  630. {
  631. ec = net::error::operation_aborted;
  632. bytes_written_ = 0;
  633. return this->complete(cont, ec, bytes_written_);
  634. }
  635. auto& impl = *sp;
  636. using mutable_buffers_type = typename
  637. DynamicBuffer::mutable_buffers_type;
  638. BOOST_ASIO_CORO_REENTER(*this)
  639. {
  640. do
  641. {
  642. // VFALCO TODO use boost::beast::bind_continuation
  643. BOOST_ASIO_CORO_YIELD
  644. {
  645. auto mb = beast::detail::dynamic_buffer_prepare(b_,
  646. clamp(impl.read_size_hint_db(b_), limit_),
  647. ec, error::buffer_overflow);
  648. if(impl.check_stop_now(ec))
  649. goto upcall;
  650. read_some_op<read_op, mutable_buffers_type>(
  651. std::move(*this), sp, *mb);
  652. }
  653. b_.commit(bytes_transferred);
  654. bytes_written_ += bytes_transferred;
  655. if(ec)
  656. goto upcall;
  657. }
  658. while(! some_ && ! impl.rd_done);
  659. upcall:
  660. this->complete(cont, ec, bytes_written_);
  661. }
  662. }
  663. };
  664. template<class NextLayer, bool deflateSupported>
  665. struct stream<NextLayer, deflateSupported>::
  666. run_read_some_op
  667. {
  668. template<
  669. class ReadHandler,
  670. class MutableBufferSequence>
  671. void
  672. operator()(
  673. ReadHandler&& h,
  674. boost::shared_ptr<impl_type> const& sp,
  675. MutableBufferSequence const& b)
  676. {
  677. // If you get an error on the following line it means
  678. // that your handler does not meet the documented type
  679. // requirements for the handler.
  680. static_assert(
  681. beast::detail::is_invocable<ReadHandler,
  682. void(error_code, std::size_t)>::value,
  683. "ReadHandler type requirements not met");
  684. read_some_op<
  685. typename std::decay<ReadHandler>::type,
  686. MutableBufferSequence>(
  687. std::forward<ReadHandler>(h),
  688. sp,
  689. b);
  690. }
  691. };
  692. template<class NextLayer, bool deflateSupported>
  693. struct stream<NextLayer, deflateSupported>::
  694. run_read_op
  695. {
  696. template<
  697. class ReadHandler,
  698. class DynamicBuffer>
  699. void
  700. operator()(
  701. ReadHandler&& h,
  702. boost::shared_ptr<impl_type> const& sp,
  703. DynamicBuffer* b,
  704. std::size_t limit,
  705. bool some)
  706. {
  707. // If you get an error on the following line it means
  708. // that your handler does not meet the documented type
  709. // requirements for the handler.
  710. static_assert(
  711. beast::detail::is_invocable<ReadHandler,
  712. void(error_code, std::size_t)>::value,
  713. "ReadHandler type requirements not met");
  714. read_op<
  715. typename std::decay<ReadHandler>::type,
  716. DynamicBuffer>(
  717. std::forward<ReadHandler>(h),
  718. sp,
  719. *b,
  720. limit,
  721. some);
  722. }
  723. };
  724. //------------------------------------------------------------------------------
  725. template<class NextLayer, bool deflateSupported>
  726. template<class DynamicBuffer>
  727. std::size_t
  728. stream<NextLayer, deflateSupported>::
  729. read(DynamicBuffer& buffer)
  730. {
  731. static_assert(is_sync_stream<next_layer_type>::value,
  732. "SyncStream type requirements not met");
  733. static_assert(
  734. net::is_dynamic_buffer<DynamicBuffer>::value,
  735. "DynamicBuffer type requirements not met");
  736. error_code ec;
  737. auto const bytes_written = read(buffer, ec);
  738. if(ec)
  739. BOOST_THROW_EXCEPTION(system_error{ec});
  740. return bytes_written;
  741. }
  742. template<class NextLayer, bool deflateSupported>
  743. template<class DynamicBuffer>
  744. std::size_t
  745. stream<NextLayer, deflateSupported>::
  746. read(DynamicBuffer& buffer, error_code& ec)
  747. {
  748. static_assert(is_sync_stream<next_layer_type>::value,
  749. "SyncStream type requirements not met");
  750. static_assert(
  751. net::is_dynamic_buffer<DynamicBuffer>::value,
  752. "DynamicBuffer type requirements not met");
  753. std::size_t bytes_written = 0;
  754. do
  755. {
  756. bytes_written += read_some(buffer, 0, ec);
  757. if(ec)
  758. return bytes_written;
  759. }
  760. while(! is_message_done());
  761. return bytes_written;
  762. }
  763. template<class NextLayer, bool deflateSupported>
  764. template<class DynamicBuffer, class ReadHandler>
  765. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  766. stream<NextLayer, deflateSupported>::
  767. async_read(DynamicBuffer& buffer, ReadHandler&& handler)
  768. {
  769. static_assert(is_async_stream<next_layer_type>::value,
  770. "AsyncStream type requirements not met");
  771. static_assert(
  772. net::is_dynamic_buffer<DynamicBuffer>::value,
  773. "DynamicBuffer type requirements not met");
  774. return net::async_initiate<
  775. ReadHandler,
  776. void(error_code, std::size_t)>(
  777. run_read_op{},
  778. handler,
  779. impl_,
  780. &buffer,
  781. 0,
  782. false);
  783. }
  784. //------------------------------------------------------------------------------
  785. template<class NextLayer, bool deflateSupported>
  786. template<class DynamicBuffer>
  787. std::size_t
  788. stream<NextLayer, deflateSupported>::
  789. read_some(
  790. DynamicBuffer& buffer,
  791. std::size_t limit)
  792. {
  793. static_assert(is_sync_stream<next_layer_type>::value,
  794. "SyncStream type requirements not met");
  795. static_assert(
  796. net::is_dynamic_buffer<DynamicBuffer>::value,
  797. "DynamicBuffer type requirements not met");
  798. error_code ec;
  799. auto const bytes_written =
  800. read_some(buffer, limit, ec);
  801. if(ec)
  802. BOOST_THROW_EXCEPTION(system_error{ec});
  803. return bytes_written;
  804. }
  805. template<class NextLayer, bool deflateSupported>
  806. template<class DynamicBuffer>
  807. std::size_t
  808. stream<NextLayer, deflateSupported>::
  809. read_some(
  810. DynamicBuffer& buffer,
  811. std::size_t limit,
  812. error_code& ec)
  813. {
  814. static_assert(is_sync_stream<next_layer_type>::value,
  815. "SyncStream type requirements not met");
  816. static_assert(
  817. net::is_dynamic_buffer<DynamicBuffer>::value,
  818. "DynamicBuffer type requirements not met");
  819. using beast::detail::clamp;
  820. if(! limit)
  821. limit = (std::numeric_limits<std::size_t>::max)();
  822. auto const size =
  823. clamp(read_size_hint(buffer), limit);
  824. BOOST_ASSERT(size > 0);
  825. auto mb = beast::detail::dynamic_buffer_prepare(
  826. buffer, size, ec, error::buffer_overflow);
  827. if(impl_->check_stop_now(ec))
  828. return 0;
  829. auto const bytes_written = read_some(*mb, ec);
  830. buffer.commit(bytes_written);
  831. return bytes_written;
  832. }
  833. template<class NextLayer, bool deflateSupported>
  834. template<class DynamicBuffer, class ReadHandler>
  835. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  836. stream<NextLayer, deflateSupported>::
  837. async_read_some(
  838. DynamicBuffer& buffer,
  839. std::size_t limit,
  840. ReadHandler&& handler)
  841. {
  842. static_assert(is_async_stream<next_layer_type>::value,
  843. "AsyncStream type requirements not met");
  844. static_assert(
  845. net::is_dynamic_buffer<DynamicBuffer>::value,
  846. "DynamicBuffer type requirements not met");
  847. return net::async_initiate<
  848. ReadHandler,
  849. void(error_code, std::size_t)>(
  850. run_read_op{},
  851. handler,
  852. impl_,
  853. &buffer,
  854. limit,
  855. true);
  856. }
  857. //------------------------------------------------------------------------------
  858. template<class NextLayer, bool deflateSupported>
  859. template<class MutableBufferSequence>
  860. std::size_t
  861. stream<NextLayer, deflateSupported>::
  862. read_some(
  863. MutableBufferSequence const& buffers)
  864. {
  865. static_assert(is_sync_stream<next_layer_type>::value,
  866. "SyncStream type requirements not met");
  867. static_assert(net::is_mutable_buffer_sequence<
  868. MutableBufferSequence>::value,
  869. "MutableBufferSequence type requirements not met");
  870. error_code ec;
  871. auto const bytes_written = read_some(buffers, ec);
  872. if(ec)
  873. BOOST_THROW_EXCEPTION(system_error{ec});
  874. return bytes_written;
  875. }
  876. template<class NextLayer, bool deflateSupported>
  877. template<class MutableBufferSequence>
  878. std::size_t
  879. stream<NextLayer, deflateSupported>::
  880. read_some(
  881. MutableBufferSequence const& buffers,
  882. error_code& ec)
  883. {
  884. static_assert(is_sync_stream<next_layer_type>::value,
  885. "SyncStream type requirements not met");
  886. static_assert(net::is_mutable_buffer_sequence<
  887. MutableBufferSequence>::value,
  888. "MutableBufferSequence type requirements not met");
  889. using beast::detail::clamp;
  890. auto& impl = *impl_;
  891. close_code code{};
  892. std::size_t bytes_written = 0;
  893. ec = {};
  894. // Make sure the stream is open
  895. if(impl.check_stop_now(ec))
  896. return bytes_written;
  897. loop:
  898. // See if we need to read a frame header. This
  899. // condition is structured to give the decompressor
  900. // a chance to emit the final empty deflate block
  901. //
  902. if(impl.rd_remain == 0 && (
  903. ! impl.rd_fh.fin || impl.rd_done))
  904. {
  905. // Read frame header
  906. error_code result;
  907. while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result))
  908. {
  909. if(result)
  910. {
  911. // _Fail the WebSocket Connection_
  912. if(result == error::message_too_big)
  913. code = close_code::too_big;
  914. else
  915. code = close_code::protocol_error;
  916. do_fail(code, result, ec);
  917. return bytes_written;
  918. }
  919. auto const bytes_transferred =
  920. impl.stream().read_some(
  921. impl.rd_buf.prepare(read_size(
  922. impl.rd_buf, impl.rd_buf.max_size())),
  923. ec);
  924. impl.rd_buf.commit(bytes_transferred);
  925. if(impl.check_stop_now(ec))
  926. return bytes_written;
  927. }
  928. // Immediately apply the mask to the portion
  929. // of the buffer holding payload data.
  930. if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
  931. detail::mask_inplace(buffers_prefix(
  932. clamp(impl.rd_fh.len), impl.rd_buf.data()),
  933. impl.rd_key);
  934. if(detail::is_control(impl.rd_fh.op))
  935. {
  936. // Get control frame payload
  937. auto const b = buffers_prefix(
  938. clamp(impl.rd_fh.len), impl.rd_buf.data());
  939. auto const len = buffer_bytes(b);
  940. BOOST_ASSERT(len == impl.rd_fh.len);
  941. // Clear this otherwise the next
  942. // frame will be considered final.
  943. impl.rd_fh.fin = false;
  944. // Handle ping frame
  945. if(impl.rd_fh.op == detail::opcode::ping)
  946. {
  947. ping_data payload;
  948. detail::read_ping(payload, b);
  949. impl.rd_buf.consume(len);
  950. if(impl.wr_close)
  951. {
  952. // Ignore ping when closing
  953. goto loop;
  954. }
  955. if(impl.ctrl_cb)
  956. impl.ctrl_cb(frame_type::ping, payload);
  957. detail::frame_buffer fb;
  958. impl.template write_ping<flat_static_buffer_base>(fb,
  959. detail::opcode::pong, payload);
  960. net::write(impl.stream(), fb.data(), ec);
  961. if(impl.check_stop_now(ec))
  962. return bytes_written;
  963. goto loop;
  964. }
  965. // Handle pong frame
  966. if(impl.rd_fh.op == detail::opcode::pong)
  967. {
  968. ping_data payload;
  969. detail::read_ping(payload, b);
  970. impl.rd_buf.consume(len);
  971. if(impl.ctrl_cb)
  972. impl.ctrl_cb(frame_type::pong, payload);
  973. goto loop;
  974. }
  975. // Handle close frame
  976. BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close);
  977. {
  978. BOOST_ASSERT(! impl.rd_close);
  979. impl.rd_close = true;
  980. close_reason cr;
  981. detail::read_close(cr, b, result);
  982. if(result)
  983. {
  984. // _Fail the WebSocket Connection_
  985. do_fail(close_code::protocol_error,
  986. result, ec);
  987. return bytes_written;
  988. }
  989. impl.cr = cr;
  990. impl.rd_buf.consume(len);
  991. if(impl.ctrl_cb)
  992. impl.ctrl_cb(frame_type::close, impl.cr.reason);
  993. BOOST_ASSERT(! impl.wr_close);
  994. // _Start the WebSocket Closing Handshake_
  995. do_fail(
  996. cr.code == close_code::none ?
  997. close_code::normal :
  998. static_cast<close_code>(cr.code),
  999. error::closed, ec);
  1000. return bytes_written;
  1001. }
  1002. }
  1003. if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin)
  1004. {
  1005. // Empty non-final frame
  1006. goto loop;
  1007. }
  1008. impl.rd_done = false;
  1009. }
  1010. else
  1011. {
  1012. ec = {};
  1013. }
  1014. if(! impl.rd_deflated())
  1015. {
  1016. if(impl.rd_remain > 0)
  1017. {
  1018. if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() >
  1019. (std::min)(clamp(impl.rd_remain),
  1020. buffer_bytes(buffers)))
  1021. {
  1022. // Fill the read buffer first, otherwise we
  1023. // get fewer bytes at the cost of one I/O.
  1024. impl.rd_buf.commit(impl.stream().read_some(
  1025. impl.rd_buf.prepare(read_size(impl.rd_buf,
  1026. impl.rd_buf.max_size())), ec));
  1027. if(impl.check_stop_now(ec))
  1028. return bytes_written;
  1029. if(impl.rd_fh.mask)
  1030. detail::mask_inplace(
  1031. buffers_prefix(clamp(impl.rd_remain),
  1032. impl.rd_buf.data()), impl.rd_key);
  1033. }
  1034. if(impl.rd_buf.size() > 0)
  1035. {
  1036. // Copy from the read buffer.
  1037. // The mask was already applied.
  1038. auto const bytes_transferred = net::buffer_copy(
  1039. buffers, impl.rd_buf.data(),
  1040. clamp(impl.rd_remain));
  1041. auto const mb = buffers_prefix(
  1042. bytes_transferred, buffers);
  1043. impl.rd_remain -= bytes_transferred;
  1044. if(impl.rd_op == detail::opcode::text)
  1045. {
  1046. if(! impl.rd_utf8.write(mb) ||
  1047. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1048. ! impl.rd_utf8.finish()))
  1049. {
  1050. // _Fail the WebSocket Connection_
  1051. do_fail(close_code::bad_payload,
  1052. error::bad_frame_payload, ec);
  1053. return bytes_written;
  1054. }
  1055. }
  1056. bytes_written += bytes_transferred;
  1057. impl.rd_size += bytes_transferred;
  1058. impl.rd_buf.consume(bytes_transferred);
  1059. }
  1060. else
  1061. {
  1062. // Read into caller's buffer
  1063. BOOST_ASSERT(impl.rd_remain > 0);
  1064. BOOST_ASSERT(buffer_bytes(buffers) > 0);
  1065. BOOST_ASSERT(buffer_bytes(buffers_prefix(
  1066. clamp(impl.rd_remain), buffers)) > 0);
  1067. auto const bytes_transferred =
  1068. impl.stream().read_some(buffers_prefix(
  1069. clamp(impl.rd_remain), buffers), ec);
  1070. // VFALCO What if some bytes were written?
  1071. if(impl.check_stop_now(ec))
  1072. return bytes_written;
  1073. BOOST_ASSERT(bytes_transferred > 0);
  1074. auto const mb = buffers_prefix(
  1075. bytes_transferred, buffers);
  1076. impl.rd_remain -= bytes_transferred;
  1077. if(impl.rd_fh.mask)
  1078. detail::mask_inplace(mb, impl.rd_key);
  1079. if(impl.rd_op == detail::opcode::text)
  1080. {
  1081. if(! impl.rd_utf8.write(mb) ||
  1082. (impl.rd_remain == 0 && impl.rd_fh.fin &&
  1083. ! impl.rd_utf8.finish()))
  1084. {
  1085. // _Fail the WebSocket Connection_
  1086. do_fail(close_code::bad_payload,
  1087. error::bad_frame_payload, ec);
  1088. return bytes_written;
  1089. }
  1090. }
  1091. bytes_written += bytes_transferred;
  1092. impl.rd_size += bytes_transferred;
  1093. }
  1094. }
  1095. BOOST_ASSERT( ! impl.rd_done );
  1096. if( impl.rd_remain == 0 && impl.rd_fh.fin )
  1097. impl.rd_done = true;
  1098. }
  1099. else
  1100. {
  1101. // Read compressed message frame payload:
  1102. // inflate even if rd_fh_.len == 0, otherwise we
  1103. // never emit the end-of-stream deflate block.
  1104. //
  1105. bool did_read = false;
  1106. buffers_suffix<MutableBufferSequence> cb(buffers);
  1107. while(buffer_bytes(cb) > 0)
  1108. {
  1109. zlib::z_params zs;
  1110. {
  1111. auto const out = beast::buffers_front(cb);
  1112. zs.next_out = out.data();
  1113. zs.avail_out = out.size();
  1114. BOOST_ASSERT(zs.avail_out > 0);
  1115. }
  1116. if(impl.rd_remain > 0)
  1117. {
  1118. if(impl.rd_buf.size() > 0)
  1119. {
  1120. // use what's there
  1121. auto const in = buffers_prefix(
  1122. clamp(impl.rd_remain), beast::buffers_front(
  1123. impl.rd_buf.data()));
  1124. zs.avail_in = in.size();
  1125. zs.next_in = in.data();
  1126. }
  1127. else if(! did_read)
  1128. {
  1129. // read new
  1130. auto const bytes_transferred =
  1131. impl.stream().read_some(
  1132. impl.rd_buf.prepare(read_size(
  1133. impl.rd_buf, impl.rd_buf.max_size())),
  1134. ec);
  1135. if(impl.check_stop_now(ec))
  1136. return bytes_written;
  1137. BOOST_ASSERT(bytes_transferred > 0);
  1138. impl.rd_buf.commit(bytes_transferred);
  1139. if(impl.rd_fh.mask)
  1140. detail::mask_inplace(
  1141. buffers_prefix(clamp(impl.rd_remain),
  1142. impl.rd_buf.data()), impl.rd_key);
  1143. auto const in = buffers_prefix(
  1144. clamp(impl.rd_remain), buffers_front(
  1145. impl.rd_buf.data()));
  1146. zs.avail_in = in.size();
  1147. zs.next_in = in.data();
  1148. did_read = true;
  1149. }
  1150. else
  1151. {
  1152. break;
  1153. }
  1154. }
  1155. else if(impl.rd_fh.fin)
  1156. {
  1157. // append the empty block codes
  1158. static std::uint8_t constexpr
  1159. empty_block[4] = {
  1160. 0x00, 0x00, 0xff, 0xff };
  1161. zs.next_in = empty_block;
  1162. zs.avail_in = sizeof(empty_block);
  1163. impl.inflate(zs, zlib::Flush::sync, ec);
  1164. if(! ec)
  1165. {
  1166. // https://github.com/madler/zlib/issues/280
  1167. if(zs.total_out > 0)
  1168. ec = error::partial_deflate_block;
  1169. }
  1170. if(impl.check_stop_now(ec))
  1171. return bytes_written;
  1172. impl.do_context_takeover_read(impl.role);
  1173. impl.rd_done = true;
  1174. break;
  1175. }
  1176. else
  1177. {
  1178. break;
  1179. }
  1180. impl.inflate(zs, zlib::Flush::sync, ec);
  1181. if(impl.check_stop_now(ec))
  1182. return bytes_written;
  1183. if(impl.rd_msg_max && beast::detail::sum_exceeds(
  1184. impl.rd_size, zs.total_out, impl.rd_msg_max))
  1185. {
  1186. do_fail(close_code::too_big,
  1187. error::message_too_big, ec);
  1188. return bytes_written;
  1189. }
  1190. cb.consume(zs.total_out);
  1191. impl.rd_size += zs.total_out;
  1192. impl.rd_remain -= zs.total_in;
  1193. impl.rd_buf.consume(zs.total_in);
  1194. bytes_written += zs.total_out;
  1195. }
  1196. if(impl.rd_op == detail::opcode::text)
  1197. {
  1198. // check utf8
  1199. if(! impl.rd_utf8.write(beast::buffers_prefix(
  1200. bytes_written, buffers)) || (
  1201. impl.rd_done && ! impl.rd_utf8.finish()))
  1202. {
  1203. // _Fail the WebSocket Connection_
  1204. do_fail(close_code::bad_payload,
  1205. error::bad_frame_payload, ec);
  1206. return bytes_written;
  1207. }
  1208. }
  1209. }
  1210. return bytes_written;
  1211. }
  1212. template<class NextLayer, bool deflateSupported>
  1213. template<class MutableBufferSequence, class ReadHandler>
  1214. BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
  1215. stream<NextLayer, deflateSupported>::
  1216. async_read_some(
  1217. MutableBufferSequence const& buffers,
  1218. ReadHandler&& handler)
  1219. {
  1220. static_assert(is_async_stream<next_layer_type>::value,
  1221. "AsyncStream type requirements not met");
  1222. static_assert(net::is_mutable_buffer_sequence<
  1223. MutableBufferSequence>::value,
  1224. "MutableBufferSequence type requirements not met");
  1225. return net::async_initiate<
  1226. ReadHandler,
  1227. void(error_code, std::size_t)>(
  1228. run_read_some_op{},
  1229. handler,
  1230. impl_,
  1231. buffers);
  1232. }
  1233. } // websocket
  1234. } // beast
  1235. } // boost
  1236. #endif