Say I have simple producer/consumer model where the consumer wants to pass back some state to the producer. For instance, let the downstream-flowing objects be objects we want to write to a file and the upstream objects be some token representing where the object was written in the file (e.g. an offset).
These two processes might look something like this (with pipes-4.0),
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
import Pipes
import Pipes.Core
import Control.Monad.Trans.State
import Control.Monad
newtype Object = Obj Int
deriving (Show)
newtype ObjectId = ObjId Int
deriving (Show, Num)
writeObjects :: Proxy ObjectId Object () X IO r
writeObjects = evalStateT (forever go) (ObjId 0)
where go = do i <- get
obj <- lift $ request i
lift $ lift $ putStrLn $ "Wrote "++show obj
modify (+1)
produceObjects :: [Object] -> Proxy X () ObjectId Object IO ()
produceObjects = go
where go [] = return ()
go (obj:rest) = do
lift $ putStrLn $ "Producing "++show obj
objId <- respond obj
lift $ putStrLn $ "Object "++show obj++" has ID "++show objId
go rest
objects = [ Obj i | i <- [0..10] ]
Simple as this might be, I've had a fair bit of difficulty reasoning about how to compose them. Ideally, we'd want a push-based flow of control like the following,
writeObjectsstarts by blocking onrequest, having sent the initialObjId 0upstream.produceObjectssends the first object,Obj 0, downstreamwriteObjectswrites the object and increments its state, and waits onrequest, this time sendingObjId 1upstreamrespondinproduceObjectsreturns withObjId 0produceObjectscontinues at Step (2) with the second object,Obj 1
My initial attempt was with push-based composition as follows,
main = void $ run $ produceObjects objects >>~ const writeObjects
Note the use of const to work around the otherwise incompatible types (this is likely where the problem lies). In this case, however, we find that ObjId 0 gets eaten,
Producing Obj 0
Wrote Obj 0
Object Obj 0 has ID ObjId 1
Producing Obj 1
...
A pull-based approach,
main = void $ run $ const (produceObjects objects) +>> writeObjects
suffers a similar issue, this time dropping Obj 0.
How might one go about composing these pieces in the desired manner?