LCOV - code coverage report
Current view: top level - capy/io - any_buffer_sink.hpp (source / functions) Coverage Total Hit
Test: coverage_filtered.info Lines: 84.3 % 115 97
Test Date: 2026-02-01 07:03:35 Functions: 90.0 % 30 27

            Line data    Source code
       1              : //
       2              : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
       3              : //
       4              : // Distributed under the Boost Software License, Version 1.0. (See accompanying
       5              : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
       6              : //
       7              : // Official repository: https://github.com/cppalliance/capy
       8              : //
       9              : 
      10              : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
      11              : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
      12              : 
      13              : #include <boost/capy/detail/config.hpp>
      14              : #include <boost/capy/detail/await_suspend_helper.hpp>
      15              : #include <boost/capy/buffers.hpp>
      16              : #include <boost/capy/buffers/buffer_copy.hpp>
      17              : #include <boost/capy/buffers/buffer_param.hpp>
      18              : #include <boost/capy/concept/buffer_sink.hpp>
      19              : #include <boost/capy/concept/io_awaitable.hpp>
      20              : #include <boost/capy/concept/write_sink.hpp>
      21              : #include <boost/capy/coro.hpp>
      22              : #include <boost/capy/ex/executor_ref.hpp>
      23              : #include <boost/capy/io_result.hpp>
      24              : #include <boost/capy/task.hpp>
      25              : 
      26              : #include <system_error>
      27              : 
      28              : #include <concepts>
      29              : #include <coroutine>
      30              : #include <cstddef>
      31              : #include <exception>
      32              : #include <new>
      33              : #include <stop_token>
      34              : #include <utility>
      35              : 
      36              : namespace boost {
      37              : namespace capy {
      38              : 
      39              : /** Type-erased wrapper for any BufferSink.
      40              : 
      41              :     This class provides type erasure for any type satisfying the
      42              :     @ref BufferSink concept, enabling runtime polymorphism for
      43              :     buffer sink operations. It uses cached awaitable storage to achieve
      44              :     zero steady-state allocation after construction.
      45              : 
      46              :     The wrapper also satisfies @ref WriteSink through templated
      47              :     @ref write methods. These methods copy data from the caller's
      48              :     buffers into the sink's internal storage, incurring one extra
      49              :     buffer copy compared to using @ref prepare and @ref commit
      50              :     directly.
      51              : 
      52              :     The wrapper supports two construction modes:
      53              :     - **Owning**: Pass by value to transfer ownership. The wrapper
      54              :       allocates storage and owns the sink.
      55              :     - **Reference**: Pass a pointer to wrap without ownership. The
      56              :       pointed-to sink must outlive this wrapper.
      57              : 
      58              :     @par Awaitable Preallocation
      59              :     The constructor preallocates storage for the type-erased awaitable.
      60              :     This reserves all virtual address space at server startup
      61              :     so memory usage can be measured up front, rather than
      62              :     allocating piecemeal as traffic arrives.
      63              : 
      64              :     @par Thread Safety
      65              :     Not thread-safe. Concurrent operations on the same wrapper
      66              :     are undefined behavior.
      67              : 
      68              :     @par Example
      69              :     @code
      70              :     // Owning - takes ownership of the sink
      71              :     any_buffer_sink abs(some_buffer_sink{args...});
      72              : 
      73              :     // Reference - wraps without ownership
      74              :     some_buffer_sink sink;
      75              :     any_buffer_sink abs(&sink);
      76              : 
      77              :     mutable_buffer arr[16];
      78              :     std::size_t count = abs.prepare(arr, 16);
      79              :     // Write data into arr[0..count)
      80              :     auto [ec] = co_await abs.commit(bytes_written);
      81              :     auto [ec2] = co_await abs.commit_eof();
      82              :     @endcode
      83              : 
      84              :     @see any_buffer_source, BufferSink, WriteSink
      85              : */
      86              : class any_buffer_sink
      87              : {
      88              :     struct vtable;
      89              :     struct awaitable_ops;
      90              : 
      91              :     template<BufferSink S>
      92              :     struct vtable_for_impl;
      93              : 
      94              :     void* sink_ = nullptr;
      95              :     vtable const* vt_ = nullptr;
      96              :     void* cached_awaitable_ = nullptr;
      97              :     void* storage_ = nullptr;
      98              :     awaitable_ops const* active_ops_ = nullptr;
      99              : 
     100              : public:
     101              :     /** Destructor.
     102              : 
     103              :         Destroys the owned sink (if any) and releases the cached
     104              :         awaitable storage.
     105              :     */
     106              :     ~any_buffer_sink();
     107              : 
     108              :     /** Default constructor.
     109              : 
     110              :         Constructs an empty wrapper. Operations on a default-constructed
     111              :         wrapper result in undefined behavior.
     112              :     */
     113              :     any_buffer_sink() = default;
     114              : 
     115              :     /** Non-copyable.
     116              : 
     117              :         The awaitable cache is per-instance and cannot be shared.
     118              :     */
     119              :     any_buffer_sink(any_buffer_sink const&) = delete;
     120              :     any_buffer_sink& operator=(any_buffer_sink const&) = delete;
     121              : 
     122              :     /** Move constructor.
     123              : 
     124              :         Transfers ownership of the wrapped sink (if owned) and
     125              :         cached awaitable storage from `other`. After the move, `other` is
     126              :         in a default-constructed state.
     127              : 
     128              :         @param other The wrapper to move from.
     129              :     */
     130            1 :     any_buffer_sink(any_buffer_sink&& other) noexcept
     131            1 :         : sink_(std::exchange(other.sink_, nullptr))
     132            1 :         , vt_(std::exchange(other.vt_, nullptr))
     133            1 :         , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
     134            1 :         , storage_(std::exchange(other.storage_, nullptr))
     135            1 :         , active_ops_(std::exchange(other.active_ops_, nullptr))
     136              :     {
     137            1 :     }
     138              : 
     139              :     /** Move assignment operator.
     140              : 
     141              :         Destroys any owned sink and releases existing resources,
     142              :         then transfers ownership from `other`.
     143              : 
     144              :         @param other The wrapper to move from.
     145              :         @return Reference to this wrapper.
     146              :     */
     147              :     any_buffer_sink&
     148              :     operator=(any_buffer_sink&& other) noexcept;
     149              : 
     150              :     /** Construct by taking ownership of a BufferSink.
     151              : 
     152              :         Allocates storage and moves the sink into this wrapper.
     153              :         The wrapper owns the sink and will destroy it.
     154              : 
     155              :         @param s The sink to take ownership of.
     156              :     */
     157              :     template<BufferSink S>
     158              :         requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
     159              :     any_buffer_sink(S s);
     160              : 
     161              :     /** Construct by wrapping a BufferSink without ownership.
     162              : 
     163              :         Wraps the given sink by pointer. The sink must remain
     164              :         valid for the lifetime of this wrapper.
     165              : 
     166              :         @param s Pointer to the sink to wrap.
     167              :     */
     168              :     template<BufferSink S>
     169              :     any_buffer_sink(S* s);
     170              : 
     171              :     /** Check if the wrapper contains a valid sink.
     172              : 
     173              :         @return `true` if wrapping a sink, `false` if default-constructed
     174              :             or moved-from.
     175              :     */
     176              :     bool
     177            9 :     has_value() const noexcept
     178              :     {
     179            9 :         return sink_ != nullptr;
     180              :     }
     181              : 
     182              :     /** Check if the wrapper contains a valid sink.
     183              : 
     184              :         @return `true` if wrapping a sink, `false` if default-constructed
     185              :             or moved-from.
     186              :     */
     187              :     explicit
     188            2 :     operator bool() const noexcept
     189              :     {
     190            2 :         return has_value();
     191              :     }
     192              : 
     193              :     /** Prepare writable buffers.
     194              : 
     195              :         Fills the provided array with mutable buffer descriptors
     196              :         pointing to the underlying sink's internal storage. This
     197              :         operation is synchronous.
     198              : 
     199              :         @param arr Pointer to array of mutable_buffer to fill.
     200              :         @param max_count Maximum number of buffers to fill.
     201              : 
     202              :         @return The number of buffers filled.
     203              : 
     204              :         @par Preconditions
     205              :         The wrapper must contain a valid sink (`has_value() == true`).
     206              :     */
     207              :     std::size_t
     208              :     prepare(mutable_buffer* arr, std::size_t max_count);
     209              : 
     210              :     /** Commit bytes written to the prepared buffers.
     211              : 
     212              :         Commits `n` bytes written to the buffers returned by the
     213              :         most recent call to @ref prepare. The operation may trigger
     214              :         underlying I/O.
     215              : 
     216              :         @param n The number of bytes to commit.
     217              : 
     218              :         @return An awaitable yielding `(error_code)`.
     219              : 
     220              :         @par Preconditions
     221              :         The wrapper must contain a valid sink (`has_value() == true`).
     222              :     */
     223              :     auto
     224              :     commit(std::size_t n);
     225              : 
     226              :     /** Commit bytes written with optional end-of-stream.
     227              : 
     228              :         Commits `n` bytes written to the buffers returned by the
     229              :         most recent call to @ref prepare. If `eof` is true, also
     230              :         signals end-of-stream.
     231              : 
     232              :         @param n The number of bytes to commit.
     233              :         @param eof If true, signals end-of-stream after committing.
     234              : 
     235              :         @return An awaitable yielding `(error_code)`.
     236              : 
     237              :         @par Preconditions
     238              :         The wrapper must contain a valid sink (`has_value() == true`).
     239              :     */
     240              :     auto
     241              :     commit(std::size_t n, bool eof);
     242              : 
     243              :     /** Signal end-of-stream.
     244              : 
     245              :         Indicates that no more data will be written to the sink.
     246              :         The operation completes when the sink is finalized, or
     247              :         an error occurs.
     248              : 
     249              :         @return An awaitable yielding `(error_code)`.
     250              : 
     251              :         @par Preconditions
     252              :         The wrapper must contain a valid sink (`has_value() == true`).
     253              :     */
     254              :     auto
     255              :     commit_eof();
     256              : 
     257              :     /** Write data from a buffer sequence.
     258              : 
     259              :         Writes all data from the buffer sequence to the underlying
     260              :         sink. This method satisfies the @ref WriteSink concept.
     261              : 
     262              :         @note This operation copies data from the caller's buffers
     263              :         into the sink's internal buffers. For zero-copy writes,
     264              :         use @ref prepare and @ref commit directly.
     265              : 
     266              :         @param buffers The buffer sequence to write.
     267              : 
     268              :         @return An awaitable yielding `(error_code,std::size_t)`.
     269              : 
     270              :         @par Preconditions
     271              :         The wrapper must contain a valid sink (`has_value() == true`).
     272              :     */
     273              :     template<ConstBufferSequence CB>
     274              :     task<io_result<std::size_t>>
     275              :     write(CB buffers);
     276              : 
     277              :     /** Write data with optional end-of-stream.
     278              : 
     279              :         Writes all data from the buffer sequence to the underlying
     280              :         sink, optionally finalizing it afterwards. This method
     281              :         satisfies the @ref WriteSink concept.
     282              : 
     283              :         @note This operation copies data from the caller's buffers
     284              :         into the sink's internal buffers. For zero-copy writes,
     285              :         use @ref prepare and @ref commit directly.
     286              : 
     287              :         @param buffers The buffer sequence to write.
     288              :         @param eof If true, finalize the sink after writing.
     289              : 
     290              :         @return An awaitable yielding `(error_code,std::size_t)`.
     291              : 
     292              :         @par Preconditions
     293              :         The wrapper must contain a valid sink (`has_value() == true`).
     294              :     */
     295              :     template<ConstBufferSequence CB>
     296              :     task<io_result<std::size_t>>
     297              :     write(CB buffers, bool eof);
     298              : 
     299              :     /** Signal end-of-stream.
     300              : 
     301              :         Indicates that no more data will be written to the sink.
     302              :         This method satisfies the @ref WriteSink concept.
     303              : 
     304              :         @return An awaitable yielding `(error_code)`.
     305              : 
     306              :         @par Preconditions
     307              :         The wrapper must contain a valid sink (`has_value() == true`).
     308              :     */
     309              :     auto
     310              :     write_eof();
     311              : 
     312              : protected:
     313              :     /** Rebind to a new sink after move.
     314              : 
     315              :         Updates the internal pointer to reference a new sink object.
     316              :         Used by owning wrappers after move assignment when the owned
     317              :         object has moved to a new location.
     318              : 
     319              :         @param new_sink The new sink to bind to. Must be the same
     320              :             type as the original sink.
     321              : 
     322              :         @note Terminates if called with a sink of different type
     323              :             than the original.
     324              :     */
     325              :     template<BufferSink S>
     326              :     void
     327              :     rebind(S& new_sink) noexcept
     328              :     {
     329              :         if(vt_ != &vtable_for_impl<S>::value)
     330              :             std::terminate();
     331              :         sink_ = &new_sink;
     332              :     }
     333              : };
     334              : 
     335              : //----------------------------------------------------------
     336              : 
     337              : struct any_buffer_sink::awaitable_ops
     338              : {
     339              :     bool (*await_ready)(void*);
     340              :     coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
     341              :     io_result<> (*await_resume)(void*);
     342              :     void (*destroy)(void*) noexcept;
     343              : };
     344              : 
     345              : struct any_buffer_sink::vtable
     346              : {
     347              :     void (*destroy)(void*) noexcept;
     348              :     std::size_t (*do_prepare)(
     349              :         void* sink,
     350              :         mutable_buffer* arr,
     351              :         std::size_t max_count);
     352              :     std::size_t awaitable_size;
     353              :     std::size_t awaitable_align;
     354              :     awaitable_ops const* (*construct_commit_awaitable)(
     355              :         void* sink,
     356              :         void* storage,
     357              :         std::size_t n,
     358              :         bool eof);
     359              :     awaitable_ops const* (*construct_eof_awaitable)(
     360              :         void* sink,
     361              :         void* storage);
     362              : };
     363              : 
     364              : template<BufferSink S>
     365              : struct any_buffer_sink::vtable_for_impl
     366              : {
     367              :     using CommitAwaitable = decltype(std::declval<S&>().commit(
     368              :         std::size_t{}, false));
     369              :     using EofAwaitable = decltype(std::declval<S&>().commit_eof());
     370              : 
     371              :     static void
     372            0 :     do_destroy_impl(void* sink) noexcept
     373              :     {
     374            0 :         static_cast<S*>(sink)->~S();
     375            0 :     }
     376              : 
     377              :     static std::size_t
     378           68 :     do_prepare_impl(
     379              :         void* sink,
     380              :         mutable_buffer* arr,
     381              :         std::size_t max_count)
     382              :     {
     383           68 :         auto& s = *static_cast<S*>(sink);
     384           68 :         return s.prepare(arr, max_count);
     385              :     }
     386              : 
     387              :     static awaitable_ops const*
     388           48 :     construct_commit_awaitable_impl(
     389              :         void* sink,
     390              :         void* storage,
     391              :         std::size_t n,
     392              :         bool eof)
     393              :     {
     394           48 :         auto& s = *static_cast<S*>(sink);
     395           48 :         ::new(storage) CommitAwaitable(s.commit(n, eof));
     396              : 
     397              :         static constexpr awaitable_ops ops = {
     398           48 :             +[](void* p) {
     399           48 :                 return static_cast<CommitAwaitable*>(p)->await_ready();
     400              :             },
     401            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     402            0 :                 return detail::call_await_suspend(
     403            0 :                     static_cast<CommitAwaitable*>(p), h, ex, token);
     404              :             },
     405           48 :             +[](void* p) {
     406           48 :                 return static_cast<CommitAwaitable*>(p)->await_resume();
     407              :             },
     408           48 :             +[](void* p) noexcept {
     409           48 :                 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
     410              :             }
     411              :         };
     412           48 :         return &ops;
     413              :     }
     414              : 
     415              :     static awaitable_ops const*
     416           18 :     construct_eof_awaitable_impl(
     417              :         void* sink,
     418              :         void* storage)
     419              :     {
     420           18 :         auto& s = *static_cast<S*>(sink);
     421           18 :         ::new(storage) EofAwaitable(s.commit_eof());
     422              : 
     423              :         static constexpr awaitable_ops ops = {
     424           18 :             +[](void* p) {
     425           18 :                 return static_cast<EofAwaitable*>(p)->await_ready();
     426              :             },
     427            0 :             +[](void* p, coro h, executor_ref ex, std::stop_token token) {
     428            0 :                 return detail::call_await_suspend(
     429            0 :                     static_cast<EofAwaitable*>(p), h, ex, token);
     430              :             },
     431           18 :             +[](void* p) {
     432           18 :                 return static_cast<EofAwaitable*>(p)->await_resume();
     433              :             },
     434           18 :             +[](void* p) noexcept {
     435           18 :                 static_cast<EofAwaitable*>(p)->~EofAwaitable();
     436              :             }
     437              :         };
     438           18 :         return &ops;
     439              :     }
     440              : 
     441              :     static constexpr std::size_t max_awaitable_size =
     442              :         sizeof(CommitAwaitable) > sizeof(EofAwaitable)
     443              :             ? sizeof(CommitAwaitable)
     444              :             : sizeof(EofAwaitable);
     445              : 
     446              :     static constexpr std::size_t max_awaitable_align =
     447              :         alignof(CommitAwaitable) > alignof(EofAwaitable)
     448              :             ? alignof(CommitAwaitable)
     449              :             : alignof(EofAwaitable);
     450              : 
     451              :     static constexpr vtable value = {
     452              :         &do_destroy_impl,
     453              :         &do_prepare_impl,
     454              :         max_awaitable_size,
     455              :         max_awaitable_align,
     456              :         &construct_commit_awaitable_impl,
     457              :         &construct_eof_awaitable_impl
     458              :     };
     459              : };
     460              : 
     461              : //----------------------------------------------------------
     462              : 
     463              : inline
     464           63 : any_buffer_sink::~any_buffer_sink()
     465              : {
     466           63 :     if(storage_)
     467              :     {
     468            0 :         vt_->destroy(sink_);
     469            0 :         ::operator delete(storage_);
     470              :     }
     471           63 :     if(cached_awaitable_)
     472           60 :         ::operator delete(cached_awaitable_);
     473           63 : }
     474              : 
     475              : inline any_buffer_sink&
     476            1 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
     477              : {
     478            1 :     if(this != &other)
     479              :     {
     480            1 :         if(storage_)
     481              :         {
     482            0 :             vt_->destroy(sink_);
     483            0 :             ::operator delete(storage_);
     484              :         }
     485            1 :         if(cached_awaitable_)
     486            0 :             ::operator delete(cached_awaitable_);
     487            1 :         sink_ = std::exchange(other.sink_, nullptr);
     488            1 :         vt_ = std::exchange(other.vt_, nullptr);
     489            1 :         cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
     490            1 :         storage_ = std::exchange(other.storage_, nullptr);
     491            1 :         active_ops_ = std::exchange(other.active_ops_, nullptr);
     492              :     }
     493            1 :     return *this;
     494              : }
     495              : 
     496              : template<BufferSink S>
     497              :     requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
     498              : any_buffer_sink::any_buffer_sink(S s)
     499              :     : vt_(&vtable_for_impl<S>::value)
     500              : {
     501              :     struct guard {
     502              :         any_buffer_sink* self;
     503              :         bool committed = false;
     504              :         ~guard() {
     505              :             if(!committed && self->storage_) {
     506              :                 self->vt_->destroy(self->sink_);
     507              :                 ::operator delete(self->storage_);
     508              :                 self->storage_ = nullptr;
     509              :                 self->sink_ = nullptr;
     510              :             }
     511              :         }
     512              :     } g{this};
     513              : 
     514              :     storage_ = ::operator new(sizeof(S));
     515              :     sink_ = ::new(storage_) S(std::move(s));
     516              : 
     517              :     // Preallocate the awaitable storage (sized for max of commit/eof)
     518              :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     519              : 
     520              :     g.committed = true;
     521              : }
     522              : 
     523              : template<BufferSink S>
     524           60 : any_buffer_sink::any_buffer_sink(S* s)
     525           60 :     : sink_(s)
     526           60 :     , vt_(&vtable_for_impl<S>::value)
     527              : {
     528              :     // Preallocate the awaitable storage (sized for max of commit/eof)
     529           60 :     cached_awaitable_ = ::operator new(vt_->awaitable_size);
     530           60 : }
     531              : 
     532              : //----------------------------------------------------------
     533              : 
     534              : inline std::size_t
     535           68 : any_buffer_sink::prepare(
     536              :     mutable_buffer* arr,
     537              :     std::size_t max_count)
     538              : {
     539           68 :     return vt_->do_prepare(sink_, arr, max_count);
     540              : }
     541              : 
     542              : inline auto
     543           48 : any_buffer_sink::commit(std::size_t n, bool eof)
     544              : {
     545              :     struct awaitable
     546              :     {
     547              :         any_buffer_sink* self_;
     548              :         std::size_t n_;
     549              :         bool eof_;
     550              : 
     551              :         bool
     552           48 :         await_ready() const noexcept
     553              :         {
     554           48 :             return false;
     555              :         }
     556              : 
     557              :         coro
     558           48 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     559              :         {
     560              :             // Construct the underlying awaitable into cached storage
     561           96 :             self_->active_ops_ = self_->vt_->construct_commit_awaitable(
     562           48 :                 self_->sink_,
     563           48 :                 self_->cached_awaitable_,
     564              :                 n_,
     565           48 :                 eof_);
     566              : 
     567              :             // Check if underlying is immediately ready
     568           48 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     569           48 :                 return h;
     570              : 
     571              :             // Forward to underlying awaitable
     572            0 :             return self_->active_ops_->await_suspend(
     573            0 :                 self_->cached_awaitable_, h, ex, token);
     574              :         }
     575              : 
     576              :         io_result<>
     577           48 :         await_resume()
     578              :         {
     579              :             struct guard {
     580              :                 any_buffer_sink* self;
     581           48 :                 ~guard() {
     582           48 :                     self->active_ops_->destroy(self->cached_awaitable_);
     583           48 :                     self->active_ops_ = nullptr;
     584           48 :                 }
     585           48 :             } g{self_};
     586           48 :             return self_->active_ops_->await_resume(
     587           85 :                 self_->cached_awaitable_);
     588           48 :         }
     589              :     };
     590           48 :     return awaitable{this, n, eof};
     591              : }
     592              : 
     593              : inline auto
     594           38 : any_buffer_sink::commit(std::size_t n)
     595              : {
     596           38 :     return commit(n, false);
     597              : }
     598              : 
     599              : inline auto
     600           18 : any_buffer_sink::commit_eof()
     601              : {
     602              :     struct awaitable
     603              :     {
     604              :         any_buffer_sink* self_;
     605              : 
     606              :         bool
     607           18 :         await_ready() const noexcept
     608              :         {
     609           18 :             return false;
     610              :         }
     611              : 
     612              :         coro
     613           18 :         await_suspend(coro h, executor_ref ex, std::stop_token token)
     614              :         {
     615              :             // Construct the underlying awaitable into cached storage
     616           36 :             self_->active_ops_ = self_->vt_->construct_eof_awaitable(
     617           18 :                 self_->sink_,
     618           18 :                 self_->cached_awaitable_);
     619              : 
     620              :             // Check if underlying is immediately ready
     621           18 :             if(self_->active_ops_->await_ready(self_->cached_awaitable_))
     622           18 :                 return h;
     623              : 
     624              :             // Forward to underlying awaitable
     625            0 :             return self_->active_ops_->await_suspend(
     626            0 :                 self_->cached_awaitable_, h, ex, token);
     627              :         }
     628              : 
     629              :         io_result<>
     630           18 :         await_resume()
     631              :         {
     632              :             struct guard {
     633              :                 any_buffer_sink* self;
     634           18 :                 ~guard() {
     635           18 :                     self->active_ops_->destroy(self->cached_awaitable_);
     636           18 :                     self->active_ops_ = nullptr;
     637           18 :                 }
     638           18 :             } g{self_};
     639           18 :             return self_->active_ops_->await_resume(
     640           31 :                 self_->cached_awaitable_);
     641           18 :         }
     642              :     };
     643           18 :     return awaitable{this};
     644              : }
     645              : 
     646              : //----------------------------------------------------------
     647              : 
     648              : template<ConstBufferSequence CB>
     649              : task<io_result<std::size_t>>
     650              : any_buffer_sink::write(CB buffers)
     651              : {
     652              :     return write(buffers, false);
     653              : }
     654              : 
     655              : template<ConstBufferSequence CB>
     656              : task<io_result<std::size_t>>
     657              : any_buffer_sink::write(CB buffers, bool eof)
     658              : {
     659              :     buffer_param<CB> bp(buffers);
     660              :     std::size_t total = 0;
     661              : 
     662              :     for(;;)
     663              :     {
     664              :         auto src = bp.data();
     665              :         if(src.empty())
     666              :             break;
     667              : 
     668              :         mutable_buffer arr[detail::max_iovec_];
     669              :         std::size_t count = prepare(arr, detail::max_iovec_);
     670              :         if(count == 0)
     671              :         {
     672              :             auto [ec] = co_await commit(0);
     673              :             if(ec)
     674              :                 co_return {ec, total};
     675              :             continue;
     676              :         }
     677              : 
     678              :         auto n = buffer_copy(std::span(arr, count), src);
     679              :         auto [ec] = co_await commit(n);
     680              :         if(ec)
     681              :             co_return {ec, total};
     682              :         bp.consume(n);
     683              :         total += n;
     684              :     }
     685              : 
     686              :     if(eof)
     687              :     {
     688              :         auto [ec] = co_await commit_eof();
     689              :         if(ec)
     690              :             co_return {ec, total};
     691              :     }
     692              : 
     693              :     co_return {{}, total};
     694              : }
     695              : 
     696              : inline auto
     697              : any_buffer_sink::write_eof()
     698              : {
     699              :     return commit_eof();
     700              : }
     701              : 
     702              : //----------------------------------------------------------
     703              : 
     704              : static_assert(WriteSink<any_buffer_sink>);
     705              : 
     706              : } // namespace capy
     707              : } // namespace boost
     708              : 
     709              : #endif
        

Generated by: LCOV version 2.3