Mastering Publisher Subscriptions in Combine: A Comprehensive Guide

Mastering Publisher Subscriptions in Combine: A Comprehensive Guide

It’s time to review a little:

Subscribers will receive values from Publisher, all subscribers must inherit the Subscriber protocol.

public protocol Subscriber {
  associatedtype Input
  associatedtype Failure: Error
  func receive(subscription: Subscription)
  func receive(_ input: Self.Input) -> Subscribers.Demand
  func receive(completion: Subscribers.Completion<Self.Failure>)
}
  • Input: Type of data provided, it has to be matched the Output of Publisher

  • Failure: Type of data for error

3 important methods:

  • receive(subscription:) when receiving subscription from Publisher

  • receive(input:) when receiving value from Publisher and we will adjust the request for further data through Demand.

  • receive( completion:) when receiving completion from the publisher.

You can subscribe to a Subscriber by using subscribe of Publisher (if you haven’t read my article about Publisher in Combine yet, you can check this link)

publisher.subscribe(subscriber)

An important note that only when a subscriber connects does the publisher emit data

Subscriber has the ability to self-destruction when subscription is disconnected. That cancellation helps memory automatically release unnecessary objects. We have 2 types of cancellation:

  • Auto cancel by AnyCancellable when we create subscribers by sink or assign

  • Manually cancel by method cancel() of Subscriber

How to create Subscriber

Assign

Assigns each element from a publisher to a property on an object.

func assign<Root>(
    to keyPath: ReferenceWritableKeyPath<Root, Self.Output>,
    on object: Root
) -> AnyCancellable

Use the assign(to:on:) subscriber when you want to set a given property each time a publisher produces a value.

import Combine
class MyModel {
    @Published var value: Int = 0
}
let model = MyModel()
var cancellable = Set<AnyCancellable>()
Just(10)
    .assign(to: \.value, on: model)
    .store(in: &cancellable)
print(model.value)  // Output: 10

You can use Subscribers.Assignto get the same approach, but note that the instance created by this operator maintains a strong reference to object, and sets it tonilwhen the upstream publisher completes (either normally or with an error), you can see the below example and try it yourself in Playground

import Combine
class ViewModel {
    var name: String = ""
    private var cancellable: Set<AnyCancellable> = Set()
    deinit {
        print("deinit")
    }
    init(publisher: CurrentValueSubject<String, Never>) {
        publisher.assign(to: \.name, on: self).store(in: &cancellable)
    }
}
let publisher = CurrentValueSubject<String, Never>("Test")
var viewModel: ViewModel? = ViewModel(publisher: publisher)
viewModel = nil // the ViewModel object can't be released because upstream publisher hasn't finished
publisher.send(completion: .finished) // finish the publisher, now the ViewModel object is completely released

I also find out an extension at forums.swift.org can replace the primary assign method to properly prevent the memory leak, that’s awesome!

extension Publisher where Self.Failure == Never {
    public func assignNoRetain<Root>(to keyPath: ReferenceWritableKeyPath<Root, Self.Output>, on object: Root) -> AnyCancellable where Root: AnyObject {
        sink { [weak object] (value) in
        object?[keyPath: keyPath] = value
    }
  }
}

Sink

Attaches a subscriber with closure-based behavior.

Use sink(receiveCompletion:receiveValue:) to observe values received by the publisher and process them using a closure you specify.

let publisher = Just("Hello, Combine!")
let cancellable = publisher
    .sink(receiveCompletion: { completion in
        switch completion {
        case .finished:
            print("Publisher completed successfully.")
        case .failure(let error):
            print("Publisher completed with error: \(error)")
        }
    }, receiveValue: { value in
        print("Received value: \(value)")
    })
  • The receiveValue: closure is called with the value emitted by the publisher, and the receiveCompletion: closure is called when the publisher completes.

  • The receiveCompletion: closure takes a Subscribers.Completion parameter, which is an enum that can be either .finished or .failure(let error). In this case, since Just cannot fail, there’s no error to handle, but in a real-world scenario, you would handle potential errors in the .failure case.

Please note thatsink(receiveCompletion:receiveValue:) returns an AnyCancellable instance, which is stored in Set<AnyCancellable> to keep the subscription alive. If you don’t store this instance, the subscription is canceled immediately

Custom Subscriber

Implementing custom subscribers in Combine allows us to define our own logic for handling values received from publishers.

