libs/capy/include/boost/capy/io/any_buffer_sink.hpp

84.3% Lines (97/115) 90.0% Functions (27/30) 55.6% Branches (10/18)
libs/capy/include/boost/capy/io/any_buffer_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/buffer_param.hpp>
18 #include <boost/capy/concept/buffer_sink.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/write_sink.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/ex/executor_ref.hpp>
23 #include <boost/capy/io_result.hpp>
24 #include <boost/capy/task.hpp>
25
26 #include <system_error>
27
28 #include <concepts>
29 #include <coroutine>
30 #include <cstddef>
31 #include <exception>
32 #include <new>
33 #include <stop_token>
34 #include <utility>
35
36 namespace boost {
37 namespace capy {
38
39 /** Type-erased wrapper for any BufferSink.
40
41 This class provides type erasure for any type satisfying the
42 @ref BufferSink concept, enabling runtime polymorphism for
43 buffer sink operations. It uses cached awaitable storage to achieve
44 zero steady-state allocation after construction.
45
46 The wrapper also satisfies @ref WriteSink through templated
47 @ref write methods. These methods copy data from the caller's
48 buffers into the sink's internal storage, incurring one extra
49 buffer copy compared to using @ref prepare and @ref commit
50 directly.
51
52 The wrapper supports two construction modes:
53 - **Owning**: Pass by value to transfer ownership. The wrapper
54 allocates storage and owns the sink.
55 - **Reference**: Pass a pointer to wrap without ownership. The
56 pointed-to sink must outlive this wrapper.
57
58 @par Awaitable Preallocation
59 The constructor preallocates storage for the type-erased awaitable.
60 This reserves all virtual address space at server startup
61 so memory usage can be measured up front, rather than
62 allocating piecemeal as traffic arrives.
63
64 @par Thread Safety
65 Not thread-safe. Concurrent operations on the same wrapper
66 are undefined behavior.
67
68 @par Example
69 @code
70 // Owning - takes ownership of the sink
71 any_buffer_sink abs(some_buffer_sink{args...});
72
73 // Reference - wraps without ownership
74 some_buffer_sink sink;
75 any_buffer_sink abs(&sink);
76
77 mutable_buffer arr[16];
78 std::size_t count = abs.prepare(arr, 16);
79 // Write data into arr[0..count)
80 auto [ec] = co_await abs.commit(bytes_written);
81 auto [ec2] = co_await abs.commit_eof();
82 @endcode
83
84 @see any_buffer_source, BufferSink, WriteSink
85 */
86 class any_buffer_sink
87 {
88 struct vtable;
89 struct awaitable_ops;
90
91 template<BufferSink S>
92 struct vtable_for_impl;
93
94 void* sink_ = nullptr;
95 vtable const* vt_ = nullptr;
96 void* cached_awaitable_ = nullptr;
97 void* storage_ = nullptr;
98 awaitable_ops const* active_ops_ = nullptr;
99
100 public:
101 /** Destructor.
102
103 Destroys the owned sink (if any) and releases the cached
104 awaitable storage.
105 */
106 ~any_buffer_sink();
107
108 /** Default constructor.
109
110 Constructs an empty wrapper. Operations on a default-constructed
111 wrapper result in undefined behavior.
112 */
113 any_buffer_sink() = default;
114
115 /** Non-copyable.
116
117 The awaitable cache is per-instance and cannot be shared.
118 */
119 any_buffer_sink(any_buffer_sink const&) = delete;
120 any_buffer_sink& operator=(any_buffer_sink const&) = delete;
121
122 /** Move constructor.
123
124 Transfers ownership of the wrapped sink (if owned) and
125 cached awaitable storage from `other`. After the move, `other` is
126 in a default-constructed state.
127
128 @param other The wrapper to move from.
129 */
130 1 any_buffer_sink(any_buffer_sink&& other) noexcept
131 1 : sink_(std::exchange(other.sink_, nullptr))
132 1 , vt_(std::exchange(other.vt_, nullptr))
133 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
134 1 , storage_(std::exchange(other.storage_, nullptr))
135 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
136 {
137 1 }
138
139 /** Move assignment operator.
140
141 Destroys any owned sink and releases existing resources,
142 then transfers ownership from `other`.
143
144 @param other The wrapper to move from.
145 @return Reference to this wrapper.
146 */
147 any_buffer_sink&
148 operator=(any_buffer_sink&& other) noexcept;
149
150 /** Construct by taking ownership of a BufferSink.
151
152 Allocates storage and moves the sink into this wrapper.
153 The wrapper owns the sink and will destroy it.
154
155 @param s The sink to take ownership of.
156 */
157 template<BufferSink S>
158 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
159 any_buffer_sink(S s);
160
161 /** Construct by wrapping a BufferSink without ownership.
162
163 Wraps the given sink by pointer. The sink must remain
164 valid for the lifetime of this wrapper.
165
166 @param s Pointer to the sink to wrap.
167 */
168 template<BufferSink S>
169 any_buffer_sink(S* s);
170
171 /** Check if the wrapper contains a valid sink.
172
173 @return `true` if wrapping a sink, `false` if default-constructed
174 or moved-from.
175 */
176 bool
177 9 has_value() const noexcept
178 {
179 9 return sink_ != nullptr;
180 }
181
182 /** Check if the wrapper contains a valid sink.
183
184 @return `true` if wrapping a sink, `false` if default-constructed
185 or moved-from.
186 */
187 explicit
188 2 operator bool() const noexcept
189 {
190 2 return has_value();
191 }
192
193 /** Prepare writable buffers.
194
195 Fills the provided array with mutable buffer descriptors
196 pointing to the underlying sink's internal storage. This
197 operation is synchronous.
198
199 @param arr Pointer to array of mutable_buffer to fill.
200 @param max_count Maximum number of buffers to fill.
201
202 @return The number of buffers filled.
203
204 @par Preconditions
205 The wrapper must contain a valid sink (`has_value() == true`).
206 */
207 std::size_t
208 prepare(mutable_buffer* arr, std::size_t max_count);
209
210 /** Commit bytes written to the prepared buffers.
211
212 Commits `n` bytes written to the buffers returned by the
213 most recent call to @ref prepare. The operation may trigger
214 underlying I/O.
215
216 @param n The number of bytes to commit.
217
218 @return An awaitable yielding `(error_code)`.
219
220 @par Preconditions
221 The wrapper must contain a valid sink (`has_value() == true`).
222 */
223 auto
224 commit(std::size_t n);
225
226 /** Commit bytes written with optional end-of-stream.
227
228 Commits `n` bytes written to the buffers returned by the
229 most recent call to @ref prepare. If `eof` is true, also
230 signals end-of-stream.
231
232 @param n The number of bytes to commit.
233 @param eof If true, signals end-of-stream after committing.
234
235 @return An awaitable yielding `(error_code)`.
236
237 @par Preconditions
238 The wrapper must contain a valid sink (`has_value() == true`).
239 */
240 auto
241 commit(std::size_t n, bool eof);
242
243 /** Signal end-of-stream.
244
245 Indicates that no more data will be written to the sink.
246 The operation completes when the sink is finalized, or
247 an error occurs.
248
249 @return An awaitable yielding `(error_code)`.
250
251 @par Preconditions
252 The wrapper must contain a valid sink (`has_value() == true`).
253 */
254 auto
255 commit_eof();
256
257 /** Write data from a buffer sequence.
258
259 Writes all data from the buffer sequence to the underlying
260 sink. This method satisfies the @ref WriteSink concept.
261
262 @note This operation copies data from the caller's buffers
263 into the sink's internal buffers. For zero-copy writes,
264 use @ref prepare and @ref commit directly.
265
266 @param buffers The buffer sequence to write.
267
268 @return An awaitable yielding `(error_code,std::size_t)`.
269
270 @par Preconditions
271 The wrapper must contain a valid sink (`has_value() == true`).
272 */
273 template<ConstBufferSequence CB>
274 task<io_result<std::size_t>>
275 write(CB buffers);
276
277 /** Write data with optional end-of-stream.
278
279 Writes all data from the buffer sequence to the underlying
280 sink, optionally finalizing it afterwards. This method
281 satisfies the @ref WriteSink concept.
282
283 @note This operation copies data from the caller's buffers
284 into the sink's internal buffers. For zero-copy writes,
285 use @ref prepare and @ref commit directly.
286
287 @param buffers The buffer sequence to write.
288 @param eof If true, finalize the sink after writing.
289
290 @return An awaitable yielding `(error_code,std::size_t)`.
291
292 @par Preconditions
293 The wrapper must contain a valid sink (`has_value() == true`).
294 */
295 template<ConstBufferSequence CB>
296 task<io_result<std::size_t>>
297 write(CB buffers, bool eof);
298
299 /** Signal end-of-stream.
300
301 Indicates that no more data will be written to the sink.
302 This method satisfies the @ref WriteSink concept.
303
304 @return An awaitable yielding `(error_code)`.
305
306 @par Preconditions
307 The wrapper must contain a valid sink (`has_value() == true`).
308 */
309 auto
310 write_eof();
311
312 protected:
313 /** Rebind to a new sink after move.
314
315 Updates the internal pointer to reference a new sink object.
316 Used by owning wrappers after move assignment when the owned
317 object has moved to a new location.
318
319 @param new_sink The new sink to bind to. Must be the same
320 type as the original sink.
321
322 @note Terminates if called with a sink of different type
323 than the original.
324 */
325 template<BufferSink S>
326 void
327 rebind(S& new_sink) noexcept
328 {
329 if(vt_ != &vtable_for_impl<S>::value)
330 std::terminate();
331 sink_ = &new_sink;
332 }
333 };
334
335 //----------------------------------------------------------
336
337 struct any_buffer_sink::awaitable_ops
338 {
339 bool (*await_ready)(void*);
340 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
341 io_result<> (*await_resume)(void*);
342 void (*destroy)(void*) noexcept;
343 };
344
345 struct any_buffer_sink::vtable
346 {
347 void (*destroy)(void*) noexcept;
348 std::size_t (*do_prepare)(
349 void* sink,
350 mutable_buffer* arr,
351 std::size_t max_count);
352 std::size_t awaitable_size;
353 std::size_t awaitable_align;
354 awaitable_ops const* (*construct_commit_awaitable)(
355 void* sink,
356 void* storage,
357 std::size_t n,
358 bool eof);
359 awaitable_ops const* (*construct_eof_awaitable)(
360 void* sink,
361 void* storage);
362 };
363
364 template<BufferSink S>
365 struct any_buffer_sink::vtable_for_impl
366 {
367 using CommitAwaitable = decltype(std::declval<S&>().commit(
368 std::size_t{}, false));
369 using EofAwaitable = decltype(std::declval<S&>().commit_eof());
370
371 static void
372 do_destroy_impl(void* sink) noexcept
373 {
374 static_cast<S*>(sink)->~S();
375 }
376
377 static std::size_t
378 68 do_prepare_impl(
379 void* sink,
380 mutable_buffer* arr,
381 std::size_t max_count)
382 {
383 68 auto& s = *static_cast<S*>(sink);
384 68 return s.prepare(arr, max_count);
385 }
386
387 static awaitable_ops const*
388 48 construct_commit_awaitable_impl(
389 void* sink,
390 void* storage,
391 std::size_t n,
392 bool eof)
393 {
394 48 auto& s = *static_cast<S*>(sink);
395 48 ::new(storage) CommitAwaitable(s.commit(n, eof));
396
397 static constexpr awaitable_ops ops = {
398 48 +[](void* p) {
399 48 return static_cast<CommitAwaitable*>(p)->await_ready();
400 },
401 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
402 return detail::call_await_suspend(
403 static_cast<CommitAwaitable*>(p), h, ex, token);
404 },
405 48 +[](void* p) {
406 48 return static_cast<CommitAwaitable*>(p)->await_resume();
407 },
408 48 +[](void* p) noexcept {
409 48 static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
410 }
411 };
412 48 return &ops;
413 }
414
415 static awaitable_ops const*
416 18 construct_eof_awaitable_impl(
417 void* sink,
418 void* storage)
419 {
420 18 auto& s = *static_cast<S*>(sink);
421 18 ::new(storage) EofAwaitable(s.commit_eof());
422
423 static constexpr awaitable_ops ops = {
424 18 +[](void* p) {
425 18 return static_cast<EofAwaitable*>(p)->await_ready();
426 },
427 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
428 return detail::call_await_suspend(
429 static_cast<EofAwaitable*>(p), h, ex, token);
430 },
431 18 +[](void* p) {
432 18 return static_cast<EofAwaitable*>(p)->await_resume();
433 },
434 18 +[](void* p) noexcept {
435 18 static_cast<EofAwaitable*>(p)->~EofAwaitable();
436 }
437 };
438 18 return &ops;
439 }
440
441 static constexpr std::size_t max_awaitable_size =
442 sizeof(CommitAwaitable) > sizeof(EofAwaitable)
443 ? sizeof(CommitAwaitable)
444 : sizeof(EofAwaitable);
445
446 static constexpr std::size_t max_awaitable_align =
447 alignof(CommitAwaitable) > alignof(EofAwaitable)
448 ? alignof(CommitAwaitable)
449 : alignof(EofAwaitable);
450
451 static constexpr vtable value = {
452 &do_destroy_impl,
453 &do_prepare_impl,
454 max_awaitable_size,
455 max_awaitable_align,
456 &construct_commit_awaitable_impl,
457 &construct_eof_awaitable_impl
458 };
459 };
460
461 //----------------------------------------------------------
462
463 inline
464 63 any_buffer_sink::~any_buffer_sink()
465 {
466
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 63 times.
63 if(storage_)
467 {
468 vt_->destroy(sink_);
469 ::operator delete(storage_);
470 }
471
2/2
✓ Branch 0 taken 60 times.
✓ Branch 1 taken 3 times.
63 if(cached_awaitable_)
472 60 ::operator delete(cached_awaitable_);
473 63 }
474
475 inline any_buffer_sink&
476 1 any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
477 {
478
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
479 {
480
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
481 {
482 vt_->destroy(sink_);
483 ::operator delete(storage_);
484 }
485
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
486 ::operator delete(cached_awaitable_);
487 1 sink_ = std::exchange(other.sink_, nullptr);
488 1 vt_ = std::exchange(other.vt_, nullptr);
489 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
490 1 storage_ = std::exchange(other.storage_, nullptr);
491 1 active_ops_ = std::exchange(other.active_ops_, nullptr);
492 }
493 1 return *this;
494 }
495
496 template<BufferSink S>
497 requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
498 any_buffer_sink::any_buffer_sink(S s)
499 : vt_(&vtable_for_impl<S>::value)
500 {
501 struct guard {
502 any_buffer_sink* self;
503 bool committed = false;
504 ~guard() {
505 if(!committed && self->storage_) {
506 self->vt_->destroy(self->sink_);
507 ::operator delete(self->storage_);
508 self->storage_ = nullptr;
509 self->sink_ = nullptr;
510 }
511 }
512 } g{this};
513
514 storage_ = ::operator new(sizeof(S));
515 sink_ = ::new(storage_) S(std::move(s));
516
517 // Preallocate the awaitable storage (sized for max of commit/eof)
518 cached_awaitable_ = ::operator new(vt_->awaitable_size);
519
520 g.committed = true;
521 }
522
523 template<BufferSink S>
524 60 any_buffer_sink::any_buffer_sink(S* s)
525 60 : sink_(s)
526 60 , vt_(&vtable_for_impl<S>::value)
527 {
528 // Preallocate the awaitable storage (sized for max of commit/eof)
529 60 cached_awaitable_ = ::operator new(vt_->awaitable_size);
530 60 }
531
532 //----------------------------------------------------------
533
534 inline std::size_t
535 68 any_buffer_sink::prepare(
536 mutable_buffer* arr,
537 std::size_t max_count)
538 {
539 68 return vt_->do_prepare(sink_, arr, max_count);
540 }
541
542 inline auto
543 48 any_buffer_sink::commit(std::size_t n, bool eof)
544 {
545 struct awaitable
546 {
547 any_buffer_sink* self_;
548 std::size_t n_;
549 bool eof_;
550
551 bool
552 48 await_ready() const noexcept
553 {
554 48 return false;
555 }
556
557 coro
558 48 await_suspend(coro h, executor_ref ex, std::stop_token token)
559 {
560 // Construct the underlying awaitable into cached storage
561 96 self_->active_ops_ = self_->vt_->construct_commit_awaitable(
562 48 self_->sink_,
563 48 self_->cached_awaitable_,
564 n_,
565 48 eof_);
566
567 // Check if underlying is immediately ready
568
1/2
✓ Branch 1 taken 48 times.
✗ Branch 2 not taken.
48 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
569 48 return h;
570
571 // Forward to underlying awaitable
572 return self_->active_ops_->await_suspend(
573 self_->cached_awaitable_, h, ex, token);
574 }
575
576 io_result<>
577 48 await_resume()
578 {
579 struct guard {
580 any_buffer_sink* self;
581 48 ~guard() {
582 48 self->active_ops_->destroy(self->cached_awaitable_);
583 48 self->active_ops_ = nullptr;
584 48 }
585 48 } g{self_};
586 48 return self_->active_ops_->await_resume(
587
1/1
✓ Branch 1 taken 37 times.
85 self_->cached_awaitable_);
588 48 }
589 };
590 48 return awaitable{this, n, eof};
591 }
592
593 inline auto
594 38 any_buffer_sink::commit(std::size_t n)
595 {
596 38 return commit(n, false);
597 }
598
599 inline auto
600 18 any_buffer_sink::commit_eof()
601 {
602 struct awaitable
603 {
604 any_buffer_sink* self_;
605
606 bool
607 18 await_ready() const noexcept
608 {
609 18 return false;
610 }
611
612 coro
613 18 await_suspend(coro h, executor_ref ex, std::stop_token token)
614 {
615 // Construct the underlying awaitable into cached storage
616 36 self_->active_ops_ = self_->vt_->construct_eof_awaitable(
617 18 self_->sink_,
618 18 self_->cached_awaitable_);
619
620 // Check if underlying is immediately ready
621
1/2
✓ Branch 1 taken 18 times.
✗ Branch 2 not taken.
18 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
622 18 return h;
623
624 // Forward to underlying awaitable
625 return self_->active_ops_->await_suspend(
626 self_->cached_awaitable_, h, ex, token);
627 }
628
629 io_result<>
630 18 await_resume()
631 {
632 struct guard {
633 any_buffer_sink* self;
634 18 ~guard() {
635 18 self->active_ops_->destroy(self->cached_awaitable_);
636 18 self->active_ops_ = nullptr;
637 18 }
638 18 } g{self_};
639 18 return self_->active_ops_->await_resume(
640
1/1
✓ Branch 1 taken 13 times.
31 self_->cached_awaitable_);
641 18 }
642 };
643 18 return awaitable{this};
644 }
645
646 //----------------------------------------------------------
647
648 template<ConstBufferSequence CB>
649 task<io_result<std::size_t>>
650 any_buffer_sink::write(CB buffers)
651 {
652 return write(buffers, false);
653 }
654
655 template<ConstBufferSequence CB>
656 task<io_result<std::size_t>>
657 any_buffer_sink::write(CB buffers, bool eof)
658 {
659 buffer_param<CB> bp(buffers);
660 std::size_t total = 0;
661
662 for(;;)
663 {
664 auto src = bp.data();
665 if(src.empty())
666 break;
667
668 mutable_buffer arr[detail::max_iovec_];
669 std::size_t count = prepare(arr, detail::max_iovec_);
670 if(count == 0)
671 {
672 auto [ec] = co_await commit(0);
673 if(ec)
674 co_return {ec, total};
675 continue;
676 }
677
678 auto n = buffer_copy(std::span(arr, count), src);
679 auto [ec] = co_await commit(n);
680 if(ec)
681 co_return {ec, total};
682 bp.consume(n);
683 total += n;
684 }
685
686 if(eof)
687 {
688 auto [ec] = co_await commit_eof();
689 if(ec)
690 co_return {ec, total};
691 }
692
693 co_return {{}, total};
694 }
695
696 inline auto
697 any_buffer_sink::write_eof()
698 {
699 return commit_eof();
700 }
701
702 //----------------------------------------------------------
703
704 static_assert(WriteSink<any_buffer_sink>);
705
706 } // namespace capy
707 } // namespace boost
708
709 #endif
710