Subscription을 공유하여 동일한 데이터 스트림을 여러 곳에서 재사용해 리소스 사용을 최적화 해보자.
RxSwift와 비교하기
share
Publisher는 주로 구조체로 정의되어 있으며, 함수의 인자로 전달되거나 다른 속성에 할당될 때마다 스위프트에 의해 복사된다.
struct Publisher : Publisher
share()는 Publishers.Share 클래스의 인스턴스를 반환하는 연산자이다. 일반적인 publisher가 구조체로 구현되어 값 의미론(Value Semantics)을 따르는 반면, share()는 참조 의미론(Reference Semantics)을 따르므로 주의가 필요하다.
final class Share<Upstream> : Publisher, Equatable where Upstream : Publisher
share()는 upstream publisher를 첫번째 구독자에 한하여 단 한 번만 구독한다. 이후 새로운 구독자들은 구독을 공유하여 upstream publisher가 방출하는 값을 공유받는다.
let shared = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
.map(\.data)
.print("shared")
print("subscribing first")
let subscription1 = shared.sink(
receiveCompletion: { _ in print("[subscription1] received completion") },
receiveValue: { print("[subscription1] received: \($0)") }
)
print("subscribing second")
let subscription2 = shared.sink(
receiveCompletion: { _ in print("[subscription2] received completion") },
receiveValue: { print("[subscription2] received: \($0)") }
)
/*
subscribing first
shared: receive subscription: (DataTaskPublisher) <- subscription 생성
shared: request unlimited
subscribing second
shared: receive value: (245629 bytes)
[subscription1] received: 245629 bytes <- 같은 값을 공유 받음
[subscription2] received: 245629 bytes <- 같은 값을 공유 받음
shared: receive finished
[subscription1] received completion
[subscription2] received completion
*/
share()가 없다면 각 구독마다 별도의 구독이 생성된다.
let shared = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
.map(\.data)
.print("shared")
/*
subscribing first
shared: receive subscription: (DataTaskPublisher) <- subscription 생성
shared: request unlimited
subscribing second
shared: receive subscription: (DataTaskPublisher) <- subscription 생성
shared: request unlimited
shared: receive value: (238588 bytes)
[subscription2] received: 238588 bytes <- 서로 다른 값을 공유 받음
shared: receive finished
[subscription2] received completion
shared: receive value: (283528 bytes)
[subscription1] received: 283528 bytes <- 서로 다른 값을 공유 받음
shared: receive finished
[subscription1] received completion
*/
combine | rxswift | |
---|---|---|
장점 | share(replay:scope:)를 통해 replay 기능을 제공한다. | |
단점 | 기본적으로는 replay 기능이 없다. 즉, subscriber가 구독을 시작한 시점 이전의 이벤트를 받지 못한다. | replay 기능을 사용할 때 메모리 관리에 주의해야 한다. 저장해야 하는 이벤트의 수가 많을수록 메모리 사용량이 증가한다. |
multicast
rxswift의 mulicast()와 동일하다. multicast()는 단일 subject를 인자로 받는다.
final class Multicast<Upstream, SubjectType>: ConnectablePublisher {
init(upstream: Upstream, subject: SubjectType)
init(upstream: Upstream, createSubject: @escaping () -> SubjectType)
}
upstream publisher가 방출한 이벤트는 인자로 받은 subject를 통해 전달되며, 이 subject는 전달받은 이벤트를 다수의 구독자에게 방출한다. 이로써 기존의 publisher와 구독자 사이의 1:1 관계가 subject를 통해 1:n 관계로 변화한다.
let multicasted = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
.map(\.data)
.print("multicast")
.multicast { PassthroughSubject<Data, URLError>() }
multicast는 connectable publisher를 반환한다. 기존의 publisher는 구독자가 구독을 시작하자마자 스트림이 생성되어 이벤트가 방출되지만, connectable publisher는 구독자가 구독을 해도 스트림이 생성되지 않으며, connect()를 호출하는 시점에 시퀀스가 시작된다.
물론 자동으로 connect()를 호출해주는 autoconnect()도 있다.
print("subscribing first")
let subscription1 = multicasted.sink(
receiveCompletion: { _ in print("[subscription1] received completion") },
receiveValue: { print("[subscription1] received: \($0)") }
)
print("subscribing second")
let subscription2 = multicasted.sink(
receiveCompletion: { print("subscription2 completion \($0)") },
receiveValue: { print("subscription2 received: '\($0)'") }
)
// connect() 연산자를 명시해야 이벤트가 방출된다.
let cancellable = mulicated.connect()
// autoconnect() 연산자를 사용하면 자동으로 connect() 연산자가 호출된다.
let multicasted = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
.map(\.data)
.print("multicast")
.multicast(subject: subject)
.autoconnect()
물론 자동으로 connect()를 호출해주는 autoconnect()도 있다.
// autoconnect() 연산자를 사용하면 자동으로 connect() 연산자가 호출된다.
let multicasted = URLSession.shared
.dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
.map(\.data)
.print("multicast")
.multicast(subject: subject)
.autoconnect()
share(replay:)
combine의 share()는 replay 기능이 없다. 하지만 직접 ShareReplaySubscription을 구현하여 share(replay:) publisher를 만들 수 있다.
final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
let capacity: Int
var subscriber: AnySubscriber<Output, Failure>? = nil
var demand: Subscribers.Demand = .none
var buffer: [Output]
var completion: Subscribers.Completion<Failure>? = nil
init<S: Subscriber>(
subscriber: S,
replay: [Output],
capacity: Int,
completion: Subscribers.Completion<Failure>?
) where Failure == S.Failure, Output == S.Input {
self.subscriber = AnySubscriber(subscriber)
self.buffer = replay
self.capacity = capacity
self.completion = completion
}
private func complete(with completion: Subscribers.Completion<Failure>) {
guard let subscriber = subscriber else { return }
self.subscriber = nil
self.completion = nil
self.buffer.removeAll()
subscriber.receive(completion: completion)
}
private func emitAsNeeded() {
guard let subscriber = subscriber else { return }
while self.demand > .none && !buffer.isEmpty {
self.demand -= .max(1)
let nextDemand = subscriber.receive(buffer.removeFirst())
if nextDemand != .none {
self.demand += nextDemand
}
}
if let completion = completion {
complete(with: completion)
}
}
func request(_ demand: Subscribers.Demand) {
if demand != .none {
self.demand += demand
}
emitAsNeeded()
}
func cancel() {
complete(with: .finished)
}
func receive(_ input: Output) {
guard subscriber != nil else { return }
buffer.append(input)
if buffer.count > capacity {
buffer.removeFirst()
}
emitAsNeeded()
}
func receive(completion: Subscribers.Completion<Failure>) {
guard let subscriber = subscriber else { return }
self.subscriber = nil
self.buffer.removeAll()
subscriber.receive(completion: completion)
}
}
enum ShareReplayScope {
case forever
case whileConnected
}
extension Publishers {
final class ShareReplayScopePublisher<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let lock = NSRecursiveLock()
private let upstream: Upstream
private let capacity: Int
private let scope: ShareReplayScope
private var replay = [Output]()
private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
private var completion: Subscribers.Completion<Failure>? = nil
init(upstream: Upstream, capacity: Int, scope: ShareReplayScope) {
self.upstream = upstream
self.capacity = capacity
self.scope = scope
}
private func relay(_ value: Output) {
lock.lock()
defer { lock.unlock() }
switch scope {
case .forever where completion != nil:
return
default:
replay.append(value)
if replay.count > capacity {
replay.removeFirst()
}
}
subscriptions.forEach { _ = $0.receive(value) }
}
private func complete(_ completion: Subscribers.Completion<Failure>) {
lock.lock()
defer { lock.unlock() }
subscriptions.forEach { _ = $0.receive(completion: completion) }
switch scope {
case .whileConnected:
replay.removeAll()
subscriptions.removeAll()
case .forever:
self.completion = completion
}
}
func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
lock.lock()
defer { lock.unlock() }
let subscription = ShareReplaySubscription(
subscriber: subscriber,
replay: replay,
capacity: capacity,
completion: completion
)
subscriptions.append(subscription)
subscriber.receive(subscription: subscription)
guard subscriptions.count == 1 else { return }
let sink = AnySubscriber(receiveSubscription: { subscription in
subscription.request(.unlimited)
}, receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
self?.relay(value)
return .none
}, receiveCompletion: { [weak self] in
self?.complete($0)
})
upstream.subscribe(sink)
}
}
}
extension Publisher {
func shareReplay(
capacity: Int = .max,
scope: ShareReplayScope = .forever
) -> Publishers.ShareReplayScopePublisher<Self> {
return Publishers.ShareReplayScopePublisher(upstream: self, capacity: capacity, scope: scope)
}
}
Future
Future은 이벤트 스트림을 공유하는 publisher이다. 이 역시 구조체가 아닌 클래스이므로 참조 의미론을 따른다.
final class Future<Output, Failure> : Publisher where Failure : Error
Future는 단일 값 또는 오류를 비동기적으로 생성하는 promise와 유사한 역할을 한다. 따라서 간단한 비동기 작업에 적합하다. 하지만 하나의 값만을 처리할 수 있어, 시간에 따라 여러 값을 처리해야 하는 스트림에는 적합하지 않다. 주의해야 할 점은, Future는 생성 즉시 클로저를 실행하여 결과 값을 저장하고 있다가 미래에 구독자가 생기면 저장된 값을 전달한다.
func execute() throws -> Int {
print("메소드 실행")
return 10
}
let future = Future<Int, Error> { fulfill in
do {
let result = try execute()
print("result: \(result)")
fulfill(.success(result))
} catch {
fulfill(.failure(error))
}
}
/*
메소드 실행
result: 10
*/
구독자가 있을 때까지 Future의 클로저 실행을 연기하기 위하여 Deferred로 Future을 감싸 사용하여 연기할 수 있다. 그러나 Deferred는 구조체이므로 구독이 매번 생성되므로 주의해야 한다.
struct Deferred<DeferredPublisher>: Publisher where DeferredPublisher : Publisher
let defered = Deferred {
Future<Int, Error> { fulfill in
...
}
}