Skip to content

whaleal/reactor-demo

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 

Repository files navigation

响应式编程 Reactor 是响应式编程范式的实现,总结起来有如下几点:

响应式编程是一种关注于数据流(data streams)和变化传递(propagation of change)的异步编程方式。 这意味着它可以用既有的编程语言表达静态(如数组)或动态(如事件源)的数据流。

在响应式编程方面,微软跨出了第一步,它在 .NET 生态中创建了响应式扩展库(Reactive Extensions library, Rx)。接着 RxJava 在JVM上实现了响应式编程。后来,在 JVM 平台出现了一套标准的响应式 编程规范,它定义了一系列标准接口和交互规范。并整合到 Java 9 中(使用 Flow 类)。

响应式编程通常作为面向对象编程中的“观察者模式”(Observer design pattern)的一种扩展。 响应式流(reactive streams)与“迭代子模式”(Iterator design pattern)也有相通之处, 因为其中也有 Iterable-Iterator 这样的对应关系。主要的区别在于,Iterator 是基于 “拉取”(pull)方式的,而响应式流是基于“推送”(push)方式的。

使用 iterator 是一种“命令式”(imperative)编程范式,即使访问元素的方法是 Iterable 的唯一职责。关键在于,什么时候执行 next() 获取元素取决于开发者。在响应式流中,相对应的 角色是 Publisher-Subscriber,但是 当有新的值到来的时候 ,却反过来由发布者(Publisher) 通知订阅者(Subscriber),这种“推送”模式是响应式的关键。此外,对推送来的数据的操作 是通过一种声明式(declaratively)而不是命令式(imperatively)的方式表达的:开发者通过 描述“控制流程”来定义对数据流的处理逻辑。

除了数据推送,对错误处理(error handling)和完成(completion)信号的定义也很完善。 一个 Publisher 可以推送新的值到它的 Subscriber(调用 onNext 方法), 同样也可以推送错误(调用 onError 方法)和完成(调用 onComplete 方法)信号。 错误和完成信号都可以终止响应式流。可以用下边的表达式描述:

onNext x 0..N [onError | onComplete] 这种方式非常灵活,无论是有/没有值,还是 n 个值(包括有无限个值的流,比如时钟的持续读秒),都可处理。

那么我们为什么需要这样的异步响应式开发库呢?

3.1. 阻塞是对资源的浪费 现代应用需要应对大量的并发用户,而且即使现代硬件的处理能力飞速发展,软件性能仍然是关键因素。

广义来说我们有两种思路来提升程序性能:

并行化(parallelize) :使用更多的线程和硬件资源。

基于现有的资源来 提高执行效率 。

通常,Java开发者使用阻塞式(blocking)编写代码。这没有问题,在出现性能瓶颈后, 我们可以增加处理线程,线程中同样是阻塞的代码。但是这种使用资源的方式会迅速面临 资源竞争和并发问题。

更糟糕的是,阻塞会浪费资源。具体来说,比如当一个程序面临延迟(通常是I/O方面, 比如数据库读写请求或网络调用),所在线程需要进入 idle 状态等待数据,从而浪费资源。

所以,并行化方式并非银弹。这是挖掘硬件潜力的方式,但是却带来了复杂性,而且容易造成浪费。

3.2. 异步可以解决问题吗? 第二种思路——提高执行效率——可以解决资源浪费问题。通过编写 异步非阻塞 的代码, (任务发起异步调用后)执行过程会切换到另一个 使用同样底层资源 的活跃任务,然后等 异步调用返回结果再去处理。

但是在 JVM 上如何编写异步代码呢?Java 提供了两种异步编程方式:

回调(Callbacks) :异步方法没有返回值,而是采用一个 callback 作为参数(lambda 或匿名类),当结果出来后回调这个 callback。常见的例子比如 Swings 的 EventListener。

Futures :异步方法 立即 返回一个 Future,该异步方法要返回结果的是 T 类型,通过Future封装。这个结果并不是 *立刻* 可以拿到,而是等实际处理结束才可用。比如, ExecutorService 执行 Callable 任务时会返回 Future 对象。

这些技术够用吗?并非对于每个用例都是如此,两种方式都有局限性。

