I am trying to understand the "Streams as arrows" section in John Hughes' famous "Generalising Arrows to Monads". To be more precise, I am interested in writing down the Fibonacci stream.
I tweaked Hughes' definition a bit:
data StreamProcessor a b = Get (a -> StreamProcessor a b) |
Put b (StreamProcessor a b) |
Halt
put = Put
get = Get
First of all, I treat stream processors as lists which may block (waiting for input). That is:
Put :: b -> StreamProcessor a b -> StreamProcessor a bmatches(:) :: a -> [a] -> [a];Halt :: StreamProcessor a bmatches[] :: [a];Get :: (a -> StreamProcessor a b) -> StreamProcessor a bhelps us block the stream and wait for input.
Therefore, if we drop the Get we get a list-like structure. If we also drop Halt we get an infinite-list-like structure.
Here are two ways I would understand "a stream of Fibonaccis":
a non-blocked infinite stream (infinite-list-like):
zipNonBlockedStreamsWith :: (a -> b -> c) -> StreamProcessor () a -> StreamProcessor () b -> StreamProcessor () c zipNonBlockedStreamsWith f (Put x sp) (Put y sp') = Put (f x y) (zipNonBlockedStreamsWith f sp sp') zipNonBlockedStreamsWith f Halt sp = Halt zipNonBlockedStreamsWith f sp Halt = Halt fibs :: StreamProcessor () Int fibs = put 0 (put 1 $ zipNonBlockedStreamsWith (+) fibs (tailNonBlockedStream fibs)) -- matches a well-known definition of an infinite Fibonacci list. fibsList :: [Int] fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList)) -- with the 'fibsList', we can use folds to do the same thing. putStream :: [b] -> StreamProcessor a b -> StreamProcessor a b putStream bs sp = foldr Put sp bs fibs' :: StreamProcessor () Int fibs' = putStream fibsList HaltA blocked stream waits for
n, outputs thenth Fibonacci number and blocks again. Hughes'Arrowinterface helps us express it in a quite concise way:instance Category StreamProcessor where ... instance Arrow StreamProcessor where arr f = Get $ \ a -> Put (f a) (arr f) ... fibsList :: [Int] fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList)) blockedFibs :: StreamProcessor Int Int blockedFibs = arr (fibsList !!)
Yet, in the paper I linked John Hughes shows another solution, Arrowing his way through:
instance Category StreamProcessor where
id = Get (\ x -> Put x id)
Put c bc . ab = Put c (bc . ab)
Get bbc . Put b ab = (bbc b) . ab
Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)
Get bbc . Halt = Halt
Halt . ab = Halt
bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f) = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f) = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =
Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt = Halt
instance Arrow StreamProcessor where
arr f = Get $ \ a -> Put (f a) (arr f)
first = bypass [] []
liftArr2 :: Arrow k => (a -> b -> c) -> k r a -> k r b -> k r c
liftArr2 f a b = a &&& b >>^ uncurry f
fibsHughes = let
fibsHughes' = put 1 (liftArr2 (+) fibsHughes fibsHughes')
in put 0 fibsHughes'
But it does not work the way I expect. The following function would help us take the values from the stream until it blocks or halts (using Data.List.unfoldr):
popToTheBlockOrHalt :: StreamProcessor a b -> [b]
popToTheBlockOrHalt = let
getOutput (Put x sp) = Just (x, sp)
getOutput getOrHalt = Nothing
in unfoldr getOutput
So, here is what we get:
GHCi> popToTheBlockOrHalt fibsHughes
[0, 1]
GHCi> :t fibsHughes
fibsHughes :: StreamProcessor a Integer
If we check the patterns, we would see that it blocks (that is we stumble into Get).
I cannot tell whether it should be that way. If it is what we want, why? If not, what is the problem? I checked and rechecked the code I wrote and it pretty much matches the definitions in Hughes' article (well, I had to add id and patterns for Halt - I cannot see how they could have messed the process up).
Edit: As it is said in the comments, in the later edition of the paper bypass was slightly changed (we use that one). It is able to accumulate both withheld bs and ds (that is has two queues), whereas the bypass from the original paper accumulates just ds (that is one queue).
Edit #2: I just wanted to write down a function which would pop the Fibonacci numbers from the fibsHughes:
popToTheHaltThroughImproperBlocks :: StreamProcessor () b -> [b]
popToTheHaltThroughImproperBlocks = let
getOutput (Put x sp) = Just (x, sp)
getOutput (Get c) = getOutput $ c ()
getOutput Halt = Nothing
in unfoldr getOutput
And here we go:
GHCi> (take 10 . popToTheHaltThroughImproperBlocks) fibsHughes
[0,1,1,2,3,5,8,13,21,34]