Hi all,
I was trying to do something that would allow me to partition a stream to parts, work with them as independent streams, and sensibly merge the results. Essentially a "diamond" style workflow, but extensible in how many branches the diamond has.
In the current iteration I arrived to a set of helpers that can be used to make a pipeline:
toL puts items into InL
dupLs is like copy but the copied stream only contains the stuff from the left side of Sum
joinRs takes a transformed stream and saves its results back into the bigger stream in InR
fromR takes the stream with the transformations applied, and drops the leftover "left" Sums, only leaving the results.
Schematically, one make a stream of (Sum input output), and copy the inputs into as many other output-producing streams as feasible (bracketed between joinRs . ... . dupLs), finally scraping all the results with fromR. Code example below. I'm adding a picture of how I imagine the process (corresponds to: ... $ fromR $ joinRs . S.map (+1) . dupLs $ toL $ ...):
,--- S.map (+1)-.
,---(InL)--> dupLs -------------- \ ---- - - - -----v
x -> toL --(InR)--> - - - --------------- \ --> joinRs --- fromR --> (x+1)
| '----^ |
| repeatable as needed ................ |
So having this kinda working, I thought I'd better ask here:
- I didn't find a way to do this easily with existing functionality in
streaming; in case this looks useful to anyone, should I PR? In the other case, is this somehow easily composable from the existing stuff in streaming?
- It seems to me that the use of
destroy might be too much in cases, but I failed find any smaller gun to do this properly; is there any way to express this e.g. with mapped ?
- can one somehow easily get rid of the fixed
Of in the type of fromR? I guess we'd instead need something more generic, such as Comonad's extract?
Thanks for any comments!
Code example:
import Streaming
import qualified Streaming.Prelude as S
toL :: (Functor f, Monad m) => Stream f m r -> Stream (Sum f x) m r
toL = maps InL
dupLs ::
(Functor f, Functor g, Monad m)
=> Stream (Sum f g) m r
-> Stream f (Stream (Sum f g) m) r
dupLs s =
destroy
s
(\x ->
case x of
InL fss -> wrap (effect (yields (InL fss)) <$ fss)
InR gss -> effect (yields (InR gss)))
(effect . lift)
return
joinRs ::
(Functor f, Functor g, Monad m)
=> Stream g (Stream (Sum f g) m) r
-> Stream (Sum f g) m r
joinRs s = destroy s (\gss -> wrap $ InR gss) join return
fromR :: (Monad m, Functor g) => Stream (Sum (Of x) g) m r -> Stream g m r
fromR = S.effects . separate
test :: IO ()
test =
S.print
. fromR
$ joinRs . mapped S.toList . chunksOf 3 . dupLs -- list chunks aren't great for streaming but it shows how the results interleave
$ joinRs . mapped S.toList . chunksOf 5 . dupLs
$ toL
$ S.each [1 .. 20 :: Int]
test prints:
[1,2,3]
[1,2,3,4,5]
[4,5,6]
[7,8,9]
[6,7,8,9,10]
[10,11,12]
[11,12,13,14,15]
[13,14,15]
[16,17,18]
[16,17,18,19,20]
[19,20]
EDITS: fixed type of test to compile with monomorphism restriction, added the note about fromR
Hi all,
I was trying to do something that would allow me to partition a stream to parts, work with them as independent streams, and sensibly merge the results. Essentially a "diamond" style workflow, but extensible in how many branches the diamond has.
In the current iteration I arrived to a set of helpers that can be used to make a pipeline:
toLputs items intoInLdupLsis likecopybut the copied stream only contains the stuff from the left side ofSumjoinRstakes a transformed stream and saves its results back into the bigger stream inInRfromRtakes the stream with the transformations applied, and drops the leftover "left" Sums, only leaving the results.Schematically, one make a stream of
(Sum input output), and copy the inputs into as many other output-producing streams as feasible (bracketed betweenjoinRs . ... . dupLs), finally scraping all the results withfromR. Code example below. I'm adding a picture of how I imagine the process (corresponds to:... $ fromR $ joinRs . S.map (+1) . dupLs $ toL $ ...):So having this kinda working, I thought I'd better ask here:
streaming; in case this looks useful to anyone, should I PR? In the other case, is this somehow easily composable from the existing stuff in streaming?destroymight be too much in cases, but I failed find any smaller gun to do this properly; is there any way to express this e.g. withmapped?Ofin the type offromR? I guess we'd instead need something more generic, such as Comonad's extract?Thanks for any comments!
Code example:
testprints:EDITS: fixed type of
testto compile with monomorphism restriction, added the note aboutfromR