FLY

RxJS 简析- Ross Bulat - Medium

原文链接: medium.com

RxJS 的简单介绍

Javascript 应用下的响应式编程和观察者模式

ReactiveX: 异步的, 基于事件驱动编程

现代 web 应用程序是高度事件驱动的,这意味着它们要对一系列外部服务和数据源做出反应(并依赖于它们)。实现这一目标的核心库之一是 ReactiveX,也称为响应式扩展。

ReactiveX 库已经在一系列语言中实现,其中最广泛采用的是RxJS,即 Javascript 实现。RxJS 特别有用,因为它可以集成在任何 Javascript 应用程序中,无论是服务器端还是客户端,这都归功于其巨大的应用增长。

本文旨在解释 RxJS 是如何工作的,以及如何在你的项目中利用它,大约10分钟的阅读时间,涵盖了沿途使用的关键概念和术语。让我们开始吧。

RxJS 现状

rsjx 目前最新包版本为6(npmjs),在撰写本文时每周的下载量为1050万次,非常受欢迎。这种受欢迎程度可以源于一系列因素:

  • 这个包只有一个依赖项,tslib,它提供了 Typescript 支持。极少的依赖意味着将 RxJS 作为其他包的依赖项本身是非常容易的,从而允许了大量的用例,包括实用工具包、API SDKs、webhook 实现和服务器端程序,以及前端事件管理器。

  • 该包维护良好,并定期更新,使其成为可靠的代码库。 版本6(于2018年4月发布)是对先前版本的重大更新,增强了库的模块化和性能,开发团队致力于长期维护,并且进行有意义地重构

  • 有趣的是,RxJS对于区块链应用程序和整个金融科技行业都非常有意义。 Dapps 特别依靠来自区块链的持续反馈来处理交易数据和事件。 RxJS 也非常适合用于管理事务处理的集中式服务

  • 实时聊天和即时通信软件、实时协作应用程序、项目管理工具,Tensorflow JS 相关的 AI 工具以及分析框架都是 RxJS 的绝佳用例——对企业来说,这些重要的工具都有一个共同点:异步事件驱动接口,而 RxJS 正好非常切合这一点

毋庸置疑,RxJS 正处于非常重要的时期,市场对高效的响应式编程解决方案有很高的需求。

现在,让我们更详细地研究该它,并确切地了解其工作原理。


RxJS 和观察者模式

RxJS 简化了观察者模式;这是一种软件设计模式,它有效地允许应用程序的组件对某些传入事件或数据流作出反应。这也称为基于回调的代码,或事件驱动的代码,其中函数在响应传入数据时被触发。

本质上,RxJS 是一种高效的异步方式,它允许组件订阅这些数据流,并通过回调函数对它们做出响应。这就引出了RxJS 采用的主要概念:既 Observables (被观察者)的概念。

Observables 被附加于数据流上,负责处理数据并将其交付给观察者。到目前为止,我们已经使用「组件」这个术语作为订阅数据流的对象。在 RxJS 中使用的官方术语是一个观察者,或者说一个可观察的订阅者。

注意: 如果你正在使用 React 或 Angular 等基于组件结构的 app 中实现 RxJS,则确实可以为各种组件创建观察者,并让这些组件在被观察者传递数据时更新其状态,而所有这些都是以异步方式进行的,不需要额外的依赖包或外部 API。 这是一种非常有用的方法!

现在,Observables 非常灵活,可以处理来自 API 或 websocket 的连续数据流、来自数组或对象的固定数量的数据、任意事件(比如单击事件)等等。具体来说,重要的是要理解 Observables 可以让几乎任何形式的事件或数据流作为数据项派发出去。

为观察者订阅可观察对象(Observable)

