Flux
Clojure wrapper for Netflix concurrency-limits — adaptive concurrency control based on TCP congestion algorithms.
Install / Use
/learn @mpenet/FluxREADME
flux
Clojure wrapper for Netflix concurrency-limits — adaptive concurrency control based on TCP congestion algorithms.
Rather than setting a fixed request-per-second cap, flux dynamically adjusts the number of allowed in-flight operations based on observed latency and error signals. The limit rises when things are going well and backs off when the system shows signs of saturation.
If you want to understand the motivation for this, watch this video: https://www.youtube.com/watch?v=m64SWl9bfvk, it does a better work than I would trying to explain the why/how.
Installation
Concepts
Limit algorithms
A limit is the algorithm that decides the current concurrency ceiling. It receives timing and error feedback after every operation and adjusts accordingly.
| Algorithm | Strategy | Good for |
|-----------|----------|----------|
| vegas-limit | Delay-based — watches queue build-up via RTT deviation | General server-side use |
| gradient2-limit | Tracks divergence between short and long RTT averages | Services with variable baseline latency |
| aimd-limit | Additive increase / multiplicative decrease on drop signals | Client-side, loss-driven environments |
| fixed-limit | Static ceiling, never adapts | Testing, or when you want a simple semaphore |
Limiter
A limiter wraps a limit algorithm and enforces it. simple-limiter is the standard choice: it maintains an atomic in-flight counter and rejects requests (returns nil from acquire!) when the counter reaches the current limit.
Listener lifecycle
Every successful acquire! returns a listener. You must call exactly one of three functions on it when the operation completes — this is how the algorithm learns:
| Function | When to use |
|----------|-------------|
| success! | Operation completed normally; RTT is recorded |
| ignore! | Operation failed for reasons unrelated to capacity (e.g. auth error, validation failure) — RTT is discarded |
| dropped! | Operation was rejected downstream or timed out — signals degradation; loss-based algorithms reduce the limit aggressively |
Forgetting to call one of these leaks a slot permanently.
Usage
Basic setup
(require '[s-exp.flux :as flux])
;; 1. Choose a limit algorithm
(def limit (flux/vegas-limit {:max-concurrency 200}))
;; 2. Create a limiter
(def limiter (flux/simple-limiter limit))
Low-level API
acquire! returns a listener on success, or nil if the limit is currently exceeded.
(if-let [listener (flux/acquire! limiter)]
(try
(let [result (do-work)]
(flux/success! listener)
result)
(catch Throwable t
(flux/dropped! listener)
(throw t)))
(handle-rejection))
acquire! accepts an optional context value passed through to the limit algorithm (useful for partitioned limiters):
(flux/acquire! limiter {:user-id "abc123"})
High-level API: attempt!
attempt! manages the acquire/signal lifecycle automatically:
;; Simple case — calls (do-work), signals success on return, dropped on exception
(flux/attempt! limiter do-work)
;; With options
(flux/attempt! limiter do-work
{:on-reject #(throw (ex-info "Too busy" {:status 503}))
:classify (fn [result]
(if (:error result) :dropped :success))
:context {:tenant "acme"}})
Options for attempt!:
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| :context | any | nil | Passed to acquire! |
| :on-reject | (fn []) | throws ex-info | Called when limit is exceeded |
| :classify | (fn [result]) | always :success | Maps the return value to :success, :ignore, or :dropped |
On exception, attempt! always signals :dropped and rethrows.
The default rejection throws:
(ex-info "Concurrency limit exceeded" {:type :s-exp.flux/limit-exceeded})
Limit algorithm options
vegas-limit
Delay-based algorithm. Estimates queue size from the ratio of minimum observed RTT to current RTT. A good default for most server-side use cases.
(flux/vegas-limit
{:initial-limit 20 ; starting concurrency
:max-concurrency 1000 ; hard ceiling
:smoothing 1.0 ; 0.0–1.0, lower = slower adaptation
:probe-multiplier 30}) ; how often to probe for a new RTT baseline
gradient2-limit
Tracks short-term vs long-term RTT gradient. More stable than Vegas under bursty load, at the cost of slower reaction.
(flux/gradient2-limit
{:initial-limit 20
:min-limit 20 ; floor — never goes below this
:max-concurrency 200
:smoothing 0.2 ; lower = more stable, slower to adapt
:rtt-tolerance 1.5 ; allow RTT to grow this much before backing off
:long-window 600 ; ms, baseline RTT measurement window
:queue-size 4}) ; extra buffer slots above the estimated limit
aimd-limit
Classic AIMD: increments the limit by 1 on success, multiplies down by backoff-ratio on a drop signal or timeout. Predictable behaviour, works well when the drop signal is clear.
(flux/aimd-limit
{:initial-limit 20
:min-limit 20
:max-limit 200
:backoff-ratio 0.9 ; 0.5–1.0, how aggressively to back off
:timeout-ns 5000000000}) ; 5s in nanoseconds — treat slow calls as drops
fixed-limit
Non-adaptive. Useful for testing or as a simple counting semaphore.
(flux/fixed-limit {:limit 50})
Limiter constructors
simple-limiter
The standard limiter. Immediately rejects (acquire! returns nil) when the in-flight count reaches the current limit.
(flux/simple-limiter (flux/vegas-limit {}))
(flux/simple-limiter (flux/vegas-limit {}) {:name "my-service"}) ; :name is optional, used for metrics
blocking-limiter
Wraps any limiter. Instead of rejecting when the limit is reached, the calling thread blocks until a slot becomes free or the timeout expires. On timeout or interrupt, acquire! returns nil.
Useful for batch clients, background workers, or any context where queuing up behind backpressure is preferable to fast-failing.
(def limiter
(flux/blocking-limiter
(flux/simple-limiter (flux/vegas-limit {}))
{:timeout-ms 5000})) ; block for up to 5 seconds, then return nil
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| :timeout-ms | long | 1 hour | Max time to block waiting for a slot |
lifo-blocking-limiter
Like blocking-limiter but queues waiting threads in last-in, first-out order. When a slot opens up, the most recently queued thread is unblocked first. This means the oldest queued requests time out and shed load first, keeping the queue fresh and favouring availability over tail latency.
The backlog has a bounded size. When it fills up, further requests are rejected immediately rather than queuing.
(flux/lifo-blocking-limiter
(flux/simple-limiter (flux/vegas-limit {}))
{:backlog-size 100 ; max threads waiting; excess rejected immediately
:backlog-timeout-ms 1000 ; how long a queued thread waits before giving up
})
The backlog timeout can also be derived dynamically from the acquire context, which lets you implement per-tenant or per-priority timeouts:
(flux/lifo-blocking-limiter
(flux/simple-limiter (flux/vegas-limit {}))
{:backlog-timeout-fn (fn [ctx] (get ctx :timeout-ms 500))})
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| :backlog-size | int | 100 | Max queued threads; excess are rejected immediately |
| :backlog-timeout-ms | long | 1000 | Fixed timeout in ms for queued threads |
| :backlog-timeout-fn | (fn [ctx] -> long ms) | nil | Dynamic timeout derived from context; overrides :backlog-timeout-ms |
partitioned-limiter
Wraps any AbstractLimiter and divides its capacity among named partitions according to fixed ratios. Each partition gets a guaranteed slice of the adaptive limit:
partition-budget = floor(current-total-limit × ratio)
Requests that resolve to a known partition are admitted only when that partition's in-flight count is below its budget. Requests that resolve to an unknown or nil key use the leftover overflow capacity — the portion not allocated to any named partition. The underlying limiter still enforces the global ceiling; partitioning is a pure admission gate on top.
(def limiter
(flux/partitioned-limiter
(flux/simple-limiter (flux/vegas-limit {:max-concurrency 100}))
(fn [ctx] (get-in ctx [:headers "x-tier"]))
{"live" 0.8
"batch" 0.1}))
; remaining 0.1 is overflow capacity for unrecognised tier values
acquire! and attempt! work exactly as usual — pass the context that partition-by expects:
(flux/acquire! limiter {"x-tier" "live"})
(flux/attempt! limiter do-work :context {"x-tier" "batch"})
| Argument | Type | Description |
|----------|------|-------------|
| limiter | AbstractLimiter | The underlying limiter (e.g. from simple-limiter) |
| partition-by | (fn [context] -> key \| nil) | Extracts a partition key from the acquire context |
| partitions | {key double} | Map of partition key → ratio (0.0–1.0); ratios should sum to ≤ 1.0 |
With the Ring middleware, supply a :context-fn that returns whatever partition-by expects:
(flux.ring/wrap-concurrency-limit handler
(flux/partitioned-limiter
(flux/simple-limiter (flux/vegas-limit {}))
:tier ; keyword lookup on the context map
{:live 0.8
:batch 0.1})
{:context-fn (fn [req] {:tier (keyword (get-in req [:headers "x-tier"]))})})
Ring middleware
`s-exp.flux.ring/wrap-concurrency-limit
