LCOV - code coverage report
Current view: top level - capy/io - any_write_sink.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.2 % 114 96
Test Date: 2026-02-01 07:03:35 Functions: 90.3 % 31 28

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      11              : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_param.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/write_sink.hpp>
      19              : #include <boost/capy/coro.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/io_result.hpp>
      22              : #include <boost/capy/task.hpp>
      23              : 
      24              : #include <system_error>
      25              : 
      26              : #include <concepts>
      27              : #include <coroutine>
      28              : #include <cstddef>
      29              : #include <exception>
      30              : #include <new>
      31              : #include <span>
      32              : #include <stop_token>
      33              : #include <utility>
      34              : 
      35              : namespace boost {
      36              : namespace capy {
      37              : 
      38              : /** Type-erased wrapper for any WriteSink.
      39              : 
      40              :     This class provides type erasure for any type satisfying the
      41              :     @ref WriteSink concept, enabling runtime polymorphism for
      42              :     sink write operations. It uses cached awaitable storage to achieve
      43              :     zero steady-state allocation after construction.
      44              : 
      45              :     The wrapper supports two construction modes:
      46              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      47              :       allocates storage and owns the sink.
      48              :     - **Reference**: Pass a pointer to wrap without ownership. The
      49              :       pointed-to sink must outlive this wrapper.
      50              : 
      51              :     @par Awaitable Preallocation
      52              :     The constructor preallocates storage for the type-erased awaitable.
      53              :     This reserves all virtual address space at server startup
      54              :     so memory usage can be measured up front, rather than
      55              :     allocating piecemeal as traffic arrives.
      56              : 
      57              :     @par Thread Safety
      58              :     Not thread-safe. Concurrent operations on the same wrapper
      59              :     are undefined behavior.
      60              : 
      61              :     @par Example
      62              :     @code
      63              :     // Owning - takes ownership of the sink
      64              :     any_write_sink ws(some_sink{args...});
      65              : 
      66              :     // Reference - wraps without ownership
      67              :     some_sink sink;
      68              :     any_write_sink ws(&sink);
      69              : 
      70              :     const_buffer buf(data, size);
      71              :     auto [ec, n] = co_await ws.write(std::span(&buf, 1));
      72              :     auto [ec2] = co_await ws.write_eof();
      73              :     @endcode
      74              : 
      75              :     @see any_write_stream, WriteSink
      76              : */
      77              : class any_write_sink
      78              : {
      79              :     struct vtable;
      80              :     struct write_awaitable_ops;
      81              :     struct eof_awaitable_ops;
      82              : 
      83              :     template<WriteSink S>
      84              :     struct vtable_for_impl;
      85              : 
      86              :     void* sink_ = nullptr;
      87              :     vtable const* vt_ = nullptr;
      88              :     void* cached_awaitable_ = nullptr;
      89              :     void* storage_ = nullptr;
      90              :     write_awaitable_ops const* active_write_ops_ = nullptr;
      91              :     eof_awaitable_ops const* active_eof_ops_ = nullptr;
      92              : 
      93              : public:
      94              :     /** Destructor.
      95              : 
      96              :         Destroys the owned sink (if any) and releases the cached
      97              :         awaitable storage.
      98              :     */
      99              :     ~any_write_sink();
     100              : 
     101              :     /** Default constructor.
     102              : 
     103              :         Constructs an empty wrapper. Operations on a default-constructed
     104              :         wrapper result in undefined behavior.
     105              :     */
     106              :     any_write_sink() = default;
     107              : 
     108              :     /** Non-copyable.
     109              : 
     110              :         The awaitable cache is per-instance and cannot be shared.
     111              :     */
     112              :     any_write_sink(any_write_sink const&) = delete;
     113              :     any_write_sink& operator=(any_write_sink const&) = delete;
     114              : 
     115              :     /** Move constructor.
     116              : 
     117              :         Transfers ownership of the wrapped sink (if owned) and
     118              :         cached awaitable storage from `other`. After the move, `other` is
     119              :         in a default-constructed state.
     120              : 
     121              :         @param other The wrapper to move from.
     122              :     */
     123            1 :     any_write_sink(any_write_sink&& other) noexcept
     124            1 :         : sink_(std::exchange(other.sink_, nullptr))
     125            1 :         , vt_(std::exchange(other.vt_, nullptr))
     126            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     127            1 :         , storage_(std::exchange(other.storage_, nullptr))
     128            1 :         , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
     129            1 :         , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
     130              :     {
     131            1 :     }
     132              : 
     133              :     /** Move assignment operator.
     134              : 
     135              :         Destroys any owned sink and releases existing resources,
     136              :         then transfers ownership from `other`.
     137              : 
     138              :         @param other The wrapper to move from.
     139              :         @return Reference to this wrapper.
     140              :     */
     141              :     any_write_sink&
     142              :     operator=(any_write_sink&& other) noexcept;
     143              : 
     144              :     /** Construct by taking ownership of a WriteSink.
     145              : 
     146              :         Allocates storage and moves the sink into this wrapper.
     147              :         The wrapper owns the sink and will destroy it.
     148              : 
     149              :         @param s The sink to take ownership of.
     150              :     */
     151              :     template<WriteSink S>
     152              :         requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     153              :     any_write_sink(S s);
     154              : 
     155              :     /** Construct by wrapping a WriteSink without ownership.
     156              : 
     157              :         Wraps the given sink by pointer. The sink must remain
     158              :         valid for the lifetime of this wrapper.
     159              : 
     160              :         @param s Pointer to the sink to wrap.
     161              :     */
     162              :     template<WriteSink S>
     163              :     any_write_sink(S* s);
     164              : 
     165              :     /** Check if the wrapper contains a valid sink.
     166              : 
     167              :         @return `true` if wrapping a sink, `false` if default-constructed
     168              :             or moved-from.
     169              :     */
     170              :     bool
     171            9 :     has_value() const noexcept
     172              :     {
     173            9 :         return sink_ != nullptr;
     174              :     }
     175              : 
     176              :     /** Check if the wrapper contains a valid sink.
     177              : 
     178              :         @return `true` if wrapping a sink, `false` if default-constructed
     179              :             or moved-from.
     180              :     */
     181              :     explicit
     182            2 :     operator bool() const noexcept
     183              :     {
     184            2 :         return has_value();
     185              :     }
     186              : 
     187              :     /** Initiate an asynchronous write operation.
     188              : 
     189              :         Writes data from the provided buffer sequence. The operation
     190              :         completes when all bytes have been consumed, or an error
     191              :         occurs.
     192              : 
     193              :         @param buffers The buffer sequence containing data to write.
     194              :             Passed by value to ensure the sequence lives in the
     195              :             coroutine frame across suspension points.
     196              : 
     197              :         @return An awaitable yielding `(error_code,std::size_t)`.
     198              : 
     199              :         @par Preconditions
     200              :         The wrapper must contain a valid sink (`has_value() == true`).
     201              :     */
     202              :     template<ConstBufferSequence CB>
     203              :     task<io_result<std::size_t>>
     204              :     write(CB buffers);
     205              : 
     206              :     /** Initiate an asynchronous write operation with optional EOF.
     207              : 
     208              :         Writes data from the provided buffer sequence, optionally
     209              :         finalizing the sink afterwards. The operation completes when
     210              :         all bytes have been consumed and (if eof is true) the sink
     211              :         is finalized, or an error occurs.
     212              : 
     213              :         @param buffers The buffer sequence containing data to write.
     214              :             Passed by value to ensure the sequence lives in the
     215              :             coroutine frame across suspension points.
     216              : 
     217              :         @param eof If `true`, the sink is finalized after writing
     218              :             the data.
     219              : 
     220              :         @return An awaitable yielding `(error_code,std::size_t)`.
     221              : 
     222              :         @par Preconditions
     223              :         The wrapper must contain a valid sink (`has_value() == true`).
     224              :     */
     225              :     template<ConstBufferSequence CB>
     226              :     task<io_result<std::size_t>>
     227              :     write(CB buffers, bool eof);
     228              : 
     229              :     /** Signal end of data.
     230              : 
     231              :         Indicates that no more data will be written to the sink.
     232              :         The operation completes when the sink is finalized, or
     233              :         an error occurs.
     234              : 
     235              :         @return An awaitable yielding `(error_code)`.
     236              : 
     237              :         @par Preconditions
     238              :         The wrapper must contain a valid sink (`has_value() == true`).
     239              :     */
     240              :     auto
     241              :     write_eof();
     242              : 
     243              : protected:
     244              :     /** Rebind to a new sink after move.
     245              : 
     246              :         Updates the internal pointer to reference a new sink object.
     247              :         Used by owning wrappers after move assignment when the owned
     248              :         object has moved to a new location.
     249              : 
     250              :         @param new_sink The new sink to bind to. Must be the same
     251              :             type as the original sink.
     252              : 
     253              :         @note Terminates if called with a sink of different type
     254              :             than the original.
     255              :     */
     256              :     template<WriteSink S>
     257              :     void
     258              :     rebind(S& new_sink) noexcept
     259              :     {
     260              :         if(vt_ != &vtable_for_impl<S>::value)
     261              :             std::terminate();
     262              :         sink_ = &new_sink;
     263              :     }
     264              : 
     265              : private:
     266              :     auto
     267              :     write_some_(std::span<const_buffer const> buffers, bool eof);
     268              : };
     269              : 
     270              : //----------------------------------------------------------
     271              : 
     272              : struct any_write_sink::write_awaitable_ops
     273              : {
     274              :     bool (*await_ready)(void*);
     275              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     276              :     io_result<std::size_t> (*await_resume)(void*);
     277              :     void (*destroy)(void*) noexcept;
     278              : };
     279              : 
     280              : struct any_write_sink::eof_awaitable_ops
     281              : {
     282              :     bool (*await_ready)(void*);
     283              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     284              :     io_result<> (*await_resume)(void*);
     285              :     void (*destroy)(void*) noexcept;
     286              : };
     287              : 
     288              : struct any_write_sink::vtable
     289              : {
     290              :     void (*destroy)(void*) noexcept;
     291              :     std::size_t awaitable_size;
     292              :     std::size_t awaitable_align;
     293              :     write_awaitable_ops const* (*construct_write_awaitable)(
     294              :         void* sink,
     295              :         void* storage,
     296              :         std::span<const_buffer const> buffers,
     297              :         bool eof);
     298              :     eof_awaitable_ops const* (*construct_eof_awaitable)(
     299              :         void* sink,
     300              :         void* storage);
     301              : };
     302              : 
     303              : template<WriteSink S>
     304              : struct any_write_sink::vtable_for_impl
     305              : {
     306              :     using WriteAwaitable = decltype(std::declval<S&>().write(
     307              :         std::span<const_buffer const>{}, false));
     308              :     using EofAwaitable = decltype(std::declval<S&>().write_eof());
     309              : 
     310              :     static void
     311            0 :     do_destroy_impl(void* sink) noexcept
     312              :     {
     313            0 :         static_cast<S*>(sink)->~S();
     314            0 :     }
     315              : 
     316              :     static write_awaitable_ops const*
     317          126 :     construct_write_awaitable_impl(
     318              :         void* sink,
     319              :         void* storage,
     320              :         std::span<const_buffer const> buffers,
     321              :         bool eof)
     322              :     {
     323          126 :         auto& s = *static_cast<S*>(sink);
     324          126 :         ::new(storage) WriteAwaitable(s.write(buffers, eof));
     325              : 
     326              :         static constexpr write_awaitable_ops ops = {
     327          126 :             +[](void* p) {
     328          126 :                 return static_cast<WriteAwaitable*>(p)->await_ready();
     329              :             },
     330            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     331            0 :                 return detail::call_await_suspend(
     332            0 :                     static_cast<WriteAwaitable*>(p), h, ex, token);
     333              :             },
     334          126 :             +[](void* p) {
     335          126 :                 return static_cast<WriteAwaitable*>(p)->await_resume();
     336              :             },
     337          126 :             +[](void* p) noexcept {
     338          126 :                 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
     339              :             }
     340              :         };
     341          126 :         return &ops;
     342              :     }
     343              : 
     344              :     static eof_awaitable_ops const*
     345           24 :     construct_eof_awaitable_impl(
     346              :         void* sink,
     347              :         void* storage)
     348              :     {
     349           24 :         auto& s = *static_cast<S*>(sink);
     350           24 :         ::new(storage) EofAwaitable(s.write_eof());
     351              : 
     352              :         static constexpr eof_awaitable_ops ops = {
     353           24 :             +[](void* p) {
     354           24 :                 return static_cast<EofAwaitable*>(p)->await_ready();
     355              :             },
     356            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     357            0 :                 return detail::call_await_suspend(
     358            0 :                     static_cast<EofAwaitable*>(p), h, ex, token);
     359              :             },
     360           24 :             +[](void* p) {
     361           24 :                 return static_cast<EofAwaitable*>(p)->await_resume();
     362              :             },
     363           24 :             +[](void* p) noexcept {
     364           24 :                 static_cast<EofAwaitable*>(p)->~EofAwaitable();
     365              :             }
     366              :         };
     367           24 :         return &ops;
     368              :     }
     369              : 
     370              :     static constexpr std::size_t max_awaitable_size =
     371              :         sizeof(WriteAwaitable) > sizeof(EofAwaitable)
     372              :             ? sizeof(WriteAwaitable)
     373              :             : sizeof(EofAwaitable);
     374              : 
     375              :     static constexpr std::size_t max_awaitable_align =
     376              :         alignof(WriteAwaitable) > alignof(EofAwaitable)
     377              :             ? alignof(WriteAwaitable)
     378              :             : alignof(EofAwaitable);
     379              : 
     380              :     static constexpr vtable value = {
     381              :         &do_destroy_impl,
     382              :         max_awaitable_size,
     383              :         max_awaitable_align,
     384              :         &construct_write_awaitable_impl,
     385              :         &construct_eof_awaitable_impl
     386              :     };
     387              : };
     388              : 
     389              : //----------------------------------------------------------
     390              : 
     391              : inline
     392           99 : any_write_sink::~any_write_sink()
     393              : {
     394           99 :     if(storage_)
     395              :     {
     396            0 :         vt_->destroy(sink_);
     397            0 :         ::operator delete(storage_);
     398              :     }
     399           99 :     if(cached_awaitable_)
     400           96 :         ::operator delete(cached_awaitable_);
     401           99 : }
     402              : 
     403              : inline any_write_sink&
     404            1 : any_write_sink::operator=(any_write_sink&& other) noexcept
     405              : {
     406            1 :     if(this != &other)
     407              :     {
     408            1 :         if(storage_)
     409              :         {
     410            0 :             vt_->destroy(sink_);
     411            0 :             ::operator delete(storage_);
     412              :         }
     413            1 :         if(cached_awaitable_)
     414            0 :             ::operator delete(cached_awaitable_);
     415            1 :         sink_ = std::exchange(other.sink_, nullptr);
     416            1 :         vt_ = std::exchange(other.vt_, nullptr);
     417            1 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     418            1 :         storage_ = std::exchange(other.storage_, nullptr);
     419            1 :         active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
     420            1 :         active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
     421              :     }
     422            1 :     return *this;
     423              : }
     424              : 
     425              : template<WriteSink S>
     426              :     requires (!std::same_as<std::decay_t<S>, any_write_sink>)
     427              : any_write_sink::any_write_sink(S s)
     428              :     : vt_(&vtable_for_impl<S>::value)
     429              : {
     430              :     struct guard {
     431              :         any_write_sink* self;
     432              :         bool committed = false;
     433              :         ~guard() {
     434              :             if(!committed && self->storage_) {
     435              :                 self->vt_->destroy(self->sink_);
     436              :                 ::operator delete(self->storage_);
     437              :                 self->storage_ = nullptr;
     438              :                 self->sink_ = nullptr;
     439              :             }
     440              :         }
     441              :     } g{this};
     442              : 
     443              :     storage_ = ::operator new(sizeof(S));
     444              :     sink_ = ::new(storage_) S(std::move(s));
     445              : 
     446              :     // Preallocate the awaitable storage (sized for max of write/eof)
     447              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     448              : 
     449              :     g.committed = true;
     450              : }
     451              : 
     452              : template<WriteSink S>
     453           96 : any_write_sink::any_write_sink(S* s)
     454           96 :     : sink_(s)
     455           96 :     , vt_(&vtable_for_impl<S>::value)
     456              : {
     457              :     // Preallocate the awaitable storage (sized for max of write/eof)
     458           96 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     459           96 : }
     460              : 
     461              : //----------------------------------------------------------
     462              : 
     463              : inline auto
     464          126 : any_write_sink::write_some_(
     465              :     std::span<const_buffer const> buffers,
     466              :     bool eof)
     467              : {
     468              :     struct awaitable
     469              :     {
     470              :         any_write_sink* self_;
     471              :         std::span<const_buffer const> buffers_;
     472              :         bool eof_;
     473              : 
     474              :         bool
     475          126 :         await_ready() const noexcept
     476              :         {
     477          126 :             return false;
     478              :         }
     479              : 
     480              :         coro
     481          126 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     482              :         {
     483              :             // Construct the underlying awaitable into cached storage
     484          252 :             self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
     485          126 :                 self_->sink_,
     486          126 :                 self_->cached_awaitable_,
     487              :                 buffers_,
     488          126 :                 eof_);
     489              : 
     490              :             // Check if underlying is immediately ready
     491          126 :             if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
     492          126 :                 return h;
     493              : 
     494              :             // Forward to underlying awaitable
     495            0 :             return self_->active_write_ops_->await_suspend(
     496            0 :                 self_->cached_awaitable_, h, ex, token);
     497              :         }
     498              : 
     499              :         io_result<std::size_t>
     500          126 :         await_resume()
     501              :         {
     502              :             struct guard {
     503              :                 any_write_sink* self;
     504          126 :                 ~guard() {
     505          126 :                     self->active_write_ops_->destroy(self->cached_awaitable_);
     506          126 :                     self->active_write_ops_ = nullptr;
     507          126 :                 }
     508          126 :             } g{self_};
     509          126 :             return self_->active_write_ops_->await_resume(
     510          224 :                 self_->cached_awaitable_);
     511          126 :         }
     512              :     };
     513          126 :     return awaitable{this, buffers, eof};
     514              : }
     515              : 
     516              : inline auto
     517           24 : any_write_sink::write_eof()
     518              : {
     519              :     struct awaitable
     520              :     {
     521              :         any_write_sink* self_;
     522              : 
     523              :         bool
     524           24 :         await_ready() const noexcept
     525              :         {
     526           24 :             return false;
     527              :         }
     528              : 
     529              :         coro
     530           24 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     531              :         {
     532              :             // Construct the underlying awaitable into cached storage
     533           48 :             self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
     534           24 :                 self_->sink_,
     535           24 :                 self_->cached_awaitable_);
     536              : 
     537              :             // Check if underlying is immediately ready
     538           24 :             if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
     539           24 :                 return h;
     540              : 
     541              :             // Forward to underlying awaitable
     542            0 :             return self_->active_eof_ops_->await_suspend(
     543            0 :                 self_->cached_awaitable_, h, ex, token);
     544              :         }
     545              : 
     546              :         io_result<>
     547           24 :         await_resume()
     548              :         {
     549              :             struct guard {
     550              :                 any_write_sink* self;
     551           24 :                 ~guard() {
     552           24 :                     self->active_eof_ops_->destroy(self->cached_awaitable_);
     553           24 :                     self->active_eof_ops_ = nullptr;
     554           24 :                 }
     555           24 :             } g{self_};
     556           24 :             return self_->active_eof_ops_->await_resume(
     557           41 :                 self_->cached_awaitable_);
     558           24 :         }
     559              :     };
     560           24 :     return awaitable{this};
     561              : }
     562              : 
     563              : template<ConstBufferSequence CB>
     564              : task<io_result<std::size_t>>
     565           66 : any_write_sink::write(CB buffers)
     566              : {
     567           66 :     return write(buffers, false);
     568              : }
     569              : 
     570              : template<ConstBufferSequence CB>
     571              : task<io_result<std::size_t>>
     572           98 : any_write_sink::write(CB buffers, bool eof)
     573              : {
     574              :     buffer_param<CB> bp(buffers);
     575              :     std::size_t total = 0;
     576              : 
     577              :     for(;;)
     578              :     {
     579              :         auto bufs = bp.data();
     580              :         if(bufs.empty())
     581              :             break;
     582              : 
     583              :         auto [ec, n] = co_await write_some_(bufs, false);
     584              :         if(ec)
     585              :             co_return {ec, total + n};
     586              :         bp.consume(n);
     587              :         total += n;
     588              :     }
     589              : 
     590              :     if(eof)
     591              :     {
     592              :         auto [ec] = co_await write_eof();
     593              :         if(ec)
     594              :             co_return {ec, total};
     595              :     }
     596              : 
     597              :     co_return {{}, total};
     598          196 : }
     599              : 
     600              : } // namespace capy
     601              : } // namespace boost
     602              : 
     603              : #endif
        

Generated by: LCOV version 2.3