LCOV - code coverage report
Current view: top level - capy/io - any_read_source.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 82.7 % 75 62
Test Date: 2026-02-01 07:03:35 Functions: 89.5 % 19 17

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_param.hpp>
      17              : #include <boost/capy/concept/io_awaitable.hpp>
      18              : #include <boost/capy/concept/read_source.hpp>
      19              : #include <boost/capy/coro.hpp>
      20              : #include <boost/capy/ex/executor_ref.hpp>
      21              : #include <boost/capy/io_result.hpp>
      22              : #include <boost/capy/task.hpp>
      23              : 
      24              : #include <system_error>
      25              : 
      26              : #include <concepts>
      27              : #include <coroutine>
      28              : #include <cstddef>
      29              : #include <new>
      30              : #include <span>
      31              : #include <stop_token>
      32              : #include <utility>
      33              : 
      34              : namespace boost {
      35              : namespace capy {
      36              : 
      37              : /** Type-erased wrapper for any ReadSource.
      38              : 
      39              :     This class provides type erasure for any type satisfying the
      40              :     @ref ReadSource concept, enabling runtime polymorphism for
      41              :     source read operations. It uses cached awaitable storage to achieve
      42              :     zero steady-state allocation after construction.
      43              : 
      44              :     The wrapper supports two construction modes:
      45              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      46              :       allocates storage and owns the source.
      47              :     - **Reference**: Pass a pointer to wrap without ownership. The
      48              :       pointed-to source must outlive this wrapper.
      49              : 
      50              :     @par Awaitable Preallocation
      51              :     The constructor preallocates storage for the type-erased awaitable.
      52              :     This reserves all virtual address space at server startup
      53              :     so memory usage can be measured up front, rather than
      54              :     allocating piecemeal as traffic arrives.
      55              : 
      56              :     @par Thread Safety
      57              :     Not thread-safe. Concurrent operations on the same wrapper
      58              :     are undefined behavior.
      59              : 
      60              :     @par Example
      61              :     @code
      62              :     // Owning - takes ownership of the source
      63              :     any_read_source rs(some_source{args...});
      64              : 
      65              :     // Reference - wraps without ownership
      66              :     some_source source;
      67              :     any_read_source rs(&source);
      68              : 
      69              :     mutable_buffer buf(data, size);
      70              :     auto [ec, n] = co_await rs.read(std::span(&buf, 1));
      71              :     @endcode
      72              : 
      73              :     @see any_read_stream, ReadSource
      74              : */
      75              : class any_read_source
      76              : {
      77              :     struct vtable;
      78              :     struct awaitable_ops;
      79              : 
      80              :     template<ReadSource S>
      81              :     struct vtable_for_impl;
      82              : 
      83              :     void* source_ = nullptr;
      84              :     vtable const* vt_ = nullptr;
      85              :     void* cached_awaitable_ = nullptr;
      86              :     void* storage_ = nullptr;
      87              :     awaitable_ops const* active_ops_ = nullptr;
      88              : 
      89              : public:
      90              :     /** Destructor.
      91              : 
      92              :         Destroys the owned source (if any) and releases the cached
      93              :         awaitable storage.
      94              :     */
      95              :     ~any_read_source();
      96              : 
      97              :     /** Default constructor.
      98              : 
      99              :         Constructs an empty wrapper. Operations on a default-constructed
     100              :         wrapper result in undefined behavior.
     101              :     */
     102              :     any_read_source() = default;
     103              : 
     104              :     /** Non-copyable.
     105              : 
     106              :         The awaitable cache is per-instance and cannot be shared.
     107              :     */
     108              :     any_read_source(any_read_source const&) = delete;
     109              :     any_read_source& operator=(any_read_source const&) = delete;
     110              : 
     111              :     /** Move constructor.
     112              : 
     113              :         Transfers ownership of the wrapped source (if owned) and
     114              :         cached awaitable storage from `other`. After the move, `other` is
     115              :         in a default-constructed state.
     116              : 
     117              :         @param other The wrapper to move from.
     118              :     */
     119            1 :     any_read_source(any_read_source&& other) noexcept
     120            1 :         : source_(std::exchange(other.source_, nullptr))
     121            1 :         , vt_(std::exchange(other.vt_, nullptr))
     122            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     123            1 :         , storage_(std::exchange(other.storage_, nullptr))
     124            1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     125              :     {
     126            1 :     }
     127              : 
     128              :     /** Move assignment operator.
     129              : 
     130              :         Destroys any owned source and releases existing resources,
     131              :         then transfers ownership from `other`.
     132              : 
     133              :         @param other The wrapper to move from.
     134              :         @return Reference to this wrapper.
     135              :     */
     136              :     any_read_source&
     137              :     operator=(any_read_source&& other) noexcept;
     138              : 
     139              :     /** Construct by taking ownership of a ReadSource.
     140              : 
     141              :         Allocates storage and moves the source into this wrapper.
     142              :         The wrapper owns the source and will destroy it.
     143              : 
     144              :         @param s The source to take ownership of.
     145              :     */
     146              :     template<ReadSource S>
     147              :         requires (!std::same_as<std::decay_t<S>, any_read_source>)
     148              :     any_read_source(S s);
     149              : 
     150              :     /** Construct by wrapping a ReadSource without ownership.
     151              : 
     152              :         Wraps the given source by pointer. The source must remain
     153              :         valid for the lifetime of this wrapper.
     154              : 
     155              :         @param s Pointer to the source to wrap.
     156              :     */
     157              :     template<ReadSource S>
     158              :     any_read_source(S* s);
     159              : 
     160              :     /** Check if the wrapper contains a valid source.
     161              : 
     162              :         @return `true` if wrapping a source, `false` if default-constructed
     163              :             or moved-from.
     164              :     */
     165              :     bool
     166            9 :     has_value() const noexcept
     167              :     {
     168            9 :         return source_ != nullptr;
     169              :     }
     170              : 
     171              :     /** Check if the wrapper contains a valid source.
     172              : 
     173              :         @return `true` if wrapping a source, `false` if default-constructed
     174              :             or moved-from.
     175              :     */
     176              :     explicit
     177            2 :     operator bool() const noexcept
     178              :     {
     179            2 :         return has_value();
     180              :     }
     181              : 
     182              :     /** Initiate an asynchronous read operation.
     183              : 
     184              :         Reads data into the provided buffer sequence. The operation
     185              :         completes when the entire buffer sequence is filled, end-of-file
     186              :         is reached, or an error occurs.
     187              : 
     188              :         @param buffers The buffer sequence to read into. Passed by
     189              :             value to ensure the sequence lives in the coroutine frame
     190              :             across suspension points.
     191              : 
     192              :         @return An awaitable yielding `(error_code,std::size_t)`.
     193              : 
     194              :         @par Postconditions
     195              :         Exactly one of the following is true on return:
     196              :         @li **Success**: `!ec` and `n == buffer_size(buffers)`.
     197              :             The entire buffer was filled.
     198              :         @li **End-of-stream or Error**: `ec` and `n` indicates
     199              :             the number of bytes transferred before the failure.
     200              : 
     201              :         @par Preconditions
     202              :         The wrapper must contain a valid source (`has_value() == true`).
     203              :     */
     204              :     template<MutableBufferSequence MB>
     205              :     task<io_result<std::size_t>>
     206              :     read(MB buffers);
     207              : 
     208              : protected:
     209              :     /** Rebind to a new source after move.
     210              : 
     211              :         Updates the internal pointer to reference a new source object.
     212              :         Used by owning wrappers after move assignment when the owned
     213              :         object has moved to a new location.
     214              : 
     215              :         @param new_source The new source to bind to. Must be the same
     216              :             type as the original source.
     217              : 
     218              :         @note Terminates if called with a source of different type
     219              :             than the original.
     220              :     */
     221              :     template<ReadSource S>
     222              :     void
     223              :     rebind(S& new_source) noexcept
     224              :     {
     225              :         if(vt_ != &vtable_for_impl<S>::value)
     226              :             std::terminate();
     227              :         source_ = &new_source;
     228              :     }
     229              : 
     230              : private:
     231              :     auto
     232              :     read_some_(std::span<mutable_buffer const> buffers);
     233              : };
     234              : 
     235              : //----------------------------------------------------------
     236              : 
     237              : struct any_read_source::awaitable_ops
     238              : {
     239              :     bool (*await_ready)(void*);
     240              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     241              :     io_result<std::size_t> (*await_resume)(void*);
     242              :     void (*destroy)(void*) noexcept;
     243              : };
     244              : 
     245              : struct any_read_source::vtable
     246              : {
     247              :     void (*destroy)(void*) noexcept;
     248              :     std::size_t awaitable_size;
     249              :     std::size_t awaitable_align;
     250              :     awaitable_ops const* (*construct_awaitable)(
     251              :         void* source,
     252              :         void* storage,
     253              :         std::span<mutable_buffer const> buffers);
     254              : };
     255              : 
     256              : template<ReadSource S>
     257              : struct any_read_source::vtable_for_impl
     258              : {
     259              :     using Awaitable = decltype(std::declval<S&>().read(
     260              :         std::span<mutable_buffer const>{}));
     261              : 
     262              :     static void
     263            0 :     do_destroy_impl(void* source) noexcept
     264              :     {
     265            0 :         static_cast<S*>(source)->~S();
     266            0 :     }
     267              : 
     268              :     static awaitable_ops const*
     269          130 :     construct_awaitable_impl(
     270              :         void* source,
     271              :         void* storage,
     272              :         std::span<mutable_buffer const> buffers)
     273              :     {
     274          130 :         auto& s = *static_cast<S*>(source);
     275          130 :         ::new(storage) Awaitable(s.read(buffers));
     276              : 
     277              :         static constexpr awaitable_ops ops = {
     278          130 :             +[](void* p) {
     279          130 :                 return static_cast<Awaitable*>(p)->await_ready();
     280              :             },
     281            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     282            0 :                 return detail::call_await_suspend(
     283            0 :                     static_cast<Awaitable*>(p), h, ex, token);
     284              :             },
     285          130 :             +[](void* p) {
     286          130 :                 return static_cast<Awaitable*>(p)->await_resume();
     287              :             },
     288          130 :             +[](void* p) noexcept {
     289          130 :                 static_cast<Awaitable*>(p)->~Awaitable();
     290              :             }
     291              :         };
     292          130 :         return &ops;
     293              :     }
     294              : 
     295              :     static constexpr vtable value = {
     296              :         &do_destroy_impl,
     297              :         sizeof(Awaitable),
     298              :         alignof(Awaitable),
     299              :         &construct_awaitable_impl
     300              :     };
     301              : };
     302              : 
     303              : //----------------------------------------------------------
     304              : 
     305              : inline
     306           91 : any_read_source::~any_read_source()
     307              : {
     308           91 :     if(storage_)
     309              :     {
     310            0 :         vt_->destroy(source_);
     311            0 :         ::operator delete(storage_);
     312              :     }
     313           91 :     if(cached_awaitable_)
     314           88 :         ::operator delete(cached_awaitable_);
     315           91 : }
     316              : 
     317              : inline any_read_source&
     318            1 : any_read_source::operator=(any_read_source&& other) noexcept
     319              : {
     320            1 :     if(this != &other)
     321              :     {
     322            1 :         if(storage_)
     323              :         {
     324            0 :             vt_->destroy(source_);
     325            0 :             ::operator delete(storage_);
     326              :         }
     327            1 :         if(cached_awaitable_)
     328            0 :             ::operator delete(cached_awaitable_);
     329            1 :         source_ = std::exchange(other.source_, nullptr);
     330            1 :         vt_ = std::exchange(other.vt_, nullptr);
     331            1 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     332            1 :         storage_ = std::exchange(other.storage_, nullptr);
     333            1 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     334              :     }
     335            1 :     return *this;
     336              : }
     337              : 
     338              : template<ReadSource S>
     339              :     requires (!std::same_as<std::decay_t<S>, any_read_source>)
     340              : any_read_source::any_read_source(S s)
     341              :     : vt_(&vtable_for_impl<S>::value)
     342              : {
     343              :     struct guard {
     344              :         any_read_source* self;
     345              :         bool committed = false;
     346              :         ~guard() {
     347              :             if(!committed && self->storage_) {
     348              :                 self->vt_->destroy(self->source_);
     349              :                 ::operator delete(self->storage_);
     350              :                 self->storage_ = nullptr;
     351              :                 self->source_ = nullptr;
     352              :             }
     353              :         }
     354              :     } g{this};
     355              : 
     356              :     storage_ = ::operator new(sizeof(S));
     357              :     source_ = ::new(storage_) S(std::move(s));
     358              : 
     359              :     // Preallocate the awaitable storage
     360              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     361              : 
     362              :     g.committed = true;
     363              : }
     364              : 
     365              : template<ReadSource S>
     366           88 : any_read_source::any_read_source(S* s)
     367           88 :     : source_(s)
     368           88 :     , vt_(&vtable_for_impl<S>::value)
     369              : {
     370              :     // Preallocate the awaitable storage
     371           88 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     372           88 : }
     373              : 
     374              : //----------------------------------------------------------
     375              : 
     376              : inline auto
     377          130 : any_read_source::read_some_(std::span<mutable_buffer const> buffers)
     378              : {
     379              :     struct awaitable
     380              :     {
     381              :         any_read_source* self_;
     382              :         std::span<mutable_buffer const> buffers_;
     383              : 
     384              :         bool
     385          130 :         await_ready() const noexcept
     386              :         {
     387          130 :             return false;
     388              :         }
     389              : 
     390              :         coro
     391          130 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     392              :         {
     393              :             // Construct the underlying awaitable into cached storage
     394          260 :             self_->active_ops_ = self_->vt_->construct_awaitable(
     395          130 :                 self_->source_,
     396          130 :                 self_->cached_awaitable_,
     397              :                 buffers_);
     398              : 
     399              :             // Check if underlying is immediately ready
     400          130 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     401          130 :                 return h;
     402              : 
     403              :             // Forward to underlying awaitable
     404            0 :             return self_->active_ops_->await_suspend(
     405            0 :                 self_->cached_awaitable_, h, ex, token);
     406              :         }
     407              : 
     408              :         io_result<std::size_t>
     409          130 :         await_resume()
     410              :         {
     411              :             struct guard {
     412              :                 any_read_source* self;
     413          130 :                 ~guard() {
     414          130 :                     self->active_ops_->destroy(self->cached_awaitable_);
     415          130 :                     self->active_ops_ = nullptr;
     416          130 :                 }
     417          130 :             } g{self_};
     418          130 :             return self_->active_ops_->await_resume(
     419          229 :                 self_->cached_awaitable_);
     420          130 :         }
     421              :     };
     422          130 :     return awaitable{this, buffers};
     423              : }
     424              : 
     425              : template<MutableBufferSequence MB>
     426              : task<io_result<std::size_t>>
     427          106 : any_read_source::read(MB buffers)
     428              : {
     429              :     buffer_param<MB> bp(std::move(buffers));
     430              :     std::size_t total = 0;
     431              : 
     432              :     for(;;)
     433              :     {
     434              :         auto bufs = bp.data();
     435              :         if(bufs.empty())
     436              :             break;
     437              : 
     438              :         auto [ec, n] = co_await read_some_(bufs);
     439              :         total += n;
     440              :         if(ec)
     441              :             co_return {ec, total};
     442              :         bp.consume(n);
     443              :     }
     444              : 
     445              :     co_return {{}, total};
     446          212 : }
     447              : 
     448              : } // namespace capy
     449              : } // namespace boost
     450              : 
     451              : #endif
        

Generated by: LCOV version 2.3