I am trying to adapt Store encoding and decoding for Streaming. Store already implements a streaming decode with a function called decodeMessageBS.
I tried to do a basic implementation of store deserialization for Streaming as below (without bracket for now to keep it simple). However, there seems to be something wrong with the logic for popper because decodeMessageBS keeps throwing PeekException:
{-# LANGUAGE RankNTypes #-}
import Streaming.Prelude as S hiding (print,show)
import Data.IORef
import Streaming as S
import qualified Data.ByteString as BS (ByteString,empty,length)
import System.IO.ByteBuffer
import Data.Store
import Data.Store.Streaming
streamDecode :: forall a. (Store a) => ByteBuffer -> Stream (Of BS.ByteString) IO () -> Stream (Of a) IO ()
streamDecode bb inp = do
ref <- lift $ newIORef inp
let popper = do
r <- S.uncons =<< readIORef ref
case r of
Nothing -> return Nothing
Just (a,rest) -> writeIORef ref rest >> return (Just a)
let go = do
r <- lift $ decodeMessageBS bb $ popper
lift $ print "Decoding"
case r of
Nothing -> return ()
Just msg -> (lift $ print "Message found") >> (S.yield . fromMessage $ msg) >> go
go
I can decode my test file fine with decodeIOPortionWith - so, the problem seems to be in the logic needed to feed decodeMessageBS. Will appreciate pointers on what is wrong with the logic of popper here.