Custom Dynamic Buffer
Implementing the DynamicBuffer concept for a custom allocation strategy.
What You Will Learn
-
Implementing the DynamicBuffer concept
-
Understanding
prepare,commit,consumelifecycle -
Custom memory management for I/O
Prerequisites
-
Completed Parallel Fetch
-
Understanding of dynamic buffers from Dynamic Buffers
Source Code
#include <boost/capy.hpp>
#include <boost/capy/test/run_blocking.hpp>
#include <boost/capy/test/stream.hpp>
#include <iostream>
#include <vector>
#include <cassert>
using namespace boost::capy;
// Custom dynamic buffer with statistics tracking
class tracked_buffer
{
std::vector<char> storage_;
std::size_t read_pos_ = 0; // Start of readable data
std::size_t write_pos_ = 0; // End of readable data
std::size_t max_size_;
// Statistics
std::size_t total_prepared_ = 0;
std::size_t total_committed_ = 0;
std::size_t total_consumed_ = 0;
public:
explicit tracked_buffer(std::size_t max_size = 65536)
: max_size_(max_size)
{
storage_.reserve(1024);
}
// === DynamicBuffer interface ===
// Consumer: readable data
const_buffer data() const noexcept
{
return const_buffer(
storage_.data() + read_pos_,
write_pos_ - read_pos_);
}
// Capacity queries
std::size_t size() const noexcept
{
return write_pos_ - read_pos_;
}
std::size_t max_size() const noexcept
{
return max_size_;
}
std::size_t capacity() const noexcept
{
return storage_.capacity() - read_pos_;
}
// Producer: prepare space for writing
mutable_buffer prepare(std::size_t n)
{
total_prepared_ += n;
// Compact if needed
if (storage_.size() + n > storage_.capacity() && read_pos_ > 0)
{
compact();
}
// Grow if needed
std::size_t required = write_pos_ + n;
if (required > max_size_)
throw std::length_error("tracked_buffer: max_size exceeded");
if (required > storage_.size())
storage_.resize(required);
return mutable_buffer(
storage_.data() + write_pos_,
n);
}
// Producer: mark bytes as written
void commit(std::size_t n)
{
total_committed_ += n;
write_pos_ += n;
}
// Consumer: mark bytes as processed
void consume(std::size_t n)
{
total_consumed_ += n;
read_pos_ += n;
if (read_pos_ == write_pos_)
{
// Buffer empty, reset positions
read_pos_ = 0;
write_pos_ = 0;
}
}
// === Statistics ===
void print_stats() const
{
std::cout << "Buffer statistics:\n"
<< " Total prepared: " << total_prepared_ << " bytes\n"
<< " Total committed: " << total_committed_ << " bytes\n"
<< " Total consumed: " << total_consumed_ << " bytes\n"
<< " Current size: " << size() << " bytes\n"
<< " Capacity: " << capacity() << " bytes\n";
}
private:
void compact()
{
if (read_pos_ == 0)
return;
std::size_t len = write_pos_ - read_pos_;
std::memmove(storage_.data(), storage_.data() + read_pos_, len);
read_pos_ = 0;
write_pos_ = len;
}
};
// Demonstrate using the custom buffer
task<> read_into_tracked_buffer(any_stream& stream, tracked_buffer& buffer)
{
// Read data in chunks
while (true)
{
auto space = buffer.prepare(256);
auto [ec, n] = co_await stream.read_some(space);
if (ec == cond::eof)
break;
if (ec.failed())
throw std::system_error(ec);
buffer.commit(n);
std::cout << "Read " << n << " bytes, buffer size now: "
<< buffer.size() << "\n";
}
}
void demo_tracked_buffer()
{
std::cout << "=== Tracked Buffer Demo ===\n\n";
// Setup mock stream with test data
test::stream mock;
mock.provide("Hello, ");
mock.provide("World! ");
mock.provide("This is a test of the custom buffer.\n");
mock.provide_eof();
any_stream stream{mock};
tracked_buffer buffer;
test::run_blocking(read_into_tracked_buffer(stream, buffer));
std::cout << "\nFinal buffer contents: ";
auto data = buffer.data();
std::cout.write(static_cast<char const*>(data.data()), data.size());
std::cout << "\n\n";
buffer.print_stats();
// Consume some data
std::cout << "\nConsuming 7 bytes...\n";
buffer.consume(7);
buffer.print_stats();
}
int main()
{
demo_tracked_buffer();
return 0;
}
Build
add_executable(custom_dynamic_buffer custom_dynamic_buffer.cpp)
target_link_libraries(custom_dynamic_buffer PRIVATE capy)
Walkthrough
DynamicBuffer Requirements
A DynamicBuffer must provide:
// Consumer interface
const_buffer data() const; // Readable data
void consume(std::size_t n); // Mark bytes as processed
// Producer interface
mutable_buffer prepare(std::size_t n); // Space for writing
void commit(std::size_t n); // Mark bytes as written
// Capacity queries
std::size_t size() const; // Readable bytes
std::size_t max_size() const; // Maximum allowed
std::size_t capacity() const; // Currently allocated
The Producer/Consumer Flow
// 1. Producer prepares space
auto space = buffer.prepare(256);
// 2. Data is written into space
auto [ec, n] = co_await stream.read_some(space);
// 3. Producer commits written bytes
buffer.commit(n);
// 4. Consumer reads data
auto data = buffer.data();
process(data);
// 5. Consumer marks bytes as processed
buffer.consume(processed_bytes);
Output
=== Tracked Buffer Demo ===
Read 7 bytes, buffer size now: 7
Read 7 bytes, buffer size now: 14
Read 37 bytes, buffer size now: 51
Final buffer contents: Hello, World! This is a test of the custom buffer.
Buffer statistics:
Total prepared: 768 bytes
Total committed: 51 bytes
Total consumed: 0 bytes
Current size: 51 bytes
Capacity: 256 bytes
Consuming 7 bytes...
Buffer statistics:
Total prepared: 768 bytes
Total committed: 51 bytes
Total consumed: 7 bytes
Current size: 44 bytes
Capacity: 249 bytes
Exercises
-
Add a "high water mark" statistic that tracks maximum buffer size reached
-
Implement a ring buffer version that never moves data
-
Add an allocator parameter for custom memory allocation
Next Steps
-
Echo Server with Corosio — Real networking