回调很难组合起来,因为很快就会导致代码难以理解和维护(即所谓的“回调地狱(callback hell)”)。

考虑这样一种情景:在用户界面上显示用户的5个收藏,或者如果没有任何收藏提供5个建议。这需要3个 服务(一个提供收藏的ID列表,第二个服务获取收藏内容,第三个提供建议内容):

回调地狱(Callback Hell)的例子

userService.getFavorites(userId, new Callback<List>() { public void onSuccess(List list) { if (list.isEmpty()) { suggestionService.getSuggestions(new Callback<List>() { public void onSuccess(List list) { UiUtils.submitOnUiThread(() -> { list.stream() .limit(5) .forEach(uiList::show); }); }

    public void onError(Throwable error) { 
      UiUtils.errorPopup(error);
    }
  });
} else {
  list.stream() 
      .limit(5)
      .forEach(favId -> favoriteService.getDetails(favId, 
        new Callback<Favorite>() {
          public void onSuccess(Favorite details) {
            UiUtils.submitOnUiThread(() -> uiList.show(details));
          }

          public void onError(Throwable error) {
            UiUtils.errorPopup(error);
          }
        }
      ));
}

}

public void onError(Throwable error) { UiUtils.errorPopup(error); } }); 基于回调的服务使用一个匿名 Callback 作为参数。后者的两个方法分别在异步执行成功 或异常时被调用。 获取到收藏ID的list后调用第一个服务的回调方法 onSuccess。 如果 list 为空, 调用 suggestionService。 服务 suggestionService 传递 List 给第二个回调。 既然是处理 UI,我们需要确保消费代码运行在 UI 线程。 使用 Java 8 Stream 来限制建议数量为5,然后在 UI 中显示。 在每一层,我们都以同样的方式处理错误:在一个 popup 中显示错误信息。 回到收藏 ID 这一层,如果返回 list,我们需要使用 favoriteService 来获取 Favorite 对象。由于只想要5个,因此使用 stream 。 再一次回调。这次对每个ID,获取 Favorite 对象在 UI 线程中推送到前端显示。 这里有不少代码,稍微有些难以阅读,并且还有重复代码,我们再来看一下用 Reactor 实现同样功能:

使用 Reactor 实现以上回调方式同样功能的例子

userService.getFavorites(userId) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup); 我们获取到收藏ID的流 我们 异步地转换 它们(ID) 为 Favorite 对象(使用 flatMap),现在我们有了 Favorite流。 一旦 Favorite 为空,切换到 suggestionService。 我们只关注流中的最多5个元素。 最后,我们希望在 UI 线程中进行处理。 通过描述对数据的最终处理(在 UI 中显示)和对错误的处理(显示在 popup 中)来触发(subscribe)。 如果你想确保“收藏的ID”的数据在800ms内获得(如果超时,从缓存中获取)呢?在基于回调的代码中, 会比较复杂。但 Reactor 中就很简单,在处理链中增加一个 timeout 的操作符即可。

Reactor 中增加超时控制的例子

userService.getFavorites(userId) .timeout(Duration.ofMillis(800)) .onErrorResume(cacheService.cachedFavoritesFor(userId)) .flatMap(favoriteService::getDetails) .switchIfEmpty(suggestionService.getSuggestions()) .take(5) .publishOn(UiUtils.uiThreadScheduler()) .subscribe(uiList::show, UiUtils::errorPopup); 如果流在超时时限没有发出(emit)任何值,则发出错误(error)。 一旦收到错误,交由 cacheService 处理。 处理链后边的内容与上例类似。 Futures 比回调要好一点,但即使在 Java 8 引入了 CompletableFuture,它对于多个处理的组合仍不够好用。 编排多个 Futures 是可行的,但却不易。此外,Future 还有一个问题:当对 Future 对象最终调用 get() 方法时,仍然会导致阻塞,并且缺乏对多个值以及更进一步对错误的处理。

考虑另外一个例子,我们首先得到 ID 的列表,然后通过它进一步获取到“对应的 name 和 statistics” 为元素的列表,整个过程用异步方式来实现。

CompletableFuture 处理组合的例子

CompletableFuture<List> ids = ifhIds();