另一方面,观察者只是通过 subscribe 方法订阅被观察者的 Javascript 对象,并通过三个方法对某些事件下对 Observable 作出反应:

  • onNext: 当 Observable 派发新数据项时调用

  • onError: 当 Observable 无法生成某些预期数据时,或者发生另一个错误导致 Observable 终止调用

  • onCompleted: 如果没有发生错误,则最后一次调用 onNext

给定源作为一个 Observable,一个观察者可以通过以下方式与这三个回调函数一起定义:

// subscribing to an Observable and defining callback functionsconst subscription = source.subscribe(    (x) =>  { console.log('Next: ' + x);
},    (err) => { console.log('Error: ' + err);
},    () => { console.log('Completed');
});

注意: 定义这些回调函数时,必须遵守 onNext,onError 和onCompleted 的顺序。

为了避免内存泄漏,记得通过 unsubscribe 方法取消对观察者的订阅。在 React 上下文中,可以在 componentWillUnmount 生命周期方法中完成:

// remember to unsubscribe observerscomponentWillUnmount() {   source.unsubscribe();}

这种事件驱动模式是一种并发编程;观察者并没有阻塞执行,因为他们在等待被观察者派发数据。相反,观察者充当一个侦听器(或者作为官方文档喜欢使用的术语“哨兵”),在传入数据项发出时适当地处理它们。

注意: 如果你需要让多个观察者订阅相同的观察对象,很有会遇到些意外的行为。 对于这种情况,使用主体(Subjects)是一个更好的选择。后面会更进一步介绍 Subjects。

冷观察和热观察的概念

在确定观察者应该开始在何时派发数据的时候,冷观察和热观察的概念非常有用。冷观察对象只有在获得订阅后才会开始发出数据,而热观察对象即使没有订阅也会开始派发数据。

当之前的数据不再需要处理的时候,前一种方法(冷)很有用,而后一种方法(热)使观察者可以在订阅时从数据流中够获取到以前派发的数据,当需要允许组件“赶上”订阅之前发出的部分或全部数据时,这对于你的应用可能是个好方法。

后面当我们要讨论操作符的时候,请想起这一概念。


使用操作符

观察者模式的框架本质上高效和并发的,这正是现代基于Javascript 的应用程序所需要的特性。然而,如果我们就此打住的话,RxJS 将只是观察者模式一个稍微扩展良好的实现。然而,它的主要优势在于被观察者如何操作数据的传递,以及如何将数据本身传递给观察者。这一切通过 operators 操作符完成的,我们将介绍 RxJS 的下一个主要部分。

ReactiveX 中的 X 操作符

操作符使 Observable 变得有趣,并归因于 ReactiveX 名称中的 X(扩展名)。 操作符执行转换用来处理 Observable 的数据或数据传递。 本部分将不会记录每个操作符的内容(因为对于有很多人,这将是一个乏味的阅读),而是旨在使读者了解使用它们可能发生的事情以及如何有效研究和实现它们。

官方网站上记录了很多操作符 ; 这些被认为是核心操作符,但是您也可以自由地开发自己的。 这些核心操作符中的大多数已经用 RxJS 的 Javascript 语言实现,这给我们 Javascript 开发人员提供了很多便利。 除非偶然发现特定于应用程序的边缘情况,否则不太可能需要自定义运算符。

操作符解决了以下问题:如何处理被观察者发出的数据?

操作符处理被观察者。 我们有诸如 defer 之类的操作符,它一直等待,以固定的间隔派发数据,直到观察者订阅了它,将被观察者组合成一个数据流,等等。

使用大量操作符组合很常见,并且,它们可以在表达式中链式调用。下面一个例子就是链式调用 3 个操作符 ,每500 毫秒从数据源中发出三个值。

// take 3 items from data stream every 500 millisecondsvar source = Rx.Observable    .interval(500)    .timeInterval()    .take(3);

注意: 每个操作符在官网文档中都有一个独立的页面,其中详细说明了他们是做什么的,并提供了每个实现的代码示例。 浏览这种宝贵的资源是使您熟悉各种操作符的最佳方法。

