Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
11 : #define BOOST_CAPY_IO_ANY_BUFFER_SOURCE_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_copy.hpp>
17 : #include <boost/capy/buffers/slice.hpp>
18 : #include <boost/capy/concept/buffer_source.hpp>
19 : #include <boost/capy/concept/io_awaitable.hpp>
20 : #include <boost/capy/concept/read_source.hpp>
21 : #include <boost/capy/coro.hpp>
22 : #include <boost/capy/error.hpp>
23 : #include <boost/capy/ex/executor_ref.hpp>
24 : #include <boost/capy/io_result.hpp>
25 : #include <boost/capy/task.hpp>
26 :
27 : #include <system_error>
28 :
29 : #include <concepts>
30 : #include <coroutine>
31 : #include <cstddef>
32 : #include <exception>
33 : #include <new>
34 : #include <span>
35 : #include <stop_token>
36 : #include <utility>
37 :
38 : namespace boost {
39 : namespace capy {
40 :
41 : /** Type-erased wrapper for any BufferSource.
42 :
43 : This class provides type erasure for any type satisfying the
44 : @ref BufferSource concept, enabling runtime polymorphism for
45 : buffer pull operations. The wrapper also satisfies @ref ReadSource,
46 : allowing it to be used with code expecting either interface.
47 : It uses cached awaitable storage to achieve zero steady-state
48 : allocation after construction.
49 :
50 : The wrapper also satisfies @ref ReadSource through the templated
51 : @ref read method. This method copies data from the source's
52 : internal buffers into the caller's buffers, incurring one extra
53 : buffer copy compared to using @ref pull and @ref consume directly.
54 :
55 : The wrapper supports two construction modes:
56 : - **Owning**: Pass by value to transfer ownership. The wrapper
57 : allocates storage and owns the source.
58 : - **Reference**: Pass a pointer to wrap without ownership. The
59 : pointed-to source must outlive this wrapper.
60 :
61 : @par Awaitable Preallocation
62 : The constructor preallocates storage for the type-erased awaitable.
63 : This reserves all virtual address space at server startup
64 : so memory usage can be measured up front, rather than
65 : allocating piecemeal as traffic arrives.
66 :
67 : @par Thread Safety
68 : Not thread-safe. Concurrent operations on the same wrapper
69 : are undefined behavior.
70 :
71 : @par Example
72 : @code
73 : // Owning - takes ownership of the source
74 : any_buffer_source abs(some_buffer_source{args...});
75 :
76 : // Reference - wraps without ownership
77 : some_buffer_source src;
78 : any_buffer_source abs(&src);
79 :
80 : const_buffer arr[16];
81 : auto [ec, count] = co_await abs.pull(arr, 16);
82 : @endcode
83 :
84 : @see any_write_sink, BufferSource, ReadSource
85 : */
86 : class any_buffer_source
87 : {
88 : struct vtable;
89 : struct awaitable_ops;
90 :
91 : template<BufferSource S>
92 : struct vtable_for_impl;
93 :
94 : void* source_ = nullptr;
95 : vtable const* vt_ = nullptr;
96 : void* cached_awaitable_ = nullptr;
97 : void* storage_ = nullptr;
98 : awaitable_ops const* active_ops_ = nullptr;
99 :
100 : public:
101 : /** Destructor.
102 :
103 : Destroys the owned source (if any) and releases the cached
104 : awaitable storage.
105 : */
106 : ~any_buffer_source();
107 :
108 : /** Default constructor.
109 :
110 : Constructs an empty wrapper. Operations on a default-constructed
111 : wrapper result in undefined behavior.
112 : */
113 : any_buffer_source() = default;
114 :
115 : /** Non-copyable.
116 :
117 : The awaitable cache is per-instance and cannot be shared.
118 : */
119 : any_buffer_source(any_buffer_source const&) = delete;
120 : any_buffer_source& operator=(any_buffer_source const&) = delete;
121 :
122 : /** Move constructor.
123 :
124 : Transfers ownership of the wrapped source (if owned) and
125 : cached awaitable storage from `other`. After the move, `other` is
126 : in a default-constructed state.
127 :
128 : @param other The wrapper to move from.
129 : */
130 1 : any_buffer_source(any_buffer_source&& other) noexcept
131 1 : : source_(std::exchange(other.source_, nullptr))
132 1 : , vt_(std::exchange(other.vt_, nullptr))
133 1 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
134 1 : , storage_(std::exchange(other.storage_, nullptr))
135 1 : , active_ops_(std::exchange(other.active_ops_, nullptr))
136 : {
137 1 : }
138 :
139 : /** Move assignment operator.
140 :
141 : Destroys any owned source and releases existing resources,
142 : then transfers ownership from `other`.
143 :
144 : @param other The wrapper to move from.
145 : @return Reference to this wrapper.
146 : */
147 : any_buffer_source&
148 : operator=(any_buffer_source&& other) noexcept;
149 :
150 : /** Construct by taking ownership of a BufferSource.
151 :
152 : Allocates storage and moves the source into this wrapper.
153 : The wrapper owns the source and will destroy it.
154 :
155 : @param s The source to take ownership of.
156 : */
157 : template<BufferSource S>
158 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
159 : any_buffer_source(S s);
160 :
161 : /** Construct by wrapping a BufferSource without ownership.
162 :
163 : Wraps the given source by pointer. The source must remain
164 : valid for the lifetime of this wrapper.
165 :
166 : @param s Pointer to the source to wrap.
167 : */
168 : template<BufferSource S>
169 : any_buffer_source(S* s);
170 :
171 : /** Check if the wrapper contains a valid source.
172 :
173 : @return `true` if wrapping a source, `false` if default-constructed
174 : or moved-from.
175 : */
176 : bool
177 9 : has_value() const noexcept
178 : {
179 9 : return source_ != nullptr;
180 : }
181 :
182 : /** Check if the wrapper contains a valid source.
183 :
184 : @return `true` if wrapping a source, `false` if default-constructed
185 : or moved-from.
186 : */
187 : explicit
188 2 : operator bool() const noexcept
189 : {
190 2 : return has_value();
191 : }
192 :
193 : /** Consume bytes from the source.
194 :
195 : Advances the internal read position of the underlying source
196 : by the specified number of bytes. The next call to @ref pull
197 : returns data starting after the consumed bytes.
198 :
199 : @param n The number of bytes to consume. Must not exceed the
200 : total size of buffers returned by the previous @ref pull.
201 :
202 : @par Preconditions
203 : The wrapper must contain a valid source (`has_value() == true`).
204 : */
205 : void
206 : consume(std::size_t n) noexcept;
207 :
208 : /** Pull buffer data from the source.
209 :
210 : Fills the provided array with buffer descriptors from the
211 : underlying source. The operation completes when data is
212 : available, the source is exhausted, or an error occurs.
213 :
214 : @param arr Pointer to array of const_buffer to fill.
215 : @param max_count Maximum number of buffers to fill.
216 :
217 : @return An awaitable yielding `(error_code,std::size_t)`.
218 : On success with data, `count > 0` indicates buffers filled.
219 : On success with `count == 0`, source is exhausted.
220 :
221 : @par Preconditions
222 : The wrapper must contain a valid source (`has_value() == true`).
223 : */
224 : auto
225 : pull(const_buffer* arr, std::size_t max_count);
226 :
227 : /** Read data into a mutable buffer sequence.
228 :
229 : Fills the provided buffer sequence by pulling data from the
230 : underlying source and copying it into the caller's buffers.
231 : This satisfies @ref ReadSource but incurs a copy; for zero-copy
232 : access, use @ref pull and @ref consume instead.
233 :
234 : @note This operation copies data from the source's internal
235 : buffers into the caller's buffers. For zero-copy reads,
236 : use @ref pull and @ref consume directly.
237 :
238 : @param buffers The buffer sequence to fill.
239 :
240 : @return An awaitable yielding `(error_code,std::size_t)`.
241 : On success, `n == buffer_size(buffers)`.
242 : On EOF, `ec == error::eof` and `n` is bytes transferred.
243 :
244 : @par Preconditions
245 : The wrapper must contain a valid source (`has_value() == true`).
246 :
247 : @see pull, consume
248 : */
249 : template<MutableBufferSequence MB>
250 : task<io_result<std::size_t>>
251 : read(MB buffers);
252 :
253 : protected:
254 : /** Rebind to a new source after move.
255 :
256 : Updates the internal pointer to reference a new source object.
257 : Used by owning wrappers after move assignment when the owned
258 : object has moved to a new location.
259 :
260 : @param new_source The new source to bind to. Must be the same
261 : type as the original source.
262 :
263 : @note Terminates if called with a source of different type
264 : than the original.
265 : */
266 : template<BufferSource S>
267 : void
268 : rebind(S& new_source) noexcept
269 : {
270 : if(vt_ != &vtable_for_impl<S>::value)
271 : std::terminate();
272 : source_ = &new_source;
273 : }
274 : };
275 :
276 : //----------------------------------------------------------
277 :
278 : struct any_buffer_source::awaitable_ops
279 : {
280 : bool (*await_ready)(void*);
281 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
282 : io_result<std::size_t> (*await_resume)(void*);
283 : void (*destroy)(void*) noexcept;
284 : };
285 :
286 : struct any_buffer_source::vtable
287 : {
288 : void (*destroy)(void*) noexcept;
289 : void (*do_consume)(void* source, std::size_t n) noexcept;
290 : std::size_t awaitable_size;
291 : std::size_t awaitable_align;
292 : awaitable_ops const* (*construct_awaitable)(
293 : void* source,
294 : void* storage,
295 : const_buffer* arr,
296 : std::size_t max_count);
297 : };
298 :
299 : template<BufferSource S>
300 : struct any_buffer_source::vtable_for_impl
301 : {
302 : using Awaitable = decltype(std::declval<S&>().pull(
303 : std::declval<const_buffer*>(),
304 : std::declval<std::size_t>()));
305 :
306 : static void
307 0 : do_destroy_impl(void* source) noexcept
308 : {
309 0 : static_cast<S*>(source)->~S();
310 0 : }
311 :
312 : static void
313 39 : do_consume_impl(void* source, std::size_t n) noexcept
314 : {
315 39 : static_cast<S*>(source)->consume(n);
316 39 : }
317 :
318 : static awaitable_ops const*
319 92 : construct_awaitable_impl(
320 : void* source,
321 : void* storage,
322 : const_buffer* arr,
323 : std::size_t max_count)
324 : {
325 92 : auto& s = *static_cast<S*>(source);
326 92 : ::new(storage) Awaitable(s.pull(arr, max_count));
327 :
328 : static constexpr awaitable_ops ops = {
329 92 : +[](void* p) {
330 92 : return static_cast<Awaitable*>(p)->await_ready();
331 : },
332 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
333 0 : return detail::call_await_suspend(
334 0 : static_cast<Awaitable*>(p), h, ex, token);
335 : },
336 92 : +[](void* p) {
337 92 : return static_cast<Awaitable*>(p)->await_resume();
338 : },
339 92 : +[](void* p) noexcept {
340 92 : static_cast<Awaitable*>(p)->~Awaitable();
341 : }
342 : };
343 92 : return &ops;
344 : }
345 :
346 : static constexpr vtable value = {
347 : &do_destroy_impl,
348 : &do_consume_impl,
349 : sizeof(Awaitable),
350 : alignof(Awaitable),
351 : &construct_awaitable_impl
352 : };
353 : };
354 :
355 : //----------------------------------------------------------
356 :
357 : inline
358 59 : any_buffer_source::~any_buffer_source()
359 : {
360 59 : if(storage_)
361 : {
362 0 : vt_->destroy(source_);
363 0 : ::operator delete(storage_);
364 : }
365 59 : if(cached_awaitable_)
366 56 : ::operator delete(cached_awaitable_);
367 59 : }
368 :
369 : inline any_buffer_source&
370 1 : any_buffer_source::operator=(any_buffer_source&& other) noexcept
371 : {
372 1 : if(this != &other)
373 : {
374 1 : if(storage_)
375 : {
376 0 : vt_->destroy(source_);
377 0 : ::operator delete(storage_);
378 : }
379 1 : if(cached_awaitable_)
380 0 : ::operator delete(cached_awaitable_);
381 1 : source_ = std::exchange(other.source_, nullptr);
382 1 : vt_ = std::exchange(other.vt_, nullptr);
383 1 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
384 1 : storage_ = std::exchange(other.storage_, nullptr);
385 1 : active_ops_ = std::exchange(other.active_ops_, nullptr);
386 : }
387 1 : return *this;
388 : }
389 :
390 : template<BufferSource S>
391 : requires (!std::same_as<std::decay_t<S>, any_buffer_source>)
392 : any_buffer_source::any_buffer_source(S s)
393 : : vt_(&vtable_for_impl<S>::value)
394 : {
395 : struct guard {
396 : any_buffer_source* self;
397 : bool committed = false;
398 : ~guard() {
399 : if(!committed && self->storage_) {
400 : self->vt_->destroy(self->source_);
401 : ::operator delete(self->storage_);
402 : self->storage_ = nullptr;
403 : self->source_ = nullptr;
404 : }
405 : }
406 : } g{this};
407 :
408 : storage_ = ::operator new(sizeof(S));
409 : source_ = ::new(storage_) S(std::move(s));
410 :
411 : // Preallocate the awaitable storage
412 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
413 :
414 : g.committed = true;
415 : }
416 :
417 : template<BufferSource S>
418 56 : any_buffer_source::any_buffer_source(S* s)
419 56 : : source_(s)
420 56 : , vt_(&vtable_for_impl<S>::value)
421 : {
422 : // Preallocate the awaitable storage
423 56 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
424 56 : }
425 :
426 : //----------------------------------------------------------
427 :
428 : inline void
429 39 : any_buffer_source::consume(std::size_t n) noexcept
430 : {
431 39 : vt_->do_consume(source_, n);
432 39 : }
433 :
434 : inline auto
435 92 : any_buffer_source::pull(
436 : const_buffer* arr,
437 : std::size_t max_count)
438 : {
439 : struct awaitable
440 : {
441 : any_buffer_source* self_;
442 : const_buffer* arr_;
443 : std::size_t max_count_;
444 :
445 : bool
446 92 : await_ready() const noexcept
447 : {
448 92 : return false;
449 : }
450 :
451 : coro
452 92 : await_suspend(coro h, executor_ref ex, std::stop_token token)
453 : {
454 : // Construct the underlying awaitable into cached storage
455 184 : self_->active_ops_ = self_->vt_->construct_awaitable(
456 92 : self_->source_,
457 92 : self_->cached_awaitable_,
458 : arr_,
459 : max_count_);
460 :
461 : // Check if underlying is immediately ready
462 92 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
463 92 : return h;
464 :
465 : // Forward to underlying awaitable
466 0 : return self_->active_ops_->await_suspend(
467 0 : self_->cached_awaitable_, h, ex, token);
468 : }
469 :
470 : io_result<std::size_t>
471 92 : await_resume()
472 : {
473 : struct guard {
474 : any_buffer_source* self;
475 92 : ~guard() {
476 92 : self->active_ops_->destroy(self->cached_awaitable_);
477 92 : self->active_ops_ = nullptr;
478 92 : }
479 92 : } g{self_};
480 92 : return self_->active_ops_->await_resume(
481 165 : self_->cached_awaitable_);
482 92 : }
483 : };
484 92 : return awaitable{this, arr, max_count};
485 : }
486 :
487 : template<MutableBufferSequence MB>
488 : task<io_result<std::size_t>>
489 : any_buffer_source::read(MB buffers)
490 : {
491 : std::size_t total = 0;
492 : auto dest = sans_prefix(buffers, 0);
493 :
494 : while(!buffer_empty(dest))
495 : {
496 : const_buffer arr[detail::max_iovec_];
497 : auto [ec, count] = co_await pull(arr, detail::max_iovec_);
498 :
499 : if(ec)
500 : co_return {ec, total};
501 :
502 : if(count == 0)
503 : co_return {error::eof, total};
504 :
505 : auto n = buffer_copy(dest, std::span(arr, count));
506 : consume(n);
507 : total += n;
508 : dest = sans_prefix(dest, n);
509 : }
510 :
511 : co_return {{}, total};
512 : }
513 :
514 : //----------------------------------------------------------
515 :
516 : static_assert(BufferSource<any_buffer_source>);
517 : static_assert(ReadSource<any_buffer_source>);
518 :
519 : } // namespace capy
520 : } // namespace boost
521 :
522 : #endif
|