Line data Source code
1 : //
2 : // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 : //
4 : // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 : // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 : //
7 : // Official repository: https://github.com/cppalliance/capy
8 : //
9 :
10 : #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 : #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12 :
13 : #include <boost/capy/detail/config.hpp>
14 : #include <boost/capy/detail/await_suspend_helper.hpp>
15 : #include <boost/capy/buffers.hpp>
16 : #include <boost/capy/buffers/buffer_param.hpp>
17 : #include <boost/capy/concept/io_awaitable.hpp>
18 : #include <boost/capy/concept/write_stream.hpp>
19 : #include <boost/capy/coro.hpp>
20 : #include <boost/capy/ex/executor_ref.hpp>
21 : #include <boost/capy/io_result.hpp>
22 :
23 : #include <system_error>
24 :
25 : #include <concepts>
26 : #include <coroutine>
27 : #include <cstddef>
28 : #include <new>
29 : #include <span>
30 : #include <stop_token>
31 : #include <utility>
32 :
33 : namespace boost {
34 : namespace capy {
35 :
36 : /** Type-erased wrapper for any WriteStream.
37 :
38 : This class provides type erasure for any type satisfying the
39 : @ref WriteStream concept, enabling runtime polymorphism for
40 : write operations. It uses cached awaitable storage to achieve
41 : zero steady-state allocation after construction.
42 :
43 : The wrapper supports two construction modes:
44 : - **Owning**: Pass by value to transfer ownership. The wrapper
45 : allocates storage and owns the stream.
46 : - **Reference**: Pass a pointer to wrap without ownership. The
47 : pointed-to stream must outlive this wrapper.
48 :
49 : @par Awaitable Preallocation
50 : The constructor preallocates storage for the type-erased awaitable.
51 : This reserves all virtual address space at server startup
52 : so memory usage can be measured up front, rather than
53 : allocating piecemeal as traffic arrives.
54 :
55 : @par Thread Safety
56 : Not thread-safe. Concurrent operations on the same wrapper
57 : are undefined behavior.
58 :
59 : @par Example
60 : @code
61 : // Owning - takes ownership of the stream
62 : any_write_stream stream(socket{ioc});
63 :
64 : // Reference - wraps without ownership
65 : socket sock(ioc);
66 : any_write_stream stream(&sock);
67 :
68 : const_buffer buf(data, size);
69 : auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
70 : @endcode
71 :
72 : @see any_read_stream, any_stream, WriteStream
73 : */
74 : class any_write_stream
75 : {
76 : struct vtable;
77 : struct awaitable_ops;
78 :
79 : template<WriteStream S>
80 : struct vtable_for_impl;
81 :
82 : void* stream_ = nullptr;
83 : vtable const* vt_ = nullptr;
84 : void* cached_awaitable_ = nullptr;
85 : void* storage_ = nullptr;
86 : awaitable_ops const* active_ops_ = nullptr;
87 :
88 : public:
89 : /** Destructor.
90 :
91 : Destroys the owned stream (if any) and releases the cached
92 : awaitable storage.
93 : */
94 : ~any_write_stream();
95 :
96 : /** Default constructor.
97 :
98 : Constructs an empty wrapper. Operations on a default-constructed
99 : wrapper result in undefined behavior.
100 : */
101 1 : any_write_stream() = default;
102 :
103 : /** Non-copyable.
104 :
105 : The awaitable cache is per-instance and cannot be shared.
106 : */
107 : any_write_stream(any_write_stream const&) = delete;
108 : any_write_stream& operator=(any_write_stream const&) = delete;
109 :
110 : /** Move constructor.
111 :
112 : Transfers ownership of the wrapped stream (if owned) and
113 : cached awaitable storage from `other`. After the move, `other` is
114 : in a default-constructed state.
115 :
116 : @param other The wrapper to move from.
117 : */
118 2 : any_write_stream(any_write_stream&& other) noexcept
119 2 : : stream_(std::exchange(other.stream_, nullptr))
120 2 : , vt_(std::exchange(other.vt_, nullptr))
121 2 : , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
122 2 : , storage_(std::exchange(other.storage_, nullptr))
123 2 : , active_ops_(std::exchange(other.active_ops_, nullptr))
124 : {
125 2 : }
126 :
127 : /** Move assignment operator.
128 :
129 : Destroys any owned stream and releases existing resources,
130 : then transfers ownership from `other`.
131 :
132 : @param other The wrapper to move from.
133 : @return Reference to this wrapper.
134 : */
135 : any_write_stream&
136 : operator=(any_write_stream&& other) noexcept;
137 :
138 : /** Construct by taking ownership of a WriteStream.
139 :
140 : Allocates storage and moves the stream into this wrapper.
141 : The wrapper owns the stream and will destroy it.
142 :
143 : @param s The stream to take ownership of.
144 : */
145 : template<WriteStream S>
146 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
147 : any_write_stream(S s);
148 :
149 : /** Construct by wrapping a WriteStream without ownership.
150 :
151 : Wraps the given stream by pointer. The stream must remain
152 : valid for the lifetime of this wrapper.
153 :
154 : @param s Pointer to the stream to wrap.
155 : */
156 : template<WriteStream S>
157 : any_write_stream(S* s);
158 :
159 : /** Check if the wrapper contains a valid stream.
160 :
161 : @return `true` if wrapping a stream, `false` if default-constructed
162 : or moved-from.
163 : */
164 : bool
165 15 : has_value() const noexcept
166 : {
167 15 : return stream_ != nullptr;
168 : }
169 :
170 : /** Check if the wrapper contains a valid stream.
171 :
172 : @return `true` if wrapping a stream, `false` if default-constructed
173 : or moved-from.
174 : */
175 : explicit
176 2 : operator bool() const noexcept
177 : {
178 2 : return has_value();
179 : }
180 :
181 : /** Initiate an asynchronous write operation.
182 :
183 : Writes data from the provided buffer sequence. The operation
184 : completes when at least one byte has been written, or an error
185 : occurs.
186 :
187 : @param buffers The buffer sequence containing data to write.
188 : Passed by value to ensure the sequence lives in the
189 : coroutine frame across suspension points.
190 :
191 : @return An awaitable yielding `(error_code,std::size_t)`.
192 :
193 : @par Preconditions
194 : The wrapper must contain a valid stream (`has_value() == true`).
195 : */
196 : template<ConstBufferSequence CB>
197 : auto
198 : write_some(CB buffers);
199 :
200 : protected:
201 : /** Rebind to a new stream after move.
202 :
203 : Updates the internal pointer to reference a new stream object.
204 : Used by owning wrappers after move assignment when the owned
205 : object has moved to a new location.
206 :
207 : @param new_stream The new stream to bind to. Must be the same
208 : type as the original stream.
209 :
210 : @note Terminates if called with a stream of different type
211 : than the original.
212 : */
213 : template<WriteStream S>
214 : void
215 : rebind(S& new_stream) noexcept
216 : {
217 : if(vt_ != &vtable_for_impl<S>::value)
218 : std::terminate();
219 : stream_ = &new_stream;
220 : }
221 : };
222 :
223 : //----------------------------------------------------------
224 :
225 : struct any_write_stream::awaitable_ops
226 : {
227 : bool (*await_ready)(void*);
228 : coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
229 : io_result<std::size_t> (*await_resume)(void*);
230 : void (*destroy)(void*) noexcept;
231 : };
232 :
233 : struct any_write_stream::vtable
234 : {
235 : void (*destroy)(void*) noexcept;
236 : std::size_t awaitable_size;
237 : std::size_t awaitable_align;
238 : awaitable_ops const* (*construct_awaitable)(
239 : void* stream,
240 : void* storage,
241 : std::span<const_buffer const> buffers);
242 : };
243 :
244 : template<WriteStream S>
245 : struct any_write_stream::vtable_for_impl
246 : {
247 : using Awaitable = decltype(std::declval<S&>().write_some(
248 : std::span<const_buffer const>{}));
249 :
250 : static void
251 0 : do_destroy_impl(void* stream) noexcept
252 : {
253 0 : static_cast<S*>(stream)->~S();
254 0 : }
255 :
256 : static awaitable_ops const*
257 60 : construct_awaitable_impl(
258 : void* stream,
259 : void* storage,
260 : std::span<const_buffer const> buffers)
261 : {
262 60 : auto& s = *static_cast<S*>(stream);
263 60 : ::new(storage) Awaitable(s.write_some(buffers));
264 :
265 : static constexpr awaitable_ops ops = {
266 60 : +[](void* p) {
267 60 : return static_cast<Awaitable*>(p)->await_ready();
268 : },
269 0 : +[](void* p, coro h, executor_ref ex, std::stop_token token) {
270 0 : return detail::call_await_suspend(
271 0 : static_cast<Awaitable*>(p), h, ex, token);
272 : },
273 60 : +[](void* p) {
274 60 : return static_cast<Awaitable*>(p)->await_resume();
275 : },
276 60 : +[](void* p) noexcept {
277 60 : static_cast<Awaitable*>(p)->~Awaitable();
278 : }
279 : };
280 60 : return &ops;
281 : }
282 :
283 : static constexpr vtable value = {
284 : &do_destroy_impl,
285 : sizeof(Awaitable),
286 : alignof(Awaitable),
287 : &construct_awaitable_impl
288 : };
289 : };
290 :
291 : //----------------------------------------------------------
292 :
293 : inline
294 72 : any_write_stream::~any_write_stream()
295 : {
296 72 : if(storage_)
297 : {
298 0 : vt_->destroy(stream_);
299 0 : ::operator delete(storage_);
300 : }
301 72 : if(cached_awaitable_)
302 65 : ::operator delete(cached_awaitable_);
303 72 : }
304 :
305 : inline any_write_stream&
306 3 : any_write_stream::operator=(any_write_stream&& other) noexcept
307 : {
308 3 : if(this != &other)
309 : {
310 3 : if(storage_)
311 : {
312 0 : vt_->destroy(stream_);
313 0 : ::operator delete(storage_);
314 : }
315 3 : if(cached_awaitable_)
316 0 : ::operator delete(cached_awaitable_);
317 3 : stream_ = std::exchange(other.stream_, nullptr);
318 3 : vt_ = std::exchange(other.vt_, nullptr);
319 3 : cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
320 3 : storage_ = std::exchange(other.storage_, nullptr);
321 3 : active_ops_ = std::exchange(other.active_ops_, nullptr);
322 : }
323 3 : return *this;
324 : }
325 :
326 : template<WriteStream S>
327 : requires (!std::same_as<std::decay_t<S>, any_write_stream>)
328 : any_write_stream::any_write_stream(S s)
329 : : vt_(&vtable_for_impl<S>::value)
330 : {
331 : struct guard {
332 : any_write_stream* self;
333 : bool committed = false;
334 : ~guard() {
335 : if(!committed && self->storage_) {
336 : self->vt_->destroy(self->stream_);
337 : ::operator delete(self->storage_);
338 : self->storage_ = nullptr;
339 : self->stream_ = nullptr;
340 : }
341 : }
342 : } g{this};
343 :
344 : storage_ = ::operator new(sizeof(S));
345 : stream_ = ::new(storage_) S(std::move(s));
346 :
347 : // Preallocate the awaitable storage
348 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
349 :
350 : g.committed = true;
351 : }
352 :
353 : template<WriteStream S>
354 65 : any_write_stream::any_write_stream(S* s)
355 65 : : stream_(s)
356 65 : , vt_(&vtable_for_impl<S>::value)
357 : {
358 : // Preallocate the awaitable storage
359 65 : cached_awaitable_ = ::operator new(vt_->awaitable_size);
360 65 : }
361 :
362 : //----------------------------------------------------------
363 :
364 : template<ConstBufferSequence CB>
365 : auto
366 60 : any_write_stream::write_some(CB buffers)
367 : {
368 : struct awaitable
369 : {
370 : any_write_stream* self_;
371 : buffer_param<CB> bp_;
372 :
373 : bool
374 60 : await_ready() const noexcept
375 : {
376 60 : return false;
377 : }
378 :
379 : coro
380 60 : await_suspend(coro h, executor_ref ex, std::stop_token token)
381 : {
382 : // Construct the underlying awaitable into cached storage
383 60 : self_->active_ops_ = self_->vt_->construct_awaitable(
384 60 : self_->stream_,
385 60 : self_->cached_awaitable_,
386 60 : bp_.data());
387 :
388 : // Check if underlying is immediately ready
389 60 : if(self_->active_ops_->await_ready(self_->cached_awaitable_))
390 60 : return h;
391 :
392 : // Forward to underlying awaitable
393 0 : return self_->active_ops_->await_suspend(
394 0 : self_->cached_awaitable_, h, ex, token);
395 : }
396 :
397 : io_result<std::size_t>
398 60 : await_resume()
399 : {
400 : struct guard {
401 : any_write_stream* self;
402 60 : ~guard() {
403 60 : self->active_ops_->destroy(self->cached_awaitable_);
404 60 : self->active_ops_ = nullptr;
405 60 : }
406 60 : } g{self_};
407 60 : return self_->active_ops_->await_resume(
408 103 : self_->cached_awaitable_);
409 60 : }
410 : };
411 60 : return awaitable{this, buffer_param<CB>(buffers)};
412 : }
413 :
414 : } // namespace capy
415 : } // namespace boost
416 :
417 : #endif
|