03月03, 2017

Rx 的编程方式(一)

1. Observables & Reactive

先来一个简单直观的例子:

const { Observable } = require("rxjs");

const source$ = Observable.of([1, 2, 3]);
source$.subscribe(x => console.log(x));

过滤器节点:subscribe

2. Declarative Transformation( 声明式转换 )

如果我们想要平时开发的数据转换功能,可以使用一些类似管道的工具来做。

Observable.of(1, 2, 3)
    .map(n => n * 2)
    .subscribe(x => console.log(x));

过滤器节点:map、subscribe。我所理解的就像一条溪流流动的水,然后 map 这类 API 就像水桶,将水装入进行加工,当然,其他 map 的特性先不用细致了解。

3. Lazy Transformation( 懒执行透明特性 )

Observable 能够在流动的过程中进行选择,所谓的懒特性就是不会像 Promise 一样,给了一个数据承认,就一定会让你接受,你可以选择不接受或者现在不接受,先这样子理解。

Observable.range(1, 100)
    // 转换管道
    .map(n => n * 2)
    // 拦截节点
    .filter(n => n > 4)
    // 懒获取
    .take(2)
    .subscribe(x => {
       console.log(x);
    });

4. DOM Event

Rx 其实是可以独立与任何框架使用了,也可以按需加载想用的方法,对于 Rx,其实和 Zone 一样,有了自己的保护圈和玩法,如果你要玩它,就需要进入它的圈子,大多数的相关 API 就是 from 的特性,对于 Rx 5.0 模块化做的更好,以及渲染 Timeline 上更可控。

<button id="btn"></button>
<h1 id="out">0</h1>
const btn = document.getElementById("btn");
const out = document.getElementById("out");

/* eslint-disable no-undef */
const { Observable } = Rx;
Observable.fromEvent(btn, "click")
    // mapTo 能映射一个常量,map 需要一个函数
    .mapTo(1)
    .scan((acc, cur) => acc + cur, 0)
    .subscribe(count => {
        out.innerHTML = count;
    });

其中主要依赖了:

rxjs/Observable.js
rxjs/observable/fromEvent.js
rxjs/operator/mapTo.js
rxjs/operator/scan.js

如果是打包的方式可以按需加载,如果是 script 可以直接引入 rxjs/bundles/Rx.js,不过会比较大,所以最好是按需提取到公用的 external 文件中,也能使用 lite 的版本,是常用的精简版。

如果有两个事件怎么办呢?利用流的合并特性可以做:

// 加法流,然后转换成数据 1 流入给下一个输入
const { Observable } = Rx;
const inSource$ = Observable.fromEvent(inBtn, "click").mapTo(1);
// 减法流,然后转换成数据 -1 流入给下一个输入
const outSource$ = Observable.fromEvent(outBtn, "click").mapTo(-1);
// 合并成最后需要操作的流
const source$ = Observable.merge(inSource$, outSource$);
// 操作流并且订阅流的 next 函数
source$.scan((acc, cur) => {
    if(acc + cur < 0) {
     return 0;
    }
    return acc + cur;
}, 0).subscribe(count => {            
    print.innerHTML = count;
});

5. Async HTTP

前端主要除了这种简单的数据操作,更多的是要看异步场景下的复杂度,下面用 Promiseasync/awaitObserable 三者的写法来对比。

(1) Promise方案

const data = fetchOrders().then(res => {
    if(res.status === 200) {
        return res.data;
    }
});

data.then(orders => orders.filter(order => order.text === "Bob"))
    .then(orders => orders.map(order => order.id))
    .then(ids => console.log(ids));

function fetchOrders() {
    return axios.post("orders.json");
}

(2) async/await

function fetchOrders() {
    return axios.post("orders.json");
}

renderOrders();
async function renderOrders() {
    const res = await fetchOrders();
    if(res.status === 200) {
        const data = res.data;
        data.filter(order => order.text === "Bob")
            .map(order => order.id)
            .forEach(id => console.log(id));
    }
}

(3) Observable

const { Observable } = Rx;

function fetchOrders() {
    const promise = axios.post("orders.json");
    return Observable.fromPromise(promise);
}

const hook =
    fetchOrders()
        .switchMap(res => {
            let data = [];
            if(res.status === 200) {
                data = res.data;
            }

            return Observable.from(data);
        })
        .filter(order => order.text === "Bob")
        .map(order => order.id)
        .filter(id => id !== void 0)
        .subscribe(id => {
            console.log(id);
        });

6. Cancellation( 可取消的特性 )

其实这个特性很好用,可以随时取消你订阅的流,这是 Promise 做不到的,一旦发起承认就无法取消。

hook.unsubscribe();

7. Create HTTP Request

上面的 ajax 使用的是 axios 封装的,然后返回的是 Promise 的对象,如果要用 Observable 的生态,那么就需要利用 fromPromise 进行转化,这看上去不是特别好,所以能够直接利用 Observable 生态下的 ajax 进行操作。

与之前方案需要修改的地方有 2 个:

  1. fetchOrders 函数请求 ajax 的方式
  2. ajax 响应返回的对象不一样,毕竟之前使用的 axios
// 1. 使用 Observable.ajax
function fetchOrders() {
    return Observable.ajax.post("orders.json");
}

// 2. 修改一下 ajax 响应基础数据的处理
if(res.status === 200) {
    // res.data => res.response, ng2 如果不是 json 需要调用 json() 方法
    data = res.response;
}

8. Observable.create 的一些细节

(1) Obserable 的组成 我之前仿造过一个简单 Obserable 的实现:Obserable 这里用简单的结构来说一下:

class Observable {
    constructor (fn) {
        this.fn = fn;
    }

    static create(fn) {
        return new Observable(fn);
    }

    subscribe(next, error, complete) {
        if (typeof next !== "function") {
            return this.fn(next);
        }

        return this.fn({
            next,
            error: error || () => {},
            complete: complete || () => {}
        });
    }
}

从上面代码可以看出来,构造器主要接收一个函数,然后 create 方法主要返回的是一个 Obserable 实例。然后 subscribe 方法接收的是三个参数,如果我们传入的是一个 function 那么就会调用 this.fn 传入一个对象,这个对象有三个属性值:next、error、complete,默认情况下 next 是必须传入的,其余的是可选,为了安全,自己手动创建的最好调用一下 complete

这样看上来其实也挺简单的。下来回到 rxjs 本身来看一下 create 的用法:

function fetchSomeone() {
    return Observable.create(observer => {
        observer.next("ok");
        observer.complete();
    });
}

fetchSomeone().subscribe(x => console.log(x));
// 或者如下写法
fetchSomeone().subscribe({
    next: (x) => console.log(x),
    complete: () => {}
});

这里用法比较简单,就是传入一个常量字符串 ok,然后在这个 Observable 被订阅的时候传给订阅者。

剩下的一些内容单独来看:

(1) Error Handle( 错误的处理机制 )

(2) Hot and Cold 数据流以及它们之间的转换关系

(3) 流节点之间的转换

(4) 流节点的并发问题

后面可能会比较细节,如果有兴趣可以直接看下面的 PPT,我主要也是按 PPT 的例子和思路来写。

参考的PPT: https://speakerdeck.com/jfairbank/devnexus-2017-the-rise-of-async-javascript

本文链接:http://www.60sky.com/post/rxjs-1.html

-- EOF --

Comments