Improve pipeline thread usage

Description

Scope : core.async/pipeline (pipeline n to xf from)

The problem:

In cases where the the from channel producers are slow, or the channel is just empty, idling, or the fromchannel producers are "bursty", threads created at https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L547 will just block/wait and do nothing when no work is to be done. That would cost unnecessary memory use among other things, since these allocated threads are just not killed until the pipeline closes.

 

Quick recap of how it works now

Today it works by doing a dotimes over n to set parallelism and then calls async/thread per tick and in that scope run a consuming loop on the from channel that will then apply a transducer xf on the value passing through.

The code of interest.
https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async.clj#L547-L550

Note: these threads are from a cached thread pool meaning the pool will shrink/expand on demand, and keep un-used threads around for 60s before removing them from the pool. In our case, with pipeline, we acquire them at pipeline creation and do not release them until the pipeline closes.

That is great, as long as we can keep these threads busy, since they will be waiting for values from from, blocking each until they gets something.

 

Potential alternative

One alternative would be to move the call to async/threads inside a go-loop and “take” the result of the computation before recur to ensure we have only 1 task running at a time per go-loop.

Something like that : https://github.com/mpenet/core.async/commit/e9bb61d648df1b588824e1210da0e0642e771ae1

It's a very minimal change. We essentially leverage a go-loop to enable parking on take on the from channel (similar to pipeline-async) and then, only once we get a value from the chan do we call async/thread to process it, making sure to also park on take on the return value of async/thread call to ensure we respect parallelism set at pipeline creation, having only one running computation per go-loop at a time.

That would allow the used thread number to expand/shrink on demand, while still being bound by n. An idle pipeline would use 0 threads, so thread use for a task would be from 0-n depending on how busy the pipeline is.

Since these threads come from a CachedThreadPool there would be a lot of reuse in the cases where the from chan producers are as fast as the consumption (computation), essentially matching the current behavior and likely not causing any overhead of spinning new threads. Compared to now the only overhead would be from fetching an idle thread from the cached thread pool, it's for sure negligible.

One downside would be that for very slow producers, and very slow computation step, some inactive thread from the cached thread pool would eventually be killed after 60s idling, then causing some overhead to re-create them later, on demand. But arguably the overhead of creating a thread in that case is likely negligible since everything else is already very slow in the chain.

Then another question would be, if it’s decided that it’s worthy of a patch: should that be a new pipeline call or a patch on the current one.

I'd happily produce a clean patch with a proper test if there's interest in this.

Environment

None

Assignee

Unassigned

Reporter

Max Penet