Skip to main content
 首页 » 编程设计

RxJava 2.0 入门教程

2022年07月19日158shanyou

RxJava 2.0 入门教程

RxJava 2.0 是来自NetFlix的开源java异步编程框架。和java 8 lambda表达式很接近,响应式编程的基本构建快是被观察对象(Observable)和订阅者(Subscriber)。被观察者发出信息项,订阅者消费它们。

RxJava 也类似于观察者模式——但不同的是,直到明确有订阅者订阅时,被观察者才发出信息。

什么是响应式编程

响应式编程是个通用编程术语,其聚焦变化,如数据值或事件变化。回调是响应式编程必用的方法。
假如你有一数据源(生产者)和数据目标(消费者),那么连接两者之后——响应式编程框架负责推有生产者产生的数据给消费者。请注意,一个被观察者可以有任意数量的订阅者。

下面看最简单RxJava的hello world示例:

package com.howtodoinjava.app; 
  
import io.reactivex.Observable; 
import io.reactivex.functions.Consumer; 
  
public class RxJava2Example 
{ 
    public static void main(String[] args) 
    {   
        //producer 
        Observable<String> observable = Observable.just("how", "to", "do", "in", "java"); 
  
        //consumer 
        Consumer<? super String> consumer = System.out::println; 
  
        //Attaching producer to consumer 
        observable.subscribe(consumer); 
    } 
} 

上面示例中, “how”, “to”, “do”, “in”, “java” 可视为事件流。为这些事件创建观察者,然后创建消费者处理这些单词——为了简单起见,仅打印至控制台。消费者就是订阅者。

最后,我们使用subscribe()方法连接两者。一旦连接,单词(事件)流开始流动,订阅者开始在控制台打印输出。

内部源码显示,当新的单词从被观察者发出,每个订阅者会调用onNext()方法。当被观察者成功所有单词,订阅者调用onComplete()方法,错误发生时对调用onError()方法。

RxJava 2.0 依赖

项目中可以使用maven、gradle或增加jar包至类路径。

maven依赖

<!-- https://mvnrepository.com/artifact/io.reactivex.rxjava2/rxjava --> 
<dependency> 
    <groupId>io.reactivex.rxjava2</groupId> 
    <artifactId>rxjava</artifactId> 
    <version>2.1.0</version> 
</dependency> 

gradle依赖

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.1.0' 

jar依赖

下载RxJava 2.0 jar文件链接。

RxJava转换

在RxJava中,订阅者接受来自被观察者的事件类型和被观察者发出的事件类型可以不同。两者包括的数据、数据类型等都可以不同。

这就需要在源和目标之间的事件提供中间转换,这样两者之间工作机制可以兼容,很像适配器模式。

下面举例说明。在上面hello world示例中,如果需要打印单词的大写形式。这是最简单的转换,但容易理解。

Observable<String> observable = Observable.just("how", "to", "do", "in", "java"); 
Consumer<? super String> consumer = System.out::println; 
  
//Transformation using map() method 
observable.map(w -> w.toUpperCase()).subscribe(consumer); 

我们在订阅之前增加中间方法map(),所以每个单词首先通过map方法,然后才到订阅者进一步处理。这就是转换。

如前所述,我们也可以在转换过程中改变事件的数据类型:

Observable<String> observable = Observable.just("how", "to", "do", "in", "java"); 
Consumer<? super Integer> consumer = System.out::println; 
  
observable.map(w -> w.toUpperCase().hashCode()).subscribe(consumer); 

这个示例,我们迭代每个单词,然后在转换中获得hashcode并传递给订阅者,最后在控制台中打印输出。这样被观察者发出字符串类型,订阅者接收integer类型。

总结

这种方式使被观察者和订阅者之间保持松耦合,给开发者带来极大优势。无需考虑对大多数人来说很难实现的并发模式,仅需要连接生产者和订阅者。一切都运转正常————完美无瑕。

另外,你也不需要考虑同时考虑生产者和订阅者。他们可以采用最佳的方式独立开发,然后使用转换连接它们。非常棒!

本文仅是RxJava的入门教程。后续陆续推出更多其重要概念。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/85225690