Combine Subscribers - Assign & Sink

We learned the basics of subscribers in Combine - Publisher, Subscriber & Subscription and implemented our own. Here we’re extending our discussion on subscribers by looking at two concrete subscriber types that come with Combine:

  • Subscribers.Assign
  • Subscribers.Sink

Subscribers.Assign

Subscribers.Assign is subscriber intended for assigning a publisher’s output to a property of an object. Let’s see an example of how it’s used before we break down how it works.

// A ValuePrinter prints its value to the console whenever its value changes
class ValuePrinter<Value> {
    var value: Value {
        didSet {
            print(value)
        }
    }
    init(_ value: Value) {
        self.value = value
    }
}

// Here we define a publisher that publishes values from 1 to 5
// and register to it a subscriber of type Assign, who then writes
// the values received to the ValuePrinter's value property
Array(1...5).publisher.assign(to: \.value, on: ValuePrinter(0))


// And the console outputs:
// 1
// 2
// 3
// 4
// 5

Now let’s look at the implementation of Subscribers.Assign.

// A Subscriber can be in one of 3 states:
// - Awaiting a subscription
// - Subscribed with a subscription
// - Terminated subscription
enum SubscriptionStatus {
    case awaitingSubscription
    case subscribed(Subscription)
    case terminal
}

extension Subscribers {
    
    // Subscribers.Assign is a cancellable subscriber that never fails
    public final class Assign<Root, Input>: Subscriber, Cancellable {
        public typealias Failure = Never

        // This subscriber holds the destination object and keypath of its destination property
        // to which the subscriber should write to whenever it receives a value
        public private(set) var object: Root?
        public let keyPath: ReferenceWritableKeyPath<Root, Input>

        // The subscriber's subscription status is initially in the awaiting state
        // Note that SubscriptionStatus holds the subscription for the subscriber
        private var status = SubscriptionStatus.awaitingSubscription
        
        // The subscriber's initializer requires the object & property keypath of the destination
        // object be designated
        public init(object: Root, keyPath: ReferenceWritableKeyPath<Root, Input>) {
            self.object = object
            self.keyPath = keyPath
        }

        // The subscriber receives a subscription
        // 1. If the subscriber is currently subscribed or the previous subscription has been cancelled:
        //    the received subscription will be cancelled
        // 2. If the subscriber is awaiting a subscription:
        //    the received subscription is accepted, and the subscriber makes a request for 
        //    unlimited demand to the subscription
        public func receive(subscription: Subscription) {
            switch status {
            case .subscribed, .terminal:
                subscription.cancel()
            case .awaitingSubscription:
                status = .subscribed(subscription)
                subscription.request(.unlimited)
            }
        }

        // The subscriber receives a value
        // Write to the destination object's property through its keypath only if the
        // the subscriber is currently subscribed
        // Ignore the value otherwise
        public func receive(_ value: Input) -> Subscribers.Demand {
            switch status {
            case .subscribed:
                object?[keyPath: keyPath] = value
            case .awaitingSubscription, .terminal:
                break
            }
            return .none
        }
        
        // The subscriber receives a completion event
        // Subscribers.Assign never fails & ignores the completion event altogether
        // The subscription is cancelled when the completion event is received nonetheless
        public func receive(completion: Subscribers.Completion<Never>) {
            cancel()
        }

        // Call cancel to cancel the subscription
        // If the subscriber is currently susbcribed, cancel the subscription and move
        // SubscriptionStatus to terminated
        public func cancel() {
            guard case let .subscribed(subscription) = status else {
                return
            }
            subscription.cancel()
            status = .terminal
            object = nil
        }
    }
}

extension Publisher where Failure == Never {

    // We can call assign on the publisher to attach a Subscribers.Assign subscriber to it
    // (like we did in our example)
    public func assign<Root>(to keyPath: ReferenceWritableKeyPath<Root, Output>,
                             on object: Root) -> AnyCancellable {
        
        // 1. Create a Subscribers.Assign subscriber using its default initializer
        // 2. Subscribe to the puslisher passing in the subscriber object
        // 3. Return the subscriber object wrapped as AnyCancellable
        let subscriber = Subscribers.Assign(object: object, keyPath: keyPath)
        subscribe(subscriber)
        return AnyCancellable(subscriber)
    }
}

(Snippet from OpenCombine)


Subscribers.Sink

Subscribers.Sink is subscriber whose behavior is defined by closure-wrapped logic assigned to it. Again, let’s see an example of how it’s used before we break down how it works.

