Buffer Sources and Sinks
This section explains the BufferSource and BufferSink concepts for zero-copy I/O where the callee owns the buffers.
Prerequisites
-
Completed Sources and Sinks
-
Understanding of caller-owns-buffers patterns
Callee-Owns-Buffers Pattern
With streams and sources/sinks, the caller provides buffers:
// Caller owns the buffer
char my_buffer[1024];
co_await stream.read_some(mutable_buffer(my_buffer));
Data flows: source → caller’s buffer → processing
With buffer sources/sinks, the callee provides buffers:
// Callee owns the buffers
const_buffer bufs[8];
auto [ec, count] = co_await source.pull(bufs, 8);
// bufs now point into source's internal storage
Data flows: source’s internal buffer → processing (no copy!)
BufferSource
A BufferSource provides read-only buffers from its internal storage:
template<typename T>
concept BufferSource =
requires(T& source, const_buffer* arr, std::size_t max_count) {
{ source.pull(arr, max_count) } -> IoAwaitable;
};
pull Semantics
IoAwaitable auto pull(const_buffer* arr, std::size_t max_count);
Returns an awaitable yielding (error_code, std::size_t):
-
On success:
!ec.failed(), fillsarr[0..count-1]with buffer descriptors -
On exhausted:
count == 0indicates no more data -
On error:
ec.failed()
The buffers point into the source’s internal storage. You must consume all returned data before calling pull() again—the previous buffers become invalid.
Example
template<BufferSource Source>
task<> process_source(Source& source)
{
const_buffer bufs[8];
for (;;)
{
auto [ec, count] = co_await source.pull(bufs, 8);
if (ec.failed())
throw std::system_error(ec);
if (count == 0)
break; // Source exhausted
// Process buffers (zero-copy!)
for (std::size_t i = 0; i < count; ++i)
process_data(bufs[i].data(), bufs[i].size());
}
}
BufferSink
A BufferSink provides writable buffers for direct write access:
template<typename T>
concept BufferSink =
requires(T& sink, mutable_buffer* arr, std::size_t max_count, std::size_t n) {
{ sink.prepare(arr, max_count) } -> std::same_as<std::size_t>;
{ sink.commit(n) } -> IoAwaitable;
{ sink.commit(n, bool{}) } -> IoAwaitable;
{ sink.commit_eof() } -> IoAwaitable;
};
prepare Semantics
std::size_t prepare(mutable_buffer* arr, std::size_t max_count);
Synchronous operation. Returns the number of buffers prepared (may be less than max_count). Fills arr[0..count-1] with writable buffer descriptors.
commit Semantics
IoAwaitable auto commit(std::size_t n);
IoAwaitable auto commit(std::size_t n, bool eof);
IoAwaitable auto commit_eof();
Finalizes n bytes of prepared data. The eof flag or commit_eof() signals end-of-stream.
Example
template<BufferSink Sink>
task<> write_to_sink(Sink& sink, std::span<char const> data)
{
std::size_t written = 0;
while (written < data.size())
{
mutable_buffer bufs[8];
std::size_t count = sink.prepare(bufs, 8);
if (count == 0)
throw std::runtime_error("sink full");
// Copy into sink's buffers
std::size_t copied = 0;
for (std::size_t i = 0; i < count && written < data.size(); ++i)
{
std::size_t chunk = (std::min)(
bufs[i].size(),
data.size() - written);
std::memcpy(bufs[i].data(), data.data() + written, chunk);
written += chunk;
copied += chunk;
}
bool eof = (written == data.size());
co_await sink.commit(copied, eof);
}
}
Zero-Copy Benefits
Buffer sources/sinks enable true zero-copy I/O:
Memory-Mapped Files
class mmap_source : public BufferSource
{
void* mapped_region_;
std::size_t size_;
std::size_t offset_ = 0;
public:
io_result<std::size_t> pull(const_buffer* arr, std::size_t max_count)
{
if (offset_ >= size_)
co_return {error_code{}, 0}; // Exhausted
// Return pointer into mapped memory—no copy!
arr[0] = const_buffer(
static_cast<char*>(mapped_region_) + offset_,
size_ - offset_);
offset_ = size_;
co_return {error_code{}, 1};
}
};
Example: Compression Pipeline
// Compressor provides compressed data via BufferSource
// Decompressor consumes compressed data via BufferSink
task<> decompress_stream(any_buffer_source& compressed, any_write_sink& output)
{
const_buffer bufs[8];
for (;;)
{
auto [ec, count] = co_await compressed.pull(bufs, 8);
if (ec.failed())
throw std::system_error(ec);
if (count == 0)
break;
for (std::size_t i = 0; i < count; ++i)
{
auto decompressed = decompress_block(bufs[i]);
co_await output.write(make_buffer(decompressed));
}
}
co_await output.write_eof();
}
Reference
| Header | Description |
|---|---|
|
BufferSource concept definition |
|
BufferSink concept definition |
|
Type-erased buffer source wrapper |
|
Type-erased buffer sink wrapper |
You have now learned about buffer sources and sinks for zero-copy I/O. Continue to Transfer Algorithms to learn about composed read/write operations.