Concurrent sources with conduit

Recently I've been using quite a bit of conduits. While building some tools to check for bugs in our dataset in dynamodb, I had to use the parallel scan feature. Since amazonka (the haskell aws sdk) uses conduit when paging, I had to find a way to get multiple sources running concurrently and feeding into the next stage of the pipeline.

After spending an embarassingly long time figuring this out, I'll present a way to run multiple conduit source and collapse them into one single source.

Dependencies and imports

conduit-combinators-1.1.1
async-2.1.0
stm-chans-3.0.0.4
stm-2.4.4.1
conduit-1.2.9

Most imports are qualified in case you're not familiar with the packages:

Some setup

First, some helpers functions are required.

Turning a queue into a source:

And the reverse: feeding a queue from a conduit:

Note that sinkQueue will not close the queue. Because multiple source will feed the same queue, we don't want the first source to terminate to close the queue and lose the inputs from the other sources.

Parallel sources

The tricky bit here is to correctly handle exceptions (and cancellation). Thankfully, there is bracketP.

First, the type signature:

Given a list of conduits, collapse all of them into one conduit, which will terminate when all sources are done. The MonadResource constraint is a requirement of bracketP. This is what makes the initialisation and cleanup possible in a nicer way than a simple bracket.

Putting it all together

First, a small helper to produce some items with a name and some delay between them

And a way to print the items going through a conduit to make sure they are properly streamed

Finally, to run everything:

And that will output in a streaming fashion:

"source 2 - 1"
"source 1 - 1"
"source 2 - 2"
"source 1 - 2"
"source 2 - 3"
"source 1 - 3"
"source 1 - 4"
"source 2 - 4"
"source 2 - 5"
"source 1 - 5"
"source 1 - 6"
"source 2 - 6"
"source 1 - 7"
"source 2 - 7"
"source 1 - 8"
"source 2 - 8"
"source 2 - 9"
"source 1 - 9"
"source 1 - 10"
"source 2 - 10"
"all done"

Conclusion

This can be used whenever lots of slow operations are done simultaneously and you want to process the results with the nice conduit abstraction. For example, getting 100 items from a parallel scan in dynamoDB, or the first 10 pages from a web crawl.

I'm still unsure how to get a more generic constraint for the sources: