如何使用 Mutiny 实现异步操作的顺序执行与失败容错

来源:AI教程网作者:香港程序员头衔:程序员
导读:本期聚焦于小伙伴创作的《如何使用 Mutiny 实现异步操作的顺序执行与失败容错》,敬请观看详情,探索知识的价值。以下视频、文章将为您系统阐述其核心内容与价值。如果您觉得《如何使用 Mutiny 实现异步操作的顺序执行与失败容错》有用,将其分享出去将是对创作者最好的鼓励。

Mutiny是一个基于Reactive Streams规范的轻量级响应式编程库,设计目标是让异步编程更简单直观,它提供了丰富的操作符来处理异步数据流,其中就包含异步操作顺序执行和失败容错的相关能力。

如何使用 Mutiny 实现异步操作的顺序执行与失败容错

Mutiny基础概念

Mutiny的核心是两个主要的类型:UniMultiUni表示最多发射一个元素的异步操作,适合处理单个异步任务,比如一次数据库查询、一次远程接口调用。Multi表示可以发射多个元素的异步流,适合处理批量数据或者持续的数据流场景。

我们要实现异步操作的顺序执行,主要会用到Uni类型,因为单个异步任务的结果可以用Uni来封装,多个任务的顺序执行可以通过链式调用或者组合操作符来实现。

异步操作的顺序执行实现

假设我们有三个异步操作,需要按照顺序依次执行,前一个操作的结果作为后一个操作的输入,在Mutiny中可以通过chain方法来实现这种依赖关系的顺序执行。

基础顺序执行示例

下面的代码演示了三个异步任务按顺序执行的过程,每个任务都返回Uni类型,通过chain串联起来:

import io.smallrye.mutiny.Uni;

public class MutinySequenceDemo {
    // 第一个异步任务,模拟耗时1秒,返回初始值
    private static Uni<String> task1() {
        return Uni.createFrom().item("初始数据")
                .onItem().delayIt().by(java.time.Duration.ofSeconds(1));
    }

    // 第二个异步任务,接收第一个任务的结果,处理后返回新结果
    private static Uni<String> task2(String input) {
        return Uni.createFrom().item(input + "-处理后数据")
                .onItem().delayIt().by(java.time.Duration.ofSeconds(1));
    }

    // 第三个异步任务,接收第二个任务的结果,完成最终处理
    private static Uni<String> task3(String input) {
        return Uni.createFrom().item(input + "-最终结果")
                .onItem().delayIt().by(java.time.Duration.ofSeconds(1));
    }

