Mutiny是一个基于Reactive Streams规范的轻量级响应式编程库,设计目标是让异步编程更简单直观,它提供了丰富的操作符来处理异步数据流,其中就包含异步操作顺序执行和失败容错的相关能力。

Mutiny基础概念
Mutiny的核心是两个主要的类型:Uni和Multi。Uni表示最多发射一个元素的异步操作,适合处理单个异步任务,比如一次数据库查询、一次远程接口调用。Multi表示可以发射多个元素的异步流,适合处理批量数据或者持续的数据流场景。
我们要实现异步操作的顺序执行,主要会用到Uni类型,因为单个异步任务的结果可以用Uni来封装,多个任务的顺序执行可以通过链式调用或者组合操作符来实现。
异步操作的顺序执行实现
假设我们有三个异步操作,需要按照顺序依次执行,前一个操作的结果作为后一个操作的输入,在Mutiny中可以通过chain方法来实现这种依赖关系的顺序执行。
基础顺序执行示例
下面的代码演示了三个异步任务按顺序执行的过程,每个任务都返回Uni类型,通过chain串联起来:
import io.smallrye.mutiny.Uni;
public class MutinySequenceDemo {
// 第一个异步任务,模拟耗时1秒,返回初始值
private static Uni<String> task1() {
return Uni.createFrom().item("初始数据")
.onItem().delayIt().by(java.time.Duration.ofSeconds(1));
}
// 第二个异步任务,接收第一个任务的结果,处理后返回新结果
private static Uni<String> task2(String input) {
return Uni.createFrom().item(input + "-处理后数据")
.onItem().delayIt().by(java.time.Duration.ofSeconds(1));
}
// 第三个异步任务,接收第二个任务的结果,完成最终处理
private static Uni<String> task3(String input) {
return Uni.createFrom().item(input + "-最终结果")
.onItem().delayIt().by(java.time.Duration.ofSeconds(1));
}
public static void main(String[] args) {
// 链式调用实现顺序执行
task1()
.chain(MutinySequenceDemo::task2)
.chain(MutinySequenceDemo::task3)
.subscribe().with(
result -> System.out.println("顺序执行结果:" + result),
failure -> System.out.println("执行失败:" + failure.getMessage())
);
// 阻塞等待结果,避免主线程提前退出
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代码中,chain方法的作用是:等待前一个Uni完成并拿到结果后,将结果传入参数中的函数,执行下一个异步任务,这样就保证了三个任务是严格按照顺序执行的,不会出现并发执行的情况。
无依赖关系的顺序执行
如果多个异步任务之间没有数据依赖,只是需要按顺序执行,不需要前一个任务的结果,也可以使用chain结合replaceWith来实现:
import io.smallrye.mutiny.Uni;
public class MutinyNoDepSequence {
private static Uni<Void> taskA() {
return Uni.createFrom().nullItem()
.onItem().delayIt().by(java.time.Duration.ofSeconds(1))
.onItem().invoke(() -> System.out.println("任务A执行完成"));
}
private static Uni<Void> taskB() {
return Uni.createFrom().nullItem()
.onItem().delayIt().by(java.time.Duration.ofSeconds(1))
.onItem().invoke(() -> System.out.println("任务B执行完成"));
}
private static Uni<Void> taskC() {
return Uni.createFrom().nullItem()
.onItem().delayIt().by(java.time.Duration.ofSeconds(1))
.onItem().invoke(() -> System.out.println("任务C执行完成"));
}
public static void main(String[] args) {
taskA()
.chain(() -> taskB())
.chain(() -> taskC())
.subscribe().with(
v -> System.out.println("所有无依赖任务顺序执行完成"),
failure -> System.out.println("执行失败:" + failure.getMessage())
);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
失败容错的实现方式
异步操作执行过程中难免会出现失败的情况,Mutiny提供了多种失败容错的操作符,开发者可以根据场景选择合适的处理方式。
失败重试
如果异步操作失败是临时性的,比如网络波动导致的远程调用失败,可以使用onFailure配合retry方法实现重试逻辑:
import io.smallrye.mutiny.Uni;
import java.util.concurrent.atomic.AtomicInteger;
public class MutinyRetryDemo {
private static AtomicInteger retryCount = new AtomicInteger(0);
private static Uni<String> unstableTask() {
return Uni.createFrom().item("")
.onItem().invoke(() -> {
int count = retryCount.incrementAndGet();
if (count <= 2) {
throw new RuntimeException("第" + count + "次执行失败");
}
})
.onItem().transform(v -> "任务执行成功");
}
public static void main(String[] args) {
unstableTask()
// 最多重试3次,每次重试间隔500毫秒
.onFailure().retry().atMost(3)
.onFailure().delayed(Duration.ofMillis(500))
.subscribe().with(
result -> System.out.println("重试后结果:" + result),
failure -> System.out.println("重试耗尽后失败:" + failure.getMessage())
);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代码中,retry().atMost(3)表示最多重试3次,加上首次执行一共会尝试4次,如果4次都失败才会触发最终的失败回调。
失败降级
如果异步操作失败后不需要重试,而是需要返回一个默认的降级结果,可以使用onFailure配合recoverWithItem或者recoverWithUni来实现:
import io.smallrye.mutiny.Uni;
public class MutinyFallbackDemo {
private static Uni<String> mayFailTask() {
return Uni.createFrom().failure(new RuntimeException("任务执行失败"));
}
public static void main(String[] args) {
mayFailTask()
// 失败时返回默认的降级结果
.onFailure().recoverWithItem("默认降级数据")
.subscribe().with(
result -> System.out.println("最终结果:" + result),
failure -> System.out.println("不应该走到这里:" + failure.getMessage())
);
// 也可以使用recoverWithUni返回一个新的Uni作为降级
mayFailTask()
.onFailure().recoverWithUni(() -> Uni.createFrom().item("Uni降级数据"))
.subscribe().with(
result -> System.out.println("Uni降级结果:" + result)
);
}
}
失败忽略
如果某个异步操作的失败不影响整体流程,可以直接忽略失败,使用onFailure配合recoverWithNull或者recoverWithItem(返回null)来实现:
import io.smallrye.mutiny.Uni;
public class MutinyIgnoreFailure {
private static Uni<String> optionalTask() {
return Uni.createFrom().failure(new RuntimeException("可选任务失败"));
}
public static void main(String[] args) {
optionalTask()
.onFailure().recoverWithNull()
.subscribe().with(
result -> System.out.println("可选任务结果:" + (result == null ? "null(已忽略失败)" : result))
);
}
}
顺序执行与失败容错结合
实际场景中,我们通常需要把顺序执行和失败容错结合起来,比如顺序执行多个任务,每个任务都有重试或者降级的逻辑:
import io.smallrye.mutiny.Uni;
import java.time.Duration;
public class MutinyCombineDemo {
private static Uni<String> step1() {
return Uni.createFrom().item("步骤1结果")
.onItem().delayIt().by(Duration.ofSeconds(1));
}
private static Uni<String> step2(String input) {
return Uni.createFrom().item(input + "-步骤2结果")
.onItem().delayIt().by(Duration.ofSeconds(1))
// 步骤2失败最多重试2次,失败则返回降级结果
.onFailure().retry().atMost(2)
.onFailure().recoverWithItem(input + "-步骤2降级结果");
}
private static Uni<String> step3(String input) {
return Uni.createFrom().item(input + "-步骤3结果")
.onItem().delayIt().by(Duration.ofSeconds(1));
}
public static void main(String[] args) {
step1()
.chain(MutinyCombineDemo::step2)
.chain(MutinyCombineDemo::step3)
.subscribe().with(
result -> System.out.println("组合执行最终结果:" + result),
failure -> System.out.println("整体执行失败:" + failure.getMessage())
);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
上述代码中,步骤2既有重试逻辑也有降级逻辑,即使步骤2失败重试后仍然失败,也会返回降级结果,不会中断整个顺序执行的流程,保证了整体流程的稳定性。
注意事项
- 使用
chain方法时,要确保传入的函数返回的是Uni类型,否则无法实现异步的顺序执行。 - 重试逻辑要设置合理的重试次数和重试间隔,避免出现无限重试或者重试风暴的问题。
- 降级逻辑要返回符合业务预期的数据,避免降级后的数据导致后续流程出现错误。
- 如果顺序执行的某个任务失败且没有容错处理,整个链式调用会直接终止,触发最终的失败回调。
Mutiny异步操作顺序执行失败容错reactive_streams修改时间:2026-07-05 07:15:39