libs/capy/include/boost/capy/io/any_read_source.hpp

82.7% Lines (62/75) 89.5% Functions (17/19) 60.0% Branches (9/15)
libs/capy/include/boost/capy/io/any_read_source.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_source.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22 #include <boost/capy/task.hpp>
23
24 #include <system_error>
25
26 #include <concepts>
27 #include <coroutine>
28 #include <cstddef>
29 #include <new>
30 #include <span>
31 #include <stop_token>
32 #include <utility>
33
34 namespace boost {
35 namespace capy {
36
37 /** Type-erased wrapper for any ReadSource.
38
39 This class provides type erasure for any type satisfying the
40 @ref ReadSource concept, enabling runtime polymorphism for
41 source read operations. It uses cached awaitable storage to achieve
42 zero steady-state allocation after construction.
43
44 The wrapper supports two construction modes:
45 - **Owning**: Pass by value to transfer ownership. The wrapper
46 allocates storage and owns the source.
47 - **Reference**: Pass a pointer to wrap without ownership. The
48 pointed-to source must outlive this wrapper.
49
50 @par Awaitable Preallocation
51 The constructor preallocates storage for the type-erased awaitable.
52 This reserves all virtual address space at server startup
53 so memory usage can be measured up front, rather than
54 allocating piecemeal as traffic arrives.
55
56 @par Thread Safety
57 Not thread-safe. Concurrent operations on the same wrapper
58 are undefined behavior.
59
60 @par Example
61 @code
62 // Owning - takes ownership of the source
63 any_read_source rs(some_source{args...});
64
65 // Reference - wraps without ownership
66 some_source source;
67 any_read_source rs(&source);
68
69 mutable_buffer buf(data, size);
70 auto [ec, n] = co_await rs.read(std::span(&buf, 1));
71 @endcode
72
73 @see any_read_stream, ReadSource
74 */
75 class any_read_source
76 {
77 struct vtable;
78 struct awaitable_ops;
79
80 template<ReadSource S>
81 struct vtable_for_impl;
82
83 void* source_ = nullptr;
84 vtable const* vt_ = nullptr;
85 void* cached_awaitable_ = nullptr;
86 void* storage_ = nullptr;
87 awaitable_ops const* active_ops_ = nullptr;
88
89 public:
90 /** Destructor.
91
92 Destroys the owned source (if any) and releases the cached
93 awaitable storage.
94 */
95 ~any_read_source();
96
97 /** Default constructor.
98
99 Constructs an empty wrapper. Operations on a default-constructed
100 wrapper result in undefined behavior.
101 */
102 any_read_source() = default;
103
104 /** Non-copyable.
105
106 The awaitable cache is per-instance and cannot be shared.
107 */
108 any_read_source(any_read_source const&) = delete;
109 any_read_source& operator=(any_read_source const&) = delete;
110
111 /** Move constructor.
112
113 Transfers ownership of the wrapped source (if owned) and
114 cached awaitable storage from `other`. After the move, `other` is
115 in a default-constructed state.
116
117 @param other The wrapper to move from.
118 */
119 1 any_read_source(any_read_source&& other) noexcept
120 1 : source_(std::exchange(other.source_, nullptr))
121 1 , vt_(std::exchange(other.vt_, nullptr))
122 1 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
123 1 , storage_(std::exchange(other.storage_, nullptr))
124 1 , active_ops_(std::exchange(other.active_ops_, nullptr))
125 {
126 1 }
127
128 /** Move assignment operator.
129
130 Destroys any owned source and releases existing resources,
131 then transfers ownership from `other`.
132
133 @param other The wrapper to move from.
134 @return Reference to this wrapper.
135 */
136 any_read_source&
137 operator=(any_read_source&& other) noexcept;
138
139 /** Construct by taking ownership of a ReadSource.
140
141 Allocates storage and moves the source into this wrapper.
142 The wrapper owns the source and will destroy it.
143
144 @param s The source to take ownership of.
145 */
146 template<ReadSource S>
147 requires (!std::same_as<std::decay_t<S>, any_read_source>)
148 any_read_source(S s);
149
150 /** Construct by wrapping a ReadSource without ownership.
151
152 Wraps the given source by pointer. The source must remain
153 valid for the lifetime of this wrapper.
154
155 @param s Pointer to the source to wrap.
156 */
157 template<ReadSource S>
158 any_read_source(S* s);
159
160 /** Check if the wrapper contains a valid source.
161
162 @return `true` if wrapping a source, `false` if default-constructed
163 or moved-from.
164 */
165 bool
166 9 has_value() const noexcept
167 {
168 9 return source_ != nullptr;
169 }
170
171 /** Check if the wrapper contains a valid source.
172
173 @return `true` if wrapping a source, `false` if default-constructed
174 or moved-from.
175 */
176 explicit
177 2 operator bool() const noexcept
178 {
179 2 return has_value();
180 }
181
182 /** Initiate an asynchronous read operation.
183
184 Reads data into the provided buffer sequence. The operation
185 completes when the entire buffer sequence is filled, end-of-file
186 is reached, or an error occurs.
187
188 @param buffers The buffer sequence to read into. Passed by
189 value to ensure the sequence lives in the coroutine frame
190 across suspension points.
191
192 @return An awaitable yielding `(error_code,std::size_t)`.
193
194 @par Postconditions
195 Exactly one of the following is true on return:
196 @li **Success**: `!ec` and `n == buffer_size(buffers)`.
197 The entire buffer was filled.
198 @li **End-of-stream or Error**: `ec` and `n` indicates
199 the number of bytes transferred before the failure.
200
201 @par Preconditions
202 The wrapper must contain a valid source (`has_value() == true`).
203 */
204 template<MutableBufferSequence MB>
205 task<io_result<std::size_t>>
206 read(MB buffers);
207
208 protected:
209 /** Rebind to a new source after move.
210
211 Updates the internal pointer to reference a new source object.
212 Used by owning wrappers after move assignment when the owned
213 object has moved to a new location.
214
215 @param new_source The new source to bind to. Must be the same
216 type as the original source.
217
218 @note Terminates if called with a source of different type
219 than the original.
220 */
221 template<ReadSource S>
222 void
223 rebind(S& new_source) noexcept
224 {
225 if(vt_ != &vtable_for_impl<S>::value)
226 std::terminate();
227 source_ = &new_source;
228 }
229
230 private:
231 auto
232 read_some_(std::span<mutable_buffer const> buffers);
233 };
234
235 //----------------------------------------------------------
236
237 struct any_read_source::awaitable_ops
238 {
239 bool (*await_ready)(void*);
240 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
241 io_result<std::size_t> (*await_resume)(void*);
242 void (*destroy)(void*) noexcept;
243 };
244
245 struct any_read_source::vtable
246 {
247 void (*destroy)(void*) noexcept;
248 std::size_t awaitable_size;
249 std::size_t awaitable_align;
250 awaitable_ops const* (*construct_awaitable)(
251 void* source,
252 void* storage,
253 std::span<mutable_buffer const> buffers);
254 };
255
256 template<ReadSource S>
257 struct any_read_source::vtable_for_impl
258 {
259 using Awaitable = decltype(std::declval<S&>().read(
260 std::span<mutable_buffer const>{}));
261
262 static void
263 do_destroy_impl(void* source) noexcept
264 {
265 static_cast<S*>(source)->~S();
266 }
267
268 static awaitable_ops const*
269 130 construct_awaitable_impl(
270 void* source,
271 void* storage,
272 std::span<mutable_buffer const> buffers)
273 {
274 130 auto& s = *static_cast<S*>(source);
275 130 ::new(storage) Awaitable(s.read(buffers));
276
277 static constexpr awaitable_ops ops = {
278 130 +[](void* p) {
279 130 return static_cast<Awaitable*>(p)->await_ready();
280 },
281 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
282 return detail::call_await_suspend(
283 static_cast<Awaitable*>(p), h, ex, token);
284 },
285 130 +[](void* p) {
286 130 return static_cast<Awaitable*>(p)->await_resume();
287 },
288 130 +[](void* p) noexcept {
289 130 static_cast<Awaitable*>(p)->~Awaitable();
290 }
291 };
292 130 return &ops;
293 }
294
295 static constexpr vtable value = {
296 &do_destroy_impl,
297 sizeof(Awaitable),
298 alignof(Awaitable),
299 &construct_awaitable_impl
300 };
301 };
302
303 //----------------------------------------------------------
304
305 inline
306 91 any_read_source::~any_read_source()
307 {
308
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 91 times.
91 if(storage_)
309 {
310 vt_->destroy(source_);
311 ::operator delete(storage_);
312 }
313
2/2
✓ Branch 0 taken 88 times.
✓ Branch 1 taken 3 times.
91 if(cached_awaitable_)
314 88 ::operator delete(cached_awaitable_);
315 91 }
316
317 inline any_read_source&
318 1 any_read_source::operator=(any_read_source&& other) noexcept
319 {
320
1/2
✓ Branch 0 taken 1 time.
✗ Branch 1 not taken.
1 if(this != &other)
321 {
322
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(storage_)
323 {
324 vt_->destroy(source_);
325 ::operator delete(storage_);
326 }
327
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 1 time.
1 if(cached_awaitable_)
328 ::operator delete(cached_awaitable_);
329 1 source_ = std::exchange(other.source_, nullptr);
330 1 vt_ = std::exchange(other.vt_, nullptr);
331 1 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
332 1 storage_ = std::exchange(other.storage_, nullptr);
333 1 active_ops_ = std::exchange(other.active_ops_, nullptr);
334 }
335 1 return *this;
336 }
337
338 template<ReadSource S>
339 requires (!std::same_as<std::decay_t<S>, any_read_source>)
340 any_read_source::any_read_source(S s)
341 : vt_(&vtable_for_impl<S>::value)
342 {
343 struct guard {
344 any_read_source* self;
345 bool committed = false;
346 ~guard() {
347 if(!committed && self->storage_) {
348 self->vt_->destroy(self->source_);
349 ::operator delete(self->storage_);
350 self->storage_ = nullptr;
351 self->source_ = nullptr;
352 }
353 }
354 } g{this};
355
356 storage_ = ::operator new(sizeof(S));
357 source_ = ::new(storage_) S(std::move(s));
358
359 // Preallocate the awaitable storage
360 cached_awaitable_ = ::operator new(vt_->awaitable_size);
361
362 g.committed = true;
363 }
364
365 template<ReadSource S>
366 88 any_read_source::any_read_source(S* s)
367 88 : source_(s)
368 88 , vt_(&vtable_for_impl<S>::value)
369 {
370 // Preallocate the awaitable storage
371 88 cached_awaitable_ = ::operator new(vt_->awaitable_size);
372 88 }
373
374 //----------------------------------------------------------
375
376 inline auto
377 130 any_read_source::read_some_(std::span<mutable_buffer const> buffers)
378 {
379 struct awaitable
380 {
381 any_read_source* self_;
382 std::span<mutable_buffer const> buffers_;
383
384 bool
385 130 await_ready() const noexcept
386 {
387 130 return false;
388 }
389
390 coro
391 130 await_suspend(coro h, executor_ref ex, std::stop_token token)
392 {
393 // Construct the underlying awaitable into cached storage
394 260 self_->active_ops_ = self_->vt_->construct_awaitable(
395 130 self_->source_,
396 130 self_->cached_awaitable_,
397 buffers_);
398
399 // Check if underlying is immediately ready
400
1/2
✓ Branch 1 taken 130 times.
✗ Branch 2 not taken.
130 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
401 130 return h;
402
403 // Forward to underlying awaitable
404 return self_->active_ops_->await_suspend(
405 self_->cached_awaitable_, h, ex, token);
406 }
407
408 io_result<std::size_t>
409 130 await_resume()
410 {
411 struct guard {
412 any_read_source* self;
413 130 ~guard() {
414 130 self->active_ops_->destroy(self->cached_awaitable_);
415 130 self->active_ops_ = nullptr;
416 130 }
417 130 } g{self_};
418 130 return self_->active_ops_->await_resume(
419
1/1
✓ Branch 1 taken 99 times.
229 self_->cached_awaitable_);
420 130 }
421 };
422 130 return awaitable{this, buffers};
423 }
424
425 template<MutableBufferSequence MB>
426 task<io_result<std::size_t>>
427
1/1
✓ Branch 1 taken 106 times.
106 any_read_source::read(MB buffers)
428 {
429 buffer_param<MB> bp(std::move(buffers));
430 std::size_t total = 0;
431
432 for(;;)
433 {
434 auto bufs = bp.data();
435 if(bufs.empty())
436 break;
437
438 auto [ec, n] = co_await read_some_(bufs);
439 total += n;
440 if(ec)
441 co_return {ec, total};
442 bp.consume(n);
443 }
444
445 co_return {{}, total};
446 212 }
447
448 } // namespace capy
449 } // namespace boost
450
451 #endif
452