libs/capy/include/boost/capy/io/any_read_stream.hpp

82.7% Lines (62/75) 84.0% Functions (21/25) 62.5% Branches (10/16)
libs/capy/include/boost/capy/io/any_read_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_READ_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_READ_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/read_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <system_error>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <new>
29 #include <span>
30 #include <stop_token>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any ReadStream.
37
38 This class provides type erasure for any type satisfying the
39 @ref ReadStream concept, enabling runtime polymorphism for
40 read operations. It uses cached awaitable storage to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the stream.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to stream must outlive this wrapper.
48
49 @par Awaitable Preallocation
50 The constructor preallocates storage for the type-erased awaitable.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Thread Safety
56 Not thread-safe. Concurrent operations on the same wrapper
57 are undefined behavior.
58
59 @par Example
60 @code
61 // Owning - takes ownership of the stream
62 any_read_stream stream(socket{ioc});
63
64 // Reference - wraps without ownership
65 socket sock(ioc);
66 any_read_stream stream(&sock);
67
68 mutable_buffer buf(data, size);
69 auto [ec, n] = co_await stream.read_some(std::span(&buf, 1));
70 @endcode
71
72 @see any_write_stream, any_stream, ReadStream
73 */
74 class any_read_stream
75 {
76 struct vtable;
77 struct awaitable_ops;
78
79 template<ReadStream S>
80 struct vtable_for_impl;
81
82 void* stream_ = nullptr;
83 vtable const* vt_ = nullptr;
84 void* cached_awaitable_ = nullptr;
85 void* storage_ = nullptr;
86 awaitable_ops const* active_ops_ = nullptr;
87
88 public:
89 /** Destructor.
90
91 Destroys the owned stream (if any) and releases the cached
92 awaitable storage.
93 */
94 ~any_read_stream();
95
96 /** Default constructor.
97
98 Constructs an empty wrapper. Operations on a default-constructed
99 wrapper result in undefined behavior.
100 */
101 1 any_read_stream() = default;
102
103 /** Non-copyable.
104
105 The awaitable cache is per-instance and cannot be shared.
106 */
107 any_read_stream(any_read_stream const&) = delete;
108 any_read_stream& operator=(any_read_stream const&) = delete;
109
110 /** Move constructor.
111
112 Transfers ownership of the wrapped stream (if owned) and
113 cached awaitable storage from `other`. After the move, `other` is
114 in a default-constructed state.
115
116 @param other The wrapper to move from.
117 */
118 2 any_read_stream(any_read_stream&& other) noexcept
119 2 : stream_(std::exchange(other.stream_, nullptr))
120 2 , vt_(std::exchange(other.vt_, nullptr))
121 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
122 2 , storage_(std::exchange(other.storage_, nullptr))
123 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
124 {
125 2 }
126
127 /** Move assignment operator.
128
129 Destroys any owned stream and releases existing resources,
130 then transfers ownership from `other`.
131
132 @param other The wrapper to move from.
133 @return Reference to this wrapper.
134 */
135 any_read_stream&
136 operator=(any_read_stream&& other) noexcept;
137
138 /** Construct by taking ownership of a ReadStream.
139
140 Allocates storage and moves the stream into this wrapper.
141 The wrapper owns the stream and will destroy it.
142
143 @param s The stream to take ownership of.
144 */
145 template<ReadStream S>
146 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
147 any_read_stream(S s);
148
149 /** Construct by wrapping a ReadStream without ownership.
150
151 Wraps the given stream by pointer. The stream must remain
152 valid for the lifetime of this wrapper.
153
154 @param s Pointer to the stream to wrap.
155 */
156 template<ReadStream S>
157 any_read_stream(S* s);
158
159 /** Check if the wrapper contains a valid stream.
160
161 @return `true` if wrapping a stream, `false` if default-constructed
162 or moved-from.
163 */
164 bool
165 19 has_value() const noexcept
166 {
167 19 return stream_ != nullptr;
168 }
169
170 /** Check if the wrapper contains a valid stream.
171
172 @return `true` if wrapping a stream, `false` if default-constructed
173 or moved-from.
174 */
175 explicit
176 2 operator bool() const noexcept
177 {
178 2 return has_value();
179 }
180
181 /** Initiate an asynchronous read operation.
182
183 Reads data into the provided buffer sequence. The operation
184 completes when at least one byte has been read, or an error
185 occurs.
186
187 @param buffers The buffer sequence to read into. Passed by
188 value to ensure the sequence lives in the coroutine frame
189 across suspension points.
190
191 @return An awaitable yielding `(error_code,std::size_t)`.
192
193 @par Preconditions
194 The wrapper must contain a valid stream (`has_value() == true`).
195 */
196 template<MutableBufferSequence MB>
197 auto
198 read_some(MB buffers);
199
200 protected:
201 /** Rebind to a new stream after move.
202
203 Updates the internal pointer to reference a new stream object.
204 Used by owning wrappers after move assignment when the owned
205 object has moved to a new location.
206
207 @param new_stream The new stream to bind to. Must be the same
208 type as the original stream.
209
210 @note Terminates if called with a stream of different type
211 than the original.
212 */
213 template<ReadStream S>
214 void
215 rebind(S& new_stream) noexcept
216 {
217 if(vt_ != &vtable_for_impl<S>::value)
218 std::terminate();
219 stream_ = &new_stream;
220 }
221 };
222
223 //----------------------------------------------------------
224
225 struct any_read_stream::awaitable_ops
226 {
227 bool (*await_ready)(void*);
228 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
229 io_result<std::size_t> (*await_resume)(void*);
230 void (*destroy)(void*) noexcept;
231 };
232
233 struct any_read_stream::vtable
234 {
235 void (*destroy)(void*) noexcept;
236 std::size_t awaitable_size;
237 std::size_t awaitable_align;
238 awaitable_ops const* (*construct_awaitable)(
239 void* stream,
240 void* storage,
241 std::span<mutable_buffer const> buffers);
242 };
243
244 template<ReadStream S>
245 struct any_read_stream::vtable_for_impl
246 {
247 using Awaitable = decltype(std::declval<S&>().read_some(
248 std::span<mutable_buffer const>{}));
249
250 static void
251 do_destroy_impl(void* stream) noexcept
252 {
253 static_cast<S*>(stream)->~S();
254 }
255
256 static awaitable_ops const*
257 70 construct_awaitable_impl(
258 void* stream,
259 void* storage,
260 std::span<mutable_buffer const> buffers)
261 {
262 70 auto& s = *static_cast<S*>(stream);
263 70 ::new(storage) Awaitable(s.read_some(buffers));
264
265 static constexpr awaitable_ops ops = {
266 70 +[](void* p) {
267 70 return static_cast<Awaitable*>(p)->await_ready();
268 },
269 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
270 return detail::call_await_suspend(
271 static_cast<Awaitable*>(p), h, ex, token);
272 },
273 70 +[](void* p) {
274 70 return static_cast<Awaitable*>(p)->await_resume();
275 },
276 70 +[](void* p) noexcept {
277 70 static_cast<Awaitable*>(p)->~Awaitable();
278 }
279 };
280 70 return &ops;
281 }
282
283 static constexpr vtable value = {
284 &do_destroy_impl,
285 sizeof(Awaitable),
286 alignof(Awaitable),
287 &construct_awaitable_impl
288 };
289 };
290
291 //----------------------------------------------------------
292
293 inline
294 78 any_read_stream::~any_read_stream()
295 {
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 78 times.
78 if(storage_)
297 {
298 vt_->destroy(stream_);
299 ::operator delete(storage_);
300 }
301
2/2
✓ Branch 0 taken 71 times.
✓ Branch 1 taken 7 times.
78 if(cached_awaitable_)
302 71 ::operator delete(cached_awaitable_);
303 78 }
304
305 inline any_read_stream&
306 3 any_read_stream::operator=(any_read_stream&& other) noexcept
307 {
308
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
309 {
310
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
311 {
312 vt_->destroy(stream_);
313 ::operator delete(storage_);
314 }
315
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_awaitable_)
316 ::operator delete(cached_awaitable_);
317 3 stream_ = std::exchange(other.stream_, nullptr);
318 3 vt_ = std::exchange(other.vt_, nullptr);
319 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
320 3 storage_ = std::exchange(other.storage_, nullptr);
321 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
322 }
323 3 return *this;
324 }
325
326 template<ReadStream S>
327 requires (!std::same_as<std::decay_t<S>, any_read_stream>)
328 any_read_stream::any_read_stream(S s)
329 : vt_(&vtable_for_impl<S>::value)
330 {
331 struct guard {
332 any_read_stream* self;
333 bool committed = false;
334 ~guard() {
335 if(!committed && self->storage_) {
336 self->vt_->destroy(self->stream_);
337 ::operator delete(self->storage_);
338 self->storage_ = nullptr;
339 self->stream_ = nullptr;
340 }
341 }
342 } g{this};
343
344 storage_ = ::operator new(sizeof(S));
345 stream_ = ::new(storage_) S(std::move(s));
346
347 // Preallocate the awaitable storage
348 cached_awaitable_ = ::operator new(vt_->awaitable_size);
349
350 g.committed = true;
351 }
352
353 template<ReadStream S>
354 71 any_read_stream::any_read_stream(S* s)
355 71 : stream_(s)
356 71 , vt_(&vtable_for_impl<S>::value)
357 {
358 // Preallocate the awaitable storage
359 71 cached_awaitable_ = ::operator new(vt_->awaitable_size);
360 71 }
361
362 //----------------------------------------------------------
363
364 template<MutableBufferSequence MB>
365 auto
366 70 any_read_stream::read_some(MB buffers)
367 {
368 struct awaitable
369 {
370 any_read_stream* self_;
371 buffer_param<MB> bp_;
372
373 bool
374 70 await_ready() const noexcept
375 {
376 70 return false;
377 }
378
379 coro
380 70 await_suspend(coro h, executor_ref ex, std::stop_token token)
381 {
382 // Construct the underlying awaitable into cached storage
383 70 self_->active_ops_ = self_->vt_->construct_awaitable(
384 70 self_->stream_,
385
1/1
✓ Branch 1 taken 14 times.
70 self_->cached_awaitable_,
386
1/1
✓ Branch 1 taken 14 times.
70 bp_.data());
387
388 // Check if underlying is immediately ready
389
1/2
✓ Branch 1 taken 14 times.
✗ Branch 2 not taken.
70 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
390 70 return h;
391
392 // Forward to underlying awaitable
393 return self_->active_ops_->await_suspend(
394 self_->cached_awaitable_, h, ex, token);
395 }
396
397 io_result<std::size_t>
398 70 await_resume()
399 {
400 struct guard {
401 any_read_stream* self;
402 70 ~guard() {
403 70 self->active_ops_->destroy(self->cached_awaitable_);
404 70 self->active_ops_ = nullptr;
405 70 }
406 70 } g{self_};
407 70 return self_->active_ops_->await_resume(
408
1/1
✓ Branch 1 taken 10 times.
120 self_->cached_awaitable_);
409 70 }
410 };
411 70 return awaitable{this, buffer_param<MB>(buffers)};
412 }
413
414 } // namespace capy
415 } // namespace boost
416
417 #endif
418