一、关键名词解释

需要先理解下边几个基本名词的概念

  1. ObservableType: 就是流 stream,是事件的发出者 (Publisher),按照时间序列发出。
  2. ObserverObject:监听者,包含三个函数:
  3. Subscribe 订阅动作,将 ObservableType 和 ObserverObject 关联起来
  4. Disposable :流的生命周期边界

二、流的基本用法和生命周期

swift
import UIKit
import RxSwift
import RxCocoa

class ViewController: UIViewController {
    // disposeBag 生命周期和 viewController 关联
    let disposeBag = DisposeBag();
    
    override func viewDidLoad() {
        super.viewDidLoad()
    }
    
    override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
        // Observable: stream
        let stream = Observable<Int>.create { observer in
            observer.onNext(1)
            observer.onCompleted()
            return Disposables.create()
        }
        
        // observer obj: next error completed
        let observer = AnyObserver<Int> { event in
            switch event {
            case .next(let val):
                print("val: \(val)")
            case .error(_):
                print("err")
            case .completed:
                print("completed")
            }
        }
        
        // subscribe: action 关联 stream 和 observer 和 stream
        let disposer = stream.subscribe(observer);
        
        // disposer 生命周期边界管理
        // 1: 局部生命周期,当即释放
        disposer.dispose();
        
        // 放入 bag,bag 释放的时候,生命周期结束
        disposer.disposed(by: disposeBag); 
    }

}
  1. subscribe() 会返回一个 disposable 用来管理生命周期
  2. 如果没有明确使用 dispose() 销毁 stream。stream 也会在以下情况下终止:

2.1、流不会随着函数作用域结束而自动释放

流的释放时机总结:

Unsupported Notion block: table

响应式编程中,因为是一套独立的编程范式,所以没有流的生命周期管理,流的生命周期会脱离函数作用域,被FRP内部管理,所以需要特别注意流的正确释放,错误案例代码:❌

swift
// ❌
override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?) {
    let stream = Observable<Int>.create { observer in
        print("observable created ---")
        observer.onNext(1)
        return Disposables.create {
            print("observable disposed ---")
        }
    }
    
    let observer = AnyObserver<Int> { event in
        switch event {
        case .next(let val):
            print("val: \(val)")
        case .error(_):
            print("err")
        case .completed:
            print("completed")
        }
    }
    
    stream.subscribe(observer)
}

这段代码,流只抛出了 onNext,永远没有 onCompleted / onError,也没有手动 dispose()。所以这个流一直存在,因为 Rx 也无法确定,流是否还有新的事件到来。

利用 dispose 取消任务

当 dispose() 执行的时候,如果需要做一些资源回收,或者任务取消,使用

swift
let stream = Observable<Int>.create { observer in            
    observer.onNext(1)
    observer.onCompleted()
    return Disposables.create {
        print("dispose called---")
    }
}

// ...

let disposer = stream.subscribe(observer);
disposer.dispose();

利用这个能力,可以做类似,网络请求取消:

swift
let request = Observable<Data>.create { observer in
    let task = URLSession.shared.dataTask(with: URL(string: "https://example.com")!) { data, _, error in
        if let error = error {
            observer.onError(error)
        } else if let data = data {
            observer.onNext(data)
            observer.onCompleted()
        }
    }
    task.resume()
    
    return Disposables.create {
        task.cancel() // 关键
    }
}

let disposable = request.subscribe(
    onNext: { data in print("Got data: \(data.count)") },
    onError: { error in print("Error: \(error)") },
    onCompleted: { print("Done") }
)

// 假设用户切换页面或取消操作:
disposable.dispose()