swift

Rxswift: Connectable Observable Operators

kimyounggyun 2022. 8. 2. 00:13

Observable을 공유하게 하는 연산자

 

Observable은 구독자가 추가되면 항상 새로운 시퀀스를 만든다. 서버와 통신하거나 파일을 읽는 경우에 서로 다른 구독자가 같은 observable을 공유할 수 있도록 하여 불필요한 중복 작업을 피할 수 있다.

multicast

앞으로 나올 연산자들은 모두 multicast 연산자를 활용하여 만들어진 연산자들이다. 

multicast는 한 개의 subject를 매개변수로 받는다. source observable이 방출한 이벤트가 매개 변수의 subject로 전달되고 subject는 전달받은 이벤트를 다수의 구독자에게 방출한다. 기존의 observable과 구독자 사이의 1:1 관계에서 중간의 subject로 인해 1:n 관계로 바뀐 셈이다.

multicast는 connectable observable을 리턴한다. 기존의 observable은 구독자가 subscribe를 시작하면 시퀀스가 생성되어 이벤트가 방출되지만, connectable observable은 구독자가 subscribe를 하여도 시퀀스가 생성되지 않고 connect 메소드를 호출하는 시점에 시퀀스가 시작된다.

public func multicast<Subject: SubjectType>(_ subject: Subject) -> ConnectableObservable<Subject.Element>

source observable을 공유하기 때문에 빨간색 원은 2부터 출력된다.

let disposeBag = DisposeBag()
let subject = PublishSubject<Int>()

let source = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .take(5)
    .multicast(subject)

source
    .subscribe { print("🔵", $0) }
    .disposed(by: disposeBag)

source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }
    .disposed(by: disposeBag)
    
source.connect()

/*
결과
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
🔵 completed
🔴 completed
*/

publish

multicast는 publishSubject를 만들어야 하는 단점이 있다. publish는 내부에서 publishSubject를 만들어주는 연산자이다. 여전히 connect를 해줘야 이벤트 방출이 시작된다.

public func publish() -> ConnectableObservable<Element> {
    self.multicast { PublishSubject() }
}

replay

multicast로 전달한 subject를 pulishSubject가 아닌 replaySubject로 바꾼 연산자이다. 마찬가지로 connect는 해줘야 한다.

replaySubject라서 새로운 구독자는 최근 buffer size의 이벤트를 받을 수 있다.

replaySubject의 버퍼 크기를 정해줘야하므로 매개변수로 버퍼 크기를 받는 것을 알 수 있다. 

버퍼의 크기가 무제한인 replayAll 메서드도 있는데 메모리 문제를 야기할 수 있으므로 사용은 자제해야한다.

 public func replay(_ bufferSize: Int) -> ConnectableObservable<Element> {
     self.multicast { ReplaySubject.create(bufferSize: bufferSize) }
 }

refCount

refCount연산자는 observable를 리턴한다.

RefCount클래스는 source로 connectable observable를 받는다. 받은 connectable observable를 관리하면서 새로운 구독자가 추가되는 시점에 자동으로 connect 메소드를 실행시켜준다. 

구독자가 dispose되었을 때 다른 구독자가 없다면 connectable observable의 sequence를 중단한다. 만약 나중에 구독자가 추가되면 이전 시퀀스를 재사용하는 것이 아닌 새로운 시퀀스를 생성한다.

public func refCount() -> Observable<Element> {
    RefCount(source: self)
}

observer1은 3초 뒤에 종료되므로 observer2는 새로운 시퀀스에서 방출된 이벤트를 받는다. 즉 다시 0부터 받는다.

let disposeBag = DisposeBag()
let source = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .publish()
    .refCount() // 내부에서 connect을 자동으로 호출함

// 1번째 구독자 추가
let observer1 = source
    .subscribe { print("🔵", $0) }

// 3초 뒤에 구독 종료
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    observer1.dispose()
}

//7초 뒤에 2번째 구독자 추가
DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer2 = source.subscribe { print("🔴", $0) }
    
    // 10초 뒤에 2번째 구독자 종료
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer2.dispose()
    }
}