Step 1: Define Your Custom Subscriber

This class should conform to the Subscriber protocol. The Subscriber protocol requires you to specify types for Input and Failure.

class CustomSubscriber: Subscriber {
    typealias Input = Int
    typealias Failure = Never

Step 2: Implement thereceive(subscription:)Method

This method is called once when the publisher is connected to the subscriber. Here, you can request a certain number of values from the publisher.

func receive(subscription: Subscription) {
    subscription.request(.max(1))
}

Step 3: Implement thereceive(_:)Method

This method is called each time a new value is delivered by the publisher. Here, you can handle the received value.

func receive(_ input: Int) -> Subscribers.Demand {
    print("Value:", input)
    return .none
}

This mode returns Demand means that every time receives the value, Subscriber will adjust its request through Demand . With returning:

  • none : do not get any more elements

  • unlimited : take all

  • max(n): get the next n elements

Step 4: Implement thereceive(completion:)Method

This method is called when the publisher completes, either successfully or with an error. Here, you can handle the completion event.

func receive(completion: Subscribers.Completion<Never>) {
        print("Completion: \(completion)")
    }
}

Step 5: Use Your Custom Subscriber

Finally, you can use your custom subscriber with a publisher. The subscribe(_:) method is called on the publisher with the subscriber as an argument, which connects the publisher to the subscriber.

let publisher = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].publisher
let subscriber = CustomSubscriber()
publisher.subscribe(subscriber)

You can also get the full code of CustomSubscriber here

AnyCancellable and memory management

AnyCancellable is a type-erasing cancellable object that executes a provided closure when canceled. It’s used to manage the lifetime of a subscription

Here are some key points about AnyCancellable:

  • Cancellation token: Subscriber implementations can use AnyCancellable to provide a “cancellation token” that makes it possible for a caller to cancel a publisher. This means you can stop the subscription whenever you want by calling the cancel() method.
let publisher = Just("Hello, Combine!")
let cancellable: AnyCancellable = publisher
    .sink { value in
        print(value)
    }

// When you want to cancel the subscription
cancellable.cancel()
  • Automatic cancellation: An AnyCancellable instance automatically calls cancel() when de-initialized. This means if the AnyCancellable instance is deallocated, the subscription will be canceled automatically.
class MyClass {
    var cancellable: AnyCancellable? = nil

    init() {
        cancellable = Just("Hello, Combine!")
            .sink { value in
                print(value)
            }
    }
}

var myClass: MyClass? = MyClass()  // prints "Hello, Combine!"
myClass = nil  // `cancellable` is deallocated, so the subscription is cancelled
  • Storing AnyCancellable instances: You can store AnyCancellable instances in a collection or a set using the store(in:) method. This is useful when you have multiple subscriptions and want to manage them together.

  • Memory Management: When you store AnyCancellable instances in a set (or any other collection), you’re essentially creating a strong reference to those instances. As long as there’s a strong reference to an AnyCancellable, the subscription it represents stays alive. However, if you remove all references to an AnyCancellable, then those AnyCancellable instances get deallocated, and the subscriptions they represent are automatically canceled.

class MyClass {
    var cancellables = Set<AnyCancellable>()

    init() {
        Just("Hello, Combine!")
            .sink { value in
                print(value)
            }
            .store(in: &cancellables)
    }
}

var myClass: MyClass? = MyClass()  // prints "Hello, Combine!"
myClass = nil  // MyClass instance is deinitialized, so all subscriptions are cancelled

In this code, when myClass is set to nil, the MyClass instance is deinitialized. As a result, the cancel() method of each AnyCancellable instance will be called automatically, effectively canceling all subscriptions. This is a key feature of AnyCancellable and a reason why it’s commonly used for managing subscriptions in Combine

Conclusion

This article covers subscribing to publishers with assign , sink and custom subscribers. Lastly, take advantage of AnyCancellable for efficient subscription and memory management in Combine.

References

https://developer.apple.com/documentation/combine/publisher/assign(to:on:)

https://stackoverflow.com/questions/57980476/how-to-prevent-strong-reference-cycles-when-using-apples-new-combine-framework

AnyCancellable | Apple Developer Documentation


Thanks for Reading! ✌️

If you have any questions or corrections, please leave a comment below or contact me via my LinkedIn accountPham Trung Huy.

Happy coding 🍻