CompletableFuture<List> result = ids.thenComposeAsync(l -> { Stream<CompletableFuture> zip = l.stream().map(i -> { CompletableFuture nameTask = ifhName(i); CompletableFuture statTask = ifhStat(i);

                                             return nameTask.thenCombineAsync(statTask, (name, stat) -> "Name " + name + " has stats " + stat); 
                                     });
    List<CompletableFuture<String>> combinationList = zip.collect(Collectors.toList()); 
    CompletableFuture<String>[] combinationArray = combinationList.toArray(new CompletableFuture[combinationList.size()]);

    CompletableFuture<Void> allDone = CompletableFuture.allOf(combinationArray); 
    return allDone.thenApply(v -> combinationList.stream()
                                                                                             .map(CompletableFuture::join) 
                                                                                             .collect(Collectors.toList()));

});

List results = result.join(); assertThat(results).contains( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121"); 以一个 Future 开始,其中封装了后续将获取和处理的 ID 的 list。 获取到 list 后边进一步对其启动异步处理任务。 对于 list 中的每一个元素: 异步地得到相应的 name。 异步地得到相应的 statistics。 将两个结果一一组合。 我们现在有了一个 list,元素是 Future(表示组合的任务,类型是 CompletableFuture),为了执行这些任务, 我们需要将这个 list(元素构成的流) 转换为数组(List)。 将这个数组传递给 CompletableFuture.allOf,返回一个 Future ,当所以任务都完成了,那么这个 Future 也就完成了。 有点麻烦的地方在于 allOf 返回的是 CompletableFuture,所以我们遍历这个 Future 的List, ,然后使用 join() 来手机它们的结果(不会导致阻塞,因为 AllOf 确保这些 Future 全部完成) 一旦整个异步流水线被触发,我们等它完成处理,然后返回结果列表。 由于 Reactor 内置许多组合操作,因此以上例子可以简单地实现:

Reactor 实现与 Future 同样功能的代码

Flux ids = ifhrIds();

Flux combinations = ids.flatMap(id -> { Mono nameTask = ifhrName(id); Mono statTask = ifhrStat(id);

                    return nameTask.zipWith(statTask, 
                                    (name, stat) -> "Name " + name + " has stats " + stat);
            });

Mono<List> result = combinations.collectList();

List results = result.block(); assertThat(results).containsExactly( "Name NameJoe has stats 103", "Name NameBart has stats 104", "Name NameHenry has stats 105", "Name NameNicole has stats 106", "Name NameABSLAJNFOAJNFOANFANSF has stats 121" ); 这一次,我们从一个异步方式提供的 ids 序列(Flux)开始。 对于序列中的每一个元素,我们异步地处理它(flatMap 方法内)两次。 获取相应的 name。 获取相应的 statistic. 异步地组合两个值。 随着序列中的元素值“到位”,它们收集一个 List 中。 在生成流的环节,我们可以继续异步地操作 Flux 流,对其进行组合和订阅(subscribe)。 最终我们很可能得到一个 Mono 。由于是测试,我们阻塞住(block()),等待流处理过程结束, 然后直接返回集合。 Assert 结果。 回调或 Future 遇到的窘境是类似的,这也是响应式编程要通过 Publisher-Suscriber 方式来解决的。

3.3. 从命令式编程到响应式编程 类似 Reactor 这样的响应式库的目标就是要弥补上述“经典”的 JVM 异步方式所带来的不足, 此外还会关注一下几个方面:

可编排性(Composability) 以及 可读性(Readability)

使用丰富的 操作符 来处理形如 流 的数据

在 订阅(subscribe) 之前什么都不会发生

背压(backpressure) 具体来说即 消费者能够反向告知生产者生产内容的速度的能力

高层次 (同时也是有高价值的)的抽象,从而达到 并发无关 的效果

3.3.1. 可编排性与可读性 可编排性,指的是编排多个异步任务的能力。比如我们将前一个任务的结果传递给后一个任务作为输入, 或者将多个任务以分解再汇总(fork-join)的形式执行,或者将异步的任务作为离散的组件在系统中 进行重用。

这种编排任务的能力与代码的可读性和可维护性是紧密相关的。随着异步处理任务数量和复杂度 的提高,编写和阅读代码都变得越来越困难。就像我们刚才看到的,回调模式是简单的,但是缺点 是在复杂的处理逻辑中,回调中会层层嵌入回调,导致 回调地狱(Callback Hell) 。你能猜到 (或有过这种痛苦经历),这样的代码是难以阅读和分析的。

Reactor 提供了丰富的编排操作,从而代码直观反映了处理流程,并且所有的操作保持在同一层次 (尽量避免了嵌套)。

3.3.2. 就像装配流水线 你可以想象数据在响应式应用中的处理,就像流过一条装配流水线。Reactor 既是传送带, 又是一个个的装配工或机器人。原材料从源头(最初的 Publisher)流出,最终被加工为成品, 等待被推送到消费者(或者说 Subscriber)。

原材料会经过不同的中间处理过程,或者作为半成品与其他半成品进行组装。如果某处有齿轮卡住, 或者某件产品的包装过程花费了太久时间,相应的工位就可以向上游发出信号来限制或停止发出原材料。

3.3.3. 操作符(Operators) 在 Reactor 中,操作符(operator)就像装配线中的工位(操作员或装配机器人)。每一个操作符 对 Publisher 进行相应的处理,然后将 Publisher 包装为一个新的 Publisher。就像一个链条, 数据源自第一个 Publisher,然后顺链条而下,在每个环节进行相应的处理。最终,一个订阅者 (Subscriber)终结这个过程。请记住,在订阅者(Subscriber)订阅(subscribe)到一个 发布者(Publisher)之前,什么都不会发生。

理解了操作符会创建新的 Publisher 实例这一点,能够帮助你避免一个常见的问题, 这种问题会让你觉得处理链上的某个操作符没有起作用。相关内容请参考 item 。

虽然响应式流规范(Reactive Streams specification)没有规定任何操作符, 类似 Reactor 这样的响应式库所带来的最大附加价值之一就是提供丰富的操作符。包括基础的转换操作, 到过滤操作,甚至复杂的编排和错误处理操作。

3.3.4. subscribe() 之前什么都不会发生 在 Reactor 中,当你创建了一条 Publisher 处理链,数据还不会开始生成。事实上,你是创建了 一种抽象的对于异步处理流程的描述(从而方便重用和组装)。

当真正“订阅(subscrib)”的时候,你需要将 Publisher 关联到一个 Subscriber 上,然后 才会触发整个链的流动。这时候,Subscriber 会向上游发送一个 request 信号,一直到达源头 的 Publisher。

3.3.5. 背压() 向上游传递信号这一点也被用于实现 背压 ,就像在装配线上,某个工位的处理速度如果慢于流水线 速度,会对上游发送反馈信号一样。

在响应式流规范中实际定义的机制同刚才的类比非常接近:订阅者可以无限接受数据并让它的源头 “满负荷”推送所有的数据,也可以通过使用 request 机制来告知源头它一次最多能够处理 n 个元素。

中间环节的操作也可以影响 request。想象一个能够将每10个元素分批打包的缓存(buffer)操作。 如果订阅者请求一个元素,那么对于源头来说可以生成10个元素。此外预取策略也可以使用了, 比如在订阅前预先生成元素。

这样能够将“推送”模式转换为“推送+拉取”混合的模式,如果下游准备好了,可以从上游拉取 n 个元素;但是如果上游元素还没有准备好,下游还是要等待上游的推送。

3.3.6. 热(Hot) vs 冷(Cold) 在 Rx 家族的响应式库中,响应式流分为“热”和“冷”两种类型,区别主要在于响应式流如何 对订阅者进行响应:

一个“冷”的序列,指对于每一个 Subscriber,都会收到从头开始所有的数据。如果源头 生成了一个 HTTP 请求,对于每一个订阅都会创建一个新的 HTTP 请求。

一个“热”的序列,指对于一个 Subscriber,只能获取从它开始 订阅 之后 发出的数据。不过注意,有些“热”的响应式流可以缓存部分或全部历史数据。 通常意义上来说,一个“热”的响应式流,甚至在即使没有订阅者接收数据的情况下,也可以 发出数据(这一点同 “Subscribe() 之前什么都不会发生”的规则有冲突)。

更多关于 Reactor 中“热”vs“冷”的内容,请参考 this reactor-specific section。

翻译建议 - "响应式编程"

About

No description, website, or topics provided.

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages