Scheduler in Combine: Managing Asynchronous Tasks Efficiently

Scheduler in Combine: Managing Asynchronous Tasks Efficiently

What is a scheduler

According to the scheduler documentation, a scheduler is “a protocol that defines when and where to execute a closure.” You can use a scheduler to execute code as soon as possible, or after a future date

Combine does not work directly with threads. Instead, it allows Publishers to operate on specific Schedulers.

The where means current run loop, dispatch queue or operation queue.

The when means virtual time, according to the scheduler’s clock. The work performed by a scheduler will adhere to the scheduler’s clock only, which might not correspond to the real-time of the system.

Types of Schedulers in Combine

Combine provides several types of schedulers, all of which conform to the Scheduler protocol:

DispatchQueue

a DispatchQueue is a first-in-first-out queue that can accept tasks in the form of block objects and execute them serially or concurrently. You’ll commonly use serial or global queues for the background work, and the main queue for the UI-related work

The system manages work submitted to a DispatchQueue on a pool of threads. The DispatchQueue doesn’t guarantee which thread it will use for executing tasks unless the DispatchQueue represents an app’s main thread

DispatchQueue is one of the safest ways to schedule commands

OperationQueue

Performs the work on a specific operation queue. Similarly to the dispatch queues, use OperationQueue.main for UI work, and other queues for the background work.

ImmediateScheduler

An IntermediateScheduler is used to perform asynchronous operations immediately. This means that commands will be immediately executed on the application’s current threads

import Combine
import Foundation

let immediateScheduler = ImmediateScheduler.shared
let aNum = [1, 2, 3].publisher
    .receive(on: immediateScheduler)
    .sink(receiveValue: {
        print("Received \($0) on thread \(Thread.current)")
    }
)

The code snippet above is running on the main thread, so the result will be also received in the main thread

Received 1 on thread <_NSMainThread: 0x600001704040>{number = 1, name = main}
Received 2 on thread <_NSMainThread: 0x600001704040>{number = 1, name = main}
Received 3 on thread <_NSMainThread: 0x600001704040>{number = 1, name = main}

Default Scheduler

💡 Even if you don’t specify any scheduler, Combine provides you with the default one. The scheduler uses the same thread, where the task is performed. For example, if you perform a background task, Combine provides a scheduler that receives the task on the same background thread

let subject = PassthroughSubject<Int, Never>()
// 1
let token = subject.sink(receiveValue: { value in
    print(Thread.isMainThread)
})
// 2
subject.send(1)
// 3
DispatchQueue.global().async {
    subject.send(2)
}
  1. Print true if the value is received on the main thread, and false otherwise.

  2. Send 1 from the main thread.

  3. Send 2 from the background thread.

It will print:

true

false

As expected, the values are received on different threads.

Switching Schedulers

In order not to freeze or crash when interacting with the user interface, the resource-consuming tasks should be done in the background and then handle their result on the main thread. The Combine’s way of doing this is by switching schedulers with two methods: subscribe(on:) and receive(on:)

receive(on:)

The receive(on:) method changes a scheduler for all publisher that comes after it is declared

Just(1)
   .map { _ in print(Thread.isMainThread) }
   .receive(on: DispatchQueue.global())
   .map { print(Thread.isMainThread) }
   .sink { print(Thread.isMainThread) }

it will print:

true
false
false

The process is visualized as follows:

All operators to the right of receive(on:) deliver elements on DispatchQueue.global() scheduler.

subscribe(on:)

subscribe(on:) sets where subscription work happens. It stays there unless receive(on:) changes it. Think of it as assigning work locations.

Just(1)
   .subscribe(on: DispatchQueue.global())
   .map { _ in print(Thread.isMainThread) }
   .sink { print(Thread.isMainThread) }

It will print:

false

false

Let’s visualize the process:

All the operations happen on the DispatchQueue.global()scheduler

The position of subscribe(on:) doesn’t matter, as it affects the time of subscription. This snippet is equivalent to the previous one:

Just(1) 
   .map { _ in print(Thread.isMainThread) }
   .subscribe(on: DispatchQueue.global()) // Position of subscribe(on:) has changed
   .sink { print(Thread.isMainThread) }

💡 You must notice that the definition of subscribe(on:) says nothing about the scheduler on which we receive values. In case a publisher emits values on a different thread, it will be received on that thread. A typical example is a data task publisher:

URLSession.shared.dataTaskPublisher(for: URL(string: "<https://www.vadimbulavin.com>")!)
   .subscribe(on: DispatchQueue.main) // Subscribe on the main thread
   .sink(receiveCompletion: { _ in },
         receiveValue: { _ in
           print(Thread.isMainThread) // Are we on the main thread?
   })

The code will print false because the publisher emits values on a background thread. In such cases, we must use receive(on:) to specify a scheduler

Performing asynchronous tasks with schedulers

Let’s see how we can switch schedulers by combining subscribe(on:) and receive(on:)

Assume that we have a publisher with a long-running task:

struct BusyPublisher: Publisher {
    typealias Output = Int
    typealias Failure = Never

    func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
        sleep(5)
        subscriber.receive(subscription: Subscriptions.empty)
        _ = subscriber.receive(1)
        subscriber.receive(completion: .finished)
    }
}

When call from the UI thread, it freezes the app for 10 seconds. Remember, Combine defaults to the same scheduler from where the element is fired:

BusyPublisher()
  .sink { _ in
    print("Received value") 
  }
print("Hello")

As expected, Hello is printed after the value is received:

Received value
Hello

💡 So the solution for doing asynchronous work with Combine is subscribing on the background scheduler and then receiving the events on the UI Scheduler:

BusyPublisher()
   .subscribe(on: DispatchQueue.global())
   .receive(on: DispatchQueue.main)
   .sink { _ in print("Received value") }
print("Hello")

As expected, Hello is printed before the value is received, this means that the application is not frozen by the publisher blocking the main thread.

Hello
Received value

Summary

Let’s recap the main takeaways.

  • subscribe(on:) and receive(on:) are primary multithreading methods of the Combine Swift framework.

  • The default scheduler uses the same thread from where the element was generated.

  • receive(on:) sets a scheduler for all operators coming afterward.

  • subscribe(on:) sets a scheduler for the whole stream, starting at the time the Publisher is subscribed to. The stream stays on the same scheduler until receive(on:) specifies another scheduler.

  • The position of subscribe(on:) does not matter.

  • Asynchronous work is typically performed by subscribing on the background scheduler and receiving values on the UI scheduler.


Thanks for Reading! ✌️

If you have any questions or corrections, please leave a comment below or contact me via my LinkedIn account Pham Trung Huy.

Happy coding 🍻