Zip a Collection of Publishers
I
This was originally published on my old site at abizern.org
have an array of publishers: [Publisher<Data, Error>]
and want a publisher of the array of their outputs: Publisher<[Data], Error>
. The Combine framework provides the Zip
family of publishers which only go up to 4 inputs so this won’t suit my needs. I’m going to write about the steps I took to create a publisher that does what I want.
This seems like a daunting task. There is a blog post about creating a Combine Latest publisher which does something similar to what I needed. I could have used that publisher, but I wanted to be more explicit that this was a Zip
type of publisher not a CombineLatest
type of publisher.
At a recent NSCoder Night
(1)A monthly meetup of iOS and macOS developers
, Daniel helped me write a publisher that fetched all the pages of a paginated URL. From talking to him and referring to his write up I came to realise that creating a publisher is basically like following a recipe. And more importantly it’s not the Publisher that does the work: when a publisher receives a subscription, it creates an internal Subscription
object which it returns to the subscriber. It is this Subscription object which actually does the work.
Why do I Need my Own Publisher? #
For an app that I am developing for a client I fetch 24 images from 24 different URLs. I need all the images, and I need them to be ordered for the resulting object that I create to be considered complete. I want to be able to write a chain a like this at the call site:
urls // [String]
.map(convertToURL) // [URL]
.map(loadURL) // [Publisher<Data, Error>]
.zip // Publisher<[Data], Error>
.sink {...} // Consume [Data] or handle the error
Why Zip and not CombineLatest? #
As the array of publishers that I have are one-shot publishers, I could use the CombineLatest publisher described in the post above. There is a difference between CombineLatest and Zip. Diagrams make this clearer.
I chose to write the Zip publisher because conceptually, I want to wait for all the matched outputs and using a Zip makes this requirement explicit. And, I wanted an excuse to write a publisher.
Writing the Publisher #
Step 1: #
Create a struct which defines its Output
and Failure
matched to the upstream Output
and Failure
.
Let’s start with the Publisher itself. Publishers are =struct=s. In my case it’s just a container to hold the array of publishers so I constrain the generic type to be a collection of publishers. I also typealias the Output to be an array of the upstream publisher’s Outputs and the Failure to be the upstream publisher’s Failure type.
public struct ZipCollection<Publishers>
where
Publishers: Collection,
Publishers.Element: Publisher
{
public typealias Output = [Publishers.Element.Output]
public typealias Failure = Publishers.Element.Failure
private let publishers: Publishers
public init(_ publishers: Publishers) {
self.publishers = publishers
}
}
Step 2: #
Make this struct conform to Publisher
matching the Output
and Failure
to the downstream Input
and Failure
.
Add an extension to make ZiCollection
conform to Publisher
and implement the required method. This will not compile yet, because the Subscription
type hasn’t been defined. Note that I’m constraining the downstream Output
and Failure
to Zip
’s Output
and Failure
. The method simply creates a Subscription
object and passes it along to the subscriber.
extension ZipCollection: Publisher {
public func receive<Subscriber>(subscriber: Subscriber)
where
Subscriber: Combine.Subscriber,
Subscriber.Failure == Failure,
Subscriber.Input == Output
{
let subscription = Subscription(subscriber: subscriber, publishers: publishers)
subscriber.receive(subscription: subscription)
}
}
Step 3: #
Create a Subscription
object to return to the downstream subscribers that does the work of transforming the upstream Output
and Failure
to the downstream Input
and Failure
extension ZipCollection {
fileprivate final class Subscription<Subscriber>: Combine.Subscription
where
Subscriber: Combine.Subscriber,
Subscriber.Failure == Failure,
Subscriber.Input == Output
{
private let subscribers: [AnyCancellable]
private let queues: [Queue<Publishers.Element.Output>]
init(subscriber: Subscriber, publishers: Publishers) {
var count = publishers.count
var outputs = publishers.map { _ in Queue<Publishers.Element.Output>() }
queues = outputs
var completions = 0
var hasCompleted = false
let lock = NSLock()
subscribers = publishers.enumerated().map { index, publisher in
publisher.sink(receiveCompletion: { completion in
lock.lock()
defer { lock.unlock() }
guard case .finished = completion else {
// Any failure causes the entire subscription to fail.
subscriber.receive(completion: completion)
hasCompleted = true
outputs.forEach { queue in
queue.removeAll()
}
return
}
completions += 1
guard completions == count else { return }
subscriber.receive(completion: completion)
hasCompleted = true
}, receiveValue: { value in
lock.lock()
defer { lock.unlock() }
guard !hasCompleted else { return }
outputs[index].enqueue(value)
guard (outputs.compactMap{ $0.peek() }.count) == count else { return }
_ = subscriber.receive(outputs.compactMap({ $0.dequeue() }))
})
}
}
public func cancel() {
subscribers.forEach { $0.cancel() }
queues.forEach { $0.removeAll() }
}
public func request(_ demand: Subscribers.Demand) {}
}
}
This is a bit more code, because this is where the actual work is being done.
The only property is an array of AnyCancellable
which is used to handle the output of the upstream array of publishers. The init
method configures each of these to handle the output of the upstream publishers. I use a `Queue` to hold on to the received values, and when at least one value has been received from each of the publishers, I dequeue those results and send them on to the downstream subscriber as an array.
I handle cancellation by sending a cancel()
message to each of the Cancellables
.
As I’m not handling back pressure there is an empty implementation of the required request(_)
method.
Make it Chainable #
That’s it for the publisher. The only thing left to do is to write some conveniences to allow it to be used with chaining syntax. That’s quite simple:
extension Collection where Element: Publisher {
/// Combine the array of publishers to give a single array of the `Zip ` of their outputs
public var zip: ZipCollection<Self> {
ZipCollection(self)
}
}
Closing Thoughts #
Is this as efficient as Combine’s Zip
functions? I Don’t know. At the call site it’s a lot easier to use this rather than trying to turn 24 requests into 6 batches of Zip4
then a Zip3
and then a Zip2
to chain all 24 requests together (I know, because that was what I started to write). So it solves the problem I had in a way that I wanted to write the code. Also, the more of these that I write, the more comfortable I get writing them, which is another benefit.
Edit #
Thanks to Iain Smith who messaged me to point out that cancellation didn’t clear out the queues I’ve made some minor corrections to the code.
Code Repository #
The code for this is available as part of the FoundationCombine Swift Package available on GitHub. Alongside the CombineLatest
publisher which inspired it.