Concurrent sources with conduit

Recently I've been using quite a bit of conduits. While building some tools to check for bugs in our dataset in dynamodb, I had to use the parallel scan feature. Since amazonka (the haskell aws sdk) uses conduit when paging, I had to find a way to get multiple sources running concurrently and feeding into the next stage of the pipeline.

After spending an embarassingly long time figuring this out, I'll present a way to run multiple conduit source and collapse them into one single source.

Dependencies and imports

conduit-combinators-1.1.1
async-2.1.0
stm-chans-3.0.0.4
stm-2.4.4.1
conduit-1.2.9

Most imports are qualified in case you're not familiar with the packages:

import qualified Conduit as C
import Conduit ((.|))  -- because C..| is ugly
import Data.Conduit (bracketP) -- for setup and cleanup

-- we'll need the async package for cancellation
import Control.Concurrent (threadDelay)
import qualified Control.Concurrent.Async as Async

-- queue utilities
import qualified Control.Concurrent.STM.TBMQueue as STM
import qualified Control.Concurrent.STM as STM

Some setup

First, some helpers functions are required.

Turning a queue into a source:

sourceQueue :: C.MonadIO m => STM.TBMQueue o -> C.ConduitM i o m ()
sourceQueue queue = loop
  where
    loop = do
        mbItem <- C.liftIO $ STM.atomically (STM.readTBMQueue queue)
        case mbItem of
            Nothing -> pure ()  -- queue closed
            Just item -> C.yield item *> loop

And the reverse: feeding a queue from a conduit:

sinkQueue queue = loop
  where
    loop = do
        mbItem <- C.await
        case mbItem of
            Nothing -> pure ()  -- no more items to come
            Just item -> do
                C.liftIO $ STM.atomically (STM.writeTBMQueue queue item)
                loop

Note that sinkQueue will not close the queue. Because multiple source will feed the same queue, we don't want the first source to terminate to close the queue and lose the inputs from the other sources.

Parallel sources

The tricky bit here is to correctly handle exceptions (and cancellation). Thankfully, there is bracketP.

First, the type signature:

parSources :: (C.MonadIO m, C.MonadResource m) => [C.ConduitM () o IO ()] -> C.ConduitM () o m ()
parSources = error "work in progress"

Given a list of conduits, collapse all of them into one conduit, which will terminate when all sources are done. The MonadResource constraint is a requirement of bracketP. This is what makes the initialisation and cleanup possible in a nicer way than a simple bracket.

parSources :: (C.MonadIO m, C.MonadResource m) => [C.ConduitM () o IO ()] -> C.ConduitM () o m ()
parSources sources = bracketP init cleanup finalSource
  where
    init = do
        -- create the queue where all sources will put their items
        queue <- STM.newTBMQueueIO 100

        -- In a separate thread, run concurrently all conduits
        a <- Async.async $ do
            Async.mapConcurrently_ (\source -> C.runConduit (source .| sinkQueue queue)) sources
            -- once all conduits are done, close the queue
            STM.atomically (STM.closeTBMQueue queue)
        pure (a, queue)
    cleanup (async, queue) = do
        -- upon exception or cancellation, close the queue and cancel the threads
        STM.atomically (STM.closeTBMQueue queue)
        Async.cancel async
    finalSource (_, queue) = sourceQueue queue

Putting it all together

First, a small helper to produce some items with a name and some delay between them

namedSource :: (C.MonadIO m) => String -> C.ConduitM () String m ()
namedSource name = do
    C.yieldMany [1..5] .| C.mapMC delayItem .| C.mapC (\i -> name ++ " - " ++ show i)
  where
    delayItem x = C.liftIO (threadDelay 500000) *> pure x

And a way to print the items going through a conduit to make sure they are properly streamed

logItem :: (Show a, C.MonadIO m) => a -> m a
logItem x = C.liftIO (print x) *> pure x

Finally, to run everything:

main = do
    C.runResourceT $ C.runConduit $
        parSources [source1, source2] .| C.mapMC logItem .| C.sinkNull
    print "all done"

And that will output in a streaming fashion:

"source 2 - 1"
"source 1 - 1"
"source 2 - 2"
"source 1 - 2"
"source 2 - 3"
"source 1 - 3"
"source 1 - 4"
"source 2 - 4"
"source 2 - 5"
"source 1 - 5"
"source 1 - 6"
"source 2 - 6"
"source 1 - 7"
"source 2 - 7"
"source 1 - 8"
"source 2 - 8"
"source 2 - 9"
"source 1 - 9"
"source 1 - 10"
"source 2 - 10"
"all done"

Conclusion

This can be used whenever lots of slow operations are done simultaneously and you want to process the results with the nice conduit abstraction. For example, getting 100 items from a parallel scan in dynamoDB, or the first 10 pages from a web crawl.

I'm still unsure how to get a more generic constraint for the sources:

parSources :: (C.MonadIO m, C.MonadResource m) => [C.ConduitM () o IO ()] -> C.ConduitM () o m ()

-- ideally I'd like the sources to have a generic type not directly involving IO:
MonadIO m => [C.ConduitM () o m ()] ...