libs/capy/include/boost/capy/io/any_write_stream.hpp

82.7% Lines (62/75) 84.0% Functions (21/25) 62.5% Branches (10/16)
libs/capy/include/boost/capy/io/any_write_stream.hpp
Line Branch Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
11 #define BOOST_CAPY_IO_ANY_WRITE_STREAM_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/detail/await_suspend_helper.hpp>
15 #include <boost/capy/buffers.hpp>
16 #include <boost/capy/buffers/buffer_param.hpp>
17 #include <boost/capy/concept/io_awaitable.hpp>
18 #include <boost/capy/concept/write_stream.hpp>
19 #include <boost/capy/coro.hpp>
20 #include <boost/capy/ex/executor_ref.hpp>
21 #include <boost/capy/io_result.hpp>
22
23 #include <system_error>
24
25 #include <concepts>
26 #include <coroutine>
27 #include <cstddef>
28 #include <new>
29 #include <span>
30 #include <stop_token>
31 #include <utility>
32
33 namespace boost {
34 namespace capy {
35
36 /** Type-erased wrapper for any WriteStream.
37
38 This class provides type erasure for any type satisfying the
39 @ref WriteStream concept, enabling runtime polymorphism for
40 write operations. It uses cached awaitable storage to achieve
41 zero steady-state allocation after construction.
42
43 The wrapper supports two construction modes:
44 - **Owning**: Pass by value to transfer ownership. The wrapper
45 allocates storage and owns the stream.
46 - **Reference**: Pass a pointer to wrap without ownership. The
47 pointed-to stream must outlive this wrapper.
48
49 @par Awaitable Preallocation
50 The constructor preallocates storage for the type-erased awaitable.
51 This reserves all virtual address space at server startup
52 so memory usage can be measured up front, rather than
53 allocating piecemeal as traffic arrives.
54
55 @par Thread Safety
56 Not thread-safe. Concurrent operations on the same wrapper
57 are undefined behavior.
58
59 @par Example
60 @code
61 // Owning - takes ownership of the stream
62 any_write_stream stream(socket{ioc});
63
64 // Reference - wraps without ownership
65 socket sock(ioc);
66 any_write_stream stream(&sock);
67
68 const_buffer buf(data, size);
69 auto [ec, n] = co_await stream.write_some(std::span(&buf, 1));
70 @endcode
71
72 @see any_read_stream, any_stream, WriteStream
73 */
74 class any_write_stream
75 {
76 struct vtable;
77 struct awaitable_ops;
78
79 template<WriteStream S>
80 struct vtable_for_impl;
81
82 void* stream_ = nullptr;
83 vtable const* vt_ = nullptr;
84 void* cached_awaitable_ = nullptr;
85 void* storage_ = nullptr;
86 awaitable_ops const* active_ops_ = nullptr;
87
88 public:
89 /** Destructor.
90
91 Destroys the owned stream (if any) and releases the cached
92 awaitable storage.
93 */
94 ~any_write_stream();
95
96 /** Default constructor.
97
98 Constructs an empty wrapper. Operations on a default-constructed
99 wrapper result in undefined behavior.
100 */
101 1 any_write_stream() = default;
102
103 /** Non-copyable.
104
105 The awaitable cache is per-instance and cannot be shared.
106 */
107 any_write_stream(any_write_stream const&) = delete;
108 any_write_stream& operator=(any_write_stream const&) = delete;
109
110 /** Move constructor.
111
112 Transfers ownership of the wrapped stream (if owned) and
113 cached awaitable storage from `other`. After the move, `other` is
114 in a default-constructed state.
115
116 @param other The wrapper to move from.
117 */
118 2 any_write_stream(any_write_stream&& other) noexcept
119 2 : stream_(std::exchange(other.stream_, nullptr))
120 2 , vt_(std::exchange(other.vt_, nullptr))
121 2 , cached_awaitable_(std::exchange(other.cached_awaitable_, nullptr))
122 2 , storage_(std::exchange(other.storage_, nullptr))
123 2 , active_ops_(std::exchange(other.active_ops_, nullptr))
124 {
125 2 }
126
127 /** Move assignment operator.
128
129 Destroys any owned stream and releases existing resources,
130 then transfers ownership from `other`.
131
132 @param other The wrapper to move from.
133 @return Reference to this wrapper.
134 */
135 any_write_stream&
136 operator=(any_write_stream&& other) noexcept;
137
138 /** Construct by taking ownership of a WriteStream.
139
140 Allocates storage and moves the stream into this wrapper.
141 The wrapper owns the stream and will destroy it.
142
143 @param s The stream to take ownership of.
144 */
145 template<WriteStream S>
146 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
147 any_write_stream(S s);
148
149 /** Construct by wrapping a WriteStream without ownership.
150
151 Wraps the given stream by pointer. The stream must remain
152 valid for the lifetime of this wrapper.
153
154 @param s Pointer to the stream to wrap.
155 */
156 template<WriteStream S>
157 any_write_stream(S* s);
158
159 /** Check if the wrapper contains a valid stream.
160
161 @return `true` if wrapping a stream, `false` if default-constructed
162 or moved-from.
163 */
164 bool
165 15 has_value() const noexcept
166 {
167 15 return stream_ != nullptr;
168 }
169
170 /** Check if the wrapper contains a valid stream.
171
172 @return `true` if wrapping a stream, `false` if default-constructed
173 or moved-from.
174 */
175 explicit
176 2 operator bool() const noexcept
177 {
178 2 return has_value();
179 }
180
181 /** Initiate an asynchronous write operation.
182
183 Writes data from the provided buffer sequence. The operation
184 completes when at least one byte has been written, or an error
185 occurs.
186
187 @param buffers The buffer sequence containing data to write.
188 Passed by value to ensure the sequence lives in the
189 coroutine frame across suspension points.
190
191 @return An awaitable yielding `(error_code,std::size_t)`.
192
193 @par Preconditions
194 The wrapper must contain a valid stream (`has_value() == true`).
195 */
196 template<ConstBufferSequence CB>
197 auto
198 write_some(CB buffers);
199
200 protected:
201 /** Rebind to a new stream after move.
202
203 Updates the internal pointer to reference a new stream object.
204 Used by owning wrappers after move assignment when the owned
205 object has moved to a new location.
206
207 @param new_stream The new stream to bind to. Must be the same
208 type as the original stream.
209
210 @note Terminates if called with a stream of different type
211 than the original.
212 */
213 template<WriteStream S>
214 void
215 rebind(S& new_stream) noexcept
216 {
217 if(vt_ != &vtable_for_impl<S>::value)
218 std::terminate();
219 stream_ = &new_stream;
220 }
221 };
222
223 //----------------------------------------------------------
224
225 struct any_write_stream::awaitable_ops
226 {
227 bool (*await_ready)(void*);
228 coro (*await_suspend)(void*, coro, executor_ref, std::stop_token);
229 io_result<std::size_t> (*await_resume)(void*);
230 void (*destroy)(void*) noexcept;
231 };
232
233 struct any_write_stream::vtable
234 {
235 void (*destroy)(void*) noexcept;
236 std::size_t awaitable_size;
237 std::size_t awaitable_align;
238 awaitable_ops const* (*construct_awaitable)(
239 void* stream,
240 void* storage,
241 std::span<const_buffer const> buffers);
242 };
243
244 template<WriteStream S>
245 struct any_write_stream::vtable_for_impl
246 {
247 using Awaitable = decltype(std::declval<S&>().write_some(
248 std::span<const_buffer const>{}));
249
250 static void
251 do_destroy_impl(void* stream) noexcept
252 {
253 static_cast<S*>(stream)->~S();
254 }
255
256 static awaitable_ops const*
257 60 construct_awaitable_impl(
258 void* stream,
259 void* storage,
260 std::span<const_buffer const> buffers)
261 {
262 60 auto& s = *static_cast<S*>(stream);
263 60 ::new(storage) Awaitable(s.write_some(buffers));
264
265 static constexpr awaitable_ops ops = {
266 60 +[](void* p) {
267 60 return static_cast<Awaitable*>(p)->await_ready();
268 },
269 +[](void* p, coro h, executor_ref ex, std::stop_token token) {
270 return detail::call_await_suspend(
271 static_cast<Awaitable*>(p), h, ex, token);
272 },
273 60 +[](void* p) {
274 60 return static_cast<Awaitable*>(p)->await_resume();
275 },
276 60 +[](void* p) noexcept {
277 60 static_cast<Awaitable*>(p)->~Awaitable();
278 }
279 };
280 60 return &ops;
281 }
282
283 static constexpr vtable value = {
284 &do_destroy_impl,
285 sizeof(Awaitable),
286 alignof(Awaitable),
287 &construct_awaitable_impl
288 };
289 };
290
291 //----------------------------------------------------------
292
293 inline
294 72 any_write_stream::~any_write_stream()
295 {
296
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 72 times.
72 if(storage_)
297 {
298 vt_->destroy(stream_);
299 ::operator delete(storage_);
300 }
301
2/2
✓ Branch 0 taken 65 times.
✓ Branch 1 taken 7 times.
72 if(cached_awaitable_)
302 65 ::operator delete(cached_awaitable_);
303 72 }
304
305 inline any_write_stream&
306 3 any_write_stream::operator=(any_write_stream&& other) noexcept
307 {
308
1/2
✓ Branch 0 taken 3 times.
✗ Branch 1 not taken.
3 if(this != &other)
309 {
310
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(storage_)
311 {
312 vt_->destroy(stream_);
313 ::operator delete(storage_);
314 }
315
1/2
✗ Branch 0 not taken.
✓ Branch 1 taken 3 times.
3 if(cached_awaitable_)
316 ::operator delete(cached_awaitable_);
317 3 stream_ = std::exchange(other.stream_, nullptr);
318 3 vt_ = std::exchange(other.vt_, nullptr);
319 3 cached_awaitable_ = std::exchange(other.cached_awaitable_, nullptr);
320 3 storage_ = std::exchange(other.storage_, nullptr);
321 3 active_ops_ = std::exchange(other.active_ops_, nullptr);
322 }
323 3 return *this;
324 }
325
326 template<WriteStream S>
327 requires (!std::same_as<std::decay_t<S>, any_write_stream>)
328 any_write_stream::any_write_stream(S s)
329 : vt_(&vtable_for_impl<S>::value)
330 {
331 struct guard {
332 any_write_stream* self;
333 bool committed = false;
334 ~guard() {
335 if(!committed && self->storage_) {
336 self->vt_->destroy(self->stream_);
337 ::operator delete(self->storage_);
338 self->storage_ = nullptr;
339 self->stream_ = nullptr;
340 }
341 }
342 } g{this};
343
344 storage_ = ::operator new(sizeof(S));
345 stream_ = ::new(storage_) S(std::move(s));
346
347 // Preallocate the awaitable storage
348 cached_awaitable_ = ::operator new(vt_->awaitable_size);
349
350 g.committed = true;
351 }
352
353 template<WriteStream S>
354 65 any_write_stream::any_write_stream(S* s)
355 65 : stream_(s)
356 65 , vt_(&vtable_for_impl<S>::value)
357 {
358 // Preallocate the awaitable storage
359 65 cached_awaitable_ = ::operator new(vt_->awaitable_size);
360 65 }
361
362 //----------------------------------------------------------
363
364 template<ConstBufferSequence CB>
365 auto
366 60 any_write_stream::write_some(CB buffers)
367 {
368 struct awaitable
369 {
370 any_write_stream* self_;
371 buffer_param<CB> bp_;
372
373 bool
374 60 await_ready() const noexcept
375 {
376 60 return false;
377 }
378
379 coro
380 60 await_suspend(coro h, executor_ref ex, std::stop_token token)
381 {
382 // Construct the underlying awaitable into cached storage
383 60 self_->active_ops_ = self_->vt_->construct_awaitable(
384 60 self_->stream_,
385
1/1
✓ Branch 1 taken 10 times.
60 self_->cached_awaitable_,
386
1/1
✓ Branch 1 taken 10 times.
60 bp_.data());
387
388 // Check if underlying is immediately ready
389
1/2
✓ Branch 1 taken 10 times.
✗ Branch 2 not taken.
60 if(self_->active_ops_->await_ready(self_->cached_awaitable_))
390 60 return h;
391
392 // Forward to underlying awaitable
393 return self_->active_ops_->await_suspend(
394 self_->cached_awaitable_, h, ex, token);
395 }
396
397 io_result<std::size_t>
398 60 await_resume()
399 {
400 struct guard {
401 any_write_stream* self;
402 60 ~guard() {
403 60 self->active_ops_->destroy(self->cached_awaitable_);
404 60 self->active_ops_ = nullptr;
405 60 }
406 60 } g{self_};
407 60 return self_->active_ops_->await_resume(
408
1/1
✓ Branch 1 taken 7 times.
103 self_->cached_awaitable_);
409 60 }
410 };
411 60 return awaitable{this, buffer_param<CB>(buffers)};
412 }
413
414 } // namespace capy
415 } // namespace boost
416
417 #endif
418