libs/capy/include/boost/capy/io/any_buffer_source.hpp

83.5% Lines (66/79) 89.5% Functions (17/19) 57.1% Branches (8/14)
libs/capy/include/boost/capy/io/any_buffer_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_copy.hpp>
17 #include <boost/capy/buffers/slice.hpp>
18 #include <boost/capy/concept/buffer_source.hpp>
19 #include <boost/capy/concept/io_awaitable.hpp>
20 #include <boost/capy/concept/read_source.hpp>
21 #include <boost/capy/coro.hpp>
22 #include <boost/capy/error.hpp>
23 #include <boost/capy/ex/executor_ref.hpp>
24 #include <boost/capy/io_result.hpp>
25 #include <boost/capy/task.hpp>
26
27 #include <system_error>
28
29 #include <concepts>
30 #include <coroutine>
31 #include <cstddef>
32 #include <exception>
33 #include <new>
34 #include <span>
35 #include <stop_token>
36 #include <utility>
37
38 namespace boost {
39 namespace capy {
40
41 /** Type-erased wrapper for any BufferSource.
42
43 This class provides type erasure for any type satisfying the
44 @ref BufferSource concept, enabling runtime polymorphism for
45 buffer pull operations. The wrapper also satisfies @ref ReadSource,
46 allowing it to be used with code expecting either interface.
47 It uses cached awaitable storage to achieve zero steady-state
48 allocation after construction.
49
50 The wrapper also satisfies @ref ReadSource through the templated
51 @ref read method. This method copies data from the source's
52 internal buffers into the caller's buffers, incurring one extra
53 buffer copy compared to using @ref pull and @ref consume directly.
54
55 The wrapper supports two construction modes:
56 - **Owning**: Pass by value to transfer ownership. The wrapper
57 allocates storage and owns the source.
58 - **Reference**: Pass a pointer to wrap without ownership. The
59 pointed-to source must outlive this wrapper.
60
61 @par Awaitable Preallocation
62 The constructor preallocates storage for the type-erased awaitable.
63 This reserves all virtual address space at server startup
64 so memory usage can be measured up front, rather than
65 allocating piecemeal as traffic arrives.
66
67 @par Thread Safety
68 Not thread-safe. Concurrent operations on the same wrapper
69 are undefined behavior.
70
71 @par Example
72 @code
73 // Owning - takes ownership of the source
74 any_buffer_source abs(some_buffer_source{args...});
75
76 // Reference - wraps without ownership
77 some_buffer_source src;
78 any_buffer_source abs(&src);
79
80 const_buffer arr[16];
81 auto [ec, count] = co_await abs.pull(arr, 16);
82 @endcode
83
84 @see any_write_sink, BufferSource, ReadSource
85 */
86 class any_buffer_source
87 {
88 struct vtable;
89 struct awaitable_ops;
90
91 template<BufferSource S>
92 struct vtable_for_impl;
93
94 void* source_ = nullptr;
95 vtable const* vt_ = nullptr;
96 void* cached_awaitable_ = nullptr;
97 void* storage_ = nullptr;
98 awaitable_ops const* active_ops_ = nullptr;
99
100 public:
101 /** Destructor.
102
103 Destroys the owned source (if any) and releases the cached
104 awaitable storage.
105 */
106 ~any_buffer_source();
107
108 /** Default constructor.
109
110 Constructs an empty wrapper. Operations on a default-constructed
111 wrapper result in undefined behavior.
112 */
113 any_buffer_source() = default;
114
115 /** Non-copyable.
116
117 The awaitable cache is per-instance and cannot be shared.
118 */
119 any_buffer_source(any_buffer_source const&) = delete;
120 any_buffer_source& operator=(any_buffer_source const&) = delete;
121
122 /** Move constructor.
123
124 Transfers ownership of the wrapped source (if owned) and
125 cached awaitable storage from `other`. After the move, `other` is
126 in a default-constructed state.
127
128 @param other The wrapper to move from.
129 */
130 1 any_buffer_source(any_buffer_source&& other) noexcept
131 1 : source_(std::exchange(other.source_, nullptr))
132 1 , vt_(std::exchange(other.vt_, nullptr))
133 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
134 1 , storage_(std::exchange(other.storage_, nullptr))
135 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
136 {
137 1 }
138
139 /** Move assignment operator.
140
141 Destroys any owned source and releases existing resources,
142 then transfers ownership from `other`.
143
144 @param other The wrapper to move from.
145 @return Reference to this wrapper.
146 */
147 any_buffer_source&
148 operator=(any_buffer_source&& other) noexcept;
149
150 /** Construct by taking ownership of a BufferSource.
151
152 Allocates storage and moves the source into this wrapper.
153 The wrapper owns the source and will destroy it.
154
155 @param s The source to take ownership of.
156 */
157 template<BufferSource S>
158 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
159 any_buffer_source(S s);
160
161 /** Construct by wrapping a BufferSource without ownership.
162
163 Wraps the given source by pointer. The source must remain
164 valid for the lifetime of this wrapper.
165
166 @param s Pointer to the source to wrap.
167 */
168 template<BufferSource S>
169 any_buffer_source(S* s);
170
171 /** Check if the wrapper contains a valid source.
172
173 @return `true` if wrapping a source, `false` if default-constructed
174 or moved-from.
175 */
176 bool
177 9 has_value() const noexcept
178 {
179 9 return source_ != nullptr;
180 }
181
182 /** Check if the wrapper contains a valid source.
183
184 @return `true` if wrapping a source, `false` if default-constructed
185 or moved-from.
186 */
187 explicit
188 2 operator bool() const noexcept
189 {
190 2 return has_value();
191 }
192
193 /** Consume bytes from the source.
194
195 Advances the internal read position of the underlying source
196 by the specified number of bytes. The next call to @ref pull
197 returns data starting after the consumed bytes.
198
199 @param n The number of bytes to consume. Must not exceed the
200 total size of buffers returned by the previous @ref pull.
201
202 @par Preconditions
203 The wrapper must contain a valid source (`has_value() == true`).
204 */
205 void
206 consume(std::size_t n) noexcept;
207
208 /** Pull buffer data from the source.
209
210 Fills the provided array with buffer descriptors from the
211 underlying source. The operation completes when data is
212 available, the source is exhausted, or an error occurs.
213
214 @param arr Pointer to array of const_buffer to fill.
215 @param max_count Maximum number of buffers to fill.
216
217 @return An awaitable yielding `(error_code,std::size_t)`.
218 On success with data, `count > 0` indicates buffers filled.
219 On success with `count == 0`, source is exhausted.
220
221 @par Preconditions
222 The wrapper must contain a valid source (`has_value() == true`).
223 */
224 auto
225 pull(const_buffer* arr, std::size_t max_count);
226
227 /** Read data into a mutable buffer sequence.
228
229 Fills the provided buffer sequence by pulling data from the
230 underlying source and copying it into the caller's buffers.
231 This satisfies @ref ReadSource but incurs a copy; for zero-copy
232 access, use @ref pull and @ref consume instead.
233
234 @note This operation copies data from the source's internal
235 buffers into the caller's buffers. For zero-copy reads,
236 use @ref pull and @ref consume directly.
237
238 @param buffers The buffer sequence to fill.
239
240 @return An awaitable yielding `(error_code,std::size_t)`.
241 On success, `n == buffer_size(buffers)`.
242 On EOF, `ec == error::eof` and `n` is bytes transferred.
243
244 @par Preconditions
245 The wrapper must contain a valid source (`has_value() == true`).
246
247 @see pull, consume
248 */
249 template<MutableBufferSequence MB>
250 task<io_result<std::size_t>>
251 read(MB buffers);
252
253 protected:
254 /** Rebind to a new source after move.
255
256 Updates the internal pointer to reference a new source object.
257 Used by owning wrappers after move assignment when the owned
258 object has moved to a new location.
259
260 @param new_source The new source to bind to. Must be the same
261 type as the original source.
262
263 @note Terminates if called with a source of different type
264 than the original.
265 */
266 template<BufferSource S>
267 void
268 rebind(S& new_source) noexcept
269 {
270 if(vt_ != &vtable_for_impl<S>::value)
271 std::terminate();
272 source_ = &new_source;
273 }
274 };
275
276 //----------------------------------------------------------
277
278 struct any_buffer_source::awaitable_ops
279 {
280 bool (*await_ready)(void*);
281 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
282 io_result<std::size_t> (*await_resume)(void*);
283 void (*destroy)(void*) noexcept;
284 };
285
286 struct any_buffer_source::vtable
287 {
288 void (*destroy)(void*) noexcept;
289 void (*do_consume)(void* source, std::size_t n) noexcept;
290 std::size_t awaitable_size;
291 std::size_t awaitable_align;
292 awaitable_ops const* (*construct_awaitable)(
293 void* source,
294 void* storage,
295 const_buffer* arr,
296 std::size_t max_count);
297 };
298
299 template<BufferSource S>
300 struct any_buffer_source::vtable_for_impl
301 {
302 using Awaitable = decltype(std::declval<S&>().pull(
303 std::declval<const_buffer*>(),
304 std::declval<std::size_t>()));
305
306 static void
307 do_destroy_impl(void* source) noexcept
308 {
309 static_cast<S*>(source)->~S();
310 }
311
312 static void
313 39 do_consume_impl(void* source, std::size_t n) noexcept
314 {
315 39 static_cast<S*>(source)->consume(n);
316 39 }
317
318 static awaitable_ops const*
319 92 construct_awaitable_impl(
320 void* source,
321 void* storage,
322 const_buffer* arr,
323 std::size_t max_count)
324 {
325 92 auto& s = *static_cast<S*>(source);
326 92 ::new(storage) Awaitable(s.pull(arr, max_count));
327
328 static constexpr awaitable_ops ops = {
329 92 +[](void* p) {
330 92 return static_cast<Awaitable*>(p)->await_ready();
331 },
332 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
333 return detail::call_await_suspend(
334 static_cast<Awaitable*>(p), h, ex, token);
335 },
336 92 +[](void* p) {
337 92 return static_cast<Awaitable*>(p)->await_resume();
338 },
339 92 +[](void* p) noexcept {
340 92 static_cast<Awaitable*>(p)->~Awaitable();
341 }
342 };
343 92 return &ops;
344 }
345
346 static constexpr vtable value = {
347 &do_destroy_impl,
348 &do_consume_impl,
349 sizeof(Awaitable),
350 alignof(Awaitable),
351 &construct_awaitable_impl
352 };
353 };
354
355 //----------------------------------------------------------
356
357 inline
358 59 any_buffer_source::~any_buffer_source()
359 {
360
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 59 times.
59 if(storage_)
361 {
362 vt_->destroy(source_);
363 ::operator delete(storage_);
364 }
365
2/2
✓ Branch 0 taken 56 times.
✓ Branch 1 taken 3 times.
59 if(cached_awaitable_)
366 56 ::operator delete(cached_awaitable_);
367 59 }
368
369 inline any_buffer_source&
370 1 any_buffer_source::operator=(any_buffer_source&& other) noexcept
371 {
372
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
373 {
374
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
375 {
376 vt_->destroy(source_);
377 ::operator delete(storage_);
378 }
379
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
380 ::operator delete(cached_awaitable_);
381 1 source_ = std::exchange(other.source_, nullptr);
382 1 vt_ = std::exchange(other.vt_, nullptr);
383 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
384 1 storage_ = std::exchange(other.storage_, nullptr);
385 1 active_ops_ = std::exchange(other.active_ops_, nullptr);
386 }
387 1 return *this;
388 }
389
390 template<BufferSource S>
391 requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
392 any_buffer_source::any_buffer_source(S s)
393 : vt_(&vtable_for_impl<S>::value)
394 {
395 struct guard {
396 any_buffer_source* self;
397 bool committed = false;
398 ~guard() {
399 if(!committed && self->storage_) {
400 self->vt_->destroy(self->source_);
401 ::operator delete(self->storage_);
402 self->storage_ = nullptr;
403 self->source_ = nullptr;
404 }
405 }
406 } g{this};
407
408 storage_ = ::operator new(sizeof(S));
409 source_ = ::new(storage_) S(std::move(s));
410
411 // Preallocate the awaitable storage
412 cached_awaitable_ = ::operator new(vt_->awaitable_size);
413
414 g.committed = true;
415 }
416
417 template<BufferSource S>
418 56 any_buffer_source::any_buffer_source(S* s)
419 56 : source_(s)
420 56 , vt_(&vtable_for_impl<S>::value)
421 {
422 // Preallocate the awaitable storage
423 56 cached_awaitable_ = ::operator new(vt_->awaitable_size);
424 56 }
425
426 //----------------------------------------------------------
427
428 inline void
429 39 any_buffer_source::consume(std::size_t n) noexcept
430 {
431 39 vt_->do_consume(source_, n);
432 39 }
433
434 inline auto
435 92 any_buffer_source::pull(
436 const_buffer* arr,
437 std::size_t max_count)
438 {
439 struct awaitable
440 {
441 any_buffer_source* self_;
442 const_buffer* arr_;
443 std::size_t max_count_;
444
445 bool
446 92 await_ready() const noexcept
447 {
448 92 return false;
449 }
450
451 coro
452 92 await_suspend(coro h, executor_ref ex, std::stop_token token)
453 {
454 // Construct the underlying awaitable into cached storage
455 184 self_->active_ops_ = self_->vt_->construct_awaitable(
456 92 self_->source_,
457 92 self_->cached_awaitable_,
458 arr_,
459 max_count_);
460
461 // Check if underlying is immediately ready
462
1/2
✓ Branch 1 taken 92 times.
✗ Branch 2 not taken.
92 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
463 92 return h;
464
465 // Forward to underlying awaitable
466 return self_->active_ops_->await_suspend(
467 self_->cached_awaitable_, h, ex, token);
468 }
469
470 io_result<std::size_t>
471 92 await_resume()
472 {
473 struct guard {
474 any_buffer_source* self;
475 92 ~guard() {
476 92 self->active_ops_->destroy(self->cached_awaitable_);
477 92 self->active_ops_ = nullptr;
478 92 }
479 92 } g{self_};
480 92 return self_->active_ops_->await_resume(
481
1/1
✓ Branch 1 taken 73 times.
165 self_->cached_awaitable_);
482 92 }
483 };
484 92 return awaitable{this, arr, max_count};
485 }
486
487 template<MutableBufferSequence MB>
488 task<io_result<std::size_t>>
489 any_buffer_source::read(MB buffers)
490 {
491 std::size_t total = 0;
492 auto dest = sans_prefix(buffers, 0);
493
494 while(!buffer_empty(dest))
495 {
496 const_buffer arr[detail::max_iovec_];
497 auto [ec, count] = co_await pull(arr, detail::max_iovec_);
498
499 if(ec)
500 co_return {ec, total};
501
502 if(count == 0)
503 co_return {error::eof, total};
504
505 auto n = buffer_copy(dest, std::span(arr, count));
506 consume(n);
507 total += n;
508 dest = sans_prefix(dest, n);
509 }
510
511 co_return {{}, total};
512 }
513
514 //----------------------------------------------------------
515
516 static_assert(BufferSource<any_buffer_source>);
517 static_assert(ReadSource<any_buffer_source>);
518
519 } // namespace capy
520 } // namespace boost
521
522 #endif
523