LCOV - code coverage report
Current view: top level - capy/io - any_buffer_source.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 83.5 % 79 66
Test Date: 2026-02-01 07:03:35 Functions: 89.5 % 19 17

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_copy.hpp>
      17              : #include <boost/capy/buffers/slice.hpp>
      18              : #include <boost/capy/concept/buffer_source.hpp>
      19              : #include <boost/capy/concept/io_awaitable.hpp>
      20              : #include <boost/capy/concept/read_source.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/error.hpp>
      23              : #include <boost/capy/ex/executor_ref.hpp>
      24              : #include <boost/capy/io_result.hpp>
      25              : #include <boost/capy/task.hpp>
      26              : 
      27              : #include <system_error>
      28              : 
      29              : #include <concepts>
      30              : #include <coroutine>
      31              : #include <cstddef>
      32              : #include <exception>
      33              : #include <new>
      34              : #include <span>
      35              : #include <stop_token>
      36              : #include <utility>
      37              : 
      38              : namespace boost {
      39              : namespace capy {
      40              : 
      41              : /** Type-erased wrapper for any BufferSource.
      42              : 
      43              :     This class provides type erasure for any type satisfying the
      44              :     @ref BufferSource concept, enabling runtime polymorphism for
      45              :     buffer pull operations. The wrapper also satisfies @ref ReadSource,
      46              :     allowing it to be used with code expecting either interface.
      47              :     It uses cached awaitable storage to achieve zero steady-state
      48              :     allocation after construction.
      49              : 
      50              :     The wrapper also satisfies @ref ReadSource through the templated
      51              :     @ref read method. This method copies data from the source's
      52              :     internal buffers into the caller's buffers, incurring one extra
      53              :     buffer copy compared to using @ref pull and @ref consume directly.
      54              : 
      55              :     The wrapper supports two construction modes:
      56              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      57              :       allocates storage and owns the source.
      58              :     - **Reference**: Pass a pointer to wrap without ownership. The
      59              :       pointed-to source must outlive this wrapper.
      60              : 
      61              :     @par Awaitable Preallocation
      62              :     The constructor preallocates storage for the type-erased awaitable.
      63              :     This reserves all virtual address space at server startup
      64              :     so memory usage can be measured up front, rather than
      65              :     allocating piecemeal as traffic arrives.
      66              : 
      67              :     @par Thread Safety
      68              :     Not thread-safe. Concurrent operations on the same wrapper
      69              :     are undefined behavior.
      70              : 
      71              :     @par Example
      72              :     @code
      73              :     // Owning - takes ownership of the source
      74              :     any_buffer_source abs(some_buffer_source{args...});
      75              : 
      76              :     // Reference - wraps without ownership
      77              :     some_buffer_source src;
      78              :     any_buffer_source abs(&src);
      79              : 
      80              :     const_buffer arr[16];
      81              :     auto [ec, count] = co_await abs.pull(arr, 16);
      82              :     @endcode
      83              : 
      84              :     @see any_write_sink, BufferSource, ReadSource
      85              : */
      86              : class any_buffer_source
      87              : {
      88              :     struct vtable;
      89              :     struct awaitable_ops;
      90              : 
      91              :     template<BufferSource S>
      92              :     struct vtable_for_impl;
      93              : 
      94              :     void* source_ = nullptr;
      95              :     vtable const* vt_ = nullptr;
      96              :     void* cached_awaitable_ = nullptr;
      97              :     void* storage_ = nullptr;
      98              :     awaitable_ops const* active_ops_ = nullptr;
      99              : 
     100              : public:
     101              :     /** Destructor.
     102              : 
     103              :         Destroys the owned source (if any) and releases the cached
     104              :         awaitable storage.
     105              :     */
     106              :     ~any_buffer_source();
     107              : 
     108              :     /** Default constructor.
     109              : 
     110              :         Constructs an empty wrapper. Operations on a default-constructed
     111              :         wrapper result in undefined behavior.
     112              :     */
     113              :     any_buffer_source() = default;
     114              : 
     115              :     /** Non-copyable.
     116              : 
     117              :         The awaitable cache is per-instance and cannot be shared.
     118              :     */
     119              :     any_buffer_source(any_buffer_source const&) = delete;
     120              :     any_buffer_source& operator=(any_buffer_source const&) = delete;
     121              : 
     122              :     /** Move constructor.
     123              : 
     124              :         Transfers ownership of the wrapped source (if owned) and
     125              :         cached awaitable storage from `other`. After the move, `other` is
     126              :         in a default-constructed state.
     127              : 
     128              :         @param other The wrapper to move from.
     129              :     */
     130            1 :     any_buffer_source(any_buffer_source&& other) noexcept
     131            1 :         : source_(std::exchange(other.source_, nullptr))
     132            1 :         , vt_(std::exchange(other.vt_, nullptr))
     133            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     134            1 :         , storage_(std::exchange(other.storage_, nullptr))
     135            1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     136              :     {
     137            1 :     }
     138              : 
     139              :     /** Move assignment operator.
     140              : 
     141              :         Destroys any owned source and releases existing resources,
     142              :         then transfers ownership from `other`.
     143              : 
     144              :         @param other The wrapper to move from.
     145              :         @return Reference to this wrapper.
     146              :     */
     147              :     any_buffer_source&
     148              :     operator=(any_buffer_source&& other) noexcept;
     149              : 
     150              :     /** Construct by taking ownership of a BufferSource.
     151              : 
     152              :         Allocates storage and moves the source into this wrapper.
     153              :         The wrapper owns the source and will destroy it.
     154              : 
     155              :         @param s The source to take ownership of.
     156              :     */
     157              :     template<BufferSource S>
     158              :         requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     159              :     any_buffer_source(S s);
     160              : 
     161              :     /** Construct by wrapping a BufferSource without ownership.
     162              : 
     163              :         Wraps the given source by pointer. The source must remain
     164              :         valid for the lifetime of this wrapper.
     165              : 
     166              :         @param s Pointer to the source to wrap.
     167              :     */
     168              :     template<BufferSource S>
     169              :     any_buffer_source(S* s);
     170              : 
     171              :     /** Check if the wrapper contains a valid source.
     172              : 
     173              :         @return `true` if wrapping a source, `false` if default-constructed
     174              :             or moved-from.
     175              :     */
     176              :     bool
     177            9 :     has_value() const noexcept
     178              :     {
     179            9 :         return source_ != nullptr;
     180              :     }
     181              : 
     182              :     /** Check if the wrapper contains a valid source.
     183              : 
     184              :         @return `true` if wrapping a source, `false` if default-constructed
     185              :             or moved-from.
     186              :     */
     187              :     explicit
     188            2 :     operator bool() const noexcept
     189              :     {
     190            2 :         return has_value();
     191              :     }
     192              : 
     193              :     /** Consume bytes from the source.
     194              : 
     195              :         Advances the internal read position of the underlying source
     196              :         by the specified number of bytes. The next call to @ref pull
     197              :         returns data starting after the consumed bytes.
     198              : 
     199              :         @param n The number of bytes to consume. Must not exceed the
     200              :         total size of buffers returned by the previous @ref pull.
     201              : 
     202              :         @par Preconditions
     203              :         The wrapper must contain a valid source (`has_value() == true`).
     204              :     */
     205              :     void
     206              :     consume(std::size_t n) noexcept;
     207              : 
     208              :     /** Pull buffer data from the source.
     209              : 
     210              :         Fills the provided array with buffer descriptors from the
     211              :         underlying source. The operation completes when data is
     212              :         available, the source is exhausted, or an error occurs.
     213              : 
     214              :         @param arr Pointer to array of const_buffer to fill.
     215              :         @param max_count Maximum number of buffers to fill.
     216              : 
     217              :         @return An awaitable yielding `(error_code,std::size_t)`.
     218              :             On success with data, `count > 0` indicates buffers filled.
     219              :             On success with `count == 0`, source is exhausted.
     220              : 
     221              :         @par Preconditions
     222              :         The wrapper must contain a valid source (`has_value() == true`).
     223              :     */
     224              :     auto
     225              :     pull(const_buffer* arr, std::size_t max_count);
     226              : 
     227              :     /** Read data into a mutable buffer sequence.
     228              : 
     229              :         Fills the provided buffer sequence by pulling data from the
     230              :         underlying source and copying it into the caller's buffers.
     231              :         This satisfies @ref ReadSource but incurs a copy; for zero-copy
     232              :         access, use @ref pull and @ref consume instead.
     233              : 
     234              :         @note This operation copies data from the source's internal
     235              :         buffers into the caller's buffers. For zero-copy reads,
     236              :         use @ref pull and @ref consume directly.
     237              : 
     238              :         @param buffers The buffer sequence to fill.
     239              : 
     240              :         @return An awaitable yielding `(error_code,std::size_t)`.
     241              :             On success, `n == buffer_size(buffers)`.
     242              :             On EOF, `ec == error::eof` and `n` is bytes transferred.
     243              : 
     244              :         @par Preconditions
     245              :         The wrapper must contain a valid source (`has_value() == true`).
     246              : 
     247              :         @see pull, consume
     248              :     */
     249              :     template<MutableBufferSequence MB>
     250              :     task<io_result<std::size_t>>
     251              :     read(MB buffers);
     252              : 
     253              : protected:
     254              :     /** Rebind to a new source after move.
     255              : 
     256              :         Updates the internal pointer to reference a new source object.
     257              :         Used by owning wrappers after move assignment when the owned
     258              :         object has moved to a new location.
     259              : 
     260              :         @param new_source The new source to bind to. Must be the same
     261              :             type as the original source.
     262              : 
     263              :         @note Terminates if called with a source of different type
     264              :             than the original.
     265              :     */
     266              :     template<BufferSource S>
     267              :     void
     268              :     rebind(S& new_source) noexcept
     269              :     {
     270              :         if(vt_ != &vtable_for_impl<S>::value)
     271              :             std::terminate();
     272              :         source_ = &new_source;
     273              :     }
     274              : };
     275              : 
     276              : //----------------------------------------------------------
     277              : 
     278              : struct any_buffer_source::awaitable_ops
     279              : {
     280              :     bool (*await_ready)(void*);
     281              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     282              :     io_result<std::size_t> (*await_resume)(void*);
     283              :     void (*destroy)(void*) noexcept;
     284              : };
     285              : 
     286              : struct any_buffer_source::vtable
     287              : {
     288              :     void (*destroy)(void*) noexcept;
     289              :     void (*do_consume)(void* source, std::size_t n) noexcept;
     290              :     std::size_t awaitable_size;
     291              :     std::size_t awaitable_align;
     292              :     awaitable_ops const* (*construct_awaitable)(
     293              :         void* source,
     294              :         void* storage,
     295              :         const_buffer* arr,
     296              :         std::size_t max_count);
     297              : };
     298              : 
     299              : template<BufferSource S>
     300              : struct any_buffer_source::vtable_for_impl
     301              : {
     302              :     using Awaitable = decltype(std::declval<S&>().pull(
     303              :         std::declval<const_buffer*>(),
     304              :         std::declval<std::size_t>()));
     305              : 
     306              :     static void
     307            0 :     do_destroy_impl(void* source) noexcept
     308              :     {
     309            0 :         static_cast<S*>(source)->~S();
     310            0 :     }
     311              : 
     312              :     static void
     313           39 :     do_consume_impl(void* source, std::size_t n) noexcept
     314              :     {
     315           39 :         static_cast<S*>(source)->consume(n);
     316           39 :     }
     317              : 
     318              :     static awaitable_ops const*
     319           92 :     construct_awaitable_impl(
     320              :         void* source,
     321              :         void* storage,
     322              :         const_buffer* arr,
     323              :         std::size_t max_count)
     324              :     {
     325           92 :         auto& s = *static_cast<S*>(source);
     326           92 :         ::new(storage) Awaitable(s.pull(arr, max_count));
     327              : 
     328              :         static constexpr awaitable_ops ops = {
     329           92 :             +[](void* p) {
     330           92 :                 return static_cast<Awaitable*>(p)->await_ready();
     331              :             },
     332            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     333            0 :                 return detail::call_await_suspend(
     334            0 :                     static_cast<Awaitable*>(p), h, ex, token);
     335              :             },
     336           92 :             +[](void* p) {
     337           92 :                 return static_cast<Awaitable*>(p)->await_resume();
     338              :             },
     339           92 :             +[](void* p) noexcept {
     340           92 :                 static_cast<Awaitable*>(p)->~Awaitable();
     341              :             }
     342              :         };
     343           92 :         return &ops;
     344              :     }
     345              : 
     346              :     static constexpr vtable value = {
     347              :         &do_destroy_impl,
     348              :         &do_consume_impl,
     349              :         sizeof(Awaitable),
     350              :         alignof(Awaitable),
     351              :         &construct_awaitable_impl
     352              :     };
     353              : };
     354              : 
     355              : //----------------------------------------------------------
     356              : 
     357              : inline
     358           59 : any_buffer_source::~any_buffer_source()
     359              : {
     360           59 :     if(storage_)
     361              :     {
     362            0 :         vt_->destroy(source_);
     363            0 :         ::operator delete(storage_);
     364              :     }
     365           59 :     if(cached_awaitable_)
     366           56 :         ::operator delete(cached_awaitable_);
     367           59 : }
     368              : 
     369              : inline any_buffer_source&
     370            1 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
     371              : {
     372            1 :     if(this != &other)
     373              :     {
     374            1 :         if(storage_)
     375              :         {
     376            0 :             vt_->destroy(source_);
     377            0 :             ::operator delete(storage_);
     378              :         }
     379            1 :         if(cached_awaitable_)
     380            0 :             ::operator delete(cached_awaitable_);
     381            1 :         source_ = std::exchange(other.source_, nullptr);
     382            1 :         vt_ = std::exchange(other.vt_, nullptr);
     383            1 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     384            1 :         storage_ = std::exchange(other.storage_, nullptr);
     385            1 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     386              :     }
     387            1 :     return *this;
     388              : }
     389              : 
     390              : template<BufferSource S>
     391              :     requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
     392              : any_buffer_source::any_buffer_source(S s)
     393              :     : vt_(&vtable_for_impl<S>::value)
     394              : {
     395              :     struct guard {
     396              :         any_buffer_source* self;
     397              :         bool committed = false;
     398              :         ~guard() {
     399              :             if(!committed && self->storage_) {
     400              :                 self->vt_->destroy(self->source_);
     401              :                 ::operator delete(self->storage_);
     402              :                 self->storage_ = nullptr;
     403              :                 self->source_ = nullptr;
     404              :             }
     405              :         }
     406              :     } g{this};
     407              : 
     408              :     storage_ = ::operator new(sizeof(S));
     409              :     source_ = ::new(storage_) S(std::move(s));
     410              : 
     411              :     // Preallocate the awaitable storage
     412              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     413              : 
     414              :     g.committed = true;
     415              : }
     416              : 
     417              : template<BufferSource S>
     418           56 : any_buffer_source::any_buffer_source(S* s)
     419           56 :     : source_(s)
     420           56 :     , vt_(&vtable_for_impl<S>::value)
     421              : {
     422              :     // Preallocate the awaitable storage
     423           56 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     424           56 : }
     425              : 
     426              : //----------------------------------------------------------
     427              : 
     428              : inline void
     429           39 : any_buffer_source::consume(std::size_t n) noexcept
     430              : {
     431           39 :     vt_->do_consume(source_, n);
     432           39 : }
     433              : 
     434              : inline auto
     435           92 : any_buffer_source::pull(
     436              :     const_buffer* arr,
     437              :     std::size_t max_count)
     438              : {
     439              :     struct awaitable
     440              :     {
     441              :         any_buffer_source* self_;
     442              :         const_buffer* arr_;
     443              :         std::size_t max_count_;
     444              : 
     445              :         bool
     446           92 :         await_ready() const noexcept
     447              :         {
     448           92 :             return false;
     449              :         }
     450              : 
     451              :         coro
     452           92 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     453              :         {
     454              :             // Construct the underlying awaitable into cached storage
     455          184 :             self_->active_ops_ = self_->vt_->construct_awaitable(
     456           92 :                 self_->source_,
     457           92 :                 self_->cached_awaitable_,
     458              :                 arr_,
     459              :                 max_count_);
     460              : 
     461              :             // Check if underlying is immediately ready
     462           92 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     463           92 :                 return h;
     464              : 
     465              :             // Forward to underlying awaitable
     466            0 :             return self_->active_ops_->await_suspend(
     467            0 :                 self_->cached_awaitable_, h, ex, token);
     468              :         }
     469              : 
     470              :         io_result<std::size_t>
     471           92 :         await_resume()
     472              :         {
     473              :             struct guard {
     474              :                 any_buffer_source* self;
     475           92 :                 ~guard() {
     476           92 :                     self->active_ops_->destroy(self->cached_awaitable_);
     477           92 :                     self->active_ops_ = nullptr;
     478           92 :                 }
     479           92 :             } g{self_};
     480           92 :             return self_->active_ops_->await_resume(
     481          165 :                 self_->cached_awaitable_);
     482           92 :         }
     483              :     };
     484           92 :     return awaitable{this, arr, max_count};
     485              : }
     486              : 
     487              : template<MutableBufferSequence MB>
     488              : task<io_result<std::size_t>>
     489              : any_buffer_source::read(MB buffers)
     490              : {
     491              :     std::size_t total = 0;
     492              :     auto dest = sans_prefix(buffers, 0);
     493              : 
     494              :     while(!buffer_empty(dest))
     495              :     {
     496              :         const_buffer arr[detail::max_iovec_];
     497              :         auto [ec, count] = co_await pull(arr, detail::max_iovec_);
     498              : 
     499              :         if(ec)
     500              :             co_return {ec, total};
     501              : 
     502              :         if(count == 0)
     503              :             co_return {error::eof, total};
     504              : 
     505              :         auto n = buffer_copy(dest, std::span(arr, count));
     506              :         consume(n);
     507              :         total += n;
     508              :         dest = sans_prefix(dest, n);
     509              :     }
     510              : 
     511              :     co_return {{}, total};
     512              : }
     513              : 
     514              : //----------------------------------------------------------
     515              : 
     516              : static_assert(BufferSource<any_buffer_source>);
     517              : static_assert(ReadSource<any_buffer_source>);
     518              : 
     519              : } // namespace capy
     520              : } // namespace boost
     521              : 
     522              : #endif
        

Generated by: LCOV version 2.3