Stream Pipeline

Data transformation through a pipeline of sources and sinks.

What You Will Learn

  • Building processing pipelines

  • Using BufferSource and BufferSink concepts

  • Chaining transformations

Prerequisites

Source Code

#include <boost/capy.hpp>
#include <boost/capy/test/run_blocking.hpp>
#include <boost/capy/test/buffer_source.hpp>
#include <boost/capy/test/buffer_sink.hpp>
#include <iostream>
#include <algorithm>
#include <cctype>

using namespace boost::capy;

// A transform stage that converts to uppercase
class uppercase_transform
{
    any_buffer_source* source_;
    std::vector<char> buffer_;
    std::size_t offset_ = 0;
    bool exhausted_ = false;

public:
    explicit uppercase_transform(any_buffer_source& source)
        : source_(&source)
    {
    }

    // BufferSource interface
    io_result<std::size_t> pull(const_buffer* arr, std::size_t max_count)
    {
        if (exhausted_ && offset_ >= buffer_.size())
            co_return {error_code{}, 0};  // Exhausted

        // Need more data?
        if (offset_ >= buffer_.size())
        {
            buffer_.clear();
            offset_ = 0;

            // Pull from upstream
            const_buffer upstream[8];
            auto [ec, count] = co_await source_->pull(upstream, 8);

            if (ec.failed())
                co_return {ec, 0};

            if (count == 0)
            {
                exhausted_ = true;
                co_return {error_code{}, 0};
            }

            // Transform: copy and uppercase
            for (std::size_t i = 0; i < count; ++i)
            {
                auto data = static_cast<char const*>(upstream[i].data());
                auto size = upstream[i].size();

                for (std::size_t j = 0; j < size; ++j)
                {
                    buffer_.push_back(static_cast<char>(
                        std::toupper(static_cast<unsigned char>(data[j]))));
                }
            }
        }

        // Return our buffer
        arr[0] = const_buffer(buffer_.data() + offset_, buffer_.size() - offset_);
        offset_ = buffer_.size();  // Mark as consumed

        co_return {error_code{}, 1};
    }
};

// A transform that adds line numbers
class line_numbering_transform
{
    any_buffer_source* source_;
    std::string buffer_;
    std::size_t line_num_ = 1;
    bool exhausted_ = false;
    bool at_line_start_ = true;

public:
    explicit line_numbering_transform(any_buffer_source& source)
        : source_(&source)
    {
    }

    io_result<std::size_t> pull(const_buffer* arr, std::size_t max_count)
    {
        if (exhausted_ && buffer_.empty())
            co_return {error_code{}, 0};

        // Pull more data if needed
        if (buffer_.empty())
        {
            const_buffer upstream[8];
            auto [ec, count] = co_await source_->pull(upstream, 8);

            if (ec.failed())
                co_return {ec, 0};

            if (count == 0)
            {
                exhausted_ = true;
                co_return {error_code{}, 0};
            }

            // Transform: add line numbers
            for (std::size_t i = 0; i < count; ++i)
            {
                auto data = static_cast<char const*>(upstream[i].data());
                auto size = upstream[i].size();

                for (std::size_t j = 0; j < size; ++j)
                {
                    if (at_line_start_)
                    {
                        buffer_ += std::to_string(line_num_++) + ": ";
                        at_line_start_ = false;
                    }

                    buffer_ += data[j];

                    if (data[j] == '\n')
                        at_line_start_ = true;
                }
            }
        }

        arr[0] = const_buffer(buffer_.data(), buffer_.size());
        buffer_.clear();

        co_return {error_code{}, 1};
    }
};

// Transfer from source to sink
task<std::size_t> transfer(any_buffer_source& source, any_write_sink& sink)
{
    std::size_t total = 0;
    const_buffer bufs[8];

    for (;;)
    {
        auto [ec, count] = co_await source.pull(bufs, 8);

        if (ec.failed())
            throw std::system_error(ec);

        if (count == 0)
            break;

        for (std::size_t i = 0; i < count; ++i)
        {
            auto [wec, n] = co_await sink.write(bufs[i]);
            if (wec.failed())
                throw std::system_error(wec);
            total += n;
        }
    }

    co_await sink.write_eof();
    co_return total;
}

void demo_pipeline()
{
    std::cout << "=== Stream Pipeline Demo ===\n\n";

    // Source data
    std::string input = "hello world\nthis is a test\nof the pipeline\n";
    std::cout << "Input:\n" << input << "\n";

    // Create source from string
    test::buffer_source source;
    source.provide(input);
    source.provide_eof();

    // Wrap as any_buffer_source
    any_buffer_source src{source};

    // Create transform stages
    uppercase_transform upper{src};
    any_buffer_source upper_src{upper};

    line_numbering_transform numbered{upper_src};
    any_buffer_source numbered_src{numbered};

    // Create sink
    test::write_sink sink;
    any_write_sink dst{sink};

    // Run pipeline
    auto bytes = test::run_blocking(transfer(numbered_src, dst));

    std::cout << "Output (" << bytes << " bytes):\n";
    std::cout << sink.data() << "\n";
}

int main()
{
    demo_pipeline();
    return 0;
}

Build

add_executable(stream_pipeline stream_pipeline.cpp)
target_link_libraries(stream_pipeline PRIVATE capy)

Walkthrough

Pipeline Structure

Source → Uppercase → LineNumbering → Sink

Data flows through the pipeline:

  1. Source provides raw input

  2. Uppercase transforms to uppercase

  3. LineNumbering adds line numbers

  4. Sink collects output

BufferSource Implementation

io_result<std::size_t> pull(const_buffer* arr, std::size_t max_count)
{
    // Pull from upstream
    auto [ec, count] = co_await source_->pull(upstream, 8);

    // Transform data
    // ...

    // Return transformed buffer
    arr[0] = const_buffer(buffer_.data(), buffer_.size());
    co_return {error_code{}, 1};
}

Each stage pulls from upstream, transforms, and provides output buffers.

Type Erasure

any_buffer_source src{source};
uppercase_transform upper{src};
any_buffer_source upper_src{upper};

any_buffer_source wraps each stage, allowing uniform composition.

Output

=== Stream Pipeline Demo ===

Input:
hello world
this is a test
of the pipeline

Output (63 bytes):
1: HELLO WORLD
2: THIS IS A TEST
3: OF THE PIPELINE

Exercises

  1. Add a compression/decompression stage

  2. Implement a ROT13 transform

  3. Create a filtering stage that drops lines matching a pattern

Summary

This example catalog demonstrated:

  • Basic task creation and launching

  • Coroutine synchronization with events

  • Buffer composition for scatter/gather I/O

  • Unit testing with mock streams

  • Compilation firewalls with type erasure

  • Cooperative cancellation with stop tokens

  • Concurrent execution with when_all

  • Custom buffer implementations

  • Real network I/O with Corosio

  • Data transformation pipelines

These patterns form the foundation for building robust, efficient I/O applications with Capy.