操作符 Marble diagrams (弹珠图)

理解操作符如何工作的一个有用的方法是参考在整个文档中大量使用的Marble diagrams (弹珠图);它们用操作符描述了一个被观察者的行为。

实际上,已经建立了专门的网站 RX Marbles,访客可以在其中浏览每个核心操作符的交互式弹珠图,以研究相应的输出。

一个弹珠图本质上代表了被观察者的时间轴,数据项被发送到它们上面,以及这些数据项是如何通过操作符进行转换。

以最简单的形式,时间轴通过一条直线表示,并向其发射数据项。 下图表示在时间轴上发射的数据,未应用任何操作符:

当同一数据项被处理或派发一段时间后,将在时间轴上使用不同的形状表示该数据项。 垂直直线表示被观察者的成功完成,而水平线则表示由于错误而提前终止:

当将操作符应用到被观察者时,时间线要么被该操作符包装,要么链接到时间线之下,这取决于操作符的性质

上面的toArray运算符可以在这里找到。 此示例获取通过 take 运算符发出每5个数据项,并将它们分组为一个数组。 这是Javascript中的样子:

const source = Rx.Observable.timer(0, 1000)    .take(5)    .toArray();const subscription = source.subscribe(    function (x) { console.log('Next: ' + x);
},    function (err) { console.log('Error: ' + err);
},    function () { console.log('Completed');
});

操作符极大地扩展了观察者模式的能力,包括但不限于:

  • 将多个 Observable 流合并为一个流,以供观察者监听(请参阅merge操作符)

  • 能够在随后的间隔中仅派发数据项的子集。 如果将大量数据喂给一个Observable,而您只需要一个子集或它们的一个样本,则可以选择仅每隔 Y 毫秒发出最后一个 X。 这可以通过 sample 和 take 操作符来实现。

  • 使用 map 操作符转换数据,通过函数应用给它们

  • 通过发送指定的测试数据项进行过滤。 请查看 filter 过滤器运算符实现这一点。 注意:过滤是官方文档中列出的一整类运算符

  • 用于对项目进行分组的一系列聚合选项,以及条件和布尔运算符(请参阅 takewhile,包含全部的示例)

  • 其他

熟悉这些概念是合理选择操作符的最佳方式; 充分理解文档中的弹珠图和 Observable 的时间轴将大大有助于做出这些决策。

如何选择操作符

操作符在官方文档 中分为不同的类别。 我建议花一些时间浏览可用的内容,同时参考弹珠图和RxJS中的书面示例。

注意: 进一步向下滚动“操作符”页面以找到“Decision Tree”——你可以直接应用在你的用例上,确定要使用的操作符。 尽管这对于熟悉目的可能是非常有用的,但我还是会选择更多功能来浏览可用的操作符,并了解它们在您自己的项目中的使用情景。

我们在对 RxJS 的理解上取得了很大的进步。接下来,让我们简要地看一下Subject(主体)的概念。

使用主体

要解锁更多的 RxJS 能力,Subjects(主体)是另一个需要理解的重要概念。正如我们在本文前面提到的,Subjects 可以允许多个观察者订阅它们,并且可以充当被观察者本身。这只是 Subjects 的一个特点;事实上,它们继承了被观察者和观察者的特征,使它们成为两者的混合体。让我们更详细地了解一下。

像被观察者和观察者一样的 Subjects (主体)

Subjects(主体)充当可观察对象和观察者之间的代理。 想象一下,它们被放置在被观察者层和观察者层之间,以促进观察者的通信和传递。

Subjects 可以订阅一个或多个 Observable 并监听其派发的数据,从而成为观察者。 与此同时,Subjects 具有自己的时间轴,因此本身就可以充当 Observable——他们监听的事件随后可供观察者订阅,这也使 Subjects 成为被观察者。 这样, Subjects 就可以将其时间轴发送给感兴趣的订阅者。