    public static void main(String[] args) {
        // 链式调用实现顺序执行
        task1()
                .chain(MutinySequenceDemo::task2)
                .chain(MutinySequenceDemo::task3)
                .subscribe().with(
                        result -> System.out.println("顺序执行结果:" + result),
                        failure -> System.out.println("执行失败:" + failure.getMessage())
                );

        // 阻塞等待结果,避免主线程提前退出
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述代码中,chain方法的作用是:等待前一个Uni完成并拿到结果后,将结果传入参数中的函数,执行下一个异步任务,这样就保证了三个任务是严格按照顺序执行的,不会出现并发执行的情况。

无依赖关系的顺序执行

如果多个异步任务之间没有数据依赖,只是需要按顺序执行,不需要前一个任务的结果,也可以使用chain结合replaceWith来实现:

import io.smallrye.mutiny.Uni;

public class MutinyNoDepSequence {
    private static Uni<Void> taskA() {
        return Uni.createFrom().nullItem()
                .onItem().delayIt().by(java.time.Duration.ofSeconds(1))
                .onItem().invoke(() -> System.out.println("任务A执行完成"));
    }

    private static Uni<Void> taskB() {
        return Uni.createFrom().nullItem()
                .onItem().delayIt().by(java.time.Duration.ofSeconds(1))
                .onItem().invoke(() -> System.out.println("任务B执行完成"));
    }

    private static Uni<Void> taskC() {
        return Uni.createFrom().nullItem()
                .onItem().delayIt().by(java.time.Duration.ofSeconds(1))
                .onItem().invoke(() -> System.out.println("任务C执行完成"));
    }

    public static void main(String[] args) {
        taskA()
                .chain(() -> taskB())
                .chain(() -> taskC())
                .subscribe().with(
                        v -> System.out.println("所有无依赖任务顺序执行完成"),
                        failure -> System.out.println("执行失败:" + failure.getMessage())
                );

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

失败容错的实现方式

异步操作执行过程中难免会出现失败的情况,Mutiny提供了多种失败容错的操作符,开发者可以根据场景选择合适的处理方式。

失败重试

如果异步操作失败是临时性的,比如网络波动导致的远程调用失败,可以使用onFailure配合retry方法实现重试逻辑:

import io.smallrye.mutiny.Uni;
import java.util.concurrent.atomic.AtomicInteger;

public class MutinyRetryDemo {
    private static AtomicInteger retryCount = new AtomicInteger(0);

    private static Uni<String> unstableTask() {
        return Uni.createFrom().item("")
                .onItem().invoke(() -> {
                    int count = retryCount.incrementAndGet();
                    if (count <= 2) {
                        throw new RuntimeException("第" + count + "次执行失败");
                    }
                })
                .onItem().transform(v -> "任务执行成功");
    }

    public static void main(String[] args) {
        unstableTask()
                // 最多重试3次,每次重试间隔500毫秒
                .onFailure().retry().atMost(3)
                .onFailure().delayed(Duration.ofMillis(500))
                .subscribe().with(
                        result -> System.out.println("重试后结果:" + result),
                        failure -> System.out.println("重试耗尽后失败:" + failure.getMessage())
                );

        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述代码中,retry().atMost(3)表示最多重试3次,加上首次执行一共会尝试4次,如果4次都失败才会触发最终的失败回调。

失败降级

如果异步操作失败后不需要重试,而是需要返回一个默认的降级结果,可以使用onFailure配合recoverWithItem或者recoverWithUni来实现:

import io.smallrye.mutiny.Uni;

public class MutinyFallbackDemo {
    private static Uni<String> mayFailTask() {
        return Uni.createFrom().failure(new RuntimeException("任务执行失败"));
    }

    public static void main(String[] args) {
        mayFailTask()
                // 失败时返回默认的降级结果
                .onFailure().recoverWithItem("默认降级数据")
                .subscribe().with(
                        result -> System.out.println("最终结果:" + result),
                        failure -> System.out.println("不应该走到这里:" + failure.getMessage())
                );

        // 也可以使用recoverWithUni返回一个新的Uni作为降级
        mayFailTask()
                .onFailure().recoverWithUni(() -> Uni.createFrom().item("Uni降级数据"))
                .subscribe().with(
                        result -> System.out.println("Uni降级结果:" + result)
                );
    }
}

失败忽略

如果某个异步操作的失败不影响整体流程,可以直接忽略失败,使用onFailure配合recoverWithNull或者recoverWithItem(返回null)来实现:

import io.smallrye.mutiny.Uni;

public class MutinyIgnoreFailure {
    private static Uni<String> optionalTask() {
        return Uni.createFrom().failure(new RuntimeException("可选任务失败"));
    }

    public static void main(String[] args) {
        optionalTask()
                .onFailure().recoverWithNull()
                .subscribe().with(
                        result -> System.out.println("可选任务结果:" + (result == null ? "null(已忽略失败)" : result))
                );
    }
}

顺序执行与失败容错结合

实际场景中,我们通常需要把顺序执行和失败容错结合起来,比如顺序执行多个任务,每个任务都有重试或者降级的逻辑:

import io.smallrye.mutiny.Uni;
import java.time.Duration;

public class MutinyCombineDemo {
    private static Uni<String> step1() {
        return Uni.createFrom().item("步骤1结果")
                .onItem().delayIt().by(Duration.ofSeconds(1));
    }

    private static Uni<String> step2(String input) {
        return Uni.createFrom().item(input + "-步骤2结果")
                .onItem().delayIt().by(Duration.ofSeconds(1))
                // 步骤2失败最多重试2次,失败则返回降级结果
                .onFailure().retry().atMost(2)
                .onFailure().recoverWithItem(input + "-步骤2降级结果");
    }

    private static Uni<String> step3(String input) {
        return Uni.createFrom().item(input + "-步骤3结果")
                .onItem().delayIt().by(Duration.ofSeconds(1));
    }

    public static void main(String[] args) {
        step1()
                .chain(MutinyCombineDemo::step2)
                .chain(MutinyCombineDemo::step3)
                .subscribe().with(
                        result -> System.out.println("组合执行最终结果:" + result),
                        failure -> System.out.println("整体执行失败:" + failure.getMessage())
                );

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

上述代码中,步骤2既有重试逻辑也有降级逻辑,即使步骤2失败重试后仍然失败,也会返回降级结果,不会中断整个顺序执行的流程,保证了整体流程的稳定性。

注意事项

  • 使用chain方法时,要确保传入的函数返回的是Uni类型,否则无法实现异步的顺序执行。
  • 重试逻辑要设置合理的重试次数和重试间隔,避免出现无限重试或者重试风暴的问题。
  • 降级逻辑要返回符合业务预期的数据,避免降级后的数据导致后续流程出现错误。
  • 如果顺序执行的某个任务失败且没有容错处理,整个链式调用会直接终止,触发最终的失败回调。

Mutiny异步操作顺序执行失败容错reactive_streams修改时间:2026-07-05 07:15:39

免责声明:​ 已尽一切努力确保本网站所含信息的准确性。网站内容多为原创整理与精心编撰,观点力求客观中立。本站旨在免费分享,内容仅供个人学习、研究或参考使用。若引用了第三方作品,版权归原作者所有。如内容涉及您的权益,请联系我们处理。
内容垂直聚焦
专注技术核心技术栏目,确保每篇文章深度聚焦于实用技能。从代码技巧到架构设计,为用户提供无干扰的纯技术知识沉淀,精准满足专业提升需求。
知识结构清晰
覆盖从开发到部署的全链路。AI、前端、编程、数据库、服务器、建站、系统层层递进,构建清晰学习路径,帮助用户系统化掌握开发与运维所需的核心技术。
深度技术解析
拒绝泛泛而谈,深入技术细节与实践难点。无论是数据库优化还是服务器配置,均结合真实场景与代码示例进行剖析,致力于提供可直接应用于工作的解决方案。
专业领域覆盖
精准对应开发生命周期。从前端界面到后端编程,从数据库操作到服务器运维,形成完整闭环,一站式满足全栈工程师和运维人员的技术需求。
即学即用高效
内容强调实操性,步骤清晰、代码完整。用户可根据教程直接复现和应用于自身项目,显著缩短从学习到实践的距离,快速解决开发中的具体问题。
持续更新保障
专注既定技术方向进行长期、稳定的内容输出。确保各栏目技术文章持续更新迭代,紧跟主流技术发展趋势,为用户提供经久不衰的学习价值。