swift

Combine: Connectable Publisher Operators

kimyounggyun 2024. 5. 7. 16:42

Subscription을 공유하여 동일한 데이터 스트림을 여러 곳에서 재사용해 리소스 사용을 최적화 해보자.

RxSwift와 비교하기

 

[rxswift] Connectable Observable Operators

Observable을 공유하게 하는 연산자 Observable은 구독자가 추가되면 항상 새로운 시퀀스를 만든다. 서버와 통신하거나 파일을 읽는 경우에 서로 다른 구독자가 같은 observable을 공유할 수 있도록 하여

younggyun.tistory.com

share

Publisher는 주로 구조체로 정의되어 있으며, 함수의 인자로 전달되거나 다른 속성에 할당될 때마다 스위프트에 의해 복사된다.

struct Publisher : Publisher

share()는 Publishers.Share 클래스의 인스턴스를 반환하는 연산자이다. 일반적인 publisher가 구조체로 구현되어 값 의미론(Value Semantics)을 따르는 반면, share()는 참조 의미론(Reference Semantics)을 따르므로 주의가 필요하다.

final class Share<Upstream> : Publisher, Equatable where Upstream : Publisher

share()는 upstream publisher를 첫번째 구독자에 한하여 단 한 번만 구독한다. 이후 새로운 구독자들은 구독을 공유하여 upstream publisher가 방출하는 값을 공유받는다.

let shared = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
    .map(\.data)
    .print("shared")

print("subscribing first")
let subscription1 = shared.sink(
    receiveCompletion: { _ in print("[subscription1] received completion") },
    receiveValue: { print("[subscription1] received: \($0)") }
)

print("subscribing second")
let subscription2 = shared.sink(
    receiveCompletion: { _ in print("[subscription2] received completion") },
    receiveValue: { print("[subscription2] received: \($0)") }
)

/*
    subscribing first
    shared: receive subscription: (DataTaskPublisher) <- subscription 생성
    shared: request unlimited
    subscribing second
    shared: receive value: (245629 bytes)
    [subscription1] received: 245629 bytes <- 같은 값을 공유 받음
    [subscription2] received: 245629 bytes <- 같은 값을 공유 받음
    shared: receive finished
    [subscription1] received completion
    [subscription2] received completion
*/

share()가 없다면 각 구독마다 별도의 구독이 생성된다.

let shared = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
    .map(\.data)
    .print("shared")

/*
    subscribing first
    shared: receive subscription: (DataTaskPublisher) <- subscription 생성
    shared: request unlimited
    subscribing second
    shared: receive subscription: (DataTaskPublisher) <- subscription 생성
    shared: request unlimited
    shared: receive value: (238588 bytes)
    [subscription2] received: 238588 bytes <- 서로 다른 값을 공유 받음
    shared: receive finished
    [subscription2] received completion
    shared: receive value: (283528 bytes)
    [subscription1] received: 283528 bytes <- 서로 다른 값을 공유 받음
    shared: receive finished
    [subscription1] received completion
*/
  combine rxswift
장점   share(replay:scope:)를 통해 replay 기능을 제공한다.
단점 기본적으로는 replay 기능이 없다. 즉, subscriber가 구독을 시작한 시점 이전의 이벤트를 받지 못한다. replay 기능을 사용할 때 메모리 관리에 주의해야 한다. 저장해야 하는 이벤트의 수가 많을수록 메모리 사용량이 증가한다.

multicast

rxswift의 mulicast()와 동일하다. multicast()는 단일 subject를 인자로 받는다.

final class Multicast<Upstream, SubjectType>: ConnectablePublisher {
    init(upstream: Upstream, subject: SubjectType)
    init(upstream: Upstream, createSubject: @escaping () -> SubjectType)
}

upstream publisher가 방출한 이벤트는 인자로 받은 subject를 통해 전달되며, 이 subject는 전달받은 이벤트를 다수의 구독자에게 방출한다. 이로써 기존의 publisher와 구독자 사이의 1:1 관계가 subject를 통해 1:n 관계로 변화한다.