Learn-rxjs对 Subjects 有一个很好的隐喻——你可以将 Subjects 视为在充满人的房间里的麦克风上讲话的单个扬声器。 标准的可观察对象将在与观察者的一对一对话中启动,而 Subjects 可以多播或向许多人传递消息。

注意: 给冷可观察对象订阅一个 Subject 将会触发它进行立刻派发。

在 RxJS 中,我们有4种类型的 Subject。让我们简要地介绍一下它们。访问相应的 xgrommx.github.io (RxJS 示例文档的重要资源)链接可以看到它们在 RxJS 中使用的例子:

  • AsyncSubject: 异步 subject 只发送观察源的最后一个值给订阅者和主体的后续订阅者

  • BehaviorSubject: 一个 Behaviour Subject 会派发最新的值给新增加的订阅者,而这个值在后续任何事件发出前就有了。如果 Subject’s 的被观察者还没有被派发前,我们可以给一个默认值

  • ReplaySubject: Replay Subject 会把之前全部源 Observable 派发的事件给新的订阅者,也会把新值给它们

  • Subject: 一个标准的 Subject 没有 replay 行为,也没有任何初始化的值。新的订阅者将会接收来自主体的源 Observable 的派发

调度器

在总结之前,值得一提的是 ReactiveX 中的 Schedulers (调度器)概念。 尽管它们在Javascript 实践中没有扮演重要角色,但是一定要注意它们。

Schedulers 为我们提供了一种定义何时执行工作的方法,根据发出的 Observable 和Subject 数据项,通常在支持它们的地方作为操作符参数传递。本质上,调度器影响执行任务的时间。

由于Javascript 对多线程的支持有限,所以在 RxJS 中使用调度器的范围非常有限。但是,在其他实现中,特别是服务器端(.NET, Java)。调度器可用于在专用线程上调度工作负载,以优化并发性和效率。它们非常适合于工作负载很大的应用程序的情景。

尽管如此,RxJS提供了一些 Scheduler 功能,其基本实现来自Rx.Scheduler类:

  • 使用 Rx.Scheduler.currentThread 立即调度工作,但不要递归地调度当前线程。如果从当前任务调度了更多的任务,则此同步特性将强制将工作排队

  • Rx.Scheduler.immediate 强制在当前线程上立即执行,并递归地执行工作

  • Rx.Scheduler.timeout 允许通过定时回调进行计划的工作,并且是唯一允许在将来的时间运行工作负载的调度器

  • Rx.TestScheduler 允许我们在你的应用程序在测试环境中模拟时随时间调度任务。TestScheduler 是专门为来自 Observables 和 Subjects 的单元测试工作而设计的

注意: 为了学习更多的 RxJS Schedulers, Stack Overflow 这篇文章 非常简洁地总结了 RxJS 调度器的功能。官方文档 整理了全部实现的调度器。

最小化背压(Backpressure)

在管理工作负载方面,ReactiveX 网站也介绍了背压 backpressure 的概念,其中有一系列操作符可以用来防止这种情况的发生。

有时,被观察者发送事件速度可能快于观察者的处理速度的情况。这就是背压的定义,通常不利于应用程序的性能和体验。

这是一般编程中的常见问题,但可以在 RxJS 的操作级别上最小它的化影响。


总结

随着互联网演变成一个动态、事件驱动的应用组成的庞大生态系统,反应式编程已取得了长足的发展。

围绕 web 应用程序开发构建的工具和基础设施现在允许应用轻松的处理各种实时事件,而不需要知道它从哪个服务接收这些事件。RxJS 是这一工作的先锋。

本片文章对这种不断发展的设计范式的进行了介绍,将你的应用程序带到异步的、基于事件的行为上,以观察者模式作为基本机制。我们并没有涵盖所有的 RxJS 提供的知识,但是读者现在应该能够很好地理解官方文档中的所有的内容,并且能够开始实现自己的 RxJS 解决方案。