在C++的异步编程场景中,经常需要多个异步任务按照固定顺序依次执行,比如先读取文件内容,再解析文件数据,最后处理解析结果,每个步骤都依赖前一个步骤的完成结果。如果手动通过std::future等待每个任务完成再启动下一个任务,代码会变得冗余且难以维护。因此需要设计一个链式异步执行模型,让开发者可以像拼接链条一样添加异步任务,框架自动处理任务的顺序执行和结果传递。

核心设计思路
链式异步执行模型的核心是将每个异步任务封装为可链式调用的节点,每个节点持有前一个任务的结果,执行完成后将结果传递给下一个节点。设计要点包含以下几个部分:
- 任务节点封装:每个节点存储一个可调用对象,以及前一个任务的结果引用
- 链式调用支持:通过then方法添加下一个任务,返回新的任务链对象
- 异步执行触发:提供execute方法启动整个任务链的执行,内部自动按顺序调度任务
- 结果传递:前一个任务的返回值作为下一个任务的输入参数,支持不同类型的任务参数传递
- 异常处理:捕获任务执行过程中的异常,避免异常导致整个任务链崩溃
基础实现代码
首先定义任务链的基础模板类,支持不同返回类型的任务节点:
#include <future>
#include <functional>
#include <memory>
#include <exception>
#include <iostream>
// 任务链基类,存储前一个任务的结果
template <typename T>
class TaskChain {
public:
TaskChain(std::future<T> fut) : future_(std::move(fut)) {}
// 添加下一个任务,接收前一个任务的返回值作为参数
template <typename Func>
auto then(Func&& func) -> TaskChain<decltype(func(std::declval<T>()))> {
using ResultType = decltype(func(std::declval<T>()));
// 封装下一个任务的逻辑:等待前一个任务完成,再执行当前任务
auto next_task = [fut = std::move(future_), f = std::forward<Func>(func)]() mutable {
try {
T prev_result = fut.get();
return f(prev_result);
} catch (...) {
// 异常传递,让后续任务可以捕获
throw;
}
};
// 启动下一个任务的异步执行
std::future<ResultType> next_future = std::async(std::launch::async, std::move(next_task));
return TaskChain<ResultType>(std::move(next_future));
}
// 获取最终执行结果
T get() {
return future_.get();
}
private:
std::future<T> future_;
};
// 起始任务创建函数,用于启动任务链
template <typename Func>
auto start_async(Func&& func) -> TaskChain<decltype(func())> {
using ResultType = decltype(func());
std::future<ResultType> fut = std::async(std::launch::async, std::forward<Func>(func));
return TaskChain<ResultType>(std::move(fut));
}
实战使用示例
下面通过一个完整的示例展示如何使用上述任务链模型,实现三个异步任务按顺序执行:
// 第一个任务:模拟读取文件,返回文件内容字符串
std::string read_file() {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "文件读取完成" << std::endl;
return "file_content_data";
}
// 第二个任务:解析文件内容,返回解析后的数据长度
int parse_content(const std::string& content) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "内容解析完成,内容长度:" << content.size() << std::endl;
return content.size();
}
// 第三个任务:处理解析结果,返回处理结果描述
std::string process_result(int data_len) {
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "结果处理完成,数据长度:" << data_len << std::endl;
return "处理完成,数据长度为" + std::to_string(data_len);
}
int main() {
try {
// 启动任务链,依次添加三个任务
auto result = start_async(read_file)
.then(parse_content)
.then(process_result)
.get();
std::cout << "最终执行结果:" << result << std::endl;
} catch (const std::exception& e) {
std::cerr << "任务执行异常:" << e.what() << std::endl;
}
return 0;
}
模型优化点
上述基础实现可以满足大部分链式异步执行的需求,实际使用中还可以根据场景做进一步优化:
- 支持无参数的起始任务,适配不需要前置输入的任务场景
- 添加任务执行超时控制,避免某个任务执行时间过长阻塞整个链
- 支持并行任务分支,部分无依赖的任务可以并行执行后再合并结果
- 增加任务执行回调,方便监控每个任务的执行状态和执行耗时
通过这种链式异步执行模型,开发者可以用简洁的代码实现复杂的异步任务调度逻辑,减少重复的结果等待和任务启动代码,提升异步编程的开发效率。