I'm looking for an operator that does the opposite of the collect operator:
- Allows you to map from one emission value to many emission values, emitting them on the stream. For example if I have the element
[1,2,3]in the stream, I want to convert it into a stream that emits1then2, then3. - Does not change the stream's completion state. Admittedly this
uncollectoperator wouldn't be exactly the opposite ofcollectsincecollectwaits until the stream is finished. Withuncollectit should just process results while the stream is unfinished, and not attempt to finish/unfinish the stream.
For example, I imagine this is how an uncollect operator would function:
func fibonacci(_ number: Int) -> AnyPublisher<Int, Never> {
Future { ... }.eraseToAnyPublisher()
}
let serverEventStream: AnyPublisher<[Int], Never> = ...
serverEventStream // AnyPublisher<[Int], Never> // Ex. 2 values: `[12, 24]`, `[1, 10, 50]`
.uncollect { $0 } // AnyPublisher<Int, Never> // Ex. 5 values: `12`, `24`, `1`, `10`, `50`
.flatMap { fibonacci($0) } // AnyPublisher<Int, Never> // Ex. 5 values: `144`, `46368`, `1`, `55`, `12586269025`
.sink { print($0) }
I was looking for names like explode, splat, or uncollect, to no avail. The closest operator that shares a promising name is flatMap since Sequence.flatMap is the equivalent in the non-Combine world. However, Combine's flatMap is the way to chain promises together.
Is there a way to uncollect elements, or map a single emission into an arbitrary number of emissions?
I was able to get it to work with my custom JustSeveral Publisher, but this seems clunky:
class Tests: XCTestCase {
func testUncollect() {
func fibonacci(_ number: Int) -> AnyPublisher<Int, Never> {
switch number {
case 1: return Just(1).eraseToAnyPublisher()
case 10: return Just(55).eraseToAnyPublisher()
case 12: return Just(144).eraseToAnyPublisher()
case 24: return Just(46368).eraseToAnyPublisher()
case 50: return Just(12586269025).eraseToAnyPublisher()
default: fatalError("Should actually build this function.")
}
}
let serverEventStream: AnyPublisher<[Int], Never> = JustSeveral([[12, 24], [1, 10, 50]]).eraseToAnyPublisher()
serverEventStream // AnyPublisher<[Int], Never> // Ex. 2 values: `[12, 24]`, `[1, 10, 50]`
.uncollect { $0 } // AnyPublisher<Int, Never> // Ex. 5 values: `12`, `24`, `1`, `10`, `50`
.flatMap { fibonacci($0) } // AnyPublisher<Int, Never> // Ex. 5 values: `144`, `46368`, `1`, `55`, `12586269025`
.sink { print($0) }
}
}
extension Publisher {
func uncollect<T>(_ transform: @escaping (Output) -> [T]) -> AnyPublisher<T, Failure> {
self
.flatMap { output -> AnyPublisher<T, Failure> in
JustSeveral(transform(output))
.setFailureType(to: Failure.self)
.eraseToAnyPublisher()
}
.eraseToAnyPublisher()
}
}