Swift Combine — Subject Publishers : PassthroughSubject & CurrentValueSubject

Nikunj joshi
7 min readJan 26, 2023

--

In the previous article, we have learnt about Future publisher and its working. In this article, we will explore ‘Subject’ publisher.

If you are directly coming to this article, basic knowledge of creating a own custom subscriber. If you wish to revise feel free to revisit the learning series.

Till now in this series, we have mostly explored Publishers with finite number events which emits events sequentially and synchronously. What if you want to send an even asynchronously and infinitely from the Publisher? I think combine has heard your need and provided ‘Subject’ publishers for the same purpose.

Do you remember our subscription example using NotificationCenter? Notificationcenter was sending events as per its wish asynchronously. We can achieve same using ‘Subject’ publishers and emit events to subscriber/s as per wish and need basis.

Combine ‘Subject’ publishers has two variants for the same purpose of sending events asynchronously according to need base, namely ‘PassthroughSubject’ and ‘CurrentValueSubject’. Let’s explore each in details.

PassthroughSubject

PassthroughSubject as name suggests, this publisher passes through the value to the subscriber/s on demand. We can call ‘send’ method of this publisher to emit an event or completion. This is very helpful when we want to manually emit an event from the Publisher. This seems like our regular coding practice where we send objects / value manually, right? You got it right, ‘Subject’ publishers help us to bridge normal imperative code with Combine World.

Let’s check how exactly ‘PassthroughSubject’ works with some coding magic.

import Foundation
import Combine

/*

This is a Util function for logs.

We will use this function to clearly distinguis the examples of variois topics.
Function will take a title of the topic and will execute the blocks after printing.
*/
public func myLearningCode(of title: String,
execute: () -> Void) {
print("\n ***** Example of:", title, "***** \n")
execute()
}

myLearningCode(of: "PassthroughSubject") {

enum StringError: Error {
case customError
}

class StringSubscriber: Subscriber {

typealias Input = String

typealias Failure = StringError

func receive(subscription: Subscription) {
subscription.request(.max(3))
}

func receive(_ input: String) -> Subscribers.Demand {

switch input.lowercased() {
case "adjust":
print("Custom Subscriber: received \(input)")
print("Increasing demand to current +1 \n")
return .max(1)
default:
print("Custom Subscriber: received \(input) \n")
return .none
}
}

func receive(completion: Subscribers.Completion<StringError>) {
print("Custom Subscriber: received a completion \(completion) \n")
}
}
}

We have created a custom subscriber with Input of String type and error of user defined StringError enum type. If you are struggling to understand this code, please take a pause, revisit our create a custom subscriber article.

Note that, whenever this custom subscriber is receiving a event with value ‘adjust’, it is increasing its demand to the publisher by +1. This will add 1 to the current demand.

Let’s add some code for ‘PassthroughSubject’ and see how it works.

/// ... continue in previous function code

myLearningCode(of: "PassthroughSubject") {

enum StringError: Error {
case customError
}

class StringSubscriber: Subscriber {

typealias Input = String

typealias Failure = StringError

func receive(subscription: Subscription) {
subscription.request(.max(3))
}

func receive(_ input: String) -> Subscribers.Demand {

switch input.lowercased() {
case "adjust":
print("Custom Subscriber: received \(input)")
print("Increasing demand to current +1 \n")
return .max(1)
default:
print("Custom Subscriber: received \(input) \n")
return .none
}
}

func receive(completion: Subscribers.Completion<StringError>) {
print("Custom Subscriber: received a completion \(completion) \n")
}
}

// 1
let passthroughSubject = PassthroughSubject<String, StringError>()

let customSubscriber = StringSubscriber()

// 2
passthroughSubject.subscribe(customSubscriber)

// 3
let subscription = passthroughSubject.sink { completion in
print("Sink :Received a \(completion) \n")
} receiveValue: { receivedValue in
print("Sink : \(receivedValue) \n")
}

// 4
passthroughSubject.send("event1")

// 5
passthroughSubject.send("more")

// 6
passthroughSubject.send("event3")
passthroughSubject.send("event4")

// 7
passthroughSubject.send("event5")

}
  1. Created a ‘PassthroughSubject’ publisher object with an output of a ‘String’ type and error of a ‘StringError’ type. Here, ‘StringError’ is a user defined enum from the Error protocol.
  2. Custom subscriber object is subscribed with passthroughSubject. This will create a subscription.
  3. Created another subscription using ‘sink’ and saved as a ‘subscriptionproperty.
  4. Sent an event using passthroughSubject with the value ‘event1’.
  5. Sent an event using passthroughSubject with value ‘more’. This will inforce subscriber to increase its demand as per our implementation in custom subscriber class.
  6. Sent couple more events using our passthrough publisher.
  7. Sent an event using passthroughSubject with the value ‘event5’.

Output

If you notice, ‘event5’ is not received by our ‘customSubscriber’, WHY??!!!
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — -
It’s time to think the answer of above question………………………
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — -
Reason is, customSubscriber has initially asked for the 3 events at ‘subscription.request(.max(3))’, on receiving ‘adjust’ input for the second event, publisher was told to adjust demand by +1, so new max events allowed to receive by this subscriber become 3 + 1 = 4. Now, when publisher is sending its fifth event, custom subscriber is not going to entertain it. I hope you also thought same before reading this answer, if not, please give our create a custom subscriber article a try, again.

