Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_SINK_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_param.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_sink.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/task.hpp>
23 :
24 : #include <system_error>
25 :
26 : #include <concepts>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <exception>
30 : #include <new>
31 : #include <span>
32 : #include <stop_token>
33 : #include <utility>
34 :
35 : namespace boost {
36 : namespace capy {
37 :
38 : /** Type-erased wrapper for any WriteSink.
39 :
40 : This class provides type erasure for any type satisfying the
41 : @ref WriteSink concept, enabling runtime polymorphism for
42 : sink write operations. It uses cached awaitable storage to achieve
43 : zero steady-state allocation after construction.
44 :
45 : The wrapper supports two construction modes:
46 : - **Owning**: Pass by value to transfer ownership. The wrapper
47 : allocates storage and owns the sink.
48 : - **Reference**: Pass a pointer to wrap without ownership. The
49 : pointed-to sink must outlive this wrapper.
50 :
51 : @par Awaitable Preallocation
52 : The constructor preallocates storage for the type-erased awaitable.
53 : This reserves all virtual address space at server startup
54 : so memory usage can be measured up front, rather than
55 : allocating piecemeal as traffic arrives.
56 :
57 : @par Thread Safety
58 : Not thread-safe. Concurrent operations on the same wrapper
59 : are undefined behavior.
60 :
61 : @par Example
62 : @code
63 : // Owning - takes ownership of the sink
64 : any_write_sink ws(some_sink{args...});
65 :
66 : // Reference - wraps without ownership
67 : some_sink sink;
68 : any_write_sink ws(&sink);
69 :
70 : const_buffer buf(data, size);
71 : auto [ec, n] = co_await ws.write(std::span(&buf, 1));
72 : auto [ec2] = co_await ws.write_eof();
73 : @endcode
74 :
75 : @see any_write_stream, WriteSink
76 : */
77 : class any_write_sink
78 : {
79 : struct vtable;
80 : struct write_awaitable_ops;
81 : struct eof_awaitable_ops;
82 :
83 : template<WriteSink S>
84 : struct vtable_for_impl;
85 :
86 : void* sink_ = nullptr;
87 : vtable const* vt_ = nullptr;
88 : void* cached_awaitable_ = nullptr;
89 : void* storage_ = nullptr;
90 : write_awaitable_ops const* active_write_ops_ = nullptr;
91 : eof_awaitable_ops const* active_eof_ops_ = nullptr;
92 :
93 : public:
94 : /** Destructor.
95 :
96 : Destroys the owned sink (if any) and releases the cached
97 : awaitable storage.
98 : */
99 : ~any_write_sink();
100 :
101 : /** Default constructor.
102 :
103 : Constructs an empty wrapper. Operations on a default-constructed
104 : wrapper result in undefined behavior.
105 : */
106 : any_write_sink() = default;
107 :
108 : /** Non-copyable.
109 :
110 : The awaitable cache is per-instance and cannot be shared.
111 : */
112 : any_write_sink(any_write_sink const&) = delete;
113 : any_write_sink& operator=(any_write_sink const&) = delete;
114 :
115 : /** Move constructor.
116 :
117 : Transfers ownership of the wrapped sink (if owned) and
118 : cached awaitable storage from `other`. After the move, `other` is
119 : in a default-constructed state.
120 :
121 : @param other The wrapper to move from.
122 : */
123 1 : any_write_sink(any_write_sink&& other) noexcept
124 1 : : sink_(std::exchange(other.sink_, nullptr))
125 1 : , vt_(std::exchange(other.vt_, nullptr))
126 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
127 1 : , storage_(std::exchange(other.storage_, nullptr))
128 1 : , active_write_ops_(std::exchange(other.active_write_ops_, nullptr))
129 1 : , active_eof_ops_(std::exchange(other.active_eof_ops_, nullptr))
130 : {
131 1 : }
132 :
133 : /** Move assignment operator.
134 :
135 : Destroys any owned sink and releases existing resources,
136 : then transfers ownership from `other`.
137 :
138 : @param other The wrapper to move from.
139 : @return Reference to this wrapper.
140 : */
141 : any_write_sink&
142 : operator=(any_write_sink&& other) noexcept;
143 :
144 : /** Construct by taking ownership of a WriteSink.
145 :
146 : Allocates storage and moves the sink into this wrapper.
147 : The wrapper owns the sink and will destroy it.
148 :
149 : @param s The sink to take ownership of.
150 : */
151 : template<WriteSink S>
152 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
153 : any_write_sink(S s);
154 :
155 : /** Construct by wrapping a WriteSink without ownership.
156 :
157 : Wraps the given sink by pointer. The sink must remain
158 : valid for the lifetime of this wrapper.
159 :
160 : @param s Pointer to the sink to wrap.
161 : */
162 : template<WriteSink S>
163 : any_write_sink(S* s);
164 :
165 : /** Check if the wrapper contains a valid sink.
166 :
167 : @return `true` if wrapping a sink, `false` if default-constructed
168 : or moved-from.
169 : */
170 : bool
171 9 : has_value() const noexcept
172 : {
173 9 : return sink_ != nullptr;
174 : }
175 :
176 : /** Check if the wrapper contains a valid sink.
177 :
178 : @return `true` if wrapping a sink, `false` if default-constructed
179 : or moved-from.
180 : */
181 : explicit
182 2 : operator bool() const noexcept
183 : {
184 2 : return has_value();
185 : }
186 :
187 : /** Initiate an asynchronous write operation.
188 :
189 : Writes data from the provided buffer sequence. The operation
190 : completes when all bytes have been consumed, or an error
191 : occurs.
192 :
193 : @param buffers The buffer sequence containing data to write.
194 : Passed by value to ensure the sequence lives in the
195 : coroutine frame across suspension points.
196 :
197 : @return An awaitable yielding `(error_code,std::size_t)`.
198 :
199 : @par Preconditions
200 : The wrapper must contain a valid sink (`has_value() == true`).
201 : */
202 : template<ConstBufferSequence CB>
203 : task<io_result<std::size_t>>
204 : write(CB buffers);
205 :
206 : /** Initiate an asynchronous write operation with optional EOF.
207 :
208 : Writes data from the provided buffer sequence, optionally
209 : finalizing the sink afterwards. The operation completes when
210 : all bytes have been consumed and (if eof is true) the sink
211 : is finalized, or an error occurs.
212 :
213 : @param buffers The buffer sequence containing data to write.
214 : Passed by value to ensure the sequence lives in the
215 : coroutine frame across suspension points.
216 :
217 : @param eof If `true`, the sink is finalized after writing
218 : the data.
219 :
220 : @return An awaitable yielding `(error_code,std::size_t)`.
221 :
222 : @par Preconditions
223 : The wrapper must contain a valid sink (`has_value() == true`).
224 : */
225 : template<ConstBufferSequence CB>
226 : task<io_result<std::size_t>>
227 : write(CB buffers, bool eof);
228 :
229 : /** Signal end of data.
230 :
231 : Indicates that no more data will be written to the sink.
232 : The operation completes when the sink is finalized, or
233 : an error occurs.
234 :
235 : @return An awaitable yielding `(error_code)`.
236 :
237 : @par Preconditions
238 : The wrapper must contain a valid sink (`has_value() == true`).
239 : */
240 : auto
241 : write_eof();
242 :
243 : protected:
244 : /** Rebind to a new sink after move.
245 :
246 : Updates the internal pointer to reference a new sink object.
247 : Used by owning wrappers after move assignment when the owned
248 : object has moved to a new location.
249 :
250 : @param new_sink The new sink to bind to. Must be the same
251 : type as the original sink.
252 :
253 : @note Terminates if called with a sink of different type
254 : than the original.
255 : */
256 : template<WriteSink S>
257 : void
258 : rebind(S& new_sink) noexcept
259 : {
260 : if(vt_ != &vtable_for_impl<S>::value)
261 : std::terminate();
262 : sink_ = &new_sink;
263 : }
264 :
265 : private:
266 : auto
267 : write_some_(std::span<const_buffer const> buffers, bool eof);
268 : };
269 :
270 : //----------------------------------------------------------
271 :
272 : struct any_write_sink::write_awaitable_ops
273 : {
274 : bool (*await_ready)(void*);
275 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
276 : io_result<std::size_t> (*await_resume)(void*);
277 : void (*destroy)(void*) noexcept;
278 : };
279 :
280 : struct any_write_sink::eof_awaitable_ops
281 : {
282 : bool (*await_ready)(void*);
283 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
284 : io_result<> (*await_resume)(void*);
285 : void (*destroy)(void*) noexcept;
286 : };
287 :
288 : struct any_write_sink::vtable
289 : {
290 : void (*destroy)(void*) noexcept;
291 : std::size_t awaitable_size;
292 : std::size_t awaitable_align;
293 : write_awaitable_ops const* (*construct_write_awaitable)(
294 : void* sink,
295 : void* storage,
296 : std::span<const_buffer const> buffers,
297 : bool eof);
298 : eof_awaitable_ops const* (*construct_eof_awaitable)(
299 : void* sink,
300 : void* storage);
301 : };
302 :
303 : template<WriteSink S>
304 : struct any_write_sink::vtable_for_impl
305 : {
306 : using WriteAwaitable = decltype(std::declval<S&>().write(
307 : std::span<const_buffer const>{}, false));
308 : using EofAwaitable = decltype(std::declval<S&>().write_eof());
309 :
310 : static void
311 0 : do_destroy_impl(void* sink) noexcept
312 : {
313 0 : static_cast<S*>(sink)->~S();
314 0 : }
315 :
316 : static write_awaitable_ops const*
317 126 : construct_write_awaitable_impl(
318 : void* sink,
319 : void* storage,
320 : std::span<const_buffer const> buffers,
321 : bool eof)
322 : {
323 126 : auto& s = *static_cast<S*>(sink);
324 126 : ::new(storage) WriteAwaitable(s.write(buffers, eof));
325 :
326 : static constexpr write_awaitable_ops ops = {
327 126 : +[](void* p) {
328 126 : return static_cast<WriteAwaitable*>(p)->await_ready();
329 : },
330 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
331 0 : return detail::call_await_suspend(
332 0 : static_cast<WriteAwaitable*>(p), h, ex, token);
333 : },
334 126 : +[](void* p) {
335 126 : return static_cast<WriteAwaitable*>(p)->await_resume();
336 : },
337 126 : +[](void* p) noexcept {
338 126 : static_cast<WriteAwaitable*>(p)->~WriteAwaitable();
339 : }
340 : };
341 126 : return &ops;
342 : }
343 :
344 : static eof_awaitable_ops const*
345 24 : construct_eof_awaitable_impl(
346 : void* sink,
347 : void* storage)
348 : {
349 24 : auto& s = *static_cast<S*>(sink);
350 24 : ::new(storage) EofAwaitable(s.write_eof());
351 :
352 : static constexpr eof_awaitable_ops ops = {
353 24 : +[](void* p) {
354 24 : return static_cast<EofAwaitable*>(p)->await_ready();
355 : },
356 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
357 0 : return detail::call_await_suspend(
358 0 : static_cast<EofAwaitable*>(p), h, ex, token);
359 : },
360 24 : +[](void* p) {
361 24 : return static_cast<EofAwaitable*>(p)->await_resume();
362 : },
363 24 : +[](void* p) noexcept {
364 24 : static_cast<EofAwaitable*>(p)->~EofAwaitable();
365 : }
366 : };
367 24 : return &ops;
368 : }
369 :
370 : static constexpr std::size_t max_awaitable_size =
371 : sizeof(WriteAwaitable) > sizeof(EofAwaitable)
372 : ? sizeof(WriteAwaitable)
373 : : sizeof(EofAwaitable);
374 :
375 : static constexpr std::size_t max_awaitable_align =
376 : alignof(WriteAwaitable) > alignof(EofAwaitable)
377 : ? alignof(WriteAwaitable)
378 : : alignof(EofAwaitable);
379 :
380 : static constexpr vtable value = {
381 : &do_destroy_impl,
382 : max_awaitable_size,
383 : max_awaitable_align,
384 : &construct_write_awaitable_impl,
385 : &construct_eof_awaitable_impl
386 : };
387 : };
388 :
389 : //----------------------------------------------------------
390 :
391 : inline
392 99 : any_write_sink::~any_write_sink()
393 : {
394 99 : if(storage_)
395 : {
396 0 : vt_->destroy(sink_);
397 0 : ::operator delete(storage_);
398 : }
399 99 : if(cached_awaitable_)
400 96 : ::operator delete(cached_awaitable_);
401 99 : }
402 :
403 : inline any_write_sink&
404 1 : any_write_sink::operator=(any_write_sink&& other) noexcept
405 : {
406 1 : if(this != &other)
407 : {
408 1 : if(storage_)
409 : {
410 0 : vt_->destroy(sink_);
411 0 : ::operator delete(storage_);
412 : }
413 1 : if(cached_awaitable_)
414 0 : ::operator delete(cached_awaitable_);
415 1 : sink_ = std::exchange(other.sink_, nullptr);
416 1 : vt_ = std::exchange(other.vt_, nullptr);
417 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
418 1 : storage_ = std::exchange(other.storage_, nullptr);
419 1 : active_write_ops_ = std::exchange(other.active_write_ops_, nullptr);
420 1 : active_eof_ops_ = std::exchange(other.active_eof_ops_, nullptr);
421 : }
422 1 : return *this;
423 : }
424 :
425 : template<WriteSink S>
426 : requires (!std::same_as<std::decay_t<S>, any_write_sink>)
427 : any_write_sink::any_write_sink(S s)
428 : : vt_(&vtable_for_impl<S>::value)
429 : {
430 : struct guard {
431 : any_write_sink* self;
432 : bool committed = false;
433 : ~guard() {
434 : if(!committed && self->storage_) {
435 : self->vt_->destroy(self->sink_);
436 : ::operator delete(self->storage_);
437 : self->storage_ = nullptr;
438 : self->sink_ = nullptr;
439 : }
440 : }
441 : } g{this};
442 :
443 : storage_ = ::operator new(sizeof(S));
444 : sink_ = ::new(storage_) S(std::move(s));
445 :
446 : // Preallocate the awaitable storage (sized for max of write/eof)
447 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
448 :
449 : g.committed = true;
450 : }
451 :
452 : template<WriteSink S>
453 96 : any_write_sink::any_write_sink(S* s)
454 96 : : sink_(s)
455 96 : , vt_(&vtable_for_impl<S>::value)
456 : {
457 : // Preallocate the awaitable storage (sized for max of write/eof)
458 96 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
459 96 : }
460 :
461 : //----------------------------------------------------------
462 :
463 : inline auto
464 126 : any_write_sink::write_some_(
465 : std::span<const_buffer const> buffers,
466 : bool eof)
467 : {
468 : struct awaitable
469 : {
470 : any_write_sink* self_;
471 : std::span<const_buffer const> buffers_;
472 : bool eof_;
473 :
474 : bool
475 126 : await_ready() const noexcept
476 : {
477 126 : return false;
478 : }
479 :
480 : coro
481 126 : await_suspend(coro h, executor_ref ex, std::stop_token token)
482 : {
483 : // Construct the underlying awaitable into cached storage
484 252 : self_->active_write_ops_ = self_->vt_->construct_write_awaitable(
485 126 : self_->sink_,
486 126 : self_->cached_awaitable_,
487 : buffers_,
488 126 : eof_);
489 :
490 : // Check if underlying is immediately ready
491 126 : if(self_->active_write_ops_->await_ready(self_->cached_awaitable_))
492 126 : return h;
493 :
494 : // Forward to underlying awaitable
495 0 : return self_->active_write_ops_->await_suspend(
496 0 : self_->cached_awaitable_, h, ex, token);
497 : }
498 :
499 : io_result<std::size_t>
500 126 : await_resume()
501 : {
502 : struct guard {
503 : any_write_sink* self;
504 126 : ~guard() {
505 126 : self->active_write_ops_->destroy(self->cached_awaitable_);
506 126 : self->active_write_ops_ = nullptr;
507 126 : }
508 126 : } g{self_};
509 126 : return self_->active_write_ops_->await_resume(
510 224 : self_->cached_awaitable_);
511 126 : }
512 : };
513 126 : return awaitable{this, buffers, eof};
514 : }
515 :
516 : inline auto
517 24 : any_write_sink::write_eof()
518 : {
519 : struct awaitable
520 : {
521 : any_write_sink* self_;
522 :
523 : bool
524 24 : await_ready() const noexcept
525 : {
526 24 : return false;
527 : }
528 :
529 : coro
530 24 : await_suspend(coro h, executor_ref ex, std::stop_token token)
531 : {
532 : // Construct the underlying awaitable into cached storage
533 48 : self_->active_eof_ops_ = self_->vt_->construct_eof_awaitable(
534 24 : self_->sink_,
535 24 : self_->cached_awaitable_);
536 :
537 : // Check if underlying is immediately ready
538 24 : if(self_->active_eof_ops_->await_ready(self_->cached_awaitable_))
539 24 : return h;
540 :
541 : // Forward to underlying awaitable
542 0 : return self_->active_eof_ops_->await_suspend(
543 0 : self_->cached_awaitable_, h, ex, token);
544 : }
545 :
546 : io_result<>
547 24 : await_resume()
548 : {
549 : struct guard {
550 : any_write_sink* self;
551 24 : ~guard() {
552 24 : self->active_eof_ops_->destroy(self->cached_awaitable_);
553 24 : self->active_eof_ops_ = nullptr;
554 24 : }
555 24 : } g{self_};
556 24 : return self_->active_eof_ops_->await_resume(
557 41 : self_->cached_awaitable_);
558 24 : }
559 : };
560 24 : return awaitable{this};
561 : }
562 :
563 : template<ConstBufferSequence CB>
564 : task<io_result<std::size_t>>
565 66 : any_write_sink::write(CB buffers)
566 : {
567 66 : return write(buffers, false);
568 : }
569 :
570 : template<ConstBufferSequence CB>
571 : task<io_result<std::size_t>>
572 98 : any_write_sink::write(CB buffers, bool eof)
573 : {
574 : buffer_param<CB> bp(buffers);
575 : std::size_t total = 0;
576 :
577 : for(;;)
578 : {
579 : auto bufs = bp.data();
580 : if(bufs.empty())
581 : break;
582 :
583 : auto [ec, n] = co_await write_some_(bufs, false);
584 : if(ec)
585 : co_return {ec, total + n};
586 : bp.consume(n);
587 : total += n;
588 : }
589 :
590 : if(eof)
591 : {
592 : auto [ec] = co_await write_eof();
593 : if(ec)
594 : co_return {ec, total};
595 : }
596 :
597 : co_return {{}, total};
598 196 : }
599 :
600 : } // namespace capy
601 : } // namespace boost
602 :
603 : #endif
|