V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
git00ll
V2EX  ›  Java

Project Reactor,如何实现主线程消费报错时停止 Flux 流

  •  
  •   git00ll · 2021-12-27 18:03:07 +08:00 · 1926 次点击
    这是一个创建于 1107 天前的主题,其中的信息可能已经有所发展或是发生改变。

    如下代码,本意是将 flux 流发送到子线程处理,再将处理结果汇聚到主线程。如何能够实现主线程处理报错时停止 Flux 的呢。

        public static void main(String[] args) {
            String[] data = {"2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15"};
    
            Iterable<Integer> integers = Flux.fromArray(data)
                    .flatMapSequential(s -> Mono.fromSupplier(() -> Integer.parseInt(s)).subscribeOn(Schedulers.boundedElastic()), 3)
                    .doOnNext(s -> {
                        System.out.println(Thread.currentThread().getName() + "---------->" + s);
                    })
                    .toIterable();
    
    
            for (Integer i : integers) {
                //如何实现这里报错时,停止 Flux
                System.out.println((10 / i) + "------>>>>" + Thread.currentThread().getName());
            }
        }
    
    
    5 条回复    2021-12-28 12:50:30 +08:00
    yazinnnn
        1
    yazinnnn  
       2021-12-28 09:14:58 +08:00
    fun main() {
    val data = arrayOf("2", "2", "2", "0", "8", "9", "10", "11", "12", "13", "14", "15")

    val integers = Flux.fromArray(data)
    .flatMapSequential({ s: String ->
    Mono.fromSupplier { s.toInt() }.subscribeOn(Schedulers.boundedElastic())
    }, 3)
    val a = integers.doOnNext { s ->
    println(Thread.currentThread().name + "---------->" + s)
    }.subscribe()

    for (i in integers.toIterable()) {
    try {
    10 / i
    } catch (t: Throwable) {
    a.dispose()
    }
    }

    CountDownLatch(1).await()
    }


    不清楚你的具体需求,如果整个链路不使用 reactivestream 的话,似乎性能(吞吐)并没什么提高
    toIterable()迭代时会阻塞当前线程,这样写跟直接用线程池处理比没啥优点
    Macolor21
        2
    Macolor21  
       2021-12-28 09:22:53 +08:00
    CompletableFuture 最后 join 回主线程?
    git00ll
        3
    git00ll  
    OP
       2021-12-28 10:39:40 +08:00
    @yazinnnn 假设 toInt 这个操作是比较耗时的,可以实现将 toInt 放置在多核上运行,最终结果再汇聚到主线程上。
    因为主线程上开启了传统注解事务,需要在主线程上操作 Flux 的处理结果
    yazinnnn
        4
    yazinnnn  
       2021-12-28 11:08:07 +08:00
    @git00ll
    那直接使用 stream 的并行 api 或者 2 楼的 CompletableFuture 是否更适合你的场景?
    git00ll
        5
    git00ll  
    OP
       2021-12-28 12:50:30 +08:00
    @yazinnnn
    业务场景里有一些限制,
    1. 需要保持输入和输出的顺序一致,
    2. 流中的数据从文件中读取的,数据量非常大,无法全部加载到内存。只能边读取边处理。
    3.处理过程中其中一条处理错误时,算失败,中断流不再继续。

    一方面 stream 的并行流没有拉模式,无法精准控制载入内存的数据行数。
    且并行 stream 提供的 api 太少,相比于 reactor 提供的控制选项不足
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   4932 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 24ms · UTC 03:59 · PVG 11:59 · LAX 19:59 · JFK 22:59
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.