// Here we define a publisher that publishes values from 1 to 5
// and register to it a subscriber of type Sink which outputs:
// "Completed with:" with a Subscribers.Completion instance when a completion event is received, or
// "Received value:" with the value received when a value is received
Array(1...5).publisher.sink(receiveCompletion: { print("Completed with: \($0)") }) 
    { print("Received value: \($0)") }


// And the console outputs:
// Received value: 1
// Received value: 2
// Received value: 3
// Received value: 4
// Received value: 5
// Completed with: finished

Now let’s see the implementation of Subscribers.Sink.

// A Subscriber can be in one of 3 states:
// - Awaiting a subscription
// - Subscribed with a subscription
// - Terminated subscription
enum SubscriptionStatus {
    case awaitingSubscription
    case subscribed(Subscription)
    case terminal
}

extension Subscribers {

    // Subscribers.Sink is a cancellable subscriber that can fails
    // A simple subscriber that requests an unlimited number of values upon subscription.
    public final class Sink<Input, Failure: Error> : Subscriber, Cancellable {
        
        // This subscriber's behavior is user-defined, respectively via:
        // 1. Closure to handle received values
        // 2. Closure to handle completion event
        public let receiveValue: (Input) -> Void
        public let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

        // The subscriber's subscription status is initially in the awaiting state
        // Note that SubscriptionStatus holds the subscription for the subscriber
        private var status = SubscriptionStatus.awaitingSubscription
        
        // The subscriber's initializer requires:
        // 1. A closure that defines how received values should be handled
        // 2. A closure that defines how the completion event should be handled
        public init(
            receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
            receiveValue: @escaping ((Input) -> Void)
        ) {
            self.receiveCompletion = receiveCompletion
            self.receiveValue = receiveValue
        }

        // The subscriber receives a subscription
        // 1. If the subscriber is currently subscribed or the previous subscription has been cancelled:
        //    the received subscription will be cancelled
        // 2. If the subscriber is awaiting a subscription:
        //    the received subscription is accepted, and the subscriber makes a request for 
        //    unlimited demand to the subscription
        public func receive(subscription: Subscription) {
            switch status {
            case .subscribed, .terminal:
                subscription.cancel()
            case .awaitingSubscription:
                status = .subscribed(subscription)
                subscription.request(.unlimited)
            }
        }
        
        // The subscriber receives a value
        // Defer to user-defined logic to handle the value
        public func receive(_ value: Input) -> Subscribers.Demand {
            receiveValue(value)
            return .none
        }
        
        // The subscriber receives a completion event
        // Defer to user-defined logic to handle the completion event
        public func receive(completion: Subscribers.Completion<Failure>) {
            receiveCompletion(completion)
            status = .terminal
        }

        // Call cancel to cancel the subscription
        // If the subscriber is currently susbcribed, cancel the subscription and move
        // SubscriptionStatus to terminated
        public func cancel() {
            guard case let .subscribed(subscription) = status else {
                return
            }
            subscription.cancel()
            status = .terminal
        }
    }
}

extension Publisher {

    // We can call sink on the publisher to attach a Subscribers.Sink subscriber to it
    // (like we did in our example)
    public func sink(
        receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
        receiveValue: @escaping ((Output) -> Void)
    ) -> AnyCancellable {

        // 1. Create a Subscribers.Sink subscriber using its default initializer
        // 2. Subscribe to the puslisher passing in the subscriber object
        // 3. Return the subscriber object wrapped in a AnyCancellable type
        let subscriber = Subscribers.Sink<Output, Failure>(
            receiveCompletion: receiveCompletion,
            receiveValue: receiveValue
        )
        subscribe(subscriber)
        return AnyCancellable(subscriber)
    }
}

extension Publisher where Failure == Never {
    
    // This sink call attaches a Subscribers.Sink subscriber to a publisher that never fails,
    // and is the one to use if the subscriber is not interested in the completion event
    public func sink(
        receiveValue: @escaping (Output) -> Void
    ) -> AnyCancellable {
        
        // 1. Create a Subscribers.Sink subscriber using its default initializer, passing
        //    a no-action completion event handler as default value
        // 2. Subscribe to the puslisher passing in the subscriber object
        // 3. Return the subscriber object wrapped in a AnyCancellable type
        let subscriber = Subscribers.Sink<Output, Failure>(
            receiveCompletion: { _ in },
            receiveValue: receiveValue
        )
        subscribe(subscriber)
        return AnyCancellable(subscriber)
    }
}

(Snippet from OpenCombine)