I'm playing around with the stream interfaces discussed in this blog post. I've made what feels like a simple implementation to test out the interfaces, but it is hanging indefinitely when I run it and I'm stumped as to why. Here's the code:
import java.io.PipedOutputStream
import java.io.PipedInputStream
import java.io.InputStream
import java.io.OutputStream
import scala.concurrent.{Future, Promise, Await}
import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors
import scala.concurrent.duration._
import scala.collection.immutable.ListMap
implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
def copy(input: InputStream, output: OutputStream): Unit = {
val buffer = new Array[Byte](1024)
var bytesRead = input.read(buffer)
while (bytesRead != -1) {
output.write(buffer, 0, bytesRead)
bytesRead = input.read(buffer)
}
}
def curthread(label: String): Unit = println(label, Thread.currentThread().getName())
trait Writable{
def writeBytesTo(out: OutputStream): Unit
}
trait Readable extends Writable{
def readBytesThrough[T](f: InputStream => T): T
def writeBytesTo(out: OutputStream): Unit = readBytesThrough(copy(_, out))
}
def convertToUpperCase(input: InputStream, output: OutputStream): Unit = {
println("convertToUpperCase")
curthread("convertToUpperCase")
val buffer = new Array[Byte](1024) // buffer size
println("convertToUpperCase: input.read")
var bytesRead = input.read(buffer)
while (bytesRead != -1) {
println(s"convertToUpperCase read $bytesRead")
val upperBuffer = buffer.map(_.toChar.toUpper.toByte)
println("convertToUpperCase: output.write")
output.write(upperBuffer, 0, bytesRead)
println("convertToUpperCase: input.read")
bytesRead = input.read(buffer)
}
input.close()
output.close()
}
class Put extends Readable {
private val output = new PipedOutputStream
def readBytesThrough[T](f: InputStream => T): T = {
println("Put: readBytesThrough")
curthread("Put")
f(new PipedInputStream(output))
}
def receive(x: Any): Unit = {
println(s"Put: receive($x)")
curthread("Put")
output.write(x.toString.getBytes("utf-8"))
println("done receiving")
}
def receivedLast(): Unit = {
println("Put: receivedLast")
curthread("Put")
output.close()
}
}
case class Parsed(value: String) extends Writable {
def writeBytesTo(out: OutputStream): Unit = {
println("Parsed: writeBytesTo")
curthread("Parsed")
out.write(value.getBytes("utf-8"))
}
}
def uppercase(r: Readable): Readable = new Readable {
def readBytesThrough[T](f: InputStream => T): T = {
val output = new PipedOutputStream
Future {
r.readBytesThrough(input => {
convertToUpperCase(input, output)
})
}
f(new PipedInputStream(output))
}
}
def parse(r: Readable): Writable = {
curthread("parse")
val x = r.readBytesThrough(scala.io.Source.fromInputStream(_).mkString)
Parsed(x)
}
def display(w: Writable): Unit = {
curthread("display")
w.writeBytesTo(System.out)
}
val put = new Put
curthread("main")
Future {
put.receive("foobarbaz")
put.receivedLast()
}
display(parse(uppercase(put)))
This gives the output
(main,run-main-0)
Put: receive(foobarbaz)
(Put,pool-11-thread-1)
(parse,run-main-0)
Put: readBytesThrough
(Put,pool-11-thread-2)
convertToUpperCase
(convertToUpperCase,pool-11-thread-2)
convertToUpperCase: input.read
So it appears that reads and writes to each end of the piped stream are happening on different threads as they should. Why then is it hanging at convertToUpperCase?
Scastie link: https://scastie.scala-lang.org/I9EIOs8NR2mBeLbcqDx2zA