Parallel Fetch

Running multiple operations concurrently with when_all.

What You Will Learn

  • Using when_all to run tasks in parallel

  • Structured bindings for results

  • Error propagation in concurrent tasks

Prerequisites

Source Code

#include <boost/capy.hpp>
#include <iostream>
#include <string>

using namespace boost::capy;

// Simulated async operations
task<int> fetch_user_id(std::string username)
{
    std::cout << "Fetching user ID for: " << username << "\n";
    // In real code: co_await http_get("/users/" + username);
    co_return username.length() * 100;  // Fake ID
}

task<std::string> fetch_user_name(int id)
{
    std::cout << "Fetching name for user ID: " << id << "\n";
    co_return "User" + std::to_string(id);
}

task<int> fetch_order_count(int user_id)
{
    std::cout << "Fetching order count for user: " << user_id << "\n";
    co_return user_id / 10;  // Fake count
}

task<double> fetch_account_balance(int user_id)
{
    std::cout << "Fetching balance for user: " << user_id << "\n";
    co_return user_id * 1.5;  // Fake balance
}

// Fetch all user data in parallel
task<> fetch_user_dashboard(std::string username)
{
    std::cout << "\n=== Fetching dashboard for: " << username << " ===\n";

    // First, get the user ID (needed for other queries)
    int user_id = co_await fetch_user_id(username);
    std::cout << "Got user ID: " << user_id << "\n\n";

    // Now fetch all user data in parallel
    std::cout << "Starting parallel fetches...\n";
    auto [name, orders, balance] = co_await when_all(
        fetch_user_name(user_id),
        fetch_order_count(user_id),
        fetch_account_balance(user_id)
    );

    std::cout << "\nDashboard results:\n";
    std::cout << "  Name: " << name << "\n";
    std::cout << "  Orders: " << orders << "\n";
    std::cout << "  Balance: $" << balance << "\n";
}

// Example with void tasks
task<> log_access(std::string resource)
{
    std::cout << "Logging access to: " << resource << "\n";
    co_return;
}

task<> update_metrics(std::string metric)
{
    std::cout << "Updating metric: " << metric << "\n";
    co_return;
}

task<std::string> fetch_with_side_effects()
{
    std::cout << "\n=== Fetch with side effects ===\n";

    // void tasks don't contribute to result tuple
    auto [data] = co_await when_all(
        log_access("api/data"),           // void - no result
        update_metrics("api_calls"),      // void - no result
        fetch_user_name(42)               // returns string
    );

    std::cout << "Data: " << data << "\n";
    co_return data;
}

// Error handling example
task<int> might_fail(bool should_fail, std::string name)
{
    std::cout << "Task " << name << " starting\n";

    if (should_fail)
    {
        throw std::runtime_error(name + " failed!");
    }

    std::cout << "Task " << name << " completed\n";
    co_return 42;
}

task<> demonstrate_error_handling()
{
    std::cout << "\n=== Error handling ===\n";

    try
    {
        auto [a, b, c] = co_await when_all(
            might_fail(false, "A"),
            might_fail(true, "B"),   // This one fails
            might_fail(false, "C")
        );
        std::cout << "All succeeded: " << a << ", " << b << ", " << c << "\n";
    }
    catch (std::runtime_error const& e)
    {
        std::cout << "Caught error: " << e.what() << "\n";
        // Note: when_all waits for all tasks to complete (or respond to stop)
        // before propagating the first exception
    }
}

int main()
{
    thread_pool pool;

    run_async(pool.get_executor())(fetch_user_dashboard("alice"));
    run_async(pool.get_executor())(fetch_with_side_effects());
    run_async(pool.get_executor())(demonstrate_error_handling());

    return 0;
}

Build

add_executable(parallel_fetch parallel_fetch.cpp)
target_link_libraries(parallel_fetch PRIVATE capy)

Walkthrough

Basic when_all

auto [name, orders, balance] = co_await when_all(
    fetch_user_name(user_id),
    fetch_order_count(user_id),
    fetch_account_balance(user_id)
);

All three tasks run concurrently. when_all completes when all tasks finish. Results are returned in a tuple matching input order.

Void Filtering

auto [data] = co_await when_all(
    log_access("api/data"),      // void - filtered out
    update_metrics("api_calls"), // void - filtered out
    fetch_user_name(42)          // string - in tuple
);

Tasks returning void don’t contribute to the result tuple. Only non-void results appear.

Error Propagation

try
{
    auto results = co_await when_all(task_a(), task_b(), task_c());
}
catch (...)
{
    // First exception is rethrown
    // All tasks complete before exception propagates
}

When a task throws:

  1. The exception is captured

  2. Stop is requested for siblings

  3. All tasks complete (or respond to stop)

  4. First exception is rethrown

Output

=== Fetching dashboard for: alice ===
Fetching user ID for: alice
Got user ID: 500

Starting parallel fetches...
Fetching name for user ID: 500
Fetching order count for user: 500
Fetching balance for user: 500

Dashboard results:
  Name: User500
  Orders: 50
  Balance: $750

=== Fetch with side effects ===
Logging access to: api/data
Updating metric: api_calls
Fetching name for user ID: 42
Data: User42

=== Error handling ===
Task A starting
Task B starting
Task C starting
Task A completed
Task C completed
Caught error: B failed!

Exercises

  1. Add timing to see the parallel speedup vs sequential execution

  2. Implement a "fan-out/fan-in" pattern that processes a list of items in parallel

  3. Add cancellation support so remaining tasks can exit early on error

Next Steps