| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784 | //// Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)//// Distributed under the Boost Software License, Version 1.0. (See accompanying// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)//// Official repository: https://github.com/boostorg/beast//#ifndef BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP#define BOOST_BEAST_WEBSOCKET_IMPL_WRITE_HPP#include <boost/beast/websocket/detail/mask.hpp>#include <boost/beast/core/async_base.hpp>#include <boost/beast/core/bind_handler.hpp>#include <boost/beast/core/buffer_traits.hpp>#include <boost/beast/core/buffers_cat.hpp>#include <boost/beast/core/buffers_prefix.hpp>#include <boost/beast/core/buffers_range.hpp>#include <boost/beast/core/buffers_suffix.hpp>#include <boost/beast/core/flat_static_buffer.hpp>#include <boost/beast/core/stream_traits.hpp>#include <boost/beast/core/detail/bind_continuation.hpp>#include <boost/beast/core/detail/clamp.hpp>#include <boost/beast/core/detail/config.hpp>#include <boost/beast/websocket/detail/frame.hpp>#include <boost/beast/websocket/impl/stream_impl.hpp>#include <boost/asio/coroutine.hpp>#include <boost/assert.hpp>#include <boost/config.hpp>#include <boost/throw_exception.hpp>#include <algorithm>#include <memory>namespace boost {namespace beast {namespace websocket {template<class NextLayer, bool deflateSupported>template<class Handler, class Buffers>class stream<NextLayer, deflateSupported>::write_some_op    : public beast::async_base<        Handler, beast::executor_type<stream>>    , public asio::coroutine{    enum    {        do_nomask_nofrag,        do_nomask_frag,        do_mask_nofrag,        do_mask_frag,        do_deflate    };    boost::weak_ptr<impl_type> wp_;    buffers_suffix<Buffers> cb_;    detail::frame_header fh_;    detail::prepared_key key_;    std::size_t bytes_transferred_ = 0;    std::size_t remain_;    std::size_t in_;    int how_;    bool fin_;    bool more_ = false; // for ubsan    bool cont_ = false;public:    static constexpr int id = 2; // for soft_mutex    template<class Handler_>    write_some_op(        Handler_&& h,        boost::shared_ptr<impl_type> const& sp,        bool fin,        Buffers const& bs)        : beast::async_base<Handler,            beast::executor_type<stream>>(                std::forward<Handler_>(h),                    sp->stream().get_executor())        , wp_(sp)        , cb_(bs)        , fin_(fin)    {        auto& impl = *sp;        // Set up the outgoing frame header        if(! impl.wr_cont)        {            impl.begin_msg();            fh_.rsv1 = impl.wr_compress;        }        else        {            fh_.rsv1 = false;        }        fh_.rsv2 = false;        fh_.rsv3 = false;        fh_.op = impl.wr_cont ?            detail::opcode::cont : impl.wr_opcode;        fh_.mask =            impl.role == role_type::client;        // Choose a write algorithm        if(impl.wr_compress)        {            how_ = do_deflate;        }        else if(! fh_.mask)        {            if(! impl.wr_frag)            {                how_ = do_nomask_nofrag;            }            else            {                BOOST_ASSERT(impl.wr_buf_size != 0);                remain_ = buffer_bytes(cb_);                if(remain_ > impl.wr_buf_size)                    how_ = do_nomask_frag;                else                    how_ = do_nomask_nofrag;            }        }        else        {            if(! impl.wr_frag)            {                how_ = do_mask_nofrag;            }            else            {                BOOST_ASSERT(impl.wr_buf_size != 0);                remain_ = buffer_bytes(cb_);                if(remain_ > impl.wr_buf_size)                    how_ = do_mask_frag;                else                    how_ = do_mask_nofrag;            }        }        (*this)({}, 0, false);    }    void operator()(        error_code ec = {},        std::size_t bytes_transferred = 0,        bool cont = true);};template<class NextLayer, bool deflateSupported>template<class Buffers, class Handler>voidstream<NextLayer, deflateSupported>::write_some_op<Buffers, Handler>::operator()(    error_code ec,    std::size_t bytes_transferred,    bool cont){    using beast::detail::clamp;    std::size_t n;    net::mutable_buffer b;    auto sp = wp_.lock();    if(! sp)    {        ec = net::error::operation_aborted;        bytes_transferred_ = 0;        return this->complete(cont, ec, bytes_transferred_);    }    auto& impl = *sp;    BOOST_ASIO_CORO_REENTER(*this)    {        // Acquire the write lock        if(! impl.wr_block.try_lock(this))        {        do_suspend:            BOOST_ASIO_CORO_YIELD            impl.op_wr.emplace(std::move(*this));            impl.wr_block.lock(this);            BOOST_ASIO_CORO_YIELD            net::post(std::move(*this));            BOOST_ASSERT(impl.wr_block.is_locked(this));        }        if(impl.check_stop_now(ec))            goto upcall;        //------------------------------------------------------------------        if(how_ == do_nomask_nofrag)        {            // send a single frame            fh_.fin = fin_;            fh_.len = buffer_bytes(cb_);            impl.wr_fb.clear();            detail::write<flat_static_buffer_base>(                impl.wr_fb, fh_);            impl.wr_cont = ! fin_;            BOOST_ASIO_CORO_YIELD            net::async_write(impl.stream(),                buffers_cat(impl.wr_fb.data(), cb_),                    beast::detail::bind_continuation(std::move(*this)));            bytes_transferred_ += clamp(fh_.len);            if(impl.check_stop_now(ec))                goto upcall;            goto upcall;        }        //------------------------------------------------------------------        if(how_ == do_nomask_frag)        {            // send multiple frames            for(;;)            {                n = clamp(remain_, impl.wr_buf_size);                fh_.len = n;                remain_ -= n;                fh_.fin = fin_ ? remain_ == 0 : false;                impl.wr_fb.clear();                detail::write<flat_static_buffer_base>(                    impl.wr_fb, fh_);                impl.wr_cont = ! fin_;                // Send frame                BOOST_ASIO_CORO_YIELD                net::async_write(impl.stream(), buffers_cat(                    impl.wr_fb.data(),                    buffers_prefix(clamp(fh_.len), cb_)),                        beast::detail::bind_continuation(std::move(*this)));                n = clamp(fh_.len); // restore `n` on yield                bytes_transferred_ += n;                if(impl.check_stop_now(ec))                    goto upcall;                if(remain_ == 0)                    break;                cb_.consume(n);                fh_.op = detail::opcode::cont;                // Give up the write lock in between each frame                // so that outgoing control frames might be sent.                impl.wr_block.unlock(this);                if( impl.op_close.maybe_invoke()                    || impl.op_idle_ping.maybe_invoke()                    || impl.op_rd.maybe_invoke()                    || impl.op_ping.maybe_invoke())                {                    BOOST_ASSERT(impl.wr_block.is_locked());                    goto do_suspend;                }                impl.wr_block.lock(this);            }            goto upcall;        }        //------------------------------------------------------------------        if(how_ == do_mask_nofrag)        {            // send a single frame using multiple writes            remain_ = beast::buffer_bytes(cb_);            fh_.fin = fin_;            fh_.len = remain_;            fh_.key = impl.create_mask();            detail::prepare_key(key_, fh_.key);            impl.wr_fb.clear();            detail::write<flat_static_buffer_base>(                impl.wr_fb, fh_);            n = clamp(remain_, impl.wr_buf_size);            net::buffer_copy(net::buffer(                impl.wr_buf.get(), n), cb_);            detail::mask_inplace(net::buffer(                impl.wr_buf.get(), n), key_);            remain_ -= n;            impl.wr_cont = ! fin_;            // write frame header and some payload            BOOST_ASIO_CORO_YIELD            net::async_write(impl.stream(), buffers_cat(                impl.wr_fb.data(),                net::buffer(impl.wr_buf.get(), n)),                    beast::detail::bind_continuation(std::move(*this)));            // VFALCO What about consuming the buffer on error?            bytes_transferred_ +=                bytes_transferred - impl.wr_fb.size();            if(impl.check_stop_now(ec))                goto upcall;            while(remain_ > 0)            {                cb_.consume(impl.wr_buf_size);                n = clamp(remain_, impl.wr_buf_size);                net::buffer_copy(net::buffer(                    impl.wr_buf.get(), n), cb_);                detail::mask_inplace(net::buffer(                    impl.wr_buf.get(), n), key_);                remain_ -= n;                // write more payload                BOOST_ASIO_CORO_YIELD                net::async_write(impl.stream(),                    net::buffer(impl.wr_buf.get(), n),                        beast::detail::bind_continuation(std::move(*this)));                bytes_transferred_ += bytes_transferred;                if(impl.check_stop_now(ec))                    goto upcall;            }            goto upcall;        }        //------------------------------------------------------------------        if(how_ == do_mask_frag)        {            // send multiple frames            for(;;)            {                n = clamp(remain_, impl.wr_buf_size);                remain_ -= n;                fh_.len = n;                fh_.key = impl.create_mask();                fh_.fin = fin_ ? remain_ == 0 : false;                detail::prepare_key(key_, fh_.key);                net::buffer_copy(net::buffer(                    impl.wr_buf.get(), n), cb_);                detail::mask_inplace(net::buffer(                    impl.wr_buf.get(), n), key_);                impl.wr_fb.clear();                detail::write<flat_static_buffer_base>(                    impl.wr_fb, fh_);                impl.wr_cont = ! fin_;                // Send frame                BOOST_ASIO_CORO_YIELD                net::async_write(impl.stream(), buffers_cat(                    impl.wr_fb.data(),                    net::buffer(impl.wr_buf.get(), n)),                        beast::detail::bind_continuation(std::move(*this)));                n = bytes_transferred - impl.wr_fb.size();                bytes_transferred_ += n;                if(impl.check_stop_now(ec))                    goto upcall;                if(remain_ == 0)                    break;                cb_.consume(n);                fh_.op = detail::opcode::cont;                // Give up the write lock in between each frame                // so that outgoing control frames might be sent.                impl.wr_block.unlock(this);                if( impl.op_close.maybe_invoke()                    || impl.op_idle_ping.maybe_invoke()                    || impl.op_rd.maybe_invoke()                    || impl.op_ping.maybe_invoke())                {                    BOOST_ASSERT(impl.wr_block.is_locked());                    goto do_suspend;                }                impl.wr_block.lock(this);            }            goto upcall;        }        //------------------------------------------------------------------        if(how_ == do_deflate)        {            // send compressed frames            for(;;)            {                b = net::buffer(impl.wr_buf.get(),                    impl.wr_buf_size);                more_ = impl.deflate(b, cb_, fin_, in_, ec);                if(impl.check_stop_now(ec))                    goto upcall;                n = buffer_bytes(b);                if(n == 0)                {                    // The input was consumed, but there is                    // no output due to compression latency.                    BOOST_ASSERT(! fin_);                    BOOST_ASSERT(buffer_bytes(cb_) == 0);                    goto upcall;                }                if(fh_.mask)                {                    fh_.key = impl.create_mask();                    detail::prepared_key key;                    detail::prepare_key(key, fh_.key);                    detail::mask_inplace(b, key);                }                fh_.fin = ! more_;                fh_.len = n;                impl.wr_fb.clear();                detail::write<                    flat_static_buffer_base>(impl.wr_fb, fh_);                impl.wr_cont = ! fin_;                // Send frame                BOOST_ASIO_CORO_YIELD                net::async_write(impl.stream(), buffers_cat(                    impl.wr_fb.data(), b),                        beast::detail::bind_continuation(std::move(*this)));                bytes_transferred_ += in_;                if(impl.check_stop_now(ec))                    goto upcall;                if(more_)                {                    fh_.op = detail::opcode::cont;                    fh_.rsv1 = false;                    // Give up the write lock in between each frame                    // so that outgoing control frames might be sent.                    impl.wr_block.unlock(this);                    if( impl.op_close.maybe_invoke()                        || impl.op_idle_ping.maybe_invoke()                        || impl.op_rd.maybe_invoke()                        || impl.op_ping.maybe_invoke())                    {                        BOOST_ASSERT(impl.wr_block.is_locked());                        goto do_suspend;                    }                    impl.wr_block.lock(this);                }                else                {                    if(fh_.fin)                        impl.do_context_takeover_write(impl.role);                    goto upcall;                }            }        }    //--------------------------------------------------------------------------    upcall:        impl.wr_block.unlock(this);        impl.op_close.maybe_invoke()            || impl.op_idle_ping.maybe_invoke()            || impl.op_rd.maybe_invoke()            || impl.op_ping.maybe_invoke();        this->complete(cont, ec, bytes_transferred_);    }}template<class NextLayer, bool deflateSupported>struct stream<NextLayer, deflateSupported>::    run_write_some_op{    template<        class WriteHandler,        class ConstBufferSequence>    void    operator()(        WriteHandler&& h,        boost::shared_ptr<impl_type> const& sp,        bool fin,        ConstBufferSequence const& b)    {        // If you get an error on the following line it means        // that your handler does not meet the documented type        // requirements for the handler.        static_assert(            beast::detail::is_invocable<WriteHandler,                void(error_code, std::size_t)>::value,            "WriteHandler type requirements not met");        write_some_op<            typename std::decay<WriteHandler>::type,            ConstBufferSequence>(                std::forward<WriteHandler>(h),                sp,                fin,                b);    }};//------------------------------------------------------------------------------template<class NextLayer, bool deflateSupported>template<class ConstBufferSequence>std::size_tstream<NextLayer, deflateSupported>::write_some(bool fin, ConstBufferSequence const& buffers){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(net::is_const_buffer_sequence<        ConstBufferSequence>::value,            "ConstBufferSequence type requirements not met");    error_code ec;    auto const bytes_transferred =        write_some(fin, buffers, ec);    if(ec)        BOOST_THROW_EXCEPTION(system_error{ec});    return bytes_transferred;}template<class NextLayer, bool deflateSupported>template<class ConstBufferSequence>std::size_tstream<NextLayer, deflateSupported>::write_some(bool fin,    ConstBufferSequence const& buffers, error_code& ec){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(net::is_const_buffer_sequence<        ConstBufferSequence>::value,            "ConstBufferSequence type requirements not met");    using beast::detail::clamp;    auto& impl = *impl_;    std::size_t bytes_transferred = 0;    ec = {};    if(impl.check_stop_now(ec))        return bytes_transferred;    detail::frame_header fh;    if(! impl.wr_cont)    {        impl.begin_msg();        fh.rsv1 = impl.wr_compress;    }    else    {        fh.rsv1 = false;    }    fh.rsv2 = false;    fh.rsv3 = false;    fh.op = impl.wr_cont ?        detail::opcode::cont : impl.wr_opcode;    fh.mask = impl.role == role_type::client;    auto remain = buffer_bytes(buffers);    if(impl.wr_compress)    {        buffers_suffix<            ConstBufferSequence> cb(buffers);        for(;;)        {            auto b = net::buffer(                impl.wr_buf.get(), impl.wr_buf_size);            auto const more = impl.deflate(                b, cb, fin, bytes_transferred, ec);            if(impl.check_stop_now(ec))                return bytes_transferred;            auto const n = buffer_bytes(b);            if(n == 0)            {                // The input was consumed, but there                // is no output due to compression                // latency.                BOOST_ASSERT(! fin);                BOOST_ASSERT(buffer_bytes(cb) == 0);                fh.fin = false;                break;            }            if(fh.mask)            {                fh.key = this->impl_->create_mask();                detail::prepared_key key;                detail::prepare_key(key, fh.key);                detail::mask_inplace(b, key);            }            fh.fin = ! more;            fh.len = n;            detail::fh_buffer fh_buf;            detail::write<                flat_static_buffer_base>(fh_buf, fh);            impl.wr_cont = ! fin;            net::write(impl.stream(),                buffers_cat(fh_buf.data(), b), ec);            if(impl.check_stop_now(ec))                return bytes_transferred;            if(! more)                break;            fh.op = detail::opcode::cont;            fh.rsv1 = false;        }        if(fh.fin)            impl.do_context_takeover_write(impl.role);    }    else if(! fh.mask)    {        if(! impl.wr_frag)        {            // no mask, no autofrag            fh.fin = fin;            fh.len = remain;            detail::fh_buffer fh_buf;            detail::write<                flat_static_buffer_base>(fh_buf, fh);            impl.wr_cont = ! fin;            net::write(impl.stream(),                buffers_cat(fh_buf.data(), buffers), ec);            if(impl.check_stop_now(ec))                return bytes_transferred;            bytes_transferred += remain;        }        else        {            // no mask, autofrag            BOOST_ASSERT(impl.wr_buf_size != 0);            buffers_suffix<                ConstBufferSequence> cb{buffers};            for(;;)            {                auto const n = clamp(remain, impl.wr_buf_size);                remain -= n;                fh.len = n;                fh.fin = fin ? remain == 0 : false;                detail::fh_buffer fh_buf;                detail::write<                    flat_static_buffer_base>(fh_buf, fh);                impl.wr_cont = ! fin;                net::write(impl.stream(),                    beast::buffers_cat(fh_buf.data(),                        beast::buffers_prefix(n, cb)), ec);                bytes_transferred += n;                if(impl.check_stop_now(ec))                    return bytes_transferred;                if(remain == 0)                    break;                fh.op = detail::opcode::cont;                cb.consume(n);            }        }    }    else if(! impl.wr_frag)    {        // mask, no autofrag        fh.fin = fin;        fh.len = remain;        fh.key = this->impl_->create_mask();        detail::prepared_key key;        detail::prepare_key(key, fh.key);        detail::fh_buffer fh_buf;        detail::write<            flat_static_buffer_base>(fh_buf, fh);        buffers_suffix<            ConstBufferSequence> cb{buffers};        {            auto const n =                clamp(remain, impl.wr_buf_size);            auto const b =                net::buffer(impl.wr_buf.get(), n);            net::buffer_copy(b, cb);            cb.consume(n);            remain -= n;            detail::mask_inplace(b, key);            impl.wr_cont = ! fin;            net::write(impl.stream(),                buffers_cat(fh_buf.data(), b), ec);            bytes_transferred += n;            if(impl.check_stop_now(ec))                return bytes_transferred;        }        while(remain > 0)        {            auto const n =                clamp(remain, impl.wr_buf_size);            auto const b =                net::buffer(impl.wr_buf.get(), n);            net::buffer_copy(b, cb);            cb.consume(n);            remain -= n;            detail::mask_inplace(b, key);            net::write(impl.stream(), b, ec);            bytes_transferred += n;            if(impl.check_stop_now(ec))                return bytes_transferred;        }    }    else    {        // mask, autofrag        BOOST_ASSERT(impl.wr_buf_size != 0);        buffers_suffix<            ConstBufferSequence> cb(buffers);        for(;;)        {            fh.key = this->impl_->create_mask();            detail::prepared_key key;            detail::prepare_key(key, fh.key);            auto const n =                clamp(remain, impl.wr_buf_size);            auto const b =                net::buffer(impl.wr_buf.get(), n);            net::buffer_copy(b, cb);            detail::mask_inplace(b, key);            fh.len = n;            remain -= n;            fh.fin = fin ? remain == 0 : false;            impl.wr_cont = ! fh.fin;            detail::fh_buffer fh_buf;            detail::write<                flat_static_buffer_base>(fh_buf, fh);            net::write(impl.stream(),                buffers_cat(fh_buf.data(), b), ec);            bytes_transferred += n;            if(impl.check_stop_now(ec))                return bytes_transferred;            if(remain == 0)                break;            fh.op = detail::opcode::cont;            cb.consume(n);        }    }    return bytes_transferred;}template<class NextLayer, bool deflateSupported>template<class ConstBufferSequence, class WriteHandler>BOOST_BEAST_ASYNC_RESULT2(WriteHandler)stream<NextLayer, deflateSupported>::async_write_some(bool fin,    ConstBufferSequence const& bs, WriteHandler&& handler){    static_assert(is_async_stream<next_layer_type>::value,        "AsyncStream type requirements not met");    static_assert(net::is_const_buffer_sequence<        ConstBufferSequence>::value,            "ConstBufferSequence type requirements not met");    return net::async_initiate<        WriteHandler,        void(error_code, std::size_t)>(            run_write_some_op{},            handler,            impl_,            fin,            bs);}//------------------------------------------------------------------------------template<class NextLayer, bool deflateSupported>template<class ConstBufferSequence>std::size_tstream<NextLayer, deflateSupported>::write(ConstBufferSequence const& buffers){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(net::is_const_buffer_sequence<        ConstBufferSequence>::value,            "ConstBufferSequence type requirements not met");    error_code ec;    auto const bytes_transferred = write(buffers, ec);    if(ec)        BOOST_THROW_EXCEPTION(system_error{ec});    return bytes_transferred;}template<class NextLayer, bool deflateSupported>template<class ConstBufferSequence>std::size_tstream<NextLayer, deflateSupported>::write(ConstBufferSequence const& buffers, error_code& ec){    static_assert(is_sync_stream<next_layer_type>::value,        "SyncStream type requirements not met");    static_assert(net::is_const_buffer_sequence<        ConstBufferSequence>::value,            "ConstBufferSequence type requirements not met");    return write_some(true, buffers, ec);}template<class NextLayer, bool deflateSupported>template<class ConstBufferSequence, class WriteHandler>BOOST_BEAST_ASYNC_RESULT2(WriteHandler)stream<NextLayer, deflateSupported>::async_write(    ConstBufferSequence const& bs, WriteHandler&& handler){    static_assert(is_async_stream<next_layer_type>::value,        "AsyncStream type requirements not met");    static_assert(net::is_const_buffer_sequence<        ConstBufferSequence>::value,            "ConstBufferSequence type requirements not met");    return net::async_initiate<        WriteHandler,        void(error_code, std::size_t)>(            run_write_some_op{},            handler,            impl_,            true,            bs);}} // websocket} // beast} // boost#endif
 |