swift

RxSwift: Combining operators

kimyounggyun 2022. 8. 19. 16:44

startWith

observable이 방출할 요소 앞부분에 다른 요소를 추가하는 연산자. 
주로 기본 값이나 시작 값을 지정할 때 사용한다. LIFO 방식이다.

let disposeBag = DisposeBag()
let numbers = [1, 2, 3, 4, 5]

Observable
    .from(numbers)
    .startWith(-1, -2)
    .startWith(-3, -4)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

/*
next(-3)
next(-4)
next(-1)
next(-2)
next(1)
next(2)
next(3)
next(4)
next(5)
completed
*/

concat

2개의 observable를 연결하는 연산자.
하나의 observable이 모든 요소를 성공적으로 방출하면 이어지는 observable의 요소가 방출된다.  

let fruits = Observable.from(["🍏", "🍎", "🥝"])
let animals = Observable.from(["🐶", "🐱", "🐹"])

Observable
    .concat([fruits, animals])
    .subscribe { print($0) }
    .disposed(by: disposeBag)
    
/*
next(🍏)
next(🍎)
next(🥝)
next(🐶)
next(🐱)
next(🐹)
completed
*/

Merge

여러 개의 observable에서 방출하는 요소를 하나의 observable로 합치는 연산자.

maxConcurrent를 지정하면 지정한 개수만 합쳤다가 합쳐진 observable 중 한 개가 completed 되면 큐에 저장되었던 다음 observable이 병합된다.

let oddNumbers = BehaviorSubject(value: 1)
let evenNumbers = BehaviorSubject(value: 2)
let negativeNumbers = BehaviorSubject(value: -1)
let source1 = Observable.of(oddNumbers, evenNumbers, negativeNumbers)

source1
    .merge(maxConcurrent: 2)
    .subscribe { print($0) }
    .disposed(by: disposeBag)
/*
next(1)
next(2)
*/
negativeNumbers.onNext(-3) 	// 구독자로 전달 X
oddNumbers.onNext(3) 		// next(3)
evenNumbers.onNext(4) 		// next(4)
oddNumbers.onCompleted() 	// next(-3) 14번째 줄이 지금 전달됨.
evenNumbers.onNext(8) 		// next(8)
negativeNumbers.onNext(-5) 	// next(-5)
evenNumbers.onCompleted()
negativeNumbers.onCompleted() 	// completed

combineLatest

observable이 요소를 방출할 때마다 전달된 observable이 최근에 방출한 요소들을 모아서 방출하는 연산자.

let greetings = PublishSubject<String>()
let names = PublishSubject<String>()
let numbers = BehaviorSubject<Int>(value: 3)

Observable
    .combineLatest(greetings, names, numbers)
    .subscribe(onNext: {
        print($0)
    })
    .disposed(by: disposeBag)

greetings.onNext("Hi")
names.onNext("Sunny")			// next(("Hi", "Sunny", 3))
numbers.onNext(5)			// next(("Hi", "Sunny", 5))
greetings.onNext("bonjour")		// next(("bonjour", "Sunny", 5))
names.onNext("Joo")			// next(("bonjour", "Joo", 5))

greetings.onCompleted() 		// 하나가 끊기면 가장 마지막에 방출한 이벤트가 사용됨
names.onNext("Lee")			// next(("bonjour", "Lee", 5))
names.onCompleted() 

numbers.onNext(10)			// next(("bonjour", "Lee", 10))
numbers.onCompleted()			// completed

withLatestFrom

 

triggerObservable.withLatestFrom(dataObservable) 형식으로 사용한다. triggerObservable이 Next 이벤트를 방출하면 dataObservable이 최근에 방출한 next이벤트를 구독자에게 전달한다.
회원가입 버튼을 누르면 텍스트 필드에 있는 값을 가져올 때 사용할 수 있다.

let trigger = PublishSubject<Void>()
let data = PublishSubject<String>()

trigger
    .withLatestFrom(data)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

data.onNext("Hello")
data.onNext("World")
trigger.onNext(())  // next(World)

data.onCompleted() // 구독자에게 completed가 전달되지 않음.
//data.onError(MyError.error) // 바로 전달됨
trigger.onNext(())	// 가장 최근에 전달한 이벤트가 전달됨
trigger.onCompleted()	// 바로 종료가 전달됨

zip

observable이 방출한 요소를 1:1로 맵핑하여 방출하는 연산자.

let greetings = PublishSubject<String>()
let names = PublishSubject<String>()

Observable
    .zip(greetings, names) { "\($0)! \($1)"}
    .subscribe { print($0) }
    .disposed(by: disposeBag)

greetings.onNext("Hi")
names.onNext("Kim") 	// next(Hi! Kim)

greetings.onNext("bonjour")
greetings.onNext("안녕")

names.onNext("Lee") 	// next(bonjour! Lee)
greetings.onCompleted()

names.onNext("Jo")  	// next(안녕! Jo)
names.onNext("Ke") 	// 맵핑할 greeting이 없음

names.onCompleted() 	// 둘 다 completed되어야 구독자에게 completed 이벤트가 전달됨

Sample

dataObservable.sample(triggerObservable) 형식으로 사용한다. triggerObservable이 Next 이벤트를 방출하면 dataObservable이 최근에 방출한 next이벤트를 구독자에게 전달한다. 근데 동일한 next 이벤트는 안 보낸다.

아래가 trigger Observable이고 방출될 때마다 위에서 최근에 방출한 요소가 구독자로 전달된다.

let trigger = PublishSubject<Void>()
let data = PublishSubject<String>()

data
    .sample(trigger)
    .subscribe { print($0) }
    .disposed(by: disposeBag)

trigger.onNext(())

data.onNext("Hello")	// next(Hello)
trigger.onNext(())

data.onNext("World")
trigger.onNext(())	// next(World)
trigger.onNext(())

data.onCompleted()
trigger.onCompleted() 	// 구독자에게 completed가 전달됨.

switchLatest

구독자에 이벤트를 방출할 observable를 바꾸는 연산자. source Observable은 inner observable를 가져야 하기 때문에 타입이 observable 이어한다.

let a = PublishSubject<String>()
let b = PublishSubject<String>()
let source = PublishSubject<Observable<String>>()

source
    .switchLatest()
    .subscribe { print($0) }
    .disposed(by: disposeBag)

a.onNext("1")
b.onNext("b")

source.onNext(a)	// a가 최신 옵저버블
a.onNext("2")   	// next(2)
b.onNext("c")

source.onNext(b)	// 이제 b가 최신 옵저버블
a.onNext("3")
b.onNext("d")   	// next(d)

a.onCompleted() 	// 전달안됨
b.onCompleted() 	// 최신인데 전달안됨

source.onCompleted()	// b가 completed되고, source도 completed되야 전달