Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/buffer_param.hpp>
18 : #include <boost/capy/concept/buffer_sink.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/write_sink.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/ex/executor_ref.hpp>
23 : #include <boost/capy/io_result.hpp>
24 : #include <boost/capy/task.hpp>
25 :
26 : #include <system_error>
27 :
28 : #include <concepts>
29 : #include <coroutine>
30 : #include <cstddef>
31 : #include <exception>
32 : #include <new>
33 : #include <stop_token>
34 : #include <utility>
35 :
36 : namespace boost {
37 : namespace capy {
38 :
39 : /** Type-erased wrapper for any BufferSink.
40 :
41 : This class provides type erasure for any type satisfying the
42 : @ref BufferSink concept, enabling runtime polymorphism for
43 : buffer sink operations. It uses cached awaitable storage to achieve
44 : zero steady-state allocation after construction.
45 :
46 : The wrapper also satisfies @ref WriteSink through templated
47 : @ref write methods. These methods copy data from the caller's
48 : buffers into the sink's internal storage, incurring one extra
49 : buffer copy compared to using @ref prepare and @ref commit
50 : directly.
51 :
52 : The wrapper supports two construction modes:
53 : - **Owning**: Pass by value to transfer ownership. The wrapper
54 : allocates storage and owns the sink.
55 : - **Reference**: Pass a pointer to wrap without ownership. The
56 : pointed-to sink must outlive this wrapper.
57 :
58 : @par Awaitable Preallocation
59 : The constructor preallocates storage for the type-erased awaitable.
60 : This reserves all virtual address space at server startup
61 : so memory usage can be measured up front, rather than
62 : allocating piecemeal as traffic arrives.
63 :
64 : @par Thread Safety
65 : Not thread-safe. Concurrent operations on the same wrapper
66 : are undefined behavior.
67 :
68 : @par Example
69 : @code
70 : // Owning - takes ownership of the sink
71 : any_buffer_sink abs(some_buffer_sink{args...});
72 :
73 : // Reference - wraps without ownership
74 : some_buffer_sink sink;
75 : any_buffer_sink abs(&sink);
76 :
77 : mutable_buffer arr[16];
78 : std::size_t count = abs.prepare(arr, 16);
79 : // Write data into arr[0..count)
80 : auto [ec] = co_await abs.commit(bytes_written);
81 : auto [ec2] = co_await abs.commit_eof();
82 : @endcode
83 :
84 : @see any_buffer_source, BufferSink, WriteSink
85 : */
86 : class any_buffer_sink
87 : {
88 : struct vtable;
89 : struct awaitable_ops;
90 :
91 : template<BufferSink S>
92 : struct vtable_for_impl;
93 :
94 : void* sink_ = nullptr;
95 : vtable const* vt_ = nullptr;
96 : void* cached_awaitable_ = nullptr;
97 : void* storage_ = nullptr;
98 : awaitable_ops const* active_ops_ = nullptr;
99 :
100 : public:
101 : /** Destructor.
102 :
103 : Destroys the owned sink (if any) and releases the cached
104 : awaitable storage.
105 : */
106 : ~any_buffer_sink();
107 :
108 : /** Default constructor.
109 :
110 : Constructs an empty wrapper. Operations on a default-constructed
111 : wrapper result in undefined behavior.
112 : */
113 : any_buffer_sink() = default;
114 :
115 : /** Non-copyable.
116 :
117 : The awaitable cache is per-instance and cannot be shared.
118 : */
119 : any_buffer_sink(any_buffer_sink const&) = delete;
120 : any_buffer_sink& operator=(any_buffer_sink const&) = delete;
121 :
122 : /** Move constructor.
123 :
124 : Transfers ownership of the wrapped sink (if owned) and
125 : cached awaitable storage from `other`. After the move, `other` is
126 : in a default-constructed state.
127 :
128 : @param other The wrapper to move from.
129 : */
130 1 : any_buffer_sink(any_buffer_sink&& other) noexcept
131 1 : : sink_(std::exchange(other.sink_, nullptr))
132 1 : , vt_(std::exchange(other.vt_, nullptr))
133 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
134 1 : , storage_(std::exchange(other.storage_, nullptr))
135 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
136 : {
137 1 : }
138 :
139 : /** Move assignment operator.
140 :
141 : Destroys any owned sink and releases existing resources,
142 : then transfers ownership from `other`.
143 :
144 : @param other The wrapper to move from.
145 : @return Reference to this wrapper.
146 : */
147 : any_buffer_sink&
148 : operator=(any_buffer_sink&& other) noexcept;
149 :
150 : /** Construct by taking ownership of a BufferSink.
151 :
152 : Allocates storage and moves the sink into this wrapper.
153 : The wrapper owns the sink and will destroy it.
154 :
155 : @param s The sink to take ownership of.
156 : */
157 : template<BufferSink S>
158 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
159 : any_buffer_sink(S s);
160 :
161 : /** Construct by wrapping a BufferSink without ownership.
162 :
163 : Wraps the given sink by pointer. The sink must remain
164 : valid for the lifetime of this wrapper.
165 :
166 : @param s Pointer to the sink to wrap.
167 : */
168 : template<BufferSink S>
169 : any_buffer_sink(S* s);
170 :
171 : /** Check if the wrapper contains a valid sink.
172 :
173 : @return `true` if wrapping a sink, `false` if default-constructed
174 : or moved-from.
175 : */
176 : bool
177 9 : has_value() const noexcept
178 : {
179 9 : return sink_ != nullptr;
180 : }
181 :
182 : /** Check if the wrapper contains a valid sink.
183 :
184 : @return `true` if wrapping a sink, `false` if default-constructed
185 : or moved-from.
186 : */
187 : explicit
188 2 : operator bool() const noexcept
189 : {
190 2 : return has_value();
191 : }
192 :
193 : /** Prepare writable buffers.
194 :
195 : Fills the provided array with mutable buffer descriptors
196 : pointing to the underlying sink's internal storage. This
197 : operation is synchronous.
198 :
199 : @param arr Pointer to array of mutable_buffer to fill.
200 : @param max_count Maximum number of buffers to fill.
201 :
202 : @return The number of buffers filled.
203 :
204 : @par Preconditions
205 : The wrapper must contain a valid sink (`has_value() == true`).
206 : */
207 : std::size_t
208 : prepare(mutable_buffer* arr, std::size_t max_count);
209 :
210 : /** Commit bytes written to the prepared buffers.
211 :
212 : Commits `n` bytes written to the buffers returned by the
213 : most recent call to @ref prepare. The operation may trigger
214 : underlying I/O.
215 :
216 : @param n The number of bytes to commit.
217 :
218 : @return An awaitable yielding `(error_code)`.
219 :
220 : @par Preconditions
221 : The wrapper must contain a valid sink (`has_value() == true`).
222 : */
223 : auto
224 : commit(std::size_t n);
225 :
226 : /** Commit bytes written with optional end-of-stream.
227 :
228 : Commits `n` bytes written to the buffers returned by the
229 : most recent call to @ref prepare. If `eof` is true, also
230 : signals end-of-stream.
231 :
232 : @param n The number of bytes to commit.
233 : @param eof If true, signals end-of-stream after committing.
234 :
235 : @return An awaitable yielding `(error_code)`.
236 :
237 : @par Preconditions
238 : The wrapper must contain a valid sink (`has_value() == true`).
239 : */
240 : auto
241 : commit(std::size_t n, bool eof);
242 :
243 : /** Signal end-of-stream.
244 :
245 : Indicates that no more data will be written to the sink.
246 : The operation completes when the sink is finalized, or
247 : an error occurs.
248 :
249 : @return An awaitable yielding `(error_code)`.
250 :
251 : @par Preconditions
252 : The wrapper must contain a valid sink (`has_value() == true`).
253 : */
254 : auto
255 : commit_eof();
256 :
257 : /** Write data from a buffer sequence.
258 :
259 : Writes all data from the buffer sequence to the underlying
260 : sink. This method satisfies the @ref WriteSink concept.
261 :
262 : @note This operation copies data from the caller's buffers
263 : into the sink's internal buffers. For zero-copy writes,
264 : use @ref prepare and @ref commit directly.
265 :
266 : @param buffers The buffer sequence to write.
267 :
268 : @return An awaitable yielding `(error_code,std::size_t)`.
269 :
270 : @par Preconditions
271 : The wrapper must contain a valid sink (`has_value() == true`).
272 : */
273 : template<ConstBufferSequence CB>
274 : task<io_result<std::size_t>>
275 : write(CB buffers);
276 :
277 : /** Write data with optional end-of-stream.
278 :
279 : Writes all data from the buffer sequence to the underlying
280 : sink, optionally finalizing it afterwards. This method
281 : satisfies the @ref WriteSink concept.
282 :
283 : @note This operation copies data from the caller's buffers
284 : into the sink's internal buffers. For zero-copy writes,
285 : use @ref prepare and @ref commit directly.
286 :
287 : @param buffers The buffer sequence to write.
288 : @param eof If true, finalize the sink after writing.
289 :
290 : @return An awaitable yielding `(error_code,std::size_t)`.
291 :
292 : @par Preconditions
293 : The wrapper must contain a valid sink (`has_value() == true`).
294 : */
295 : template<ConstBufferSequence CB>
296 : task<io_result<std::size_t>>
297 : write(CB buffers, bool eof);
298 :
299 : /** Signal end-of-stream.
300 :
301 : Indicates that no more data will be written to the sink.
302 : This method satisfies the @ref WriteSink concept.
303 :
304 : @return An awaitable yielding `(error_code)`.
305 :
306 : @par Preconditions
307 : The wrapper must contain a valid sink (`has_value() == true`).
308 : */
309 : auto
310 : write_eof();
311 :
312 : protected:
313 : /** Rebind to a new sink after move.
314 :
315 : Updates the internal pointer to reference a new sink object.
316 : Used by owning wrappers after move assignment when the owned
317 : object has moved to a new location.
318 :
319 : @param new_sink The new sink to bind to. Must be the same
320 : type as the original sink.
321 :
322 : @note Terminates if called with a sink of different type
323 : than the original.
324 : */
325 : template<BufferSink S>
326 : void
327 : rebind(S& new_sink) noexcept
328 : {
329 : if(vt_ != &vtable_for_impl<S>::value)
330 : std::terminate();
331 : sink_ = &new_sink;
332 : }
333 : };
334 :
335 : //----------------------------------------------------------
336 :
337 : struct any_buffer_sink::awaitable_ops
338 : {
339 : bool (*await_ready)(void*);
340 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
341 : io_result<> (*await_resume)(void*);
342 : void (*destroy)(void*) noexcept;
343 : };
344 :
345 : struct any_buffer_sink::vtable
346 : {
347 : void (*destroy)(void*) noexcept;
348 : std::size_t (*do_prepare)(
349 : void* sink,
350 : mutable_buffer* arr,
351 : std::size_t max_count);
352 : std::size_t awaitable_size;
353 : std::size_t awaitable_align;
354 : awaitable_ops const* (*construct_commit_awaitable)(
355 : void* sink,
356 : void* storage,
357 : std::size_t n,
358 : bool eof);
359 : awaitable_ops const* (*construct_eof_awaitable)(
360 : void* sink,
361 : void* storage);
362 : };
363 :
364 : template<BufferSink S>
365 : struct any_buffer_sink::vtable_for_impl
366 : {
367 : using CommitAwaitable = decltype(std::declval<S&>().commit(
368 : std::size_t{}, false));
369 : using EofAwaitable = decltype(std::declval<S&>().commit_eof());
370 :
371 : static void
372 0 : do_destroy_impl(void* sink) noexcept
373 : {
374 0 : static_cast<S*>(sink)->~S();
375 0 : }
376 :
377 : static std::size_t
378 68 : do_prepare_impl(
379 : void* sink,
380 : mutable_buffer* arr,
381 : std::size_t max_count)
382 : {
383 68 : auto& s = *static_cast<S*>(sink);
384 68 : return s.prepare(arr, max_count);
385 : }
386 :
387 : static awaitable_ops const*
388 48 : construct_commit_awaitable_impl(
389 : void* sink,
390 : void* storage,
391 : std::size_t n,
392 : bool eof)
393 : {
394 48 : auto& s = *static_cast<S*>(sink);
395 48 : ::new(storage) CommitAwaitable(s.commit(n, eof));
396 :
397 : static constexpr awaitable_ops ops = {
398 48 : +[](void* p) {
399 48 : return static_cast<CommitAwaitable*>(p)->await_ready();
400 : },
401 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
402 0 : return detail::call_await_suspend(
403 0 : static_cast<CommitAwaitable*>(p), h, ex, token);
404 : },
405 48 : +[](void* p) {
406 48 : return static_cast<CommitAwaitable*>(p)->await_resume();
407 : },
408 48 : +[](void* p) noexcept {
409 48 : static_cast<CommitAwaitable*>(p)->~CommitAwaitable();
410 : }
411 : };
412 48 : return &ops;
413 : }
414 :
415 : static awaitable_ops const*
416 18 : construct_eof_awaitable_impl(
417 : void* sink,
418 : void* storage)
419 : {
420 18 : auto& s = *static_cast<S*>(sink);
421 18 : ::new(storage) EofAwaitable(s.commit_eof());
422 :
423 : static constexpr awaitable_ops ops = {
424 18 : +[](void* p) {
425 18 : return static_cast<EofAwaitable*>(p)->await_ready();
426 : },
427 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
428 0 : return detail::call_await_suspend(
429 0 : static_cast<EofAwaitable*>(p), h, ex, token);
430 : },
431 18 : +[](void* p) {
432 18 : return static_cast<EofAwaitable*>(p)->await_resume();
433 : },
434 18 : +[](void* p) noexcept {
435 18 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
436 : }
437 : };
438 18 : return &ops;
439 : }
440 :
441 : static constexpr std::size_t max_awaitable_size =
442 : sizeof(CommitAwaitable) > sizeof(EofAwaitable)
443 : ? sizeof(CommitAwaitable)
444 : : sizeof(EofAwaitable);
445 :
446 : static constexpr std::size_t max_awaitable_align =
447 : alignof(CommitAwaitable) > alignof(EofAwaitable)
448 : ? alignof(CommitAwaitable)
449 : : alignof(EofAwaitable);
450 :
451 : static constexpr vtable value = {
452 : &do_destroy_impl,
453 : &do_prepare_impl,
454 : max_awaitable_size,
455 : max_awaitable_align,
456 : &construct_commit_awaitable_impl,
457 : &construct_eof_awaitable_impl
458 : };
459 : };
460 :
461 : //----------------------------------------------------------
462 :
463 : inline
464 63 : any_buffer_sink::~any_buffer_sink()
465 : {
466 63 : if(storage_)
467 : {
468 0 : vt_->destroy(sink_);
469 0 : ::operator delete(storage_);
470 : }
471 63 : if(cached_awaitable_)
472 60 : ::operator delete(cached_awaitable_);
473 63 : }
474 :
475 : inline any_buffer_sink&
476 1 : any_buffer_sink::operator=(any_buffer_sink&& other) noexcept
477 : {
478 1 : if(this != &other)
479 : {
480 1 : if(storage_)
481 : {
482 0 : vt_->destroy(sink_);
483 0 : ::operator delete(storage_);
484 : }
485 1 : if(cached_awaitable_)
486 0 : ::operator delete(cached_awaitable_);
487 1 : sink_ = std::exchange(other.sink_, nullptr);
488 1 : vt_ = std::exchange(other.vt_, nullptr);
489 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
490 1 : storage_ = std::exchange(other.storage_, nullptr);
491 1 : active_ops_ = std::exchange(other.active_ops_, nullptr);
492 : }
493 1 : return *this;
494 : }
495 :
496 : template<BufferSink S>
497 : requires (!std::same_as<std::decay_t<S>, any_buffer_sink>)
498 : any_buffer_sink::any_buffer_sink(S s)
499 : : vt_(&vtable_for_impl<S>::value)
500 : {
501 : struct guard {
502 : any_buffer_sink* self;
503 : bool committed = false;
504 : ~guard() {
505 : if(!committed && self->storage_) {
506 : self->vt_->destroy(self->sink_);
507 : ::operator delete(self->storage_);
508 : self->storage_ = nullptr;
509 : self->sink_ = nullptr;
510 : }
511 : }
512 : } g{this};
513 :
514 : storage_ = ::operator new(sizeof(S));
515 : sink_ = ::new(storage_) S(std::move(s));
516 :
517 : // Preallocate the awaitable storage (sized for max of commit/eof)
518 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
519 :
520 : g.committed = true;
521 : }
522 :
523 : template<BufferSink S>
524 60 : any_buffer_sink::any_buffer_sink(S* s)
525 60 : : sink_(s)
526 60 : , vt_(&vtable_for_impl<S>::value)
527 : {
528 : // Preallocate the awaitable storage (sized for max of commit/eof)
529 60 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
530 60 : }
531 :
532 : //----------------------------------------------------------
533 :
534 : inline std::size_t
535 68 : any_buffer_sink::prepare(
536 : mutable_buffer* arr,
537 : std::size_t max_count)
538 : {
539 68 : return vt_->do_prepare(sink_, arr, max_count);
540 : }
541 :
542 : inline auto
543 48 : any_buffer_sink::commit(std::size_t n, bool eof)
544 : {
545 : struct awaitable
546 : {
547 : any_buffer_sink* self_;
548 : std::size_t n_;
549 : bool eof_;
550 :
551 : bool
552 48 : await_ready() const noexcept
553 : {
554 48 : return false;
555 : }
556 :
557 : coro
558 48 : await_suspend(coro h, executor_ref ex, std::stop_token token)
559 : {
560 : // Construct the underlying awaitable into cached storage
561 96 : self_->active_ops_ = self_->vt_->construct_commit_awaitable(
562 48 : self_->sink_,
563 48 : self_->cached_awaitable_,
564 : n_,
565 48 : eof_);
566 :
567 : // Check if underlying is immediately ready
568 48 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
569 48 : return h;
570 :
571 : // Forward to underlying awaitable
572 0 : return self_->active_ops_->await_suspend(
573 0 : self_->cached_awaitable_, h, ex, token);
574 : }
575 :
576 : io_result<>
577 48 : await_resume()
578 : {
579 : struct guard {
580 : any_buffer_sink* self;
581 48 : ~guard() {
582 48 : self->active_ops_->destroy(self->cached_awaitable_);
583 48 : self->active_ops_ = nullptr;
584 48 : }
585 48 : } g{self_};
586 48 : return self_->active_ops_->await_resume(
587 85 : self_->cached_awaitable_);
588 48 : }
589 : };
590 48 : return awaitable{this, n, eof};
591 : }
592 :
593 : inline auto
594 38 : any_buffer_sink::commit(std::size_t n)
595 : {
596 38 : return commit(n, false);
597 : }
598 :
599 : inline auto
600 18 : any_buffer_sink::commit_eof()
601 : {
602 : struct awaitable
603 : {
604 : any_buffer_sink* self_;
605 :
606 : bool
607 18 : await_ready() const noexcept
608 : {
609 18 : return false;
610 : }
611 :
612 : coro
613 18 : await_suspend(coro h, executor_ref ex, std::stop_token token)
614 : {
615 : // Construct the underlying awaitable into cached storage
616 36 : self_->active_ops_ = self_->vt_->construct_eof_awaitable(
617 18 : self_->sink_,
618 18 : self_->cached_awaitable_);
619 :
620 : // Check if underlying is immediately ready
621 18 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
622 18 : return h;
623 :
624 : // Forward to underlying awaitable
625 0 : return self_->active_ops_->await_suspend(
626 0 : self_->cached_awaitable_, h, ex, token);
627 : }
628 :
629 : io_result<>
630 18 : await_resume()
631 : {
632 : struct guard {
633 : any_buffer_sink* self;
634 18 : ~guard() {
635 18 : self->active_ops_->destroy(self->cached_awaitable_);
636 18 : self->active_ops_ = nullptr;
637 18 : }
638 18 : } g{self_};
639 18 : return self_->active_ops_->await_resume(
640 31 : self_->cached_awaitable_);
641 18 : }
642 : };
643 18 : return awaitable{this};
644 : }
645 :
646 : //----------------------------------------------------------
647 :
648 : template<ConstBufferSequence CB>
649 : task<io_result<std::size_t>>
650 : any_buffer_sink::write(CB buffers)
651 : {
652 : return write(buffers, false);
653 : }
654 :
655 : template<ConstBufferSequence CB>
656 : task<io_result<std::size_t>>
657 : any_buffer_sink::write(CB buffers, bool eof)
658 : {
659 : buffer_param<CB> bp(buffers);
660 : std::size_t total = 0;
661 :
662 : for(;;)
663 : {
664 : auto src = bp.data();
665 : if(src.empty())
666 : break;
667 :
668 : mutable_buffer arr[detail::max_iovec_];
669 : std::size_t count = prepare(arr, detail::max_iovec_);
670 : if(count == 0)
671 : {
672 : auto [ec] = co_await commit(0);
673 : if(ec)
674 : co_return {ec, total};
675 : continue;
676 : }
677 :
678 : auto n = buffer_copy(std::span(arr, count), src);
679 : auto [ec] = co_await commit(n);
680 : if(ec)
681 : co_return {ec, total};
682 : bp.consume(n);
683 : total += n;
684 : }
685 :
686 : if(eof)
687 : {
688 : auto [ec] = co_await commit_eof();
689 : if(ec)
690 : co_return {ec, total};
691 : }
692 :
693 : co_return {{}, total};
694 : }
695 :
696 : inline auto
697 : any_buffer_sink::write_eof()
698 : {
699 : return commit_eof();
700 : }
701 :
702 : //----------------------------------------------------------
703 :
704 : static_assert(WriteSink<any_buffer_sink>);
705 :
706 : } // namespace capy
707 : } // namespace boost
708 :
709 : #endif
|