let multicasted = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
    .map(\.data)
    .print("multicast")
    .multicast { PassthroughSubject<Data, URLError>() }

multicast는 connectable publisher를 반환한다. 기존의 publisher는 구독자가 구독을 시작하자마자 스트림이 생성되어 이벤트가 방출되지만, connectable publisher는 구독자가 구독을 해도 스트림이 생성되지 않으며, connect()를 호출하는 시점에 시퀀스가 시작된다.

물론 자동으로 connect()를 호출해주는 autoconnect()도 있다.

print("subscribing first")
let subscription1 = multicasted.sink(
    receiveCompletion: { _ in print("[subscription1] received completion") },
    receiveValue: { print("[subscription1] received: \($0)") }
)

print("subscribing second")
let subscription2 = multicasted.sink(
    receiveCompletion: { print("subscription2 completion \($0)") },
    receiveValue: { print("subscription2 received: '\($0)'") }
)

// connect() 연산자를 명시해야 이벤트가 방출된다.
let cancellable = mulicated.connect() 

// autoconnect() 연산자를 사용하면 자동으로 connect() 연산자가 호출된다.
let multicasted = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
    .map(\.data)
    .print("multicast")
    .multicast(subject: subject)
    .autoconnect()

물론 자동으로 connect()를 호출해주는 autoconnect()도 있다.

// autoconnect() 연산자를 사용하면 자동으로 connect() 연산자가 호출된다.
let multicasted = URLSession.shared
    .dataTaskPublisher(for: URL(string: "https://www.naver.com")!)
    .map(\.data)
    .print("multicast")
    .multicast(subject: subject)
    .autoconnect()

share(replay:)

combine의 share()는 replay 기능이 없다. 하지만 직접 ShareReplaySubscription을 구현하여 share(replay:) publisher를 만들 수 있다.

final class ShareReplaySubscription<Output, Failure: Error>: Subscription {
  let capacity: Int
  var subscriber: AnySubscriber<Output, Failure>? = nil
  var demand: Subscribers.Demand = .none
  var buffer: [Output]
  var completion: Subscribers.Completion<Failure>? = nil
  
  init<S: Subscriber>(
    subscriber: S,
    replay: [Output],
    capacity: Int,
    completion: Subscribers.Completion<Failure>?
  ) where Failure == S.Failure, Output == S.Input {
    self.subscriber = AnySubscriber(subscriber)
    self.buffer = replay
    self.capacity = capacity
    self.completion = completion
  }
  
  private func complete(with completion: Subscribers.Completion<Failure>) {
    guard let subscriber = subscriber else { return }
    self.subscriber = nil
    self.completion = nil
    self.buffer.removeAll()
    subscriber.receive(completion: completion)
  }
  
  private func emitAsNeeded() {
    guard let subscriber = subscriber else { return }
    while self.demand > .none && !buffer.isEmpty {
      self.demand -= .max(1)
      let nextDemand = subscriber.receive(buffer.removeFirst())
      if nextDemand != .none {
        self.demand += nextDemand
      }
    }
    if let completion = completion {
      complete(with: completion)
    }
  }
  
  func request(_ demand: Subscribers.Demand) {
    if demand != .none {
      self.demand += demand
    }
    emitAsNeeded()
  }
  
  func cancel() {
    complete(with: .finished)
  }
  
  func receive(_ input: Output) {
    guard subscriber != nil else { return }
    buffer.append(input)
    if buffer.count > capacity {
      buffer.removeFirst()
    }
    emitAsNeeded()
  }
  
  func receive(completion: Subscribers.Completion<Failure>) {
    guard let subscriber = subscriber else { return }
    self.subscriber = nil
    self.buffer.removeAll()
    subscriber.receive(completion: completion)
  }
}

 

