I have a List<Flow<T>>, and would like to generate a Flow<List<T>>. This is almost what combine does - except that combine waits for each and every Flow to emit an initial value, which is not what I want. Take this code for example:
val a = flow {
  repeat(3) {
    emit("a$it")
    delay(100)
  }
}
val b = flow {
  repeat(3) {
    delay(150)
    emit("b$it")
  }
}
val c = flow {
  delay(400)
  emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
  combine(flows) {
    it.toList()
  }.collect { println(it) }
}
With combine (and hence as-is), this is the output:
[a2, b1, c]
[a2, b2, c]
Whereas I'm interested in all the intermediary steps too. This is what I want from those three flows:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
Right now I have two work-arounds, but none of them are great... The first one is plain ugly and doesn't work with nullable types:
val flows = listOf(a, b, c).map {
  flow {
    emit(null)
    it.collect { emit(it) }
  }
}
runBlocking {
  combine(flows) {
    it.filterNotNull()
  }.collect { println(it) }
}
By forcing all the flows to emit a first, irrelevant value, the combine transformer is indeed called, and lets me remove the null values which I know are not actual values. Iterating on that, more readable but heavier:
sealed class FlowValueHolder {
  object None : FlowValueHolder()
  data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
  flow {
    emit(FlowValueHolder.None)
    it.collect { emit(FlowValueHolder.Some(it)) }
  }
}
runBlocking {
  combine(flows) {
    it.filterIsInstance(FlowValueHolder.Some::class.java)
      .map { it.value }
  }.collect { println(it) }
}
Now this one works just fine, but still feels like I'm overdoing stuff. Is there a method that I'm missing in the coroutines library?
 
     
     
    