Recently I've been using quite a bit of conduits. While building some tools to check for bugs in our dataset in dynamodb, I had to use the parallel scan feature. Since amazonka (the haskell aws sdk) uses conduit when paging, I had to find a way to get multiple sources running concurrently and feeding into the next stage of the pipeline.
After spending an embarassingly long time figuring this out, I'll present a way to run multiple conduit source and collapse them into one single source.
Dependencies and imports
conduit-combinators-1.1.1 async-2.1.0 stm-chans-184.108.40.206 stm-220.127.116.11 conduit-1.2.9
Most imports are qualified in case you're not familiar with the packages:
import qualified Conduit as C import Conduit ((.|)) -- because C..| is ugly import Data.Conduit (bracketP) -- for setup and cleanup -- we'll need the async package for cancellation import Control.Concurrent (threadDelay) import qualified Control.Concurrent.Async as Async -- queue utilities import qualified Control.Concurrent.STM.TBMQueue as STM import qualified Control.Concurrent.STM as STM
First, some helpers functions are required.
Turning a queue into a source:
sourceQueue :: C.MonadIO m => STM.TBMQueue o -> C.ConduitM i o m () sourceQueue queue = loop where loop = do mbItem <- C.liftIO $ STM.atomically (STM.readTBMQueue queue) case mbItem of Nothing -> pure () -- queue closed Just item -> C.yield item *> loop
And the reverse: feeding a queue from a conduit:
sinkQueue queue = loop where loop = do mbItem <- C.await case mbItem of Nothing -> pure () -- no more items to come Just item -> do C.liftIO $ STM.atomically (STM.writeTBMQueue queue item) loop
sinkQueue will not close the queue. Because multiple source will feed the same queue, we don't want the first source to terminate to close the queue and lose the inputs from the other sources.
The tricky bit here is to correctly handle exceptions (and cancellation). Thankfully, there is bracketP.
First, the type signature:
parSources :: (C.MonadIO m, C.MonadResource m) => [C.ConduitM () o IO ()] -> C.ConduitM () o m () parSources = error "work in progress"
Given a list of conduits, collapse all of them into one conduit, which will terminate when all sources are done.
MonadResource constraint is a requirement of bracketP. This is what makes the initialisation and cleanup possible in a nicer way than a simple
parSources :: (C.MonadIO m, C.MonadResource m) => [C.ConduitM () o IO ()] -> C.ConduitM () o m () parSources sources = bracketP init cleanup finalSource where init = do -- create the queue where all sources will put their items queue <- STM.newTBMQueueIO 100 -- In a separate thread, run concurrently all conduits a <- Async.async $ do Async.mapConcurrently_ (\source -> C.runConduit (source .| sinkQueue queue)) sources -- once all conduits are done, close the queue STM.atomically (STM.closeTBMQueue queue) pure (a, queue) cleanup (async, queue) = do -- upon exception or cancellation, close the queue and cancel the threads STM.atomically (STM.closeTBMQueue queue) Async.cancel async finalSource (_, queue) = sourceQueue queue
Putting it all together
First, a small helper to produce some items with a name and some delay between them
namedSource :: (C.MonadIO m) => String -> C.ConduitM () String m () namedSource name = do C.yieldMany [1..5] .| C.mapMC delayItem .| C.mapC (\i -> name ++ " - " ++ show i) where delayItem x = C.liftIO (threadDelay 500000) *> pure x
And a way to print the items going through a conduit to make sure they are properly streamed
logItem :: (Show a, C.MonadIO m) => a -> m a logItem x = C.liftIO (print x) *> pure x
Finally, to run everything:
main = do C.runResourceT $ C.runConduit $ parSources [source1, source2] .| C.mapMC logItem .| C.sinkNull print "all done"
And that will output in a streaming fashion:
"source 2 - 1" "source 1 - 1" "source 2 - 2" "source 1 - 2" "source 2 - 3" "source 1 - 3" "source 1 - 4" "source 2 - 4" "source 2 - 5" "source 1 - 5" "source 1 - 6" "source 2 - 6" "source 1 - 7" "source 2 - 7" "source 1 - 8" "source 2 - 8" "source 2 - 9" "source 1 - 9" "source 1 - 10" "source 2 - 10" "all done"
This can be used whenever lots of slow operations are done simultaneously and you want to process the results with the nice conduit abstraction. For example, getting 100 items from a parallel scan in dynamoDB, or the first 10 pages from a web crawl.
I'm still unsure how to get a more generic constraint for the sources:
parSources :: (C.MonadIO m, C.MonadResource m) => [C.ConduitM () o IO ()] -> C.ConduitM () o m () -- ideally I'd like the sources to have a generic type not directly involving IO: MonadIO m => [C.ConduitM () o m ()] ...