Let’s try to send some more events and see the behaviour.

// 8    
subscription.cancel()

passthroughSubject.send("event6")

// 9
passthroughSubject.send(completion: .finished)

// 10
passthroughSubject.send(completion: .failure(.customError))

8. This will cancel the subscription we have created using ‘sink’.

9. Emitted a completion event with ‘finished.

10. Emitted a completion with our customError as a failure value.

Output

Some points to infer from this output:

event6’ : Not received by custom subscriber as its max allowed event count is over. Not received by sink subscriber because it is already cancelled before this event.

completion with ‘finished’ : Received by custom subscriber although its max allowed event count is over.

completion with ‘failure’ : Not received by both subscriber as one is already cancelled and another has already received a completion before this.

CurrentValueSubject

CurrentValueSubject publisher works exactly same as PassthroughSubject with some as additional capabilities.

  1. CurrentValueSubject takes initial value at initialization and emits it as soon as any subscriber subscribes with it.
  2. ‘value’ property can be set manually using assignment operator (=). Change in value property will create an emission to all subscriber automatically.
  3. CurrentValueSubject gives us facility to check current value (last emitted event value or initial event value) via its ‘value’ property.
  4. Completion event can’t be assigned directly to the ‘value’ property of the CurrentValueSubject subscriber.

Let’s copy above code of ‘PassthroughSubjectand validate above points.

import Foundation
import Combine

/*

This is a Util function for logs.

We will use this function to clearly distinguis the examples of variois topics.
Function will take a title of the topic and will execute the blocks after printing.
*/
public func myLearningCode(of title: String,
execute: () -> Void) {
print("\n ***** Example of:", title, "***** \n")
execute()
}

myLearningCode(of: "CurrentValueSubject") {

enum StringError: Error {
case customError
}

class StringSubscriber: Subscriber {

typealias Input = String

typealias Failure = StringError

func receive(subscription: Subscription) {
subscription.request(.max(3))
}

func receive(_ input: String) -> Subscribers.Demand {

switch input.lowercased() {
case "adjust":
print("Custom Subscriber: received \(input)")
print("Increasing demand to current +1 \n")
return .max(1)
default:
print("Custom Subscriber: received \(input) \n")
return .none
}
}

func receive(completion: Subscribers.Completion<StringError>) {
print("Custom Subscriber: received a completion \(completion) \n")
}
}

// 1.
let currentValueSubject = CurrentValueSubject<String, StringError>("Initial Value")

let customSubscriber = StringSubscriber()

currentValueSubject.subscribe(customSubscriber)

_ = currentValueSubject.sink { completion in
print("Sink :Received a \(completion) \n")
} receiveValue: { receivedValue in
print("Sink : \(receivedValue) \n")
}
}

Output

We haven’t published any event manually from our currentValueSubject, although both of our subscription have received an event with initially assigned value of ‘CurrentValueSubject’.

Let’s emit some events manually from our publisher.

myLearningCode(of: "CurrentValueSubject") {

enum StringError: Error {
case customError
}

class StringSubscriber: Subscriber {

typealias Input = String

typealias Failure = StringError

func receive(subscription: Subscription) {
subscription.request(.max(3))
}

func receive(_ input: String) -> Subscribers.Demand {

switch input.lowercased() {
case "adjust":
print("Custom Subscriber: received \(input)")
print("Increasing demand to current +1 \n")
return .max(1)
default:
print("Custom Subscriber: received \(input) \n")
return .none
}
}

func receive(completion: Subscribers.Completion<StringError>) {
print("Custom Subscriber: received a completion \(completion) \n")
}
}

// 1.
let currentValueSubject = CurrentValueSubject<String, StringError>("Initial Value")

let customSubscriber = StringSubscriber()

currentValueSubject.subscribe(customSubscriber)

_ = currentValueSubject.sink { completion in
print("Sink :Received a \(completion) \n")
} receiveValue: { receivedValue in
print("Sink : \(receivedValue) \n")
}

// 2
currentValueSubject.value = "updated value property using = operator"

currentValueSubject.send("event1")

// 3
print("currentValueSubject.value is: \(currentValueSubject.value)")

// 4
/// currentValueSubject.value = .finished
}

Output

Above output is totally aligned with all four points discussed above for the ‘CurrentValueSubject’.

Summary

  • PassthroughSubject’ and ‘CurrentValueSubject’ give us luxury to emit event/s on demand or as per need basis asynchronously and infinitely.
  • CurrentValueSubject’ gives some extra capability of giving a publisher an initial value as well as emitting an event automatically by updating its ‘value’ property.

We have learnt about Subject publisher and its types. We are about to become a ‘ninja’ of Subscriber and Publisher topics once we complete ‘Type Erasure’ in next article. Stay tuned with the combine learning series.

Feel free to follow me to stay updated with the upcoming articles.

Thank You. Cheers!

#Pre-Requisite
Intermediate level of the swift language / iOS is required for this Combine Article series.

#Credits

--

--