Stream Pipeline
Data transformation through a pipeline of sources and sinks.
What You Will Learn
-
Building processing pipelines
-
Using
BufferSourceandBufferSinkconcepts -
Chaining transformations
Prerequisites
-
Completed Echo Server with Corosio
-
Understanding of buffer sources/sinks from Buffer Concepts
Source Code
#include <boost/capy.hpp>
#include <boost/capy/test/run_blocking.hpp>
#include <boost/capy/test/buffer_source.hpp>
#include <boost/capy/test/buffer_sink.hpp>
#include <iostream>
#include <algorithm>
#include <cctype>
using namespace boost::capy;
// A transform stage that converts to uppercase
class uppercase_transform
{
any_buffer_source* source_;
std::vector<char> buffer_;
std::size_t offset_ = 0;
bool exhausted_ = false;
public:
explicit uppercase_transform(any_buffer_source& source)
: source_(&source)
{
}
// BufferSource interface
io_result<std::size_t> pull(const_buffer* arr, std::size_t max_count)
{
if (exhausted_ && offset_ >= buffer_.size())
co_return {error_code{}, 0}; // Exhausted
// Need more data?
if (offset_ >= buffer_.size())
{
buffer_.clear();
offset_ = 0;
// Pull from upstream
const_buffer upstream[8];
auto [ec, count] = co_await source_->pull(upstream, 8);
if (ec.failed())
co_return {ec, 0};
if (count == 0)
{
exhausted_ = true;
co_return {error_code{}, 0};
}
// Transform: copy and uppercase
for (std::size_t i = 0; i < count; ++i)
{
auto data = static_cast<char const*>(upstream[i].data());
auto size = upstream[i].size();
for (std::size_t j = 0; j < size; ++j)
{
buffer_.push_back(static_cast<char>(
std::toupper(static_cast<unsigned char>(data[j]))));
}
}
}
// Return our buffer
arr[0] = const_buffer(buffer_.data() + offset_, buffer_.size() - offset_);
offset_ = buffer_.size(); // Mark as consumed
co_return {error_code{}, 1};
}
};
// A transform that adds line numbers
class line_numbering_transform
{
any_buffer_source* source_;
std::string buffer_;
std::size_t line_num_ = 1;
bool exhausted_ = false;
bool at_line_start_ = true;
public:
explicit line_numbering_transform(any_buffer_source& source)
: source_(&source)
{
}
io_result<std::size_t> pull(const_buffer* arr, std::size_t max_count)
{
if (exhausted_ && buffer_.empty())
co_return {error_code{}, 0};
// Pull more data if needed
if (buffer_.empty())
{
const_buffer upstream[8];
auto [ec, count] = co_await source_->pull(upstream, 8);
if (ec.failed())
co_return {ec, 0};
if (count == 0)
{
exhausted_ = true;
co_return {error_code{}, 0};
}
// Transform: add line numbers
for (std::size_t i = 0; i < count; ++i)
{
auto data = static_cast<char const*>(upstream[i].data());
auto size = upstream[i].size();
for (std::size_t j = 0; j < size; ++j)
{
if (at_line_start_)
{
buffer_ += std::to_string(line_num_++) + ": ";
at_line_start_ = false;
}
buffer_ += data[j];
if (data[j] == '\n')
at_line_start_ = true;
}
}
}
arr[0] = const_buffer(buffer_.data(), buffer_.size());
buffer_.clear();
co_return {error_code{}, 1};
}
};
// Transfer from source to sink
task<std::size_t> transfer(any_buffer_source& source, any_write_sink& sink)
{
std::size_t total = 0;
const_buffer bufs[8];
for (;;)
{
auto [ec, count] = co_await source.pull(bufs, 8);
if (ec.failed())
throw std::system_error(ec);
if (count == 0)
break;
for (std::size_t i = 0; i < count; ++i)
{
auto [wec, n] = co_await sink.write(bufs[i]);
if (wec.failed())
throw std::system_error(wec);
total += n;
}
}
co_await sink.write_eof();
co_return total;
}
void demo_pipeline()
{
std::cout << "=== Stream Pipeline Demo ===\n\n";
// Source data
std::string input = "hello world\nthis is a test\nof the pipeline\n";
std::cout << "Input:\n" << input << "\n";
// Create source from string
test::buffer_source source;
source.provide(input);
source.provide_eof();
// Wrap as any_buffer_source
any_buffer_source src{source};
// Create transform stages
uppercase_transform upper{src};
any_buffer_source upper_src{upper};
line_numbering_transform numbered{upper_src};
any_buffer_source numbered_src{numbered};
// Create sink
test::write_sink sink;
any_write_sink dst{sink};
// Run pipeline
auto bytes = test::run_blocking(transfer(numbered_src, dst));
std::cout << "Output (" << bytes << " bytes):\n";
std::cout << sink.data() << "\n";
}
int main()
{
demo_pipeline();
return 0;
}
Build
add_executable(stream_pipeline stream_pipeline.cpp)
target_link_libraries(stream_pipeline PRIVATE capy)
Walkthrough
Pipeline Structure
Source → Uppercase → LineNumbering → Sink
Data flows through the pipeline:
-
Source provides raw input
-
Uppercase transforms to uppercase
-
LineNumbering adds line numbers
-
Sink collects output
BufferSource Implementation
io_result<std::size_t> pull(const_buffer* arr, std::size_t max_count)
{
// Pull from upstream
auto [ec, count] = co_await source_->pull(upstream, 8);
// Transform data
// ...
// Return transformed buffer
arr[0] = const_buffer(buffer_.data(), buffer_.size());
co_return {error_code{}, 1};
}
Each stage pulls from upstream, transforms, and provides output buffers.
Output
=== Stream Pipeline Demo ===
Input:
hello world
this is a test
of the pipeline
Output (63 bytes):
1: HELLO WORLD
2: THIS IS A TEST
3: OF THE PIPELINE
Exercises
-
Add a compression/decompression stage
-
Implement a ROT13 transform
-
Create a filtering stage that drops lines matching a pattern
Summary
This example catalog demonstrated:
-
Basic task creation and launching
-
Coroutine synchronization with events
-
Buffer composition for scatter/gather I/O
-
Unit testing with mock streams
-
Compilation firewalls with type erasure
-
Cooperative cancellation with stop tokens
-
Concurrent execution with
when_all -
Custom buffer implementations
-
Real network I/O with Corosio
-
Data transformation pipelines
These patterns form the foundation for building robust, efficient I/O applications with Capy.