libs/capy/include/boost/capy/io/any_write_sink.hpp

84.2% Lines (96/114) 90.3% Functions (28/31) 57.9% Branches (11/19)
libs/capy/include/boost/capy/io/any_write_sink.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_sink.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/task.hpp>
23
24 #include <system_error>
25
26 #include <concepts>
27 #include <coroutine>
28 #include <cstddef>
29 #include <exception>
30 #include <new>
31 #include <span>
32 #include <stop_token>
33 #include <utility>
34
35 namespace boost {
36 namespace capy {
37
38 /** Type-erased wrapper for any WriteSink.
39
40 This class provides type erasure for any type satisfying the
41 @ref WriteSink concept, enabling runtime polymorphism for
42 sink write operations. It uses cached awaitable storage to achieve
43 zero steady-state allocation after construction.
44
45 The wrapper supports two construction modes:
46 - **Owning**: Pass by value to transfer ownership. The wrapper
47 allocates storage and owns the sink.
48 - **Reference**: Pass a pointer to wrap without ownership. The
49 pointed-to sink must outlive this wrapper.
50
51 @par Awaitable Preallocation
52 The constructor preallocates storage for the type-erased awaitable.
53 This reserves all virtual address space at server startup
54 so memory usage can be measured up front, rather than
55 allocating piecemeal as traffic arrives.
56
57 @par Thread Safety
58 Not thread-safe. Concurrent operations on the same wrapper
59 are undefined behavior.
60
61 @par Example
62 @code
63 // Owning - takes ownership of the sink
64 any_write_sink ws(some_sink{args...});
65
66 // Reference - wraps without ownership
67 some_sink sink;
68 any_write_sink ws(&sink);
69
70 const_buffer buf(data, size);
71 auto [ec, n] = co_await ws.write(std::span(&buf, 1));
72 auto [ec2] = co_await ws.write_eof();
73 @endcode
74
75 @see any_write_stream, WriteSink
76 */
77 class any_write_sink
78 {
79 struct vtable;
80 struct write_awaitable_ops;
81 struct eof_awaitable_ops;
82
83 template<WriteSink S>
84 struct vtable_for_impl;
85
86 void* sink_ = nullptr;
87 vtable const* vt_ = nullptr;
88 void* cached_awaitable_ = nullptr;
89 void* storage_ = nullptr;
90 write_awaitable_ops const* active_write_ops_ = nullptr;
91 eof_awaitable_ops const* active_eof_ops_ = nullptr;
92
93 public:
94 /** Destructor.
95
96 Destroys the owned sink (if any) and releases the cached
97 awaitable storage.
98 */
99 ~any_write_sink();
100
101 /** Default constructor.
102
103 Constructs an empty wrapper. Operations on a default-constructed
104 wrapper result in undefined behavior.
105 */
106 any_write_sink() = default;
107
108 /** Non-copyable.
109
110 The awaitable cache is per-instance and cannot be shared.
111 */
112 any_write_sink(any_write_sink const&) = delete;
113 any_write_sink& operator=(any_write_sink const&) = delete;
114
115 /** Move constructor.
116
117 Transfers ownership of the wrapped sink (if owned) and
118 cached awaitable storage from `other`. After the move, `other` is
119 in a default-constructed state.
120
121 @param other The wrapper to move from.
122 */
123 1 any_write_sink(any_write_sink&& other) noexcept
124 1 : sink_(std::exchange(other.sink_, nullptr))
125 1 , vt_(std::exchange(other.vt_, nullptr))
126 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 1 , storage_(std::exchange(other.storage_, nullptr))
128 1 , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
129 1 , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
130 {
131 1 }
132
133 /** Move assignment operator.
134
135 Destroys any owned sink and releases existing resources,
136 then transfers ownership from `other`.
137
138 @param other The wrapper to move from.
139 @return Reference to this wrapper.
140 */
141 any_write_sink&
142 operator=(any_write_sink&& other) noexcept;
143
144 /** Construct by taking ownership of a WriteSink.
145
146 Allocates storage and moves the sink into this wrapper.
147 The wrapper owns the sink and will destroy it.
148
149 @param s The sink to take ownership of.
150 */
151 template<WriteSink S>
152 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
153 any_write_sink(S s);
154
155 /** Construct by wrapping a WriteSink without ownership.
156
157 Wraps the given sink by pointer. The sink must remain
158 valid for the lifetime of this wrapper.
159
160 @param s Pointer to the sink to wrap.
161 */
162 template<WriteSink S>
163 any_write_sink(S* s);
164
165 /** Check if the wrapper contains a valid sink.
166
167 @return `true` if wrapping a sink, `false` if default-constructed
168 or moved-from.
169 */
170 bool
171 9 has_value() const noexcept
172 {
173 9 return sink_ != nullptr;
174 }
175
176 /** Check if the wrapper contains a valid sink.
177
178 @return `true` if wrapping a sink, `false` if default-constructed
179 or moved-from.
180 */
181 explicit
182 2 operator bool() const noexcept
183 {
184 2 return has_value();
185 }
186
187 /** Initiate an asynchronous write operation.
188
189 Writes data from the provided buffer sequence. The operation
190 completes when all bytes have been consumed, or an error
191 occurs.
192
193 @param buffers The buffer sequence containing data to write.
194 Passed by value to ensure the sequence lives in the
195 coroutine frame across suspension points.
196
197 @return An awaitable yielding `(error_code,std::size_t)`.
198
199 @par Preconditions
200 The wrapper must contain a valid sink (`has_value() == true`).
201 */
202 template<ConstBufferSequence CB>
203 task<io_result<std::size_t>>
204 write(CB buffers);
205
206 /** Initiate an asynchronous write operation with optional EOF.
207
208 Writes data from the provided buffer sequence, optionally
209 finalizing the sink afterwards. The operation completes when
210 all bytes have been consumed and (if eof is true) the sink
211 is finalized, or an error occurs.
212
213 @param buffers The buffer sequence containing data to write.
214 Passed by value to ensure the sequence lives in the
215 coroutine frame across suspension points.
216
217 @param eof If `true`, the sink is finalized after writing
218 the data.
219
220 @return An awaitable yielding `(error_code,std::size_t)`.
221
222 @par Preconditions
223 The wrapper must contain a valid sink (`has_value() == true`).
224 */
225 template<ConstBufferSequence CB>
226 task<io_result<std::size_t>>
227 write(CB buffers, bool eof);
228
229 /** Signal end of data.
230
231 Indicates that no more data will be written to the sink.
232 The operation completes when the sink is finalized, or
233 an error occurs.
234
235 @return An awaitable yielding `(error_code)`.
236
237 @par Preconditions
238 The wrapper must contain a valid sink (`has_value() == true`).
239 */
240 auto
241 write_eof();
242
243 protected:
244 /** Rebind to a new sink after move.
245
246 Updates the internal pointer to reference a new sink object.
247 Used by owning wrappers after move assignment when the owned
248 object has moved to a new location.
249
250 @param new_sink The new sink to bind to. Must be the same
251 type as the original sink.
252
253 @note Terminates if called with a sink of different type
254 than the original.
255 */
256 template<WriteSink S>
257 void
258 rebind(S& new_sink) noexcept
259 {
260 if(vt_ != &vtable_for_impl<S>::value)
261 std::terminate();
262 sink_ = &new_sink;
263 }
264
265 private:
266 auto
267 write_some_(std::span<const_buffer const> buffers, bool eof);
268 };
269
270 //----------------------------------------------------------
271
272 struct any_write_sink::write_awaitable_ops
273 {
274 bool (*await_ready)(void*);
275 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
276 io_result<std::size_t> (*await_resume)(void*);
277 void (*destroy)(void*) noexcept;
278 };
279
280 struct any_write_sink::eof_awaitable_ops
281 {
282 bool (*await_ready)(void*);
283 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
284 io_result<> (*await_resume)(void*);
285 void (*destroy)(void*) noexcept;
286 };
287
288 struct any_write_sink::vtable
289 {
290 void (*destroy)(void*) noexcept;
291 std::size_t awaitable_size;
292 std::size_t awaitable_align;
293 write_awaitable_ops const* (*construct_write_awaitable)(
294 void* sink,
295 void* storage,
296 std::span<const_buffer const> buffers,
297 bool eof);
298 eof_awaitable_ops const* (*construct_eof_awaitable)(
299 void* sink,
300 void* storage);
301 };
302
303 template<WriteSink S>
304 struct any_write_sink::vtable_for_impl
305 {
306 using WriteAwaitable = decltype(std::declval<S&>().write(
307 std::span<const_buffer const>{}, false));
308 using EofAwaitable = decltype(std::declval<S&>().write_eof());
309
310 static void
311 do_destroy_impl(void* sink) noexcept
312 {
313 static_cast<S*>(sink)->~S();
314 }
315
316 static write_awaitable_ops const*
317 126 construct_write_awaitable_impl(
318 void* sink,
319 void* storage,
320 std::span<const_buffer const> buffers,
321 bool eof)
322 {
323 126 auto& s = *static_cast<S*>(sink);
324 126 ::new(storage) WriteAwaitable(s.write(buffers, eof));
325
326 static constexpr write_awaitable_ops ops = {
327 126 +[](void* p) {
328 126 return static_cast<WriteAwaitable*>(p)->await_ready();
329 },
330 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
331 return detail::call_await_suspend(
332 static_cast<WriteAwaitable*>(p), h, ex, token);
333 },
334 126 +[](void* p) {
335 126 return static_cast<WriteAwaitable*>(p)->await_resume();
336 },
337 126 +[](void* p) noexcept {
338 126 static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
339 }
340 };
341 126 return &ops;
342 }
343
344 static eof_awaitable_ops const*
345 24 construct_eof_awaitable_impl(
346 void* sink,
347 void* storage)
348 {
349 24 auto& s = *static_cast<S*>(sink);
350 24 ::new(storage) EofAwaitable(s.write_eof());
351
352 static constexpr eof_awaitable_ops ops = {
353 24 +[](void* p) {
354 24 return static_cast<EofAwaitable*>(p)->await_ready();
355 },
356 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
357 return detail::call_await_suspend(
358 static_cast<EofAwaitable*>(p), h, ex, token);
359 },
360 24 +[](void* p) {
361 24 return static_cast<EofAwaitable*>(p)->await_resume();
362 },
363 24 +[](void* p) noexcept {
364 24 static_cast<EofAwaitable*>(p)->~EofAwaitable();
365 }
366 };
367 24 return &ops;
368 }
369
370 static constexpr std::size_t max_awaitable_size =
371 sizeof(WriteAwaitable) > sizeof(EofAwaitable)
372 ? sizeof(WriteAwaitable)
373 : sizeof(EofAwaitable);
374
375 static constexpr std::size_t max_awaitable_align =
376 alignof(WriteAwaitable) > alignof(EofAwaitable)
377 ? alignof(WriteAwaitable)
378 : alignof(EofAwaitable);
379
380 static constexpr vtable value = {
381 &do_destroy_impl,
382 max_awaitable_size,
383 max_awaitable_align,
384 &construct_write_awaitable_impl,
385 &construct_eof_awaitable_impl
386 };
387 };
388
389 //----------------------------------------------------------
390
391 inline
392 99 any_write_sink::~any_write_sink()
393 {
394
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 99 times.
99 if(storage_)
395 {
396 vt_->destroy(sink_);
397 ::operator delete(storage_);
398 }
399
2/2
✓ Branch 0 taken 96 times.
✓ Branch 1 taken 3 times.
99 if(cached_awaitable_)
400 96 ::operator delete(cached_awaitable_);
401 99 }
402
403 inline any_write_sink&
404 1 any_write_sink::operator=(any_write_sink&& other) noexcept
405 {
406
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
407 {
408
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
409 {
410 vt_->destroy(sink_);
411 ::operator delete(storage_);
412 }
413
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
414 ::operator delete(cached_awaitable_);
415 1 sink_ = std::exchange(other.sink_, nullptr);
416 1 vt_ = std::exchange(other.vt_, nullptr);
417 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
418 1 storage_ = std::exchange(other.storage_, nullptr);
419 1 active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
420 1 active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
421 }
422 1 return *this;
423 }
424
425 template<WriteSink S>
426 requires (!std::same_as<std::decay_t<S>, any_write_sink>)
427 any_write_sink::any_write_sink(S s)
428 : vt_(&vtable_for_impl<S>::value)
429 {
430 struct guard {
431 any_write_sink* self;
432 bool committed = false;
433 ~guard() {
434 if(!committed && self->storage_) {
435 self->vt_->destroy(self->sink_);
436 ::operator delete(self->storage_);
437 self->storage_ = nullptr;
438 self->sink_ = nullptr;
439 }
440 }
441 } g{this};
442
443 storage_ = ::operator new(sizeof(S));
444 sink_ = ::new(storage_) S(std::move(s));
445
446 // Preallocate the awaitable storage (sized for max of write/eof)
447 cached_awaitable_ = ::operator new(vt_->awaitable_size);
448
449 g.committed = true;
450 }
451
452 template<WriteSink S>
453 96 any_write_sink::any_write_sink(S* s)
454 96 : sink_(s)
455 96 , vt_(&vtable_for_impl<S>::value)
456 {
457 // Preallocate the awaitable storage (sized for max of write/eof)
458 96 cached_awaitable_ = ::operator new(vt_->awaitable_size);
459 96 }
460
461 //----------------------------------------------------------
462
463 inline auto
464 126 any_write_sink::write_some_(
465 std::span<const_buffer const> buffers,
466 bool eof)
467 {
468 struct awaitable
469 {
470 any_write_sink* self_;
471 std::span<const_buffer const> buffers_;
472 bool eof_;
473
474 bool
475 126 await_ready() const noexcept
476 {
477 126 return false;
478 }
479
480 coro
481 126 await_suspend(coro h, executor_ref ex, std::stop_token token)
482 {
483 // Construct the underlying awaitable into cached storage
484 252 self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
485 126 self_->sink_,
486 126 self_->cached_awaitable_,
487 buffers_,
488 126 eof_);
489
490 // Check if underlying is immediately ready
491
1/2
✓ Branch 1 taken 126 times.
✗ Branch 2 not taken.
126 if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
492 126 return h;
493
494 // Forward to underlying awaitable
495 return self_->active_write_ops_->await_suspend(
496 self_->cached_awaitable_, h, ex, token);
497 }
498
499 io_result<std::size_t>
500 126 await_resume()
501 {
502 struct guard {
503 any_write_sink* self;
504 126 ~guard() {
505 126 self->active_write_ops_->destroy(self->cached_awaitable_);
506 126 self->active_write_ops_ = nullptr;
507 126 }
508 126 } g{self_};
509 126 return self_->active_write_ops_->await_resume(
510
1/1
✓ Branch 1 taken 98 times.
224 self_->cached_awaitable_);
511 126 }
512 };
513 126 return awaitable{this, buffers, eof};
514 }
515
516 inline auto
517 24 any_write_sink::write_eof()
518 {
519 struct awaitable
520 {
521 any_write_sink* self_;
522
523 bool
524 24 await_ready() const noexcept
525 {
526 24 return false;
527 }
528
529 coro
530 24 await_suspend(coro h, executor_ref ex, std::stop_token token)
531 {
532 // Construct the underlying awaitable into cached storage
533 48 self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
534 24 self_->sink_,
535 24 self_->cached_awaitable_);
536
537 // Check if underlying is immediately ready
538
1/2
✓ Branch 1 taken 24 times.
✗ Branch 2 not taken.
24 if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
539 24 return h;
540
541 // Forward to underlying awaitable
542 return self_->active_eof_ops_->await_suspend(
543 self_->cached_awaitable_, h, ex, token);
544 }
545
546 io_result<>
547 24 await_resume()
548 {
549 struct guard {
550 any_write_sink* self;
551 24 ~guard() {
552 24 self->active_eof_ops_->destroy(self->cached_awaitable_);
553 24 self->active_eof_ops_ = nullptr;
554 24 }
555 24 } g{self_};
556 24 return self_->active_eof_ops_->await_resume(
557
1/1
✓ Branch 1 taken 17 times.
41 self_->cached_awaitable_);
558 24 }
559 };
560 24 return awaitable{this};
561 }
562
563 template<ConstBufferSequence CB>
564 task<io_result<std::size_t>>
565 66 any_write_sink::write(CB buffers)
566 {
567 66 return write(buffers, false);
568 }
569
570 template<ConstBufferSequence CB>
571 task<io_result<std::size_t>>
572
1/1
✓ Branch 1 taken 98 times.
98 any_write_sink::write(CB buffers, bool eof)
573 {
574 buffer_param<CB> bp(buffers);
575 std::size_t total = 0;
576
577 for(;;)
578 {
579 auto bufs = bp.data();
580 if(bufs.empty())
581 break;
582
583 auto [ec, n] = co_await write_some_(bufs, false);
584 if(ec)
585 co_return {ec, total + n};
586 bp.consume(n);
587 total += n;
588 }
589
590 if(eof)
591 {
592 auto [ec] = co_await write_eof();
593 if(ec)
594 co_return {ec, total};
595 }
596
597 co_return {{}, total};
598 196 }
599
600 } // namespace capy
601 } // namespace boost
602
603 #endif
604