observer2는 observer1이 dispose 되기 전에 구독을 시작했으므로 같은 observable을 구독한다.

let disposeBag = DisposeBag()
let source = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .publish()
    .refCount() // 내부에서 connect을 자동으로 호출함

// 1번째 구독자 추가
let observer1 = source
    .subscribe { print("🔵", $0) }

// 3초 뒤에 구독 종료
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
    observer1.dispose()
}

//2초 뒤에 2번째 구독자 추가
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
    let observer2 = source.subscribe { print("🔴", $0) }
    
    // 5초 뒤에 2번째 구독자 종료
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer2.dispose()
    }
}

share

앞에 나온 연산자들을 추상화한 연산자이다.
replay 매개 변수는 버퍼 사이즈이다. 0보다 크면 replaySubject, 0이면 publishSubject를 만들어준다.
scope 매개 변수는 공유하는 버퍼의 라이프 타임이다. buffer가 0이라면 scope의 상관없이 같은 결과를 낸다.

public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
    -> Observable<Element> {
    switch scope 
    case .forever:
    	switch replay {
        case 0: return self.multicast(PublishSubject()).refCount()
        default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
    	}
    case .whileConnected:
        switch replay {
        case 0: return ShareWhileConnected(source: self.asObservable())
        case 1: return ShareReplay1WhileConnected(source: self.asObservable())
    	default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
    	}
    }
}

whileConnected

scope가 whileConnected이면 replay의 버퍼는 connectable observable의 시퀀스 동안 유지된다. 

observer1과 observer2는 같은 시퀀스를 공유하기 때문에 obserever2가 구독을 시작하면 버퍼에 저장된 최근 3개의 이벤트가 방출된다.

반면에 observer3는 observer1, observer2가 dispose 된 시점에 구독을 시작하기 때문에 새로운 시퀀스가 생성되어 0부터 이벤트를 받는다.

observer4는 observer3이 dispose 되기 전에 구독을 추가하여 같은 observable을 공유하고 버퍼에 저장된 값을 받는다

let disposeBag = DisposeBag()
let source = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .share(replay: 3, scope: .whileConnected)

let observer1 = source
    .subscribe { print("🔵", $0) }

let observer2 = source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = source.subscribe { print("⚫️", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
    DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
        let observer4 = source.subscribe { print("💚", $0) }
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer4.dispose()
        }
    }
}

forever

scope가 forever면 새로운 시퀀스와 상관없이 버퍼에 저장된 값을 모두 공유한다.

observer1, 2의 시퀀스가 5초에 dispose 되기 때문에 버퍼에는 2, 3, 4가 저장되어 있고 observer3이 구독을 시작하면 2, 3, 4를 전달 받고 새로운 시퀀스가 시작된다.

let disposeBag = DisposeBag()
let source = Observable<Int>
    .interval(.seconds(1), scheduler: MainScheduler.instance)
    .debug()
    .share(replay: 3, scope: .forever)

let observer1 = source
    .subscribe { print("🔵", $0) }

let observer2 = source
    .delaySubscription(.seconds(3), scheduler: MainScheduler.instance)
    .subscribe { print("🔴", $0) }

DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
    observer1.dispose()
    observer2.dispose()
}

DispatchQueue.main.asyncAfter(deadline: .now() + 7) {
    let observer3 = source.subscribe { print("⚫️", $0) }
    
    DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
        observer3.dispose()
    }
    DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
        let observer4 = source.subscribe { print("💚", $0) }
        DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
            observer4.dispose()
        }
    }
}

/* 
결과
🔵 next(0)
🔵 next(1)
🔵 next(2)
🔴 next(0)
🔴 next(1)
🔴 next(2)
🔵 next(3)
🔴 next(3)
🔵 next(4)
🔴 next(4)
⚫️ next(2)
⚫️ next(3)
⚫️ next(4)
⚫️ next(0)
⚫️ next(1)
💚 next(4)
💚 next(0)
💚 next(1)
⚫️ next(2)
💚 next(2)
💚 next(3)
💚 next(4)
*/