Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_READ_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_param.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/read_source.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 : #include <boost/capy/task.hpp>
23 :
24 : #include <system_error>
25 :
26 : #include <concepts>
27 : #include <coroutine>
28 : #include <cstddef>
29 : #include <new>
30 : #include <span>
31 : #include <stop_token>
32 : #include <utility>
33 :
34 : namespace boost {
35 : namespace capy {
36 :
37 : /** Type-erased wrapper for any ReadSource.
38 :
39 : This class provides type erasure for any type satisfying the
40 : @ref ReadSource concept, enabling runtime polymorphism for
41 : source read operations. It uses cached awaitable storage to achieve
42 : zero steady-state allocation after construction.
43 :
44 : The wrapper supports two construction modes:
45 : - **Owning**: Pass by value to transfer ownership. The wrapper
46 : allocates storage and owns the source.
47 : - **Reference**: Pass a pointer to wrap without ownership. The
48 : pointed-to source must outlive this wrapper.
49 :
50 : @par Awaitable Preallocation
51 : The constructor preallocates storage for the type-erased awaitable.
52 : This reserves all virtual address space at server startup
53 : so memory usage can be measured up front, rather than
54 : allocating piecemeal as traffic arrives.
55 :
56 : @par Thread Safety
57 : Not thread-safe. Concurrent operations on the same wrapper
58 : are undefined behavior.
59 :
60 : @par Example
61 : @code
62 : // Owning - takes ownership of the source
63 : any_read_source rs(some_source{args...});
64 :
65 : // Reference - wraps without ownership
66 : some_source source;
67 : any_read_source rs(&source);
68 :
69 : mutable_buffer buf(data, size);
70 : auto [ec, n] = co_await rs.read(std::span(&buf, 1));
71 : @endcode
72 :
73 : @see any_read_stream, ReadSource
74 : */
75 : class any_read_source
76 : {
77 : struct vtable;
78 : struct awaitable_ops;
79 :
80 : template<ReadSource S>
81 : struct vtable_for_impl;
82 :
83 : void* source_ = nullptr;
84 : vtable const* vt_ = nullptr;
85 : void* cached_awaitable_ = nullptr;
86 : void* storage_ = nullptr;
87 : awaitable_ops const* active_ops_ = nullptr;
88 :
89 : public:
90 : /** Destructor.
91 :
92 : Destroys the owned source (if any) and releases the cached
93 : awaitable storage.
94 : */
95 : ~any_read_source();
96 :
97 : /** Default constructor.
98 :
99 : Constructs an empty wrapper. Operations on a default-constructed
100 : wrapper result in undefined behavior.
101 : */
102 : any_read_source() = default;
103 :
104 : /** Non-copyable.
105 :
106 : The awaitable cache is per-instance and cannot be shared.
107 : */
108 : any_read_source(any_read_source const&) = delete;
109 : any_read_source& operator=(any_read_source const&) = delete;
110 :
111 : /** Move constructor.
112 :
113 : Transfers ownership of the wrapped source (if owned) and
114 : cached awaitable storage from `other`. After the move, `other` is
115 : in a default-constructed state.
116 :
117 : @param other The wrapper to move from.
118 : */
119 1 : any_read_source(any_read_source&& other) noexcept
120 1 : : source_(std::exchange(other.source_, nullptr))
121 1 : , vt_(std::exchange(other.vt_, nullptr))
122 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
123 1 : , storage_(std::exchange(other.storage_, nullptr))
124 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
125 : {
126 1 : }
127 :
128 : /** Move assignment operator.
129 :
130 : Destroys any owned source and releases existing resources,
131 : then transfers ownership from `other`.
132 :
133 : @param other The wrapper to move from.
134 : @return Reference to this wrapper.
135 : */
136 : any_read_source&
137 : operator=(any_read_source&& other) noexcept;
138 :
139 : /** Construct by taking ownership of a ReadSource.
140 :
141 : Allocates storage and moves the source into this wrapper.
142 : The wrapper owns the source and will destroy it.
143 :
144 : @param s The source to take ownership of.
145 : */
146 : template<ReadSource S>
147 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
148 : any_read_source(S s);
149 :
150 : /** Construct by wrapping a ReadSource without ownership.
151 :
152 : Wraps the given source by pointer. The source must remain
153 : valid for the lifetime of this wrapper.
154 :
155 : @param s Pointer to the source to wrap.
156 : */
157 : template<ReadSource S>
158 : any_read_source(S* s);
159 :
160 : /** Check if the wrapper contains a valid source.
161 :
162 : @return `true` if wrapping a source, `false` if default-constructed
163 : or moved-from.
164 : */
165 : bool
166 9 : has_value() const noexcept
167 : {
168 9 : return source_ != nullptr;
169 : }
170 :
171 : /** Check if the wrapper contains a valid source.
172 :
173 : @return `true` if wrapping a source, `false` if default-constructed
174 : or moved-from.
175 : */
176 : explicit
177 2 : operator bool() const noexcept
178 : {
179 2 : return has_value();
180 : }
181 :
182 : /** Initiate an asynchronous read operation.
183 :
184 : Reads data into the provided buffer sequence. The operation
185 : completes when the entire buffer sequence is filled, end-of-file
186 : is reached, or an error occurs.
187 :
188 : @param buffers The buffer sequence to read into. Passed by
189 : value to ensure the sequence lives in the coroutine frame
190 : across suspension points.
191 :
192 : @return An awaitable yielding `(error_code,std::size_t)`.
193 :
194 : @par Postconditions
195 : Exactly one of the following is true on return:
196 : @li **Success**: `!ec` and `n == buffer_size(buffers)`.
197 : The entire buffer was filled.
198 : @li **End-of-stream or Error**: `ec` and `n` indicates
199 : the number of bytes transferred before the failure.
200 :
201 : @par Preconditions
202 : The wrapper must contain a valid source (`has_value() == true`).
203 : */
204 : template<MutableBufferSequence MB>
205 : task<io_result<std::size_t>>
206 : read(MB buffers);
207 :
208 : protected:
209 : /** Rebind to a new source after move.
210 :
211 : Updates the internal pointer to reference a new source object.
212 : Used by owning wrappers after move assignment when the owned
213 : object has moved to a new location.
214 :
215 : @param new_source The new source to bind to. Must be the same
216 : type as the original source.
217 :
218 : @note Terminates if called with a source of different type
219 : than the original.
220 : */
221 : template<ReadSource S>
222 : void
223 : rebind(S& new_source) noexcept
224 : {
225 : if(vt_ != &vtable_for_impl<S>::value)
226 : std::terminate();
227 : source_ = &new_source;
228 : }
229 :
230 : private:
231 : auto
232 : read_some_(std::span<mutable_buffer const> buffers);
233 : };
234 :
235 : //----------------------------------------------------------
236 :
237 : struct any_read_source::awaitable_ops
238 : {
239 : bool (*await_ready)(void*);
240 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
241 : io_result<std::size_t> (*await_resume)(void*);
242 : void (*destroy)(void*) noexcept;
243 : };
244 :
245 : struct any_read_source::vtable
246 : {
247 : void (*destroy)(void*) noexcept;
248 : std::size_t awaitable_size;
249 : std::size_t awaitable_align;
250 : awaitable_ops const* (*construct_awaitable)(
251 : void* source,
252 : void* storage,
253 : std::span<mutable_buffer const> buffers);
254 : };
255 :
256 : template<ReadSource S>
257 : struct any_read_source::vtable_for_impl
258 : {
259 : using Awaitable = decltype(std::declval<S&>().read(
260 : std::span<mutable_buffer const>{}));
261 :
262 : static void
263 0 : do_destroy_impl(void* source) noexcept
264 : {
265 0 : static_cast<S*>(source)->~S();
266 0 : }
267 :
268 : static awaitable_ops const*
269 130 : construct_awaitable_impl(
270 : void* source,
271 : void* storage,
272 : std::span<mutable_buffer const> buffers)
273 : {
274 130 : auto& s = *static_cast<S*>(source);
275 130 : ::new(storage) Awaitable(s.read(buffers));
276 :
277 : static constexpr awaitable_ops ops = {
278 130 : +[](void* p) {
279 130 : return static_cast<Awaitable*>(p)->await_ready();
280 : },
281 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
282 0 : return detail::call_await_suspend(
283 0 : static_cast<Awaitable*>(p), h, ex, token);
284 : },
285 130 : +[](void* p) {
286 130 : return static_cast<Awaitable*>(p)->await_resume();
287 : },
288 130 : +[](void* p) noexcept {
289 130 : static_cast<Awaitable*>(p)->~Awaitable();
290 : }
291 : };
292 130 : return &ops;
293 : }
294 :
295 : static constexpr vtable value = {
296 : &do_destroy_impl,
297 : sizeof(Awaitable),
298 : alignof(Awaitable),
299 : &construct_awaitable_impl
300 : };
301 : };
302 :
303 : //----------------------------------------------------------
304 :
305 : inline
306 91 : any_read_source::~any_read_source()
307 : {
308 91 : if(storage_)
309 : {
310 0 : vt_->destroy(source_);
311 0 : ::operator delete(storage_);
312 : }
313 91 : if(cached_awaitable_)
314 88 : ::operator delete(cached_awaitable_);
315 91 : }
316 :
317 : inline any_read_source&
318 1 : any_read_source::operator=(any_read_source&& other) noexcept
319 : {
320 1 : if(this != &other)
321 : {
322 1 : if(storage_)
323 : {
324 0 : vt_->destroy(source_);
325 0 : ::operator delete(storage_);
326 : }
327 1 : if(cached_awaitable_)
328 0 : ::operator delete(cached_awaitable_);
329 1 : source_ = std::exchange(other.source_, nullptr);
330 1 : vt_ = std::exchange(other.vt_, nullptr);
331 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
332 1 : storage_ = std::exchange(other.storage_, nullptr);
333 1 : active_ops_ = std::exchange(other.active_ops_, nullptr);
334 : }
335 1 : return *this;
336 : }
337 :
338 : template<ReadSource S>
339 : requires (!std::same_as<std::decay_t<S>, any_read_source>)
340 : any_read_source::any_read_source(S s)
341 : : vt_(&vtable_for_impl<S>::value)
342 : {
343 : struct guard {
344 : any_read_source* self;
345 : bool committed = false;
346 : ~guard() {
347 : if(!committed && self->storage_) {
348 : self->vt_->destroy(self->source_);
349 : ::operator delete(self->storage_);
350 : self->storage_ = nullptr;
351 : self->source_ = nullptr;
352 : }
353 : }
354 : } g{this};
355 :
356 : storage_ = ::operator new(sizeof(S));
357 : source_ = ::new(storage_) S(std::move(s));
358 :
359 : // Preallocate the awaitable storage
360 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
361 :
362 : g.committed = true;
363 : }
364 :
365 : template<ReadSource S>
366 88 : any_read_source::any_read_source(S* s)
367 88 : : source_(s)
368 88 : , vt_(&vtable_for_impl<S>::value)
369 : {
370 : // Preallocate the awaitable storage
371 88 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
372 88 : }
373 :
374 : //----------------------------------------------------------
375 :
376 : inline auto
377 130 : any_read_source::read_some_(std::span<mutable_buffer const> buffers)
378 : {
379 : struct awaitable
380 : {
381 : any_read_source* self_;
382 : std::span<mutable_buffer const> buffers_;
383 :
384 : bool
385 130 : await_ready() const noexcept
386 : {
387 130 : return false;
388 : }
389 :
390 : coro
391 130 : await_suspend(coro h, executor_ref ex, std::stop_token token)
392 : {
393 : // Construct the underlying awaitable into cached storage
394 260 : self_->active_ops_ = self_->vt_->construct_awaitable(
395 130 : self_->source_,
396 130 : self_->cached_awaitable_,
397 : buffers_);
398 :
399 : // Check if underlying is immediately ready
400 130 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
401 130 : return h;
402 :
403 : // Forward to underlying awaitable
404 0 : return self_->active_ops_->await_suspend(
405 0 : self_->cached_awaitable_, h, ex, token);
406 : }
407 :
408 : io_result<std::size_t>
409 130 : await_resume()
410 : {
411 : struct guard {
412 : any_read_source* self;
413 130 : ~guard() {
414 130 : self->active_ops_->destroy(self->cached_awaitable_);
415 130 : self->active_ops_ = nullptr;
416 130 : }
417 130 : } g{self_};
418 130 : return self_->active_ops_->await_resume(
419 229 : self_->cached_awaitable_);
420 130 : }
421 : };
422 130 : return awaitable{this, buffers};
423 : }
424 :
425 : template<MutableBufferSequence MB>
426 : task<io_result<std::size_t>>
427 106 : any_read_source::read(MB buffers)
428 : {
429 : buffer_param<MB> bp(std::move(buffers));
430 : std::size_t total = 0;
431 :
432 : for(;;)
433 : {
434 : auto bufs = bp.data();
435 : if(bufs.empty())
436 : break;
437 :
438 : auto [ec, n] = co_await read_some_(bufs);
439 : total += n;
440 : if(ec)
441 : co_return {ec, total};
442 : bp.consume(n);
443 : }
444 :
445 : co_return {{}, total};
446 212 : }
447 :
448 : } // namespace capy
449 : } // namespace boost
450 :
451 : #endif
|