I read Paul Ingles’ post “From Callbacks to Sequences” which led me to Christophe Grand’s handy pipe function. The
pipe function gives you two things wrapped in a vector: a sequence to pass on for downstream consumption, and a function for feeding the pipe which in turn adds more to that sequence. I’ll reproduce the code here, with a slight alteration to limit the underlying queue as to avoid OOM:
This is great if you want to have a thread pushing stuff into the pipe using the ‘feeder’ function, and then to pass around a lazy sequence representing whatever is in the pipe. Nice.
I had a situation which was a bit different:
- I have a sequence and I want to do some processing on it to return another lazy sequence, exactly like what the
- But actually I want to do some parallel processing of the sequence, so therefore
pmapwould seem like a better choice.
- In addition, I want to be be able to govern how many threads are used during the pmap operation, and I would also like to explicitly set some kind of internal buffering so that downstream consumers will nearly always have stuff to work on.
We used the pipe function to write this.
This basically uses the pipe function having n threads feeding the pipe and then closing it when all threads are done. The act of closing it is performed by the supervisor thread and the use of a countdown latch.
Here’s an example creating a pipe-seq and consuming it:
The above takes ten seconds – ten threads consuming the seq.
We’ve got some code chaining up a few calls to pipe-seq, the consumer function being fanned out to different virtual thread-pools of different sizes, depending on the type of operation being performed.
One consideration of this approach is that the queues are kept as an internal detail of the pipe-seq function. If you want visibility over the queues this could be an issue and this approach may not be for you.
Feedback welcome, particularly if there’s a better way.