Parallel Fetch
Running multiple operations concurrently with when_all.
What You Will Learn
-
Using
when_allto run tasks in parallel -
Structured bindings for results
-
Error propagation in concurrent tasks
Prerequisites
-
Completed Timeout with Cancellation
-
Understanding of
when_allfrom Composition
Source Code
#include <boost/capy.hpp>
#include <iostream>
#include <string>
using namespace boost::capy;
// Simulated async operations
task<int> fetch_user_id(std::string username)
{
std::cout << "Fetching user ID for: " << username << "\n";
// In real code: co_await http_get("/users/" + username);
co_return username.length() * 100; // Fake ID
}
task<std::string> fetch_user_name(int id)
{
std::cout << "Fetching name for user ID: " << id << "\n";
co_return "User" + std::to_string(id);
}
task<int> fetch_order_count(int user_id)
{
std::cout << "Fetching order count for user: " << user_id << "\n";
co_return user_id / 10; // Fake count
}
task<double> fetch_account_balance(int user_id)
{
std::cout << "Fetching balance for user: " << user_id << "\n";
co_return user_id * 1.5; // Fake balance
}
// Fetch all user data in parallel
task<> fetch_user_dashboard(std::string username)
{
std::cout << "\n=== Fetching dashboard for: " << username << " ===\n";
// First, get the user ID (needed for other queries)
int user_id = co_await fetch_user_id(username);
std::cout << "Got user ID: " << user_id << "\n\n";
// Now fetch all user data in parallel
std::cout << "Starting parallel fetches...\n";
auto [name, orders, balance] = co_await when_all(
fetch_user_name(user_id),
fetch_order_count(user_id),
fetch_account_balance(user_id)
);
std::cout << "\nDashboard results:\n";
std::cout << " Name: " << name << "\n";
std::cout << " Orders: " << orders << "\n";
std::cout << " Balance: $" << balance << "\n";
}
// Example with void tasks
task<> log_access(std::string resource)
{
std::cout << "Logging access to: " << resource << "\n";
co_return;
}
task<> update_metrics(std::string metric)
{
std::cout << "Updating metric: " << metric << "\n";
co_return;
}
task<std::string> fetch_with_side_effects()
{
std::cout << "\n=== Fetch with side effects ===\n";
// void tasks don't contribute to result tuple
auto [data] = co_await when_all(
log_access("api/data"), // void - no result
update_metrics("api_calls"), // void - no result
fetch_user_name(42) // returns string
);
std::cout << "Data: " << data << "\n";
co_return data;
}
// Error handling example
task<int> might_fail(bool should_fail, std::string name)
{
std::cout << "Task " << name << " starting\n";
if (should_fail)
{
throw std::runtime_error(name + " failed!");
}
std::cout << "Task " << name << " completed\n";
co_return 42;
}
task<> demonstrate_error_handling()
{
std::cout << "\n=== Error handling ===\n";
try
{
auto [a, b, c] = co_await when_all(
might_fail(false, "A"),
might_fail(true, "B"), // This one fails
might_fail(false, "C")
);
std::cout << "All succeeded: " << a << ", " << b << ", " << c << "\n";
}
catch (std::runtime_error const& e)
{
std::cout << "Caught error: " << e.what() << "\n";
// Note: when_all waits for all tasks to complete (or respond to stop)
// before propagating the first exception
}
}
int main()
{
thread_pool pool;
run_async(pool.get_executor())(fetch_user_dashboard("alice"));
run_async(pool.get_executor())(fetch_with_side_effects());
run_async(pool.get_executor())(demonstrate_error_handling());
return 0;
}
Build
add_executable(parallel_fetch parallel_fetch.cpp)
target_link_libraries(parallel_fetch PRIVATE capy)
Walkthrough
Basic when_all
auto [name, orders, balance] = co_await when_all(
fetch_user_name(user_id),
fetch_order_count(user_id),
fetch_account_balance(user_id)
);
All three tasks run concurrently. when_all completes when all tasks finish. Results are returned in a tuple matching input order.
Void Filtering
auto [data] = co_await when_all(
log_access("api/data"), // void - filtered out
update_metrics("api_calls"), // void - filtered out
fetch_user_name(42) // string - in tuple
);
Tasks returning void don’t contribute to the result tuple. Only non-void results appear.
Error Propagation
try
{
auto results = co_await when_all(task_a(), task_b(), task_c());
}
catch (...)
{
// First exception is rethrown
// All tasks complete before exception propagates
}
When a task throws:
-
The exception is captured
-
Stop is requested for siblings
-
All tasks complete (or respond to stop)
-
First exception is rethrown
Output
=== Fetching dashboard for: alice ===
Fetching user ID for: alice
Got user ID: 500
Starting parallel fetches...
Fetching name for user ID: 500
Fetching order count for user: 500
Fetching balance for user: 500
Dashboard results:
Name: User500
Orders: 50
Balance: $750
=== Fetch with side effects ===
Logging access to: api/data
Updating metric: api_calls
Fetching name for user ID: 42
Data: User42
=== Error handling ===
Task A starting
Task B starting
Task C starting
Task A completed
Task C completed
Caught error: B failed!
Exercises
-
Add timing to see the parallel speedup vs sequential execution
-
Implement a "fan-out/fan-in" pattern that processes a list of items in parallel
-
Add cancellation support so remaining tasks can exit early on error
Next Steps
-
Custom Dynamic Buffer — Implementing your own buffer