LCOV - code coverage report
Current view: top level - capy/io - any_write_stream.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 82.7 % 75 62
Test Date: 2026-02-01 07:03:35 Functions: 90.0 % 40 36

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      11              : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_param.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/write_stream.hpp>
      19              : #include <boost/capy/coro.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/io_result.hpp>
      22              : 
      23              : #include <system_error>
      24              : 
      25              : #include <concepts>
      26              : #include <coroutine>
      27              : #include <cstddef>
      28              : #include <new>
      29              : #include <span>
      30              : #include <stop_token>
      31              : #include <utility>
      32              : 
      33              : namespace boost {
      34              : namespace capy {
      35              : 
      36              : /** Type-erased wrapper for any WriteStream.
      37              : 
      38              :     This class provides type erasure for any type satisfying the
      39              :     @ref WriteStream concept, enabling runtime polymorphism for
      40              :     write operations. It uses cached awaitable storage to achieve
      41              :     zero steady-state allocation after construction.
      42              : 
      43              :     The wrapper supports two construction modes:
      44              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      45              :       allocates storage and owns the stream.
      46              :     - **Reference**: Pass a pointer to wrap without ownership. The
      47              :       pointed-to stream must outlive this wrapper.
      48              : 
      49              :     @par Awaitable Preallocation
      50              :     The constructor preallocates storage for the type-erased awaitable.
      51              :     This reserves all virtual address space at server startup
      52              :     so memory usage can be measured up front, rather than
      53              :     allocating piecemeal as traffic arrives.
      54              : 
      55              :     @par Thread Safety
      56              :     Not thread-safe. Concurrent operations on the same wrapper
      57              :     are undefined behavior.
      58              : 
      59              :     @par Example
      60              :     @code
      61              :     // Owning - takes ownership of the stream
      62              :     any_write_stream stream(socket{ioc});
      63              : 
      64              :     // Reference - wraps without ownership
      65              :     socket sock(ioc);
      66              :     any_write_stream stream(&sock);
      67              : 
      68              :     const_buffer buf(data, size);
      69              :     auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
      70              :     @endcode
      71              : 
      72              :     @see any_read_stream, any_stream, WriteStream
      73              : */
      74              : class any_write_stream
      75              : {
      76              :     struct vtable;
      77              :     struct awaitable_ops;
      78              : 
      79              :     template<WriteStream S>
      80              :     struct vtable_for_impl;
      81              : 
      82              :     void* stream_ = nullptr;
      83              :     vtable const* vt_ = nullptr;
      84              :     void* cached_awaitable_ = nullptr;
      85              :     void* storage_ = nullptr;
      86              :     awaitable_ops const* active_ops_ = nullptr;
      87              : 
      88              : public:
      89              :     /** Destructor.
      90              : 
      91              :         Destroys the owned stream (if any) and releases the cached
      92              :         awaitable storage.
      93              :     */
      94              :     ~any_write_stream();
      95              : 
      96              :     /** Default constructor.
      97              : 
      98              :         Constructs an empty wrapper. Operations on a default-constructed
      99              :         wrapper result in undefined behavior.
     100              :     */
     101            1 :     any_write_stream() = default;
     102              : 
     103              :     /** Non-copyable.
     104              : 
     105              :         The awaitable cache is per-instance and cannot be shared.
     106              :     */
     107              :     any_write_stream(any_write_stream const&) = delete;
     108              :     any_write_stream& operator=(any_write_stream const&) = delete;
     109              : 
     110              :     /** Move constructor.
     111              : 
     112              :         Transfers ownership of the wrapped stream (if owned) and
     113              :         cached awaitable storage from `other`. After the move, `other` is
     114              :         in a default-constructed state.
     115              : 
     116              :         @param other The wrapper to move from.
     117              :     */
     118            2 :     any_write_stream(any_write_stream&& other) noexcept
     119            2 :         : stream_(std::exchange(other.stream_, nullptr))
     120            2 :         , vt_(std::exchange(other.vt_, nullptr))
     121            2 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     122            2 :         , storage_(std::exchange(other.storage_, nullptr))
     123            2 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     124              :     {
     125            2 :     }
     126              : 
     127              :     /** Move assignment operator.
     128              : 
     129              :         Destroys any owned stream and releases existing resources,
     130              :         then transfers ownership from `other`.
     131              : 
     132              :         @param other The wrapper to move from.
     133              :         @return Reference to this wrapper.
     134              :     */
     135              :     any_write_stream&
     136              :     operator=(any_write_stream&& other) noexcept;
     137              : 
     138              :     /** Construct by taking ownership of a WriteStream.
     139              : 
     140              :         Allocates storage and moves the stream into this wrapper.
     141              :         The wrapper owns the stream and will destroy it.
     142              : 
     143              :         @param s The stream to take ownership of.
     144              :     */
     145              :     template<WriteStream S>
     146              :         requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     147              :     any_write_stream(S s);
     148              : 
     149              :     /** Construct by wrapping a WriteStream without ownership.
     150              : 
     151              :         Wraps the given stream by pointer. The stream must remain
     152              :         valid for the lifetime of this wrapper.
     153              : 
     154              :         @param s Pointer to the stream to wrap.
     155              :     */
     156              :     template<WriteStream S>
     157              :     any_write_stream(S* s);
     158              : 
     159              :     /** Check if the wrapper contains a valid stream.
     160              : 
     161              :         @return `true` if wrapping a stream, `false` if default-constructed
     162              :             or moved-from.
     163              :     */
     164              :     bool
     165           15 :     has_value() const noexcept
     166              :     {
     167           15 :         return stream_ != nullptr;
     168              :     }
     169              : 
     170              :     /** Check if the wrapper contains a valid stream.
     171              : 
     172              :         @return `true` if wrapping a stream, `false` if default-constructed
     173              :             or moved-from.
     174              :     */
     175              :     explicit
     176            2 :     operator bool() const noexcept
     177              :     {
     178            2 :         return has_value();
     179              :     }
     180              : 
     181              :     /** Initiate an asynchronous write operation.
     182              : 
     183              :         Writes data from the provided buffer sequence. The operation
     184              :         completes when at least one byte has been written, or an error
     185              :         occurs.
     186              : 
     187              :         @param buffers The buffer sequence containing data to write.
     188              :             Passed by value to ensure the sequence lives in the
     189              :             coroutine frame across suspension points.
     190              : 
     191              :         @return An awaitable yielding `(error_code,std::size_t)`.
     192              : 
     193              :         @par Preconditions
     194              :         The wrapper must contain a valid stream (`has_value() == true`).
     195              :     */
     196              :     template<ConstBufferSequence CB>
     197              :     auto
     198              :     write_some(CB buffers);
     199              : 
     200              : protected:
     201              :     /** Rebind to a new stream after move.
     202              : 
     203              :         Updates the internal pointer to reference a new stream object.
     204              :         Used by owning wrappers after move assignment when the owned
     205              :         object has moved to a new location.
     206              : 
     207              :         @param new_stream The new stream to bind to. Must be the same
     208              :             type as the original stream.
     209              : 
     210              :         @note Terminates if called with a stream of different type
     211              :             than the original.
     212              :     */
     213              :     template<WriteStream S>
     214              :     void
     215              :     rebind(S& new_stream) noexcept
     216              :     {
     217              :         if(vt_ != &vtable_for_impl<S>::value)
     218              :             std::terminate();
     219              :         stream_ = &new_stream;
     220              :     }
     221              : };
     222              : 
     223              : //----------------------------------------------------------
     224              : 
     225              : struct any_write_stream::awaitable_ops
     226              : {
     227              :     bool (*await_ready)(void*);
     228              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     229              :     io_result<std::size_t> (*await_resume)(void*);
     230              :     void (*destroy)(void*) noexcept;
     231              : };
     232              : 
     233              : struct any_write_stream::vtable
     234              : {
     235              :     void (*destroy)(void*) noexcept;
     236              :     std::size_t awaitable_size;
     237              :     std::size_t awaitable_align;
     238              :     awaitable_ops const* (*construct_awaitable)(
     239              :         void* stream,
     240              :         void* storage,
     241              :         std::span<const_buffer const> buffers);
     242              : };
     243              : 
     244              : template<WriteStream S>
     245              : struct any_write_stream::vtable_for_impl
     246              : {
     247              :     using Awaitable = decltype(std::declval<S&>().write_some(
     248              :         std::span<const_buffer const>{}));
     249              : 
     250              :     static void
     251            0 :     do_destroy_impl(void* stream) noexcept
     252              :     {
     253            0 :         static_cast<S*>(stream)->~S();
     254            0 :     }
     255              : 
     256              :     static awaitable_ops const*
     257           60 :     construct_awaitable_impl(
     258              :         void* stream,
     259              :         void* storage,
     260              :         std::span<const_buffer const> buffers)
     261              :     {
     262           60 :         auto& s = *static_cast<S*>(stream);
     263           60 :         ::new(storage) Awaitable(s.write_some(buffers));
     264              : 
     265              :         static constexpr awaitable_ops ops = {
     266           60 :             +[](void* p) {
     267           60 :                 return static_cast<Awaitable*>(p)->await_ready();
     268              :             },
     269            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     270            0 :                 return detail::call_await_suspend(
     271            0 :                     static_cast<Awaitable*>(p), h, ex, token);
     272              :             },
     273           60 :             +[](void* p) {
     274           60 :                 return static_cast<Awaitable*>(p)->await_resume();
     275              :             },
     276           60 :             +[](void* p) noexcept {
     277           60 :                 static_cast<Awaitable*>(p)->~Awaitable();
     278              :             }
     279              :         };
     280           60 :         return &ops;
     281              :     }
     282              : 
     283              :     static constexpr vtable value = {
     284              :         &do_destroy_impl,
     285              :         sizeof(Awaitable),
     286              :         alignof(Awaitable),
     287              :         &construct_awaitable_impl
     288              :     };
     289              : };
     290              : 
     291              : //----------------------------------------------------------
     292              : 
     293              : inline
     294           72 : any_write_stream::~any_write_stream()
     295              : {
     296           72 :     if(storage_)
     297              :     {
     298            0 :         vt_->destroy(stream_);
     299            0 :         ::operator delete(storage_);
     300              :     }
     301           72 :     if(cached_awaitable_)
     302           65 :         ::operator delete(cached_awaitable_);
     303           72 : }
     304              : 
     305              : inline any_write_stream&
     306            3 : any_write_stream::operator=(any_write_stream&& other) noexcept
     307              : {
     308            3 :     if(this != &other)
     309              :     {
     310            3 :         if(storage_)
     311              :         {
     312            0 :             vt_->destroy(stream_);
     313            0 :             ::operator delete(storage_);
     314              :         }
     315            3 :         if(cached_awaitable_)
     316            0 :             ::operator delete(cached_awaitable_);
     317            3 :         stream_ = std::exchange(other.stream_, nullptr);
     318            3 :         vt_ = std::exchange(other.vt_, nullptr);
     319            3 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     320            3 :         storage_ = std::exchange(other.storage_, nullptr);
     321            3 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     322              :     }
     323            3 :     return *this;
     324              : }
     325              : 
     326              : template<WriteStream S>
     327              :     requires (!std::same_as<std::decay_t<S>, any_write_stream>)
     328              : any_write_stream::any_write_stream(S s)
     329              :     : vt_(&vtable_for_impl<S>::value)
     330              : {
     331              :     struct guard {
     332              :         any_write_stream* self;
     333              :         bool committed = false;
     334              :         ~guard() {
     335              :             if(!committed && self->storage_) {
     336              :                 self->vt_->destroy(self->stream_);
     337              :                 ::operator delete(self->storage_);
     338              :                 self->storage_ = nullptr;
     339              :                 self->stream_ = nullptr;
     340              :             }
     341              :         }
     342              :     } g{this};
     343              : 
     344              :     storage_ = ::operator new(sizeof(S));
     345              :     stream_ = ::new(storage_) S(std::move(s));
     346              : 
     347              :     // Preallocate the awaitable storage
     348              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     349              : 
     350              :     g.committed = true;
     351              : }
     352              : 
     353              : template<WriteStream S>
     354           65 : any_write_stream::any_write_stream(S* s)
     355           65 :     : stream_(s)
     356           65 :     , vt_(&vtable_for_impl<S>::value)
     357              : {
     358              :     // Preallocate the awaitable storage
     359           65 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     360           65 : }
     361              : 
     362              : //----------------------------------------------------------
     363              : 
     364              : template<ConstBufferSequence CB>
     365              : auto
     366           60 : any_write_stream::write_some(CB buffers)
     367              : {
     368              :     struct awaitable
     369              :     {
     370              :         any_write_stream* self_;
     371              :         buffer_param<CB> bp_;
     372              : 
     373              :         bool
     374           60 :         await_ready() const noexcept
     375              :         {
     376           60 :             return false;
     377              :         }
     378              : 
     379              :         coro
     380           60 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     381              :         {
     382              :             // Construct the underlying awaitable into cached storage
     383           60 :             self_->active_ops_ = self_->vt_->construct_awaitable(
     384           60 :                 self_->stream_,
     385           60 :                 self_->cached_awaitable_,
     386           60 :                 bp_.data());
     387              : 
     388              :             // Check if underlying is immediately ready
     389           60 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     390           60 :                 return h;
     391              : 
     392              :             // Forward to underlying awaitable
     393            0 :             return self_->active_ops_->await_suspend(
     394            0 :                 self_->cached_awaitable_, h, ex, token);
     395              :         }
     396              : 
     397              :         io_result<std::size_t>
     398           60 :         await_resume()
     399              :         {
     400              :             struct guard {
     401              :                 any_write_stream* self;
     402           60 :                 ~guard() {
     403           60 :                     self->active_ops_->destroy(self->cached_awaitable_);
     404           60 :                     self->active_ops_ = nullptr;
     405           60 :                 }
     406           60 :             } g{self_};
     407           60 :             return self_->active_ops_->await_resume(
     408          103 :                 self_->cached_awaitable_);
     409           60 :         }
     410              :     };
     411           60 :     return awaitable{this, buffer_param<CB>(buffers)};
     412              : }
     413              : 
     414              : } // namespace capy
     415              : } // namespace boost
     416              : 
     417              : #endif
        

Generated by: LCOV version 2.3