enum ShareReplayScope {
  case forever
  case whileConnected
}

extension Publishers {
  final class ShareReplayScopePublisher<Upstream: Publisher>: Publisher {
    typealias Output = Upstream.Output
    typealias Failure = Upstream.Failure
    
    private let lock = NSRecursiveLock()
    private let upstream: Upstream
    private let capacity: Int
    private let scope: ShareReplayScope
    private var replay = [Output]()
    private var subscriptions = [ShareReplaySubscription<Output, Failure>]()
    private var completion: Subscribers.Completion<Failure>? = nil
    
    init(upstream: Upstream, capacity: Int, scope: ShareReplayScope) {
      self.upstream = upstream
      self.capacity = capacity
      self.scope = scope
    }
    
    private func relay(_ value: Output) {
      lock.lock()
      defer { lock.unlock() }
      switch scope {
      case .forever where completion != nil:
        return
      default:
        replay.append(value)
        if replay.count > capacity {
          replay.removeFirst()
        }
      }
      
      subscriptions.forEach { _ = $0.receive(value) }
    }
    
    private func complete(_ completion: Subscribers.Completion<Failure>) {
      lock.lock()
      defer { lock.unlock() }
      
      subscriptions.forEach { _ = $0.receive(completion: completion) }
      
      switch scope {
      case .whileConnected:
        replay.removeAll()
        subscriptions.removeAll()
      case .forever:
        self.completion = completion
      }
    }
    
    func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
      lock.lock()
      defer { lock.unlock() }
      
      let subscription = ShareReplaySubscription(
        subscriber: subscriber,
        replay: replay,
        capacity: capacity,
        completion: completion
      )
      
      subscriptions.append(subscription)
      subscriber.receive(subscription: subscription)
      
      guard subscriptions.count == 1 else { return }
      let sink = AnySubscriber(receiveSubscription: { subscription in
        subscription.request(.unlimited)
      }, receiveValue: { [weak self] (value: Output) -> Subscribers.Demand in
        self?.relay(value)
        return .none
      }, receiveCompletion: { [weak self] in
        self?.complete($0)
      })
      
      upstream.subscribe(sink)
    }
  }
}
extension Publisher {
  func shareReplay(
    capacity: Int = .max,
    scope: ShareReplayScope = .forever
  ) -> Publishers.ShareReplayScopePublisher<Self> {
    return Publishers.ShareReplayScopePublisher(upstream: self, capacity: capacity, scope: scope)
  }
}

Future

Future은 이벤트 스트림을 공유하는 publisher이다. 이 역시 구조체가 아닌 클래스이므로 참조 의미론을 따른다.

final class Future<Output, Failure> : Publisher where Failure : Error

Future는 단일 값 또는 오류를 비동기적으로 생성하는 promise와 유사한 역할을 한다. 따라서 간단한 비동기 작업에 적합하다. 하지만 하나의 값만을 처리할 수 있어, 시간에 따라 여러 값을 처리해야 하는 스트림에는 적합하지 않다. 주의해야 할 점은, Future는 생성 즉시 클로저를 실행하여 결과 값을 저장하고 있다가 미래에 구독자가 생기면 저장된 값을 전달한다.

func execute() throws -> Int {
    print("메소드 실행")
    return 10
}
let future = Future<Int, Error> { fulfill in
    do {
        let result = try execute()
        print("result: \(result)")
        fulfill(.success(result))
    } catch {
        fulfill(.failure(error))
    }
}

/*
    메소드 실행
    result: 10
*/

구독자가 있을 때까지 Future의 클로저 실행을 연기하기 위하여 Deferred로 Future을 감싸 사용하여 연기할 수 있다. 그러나 Deferred는 구조체이므로 구독이 매번 생성되므로 주의해야 한다.

struct Deferred<DeferredPublisher>: Publisher where DeferredPublisher : Publisher


let defered = Deferred {
    Future<Int, Error> { fulfill in
        ...
    }
}