SkillAgentSearch skills...

Flux

Clojure wrapper for Netflix concurrency-limits — adaptive concurrency control based on TCP congestion algorithms.

Install / Use

/learn @mpenet/Flux
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

flux

Clojure wrapper for Netflix concurrency-limits — adaptive concurrency control based on TCP congestion algorithms.

Rather than setting a fixed request-per-second cap, flux dynamically adjusts the number of allowed in-flight operations based on observed latency and error signals. The limit rises when things are going well and backs off when the system shows signs of saturation.

If you want to understand the motivation for this, watch this video: https://www.youtube.com/watch?v=m64SWl9bfvk, it does a better work than I would trying to explain the why/how.

Installation

Clojars Project

Concepts

Limit algorithms

A limit is the algorithm that decides the current concurrency ceiling. It receives timing and error feedback after every operation and adjusts accordingly.

| Algorithm | Strategy | Good for | |-----------|----------|----------| | vegas-limit | Delay-based — watches queue build-up via RTT deviation | General server-side use | | gradient2-limit | Tracks divergence between short and long RTT averages | Services with variable baseline latency | | aimd-limit | Additive increase / multiplicative decrease on drop signals | Client-side, loss-driven environments | | fixed-limit | Static ceiling, never adapts | Testing, or when you want a simple semaphore |

Limiter

A limiter wraps a limit algorithm and enforces it. simple-limiter is the standard choice: it maintains an atomic in-flight counter and rejects requests (returns nil from acquire!) when the counter reaches the current limit.

Listener lifecycle

Every successful acquire! returns a listener. You must call exactly one of three functions on it when the operation completes — this is how the algorithm learns:

| Function | When to use | |----------|-------------| | success! | Operation completed normally; RTT is recorded | | ignore! | Operation failed for reasons unrelated to capacity (e.g. auth error, validation failure) — RTT is discarded | | dropped! | Operation was rejected downstream or timed out — signals degradation; loss-based algorithms reduce the limit aggressively |

Forgetting to call one of these leaks a slot permanently.

Usage

Basic setup

(require '[s-exp.flux :as flux])

;; 1. Choose a limit algorithm
(def limit (flux/vegas-limit {:max-concurrency 200}))

;; 2. Create a limiter
(def limiter (flux/simple-limiter limit))

Low-level API

acquire! returns a listener on success, or nil if the limit is currently exceeded.

(if-let [listener (flux/acquire! limiter)]
  (try
    (let [result (do-work)]
      (flux/success! listener)
      result)
    (catch Throwable t
      (flux/dropped! listener)
      (throw t)))
  (handle-rejection))

acquire! accepts an optional context value passed through to the limit algorithm (useful for partitioned limiters):

(flux/acquire! limiter {:user-id "abc123"})

High-level API: attempt!

attempt! manages the acquire/signal lifecycle automatically:

;; Simple case — calls (do-work), signals success on return, dropped on exception
(flux/attempt! limiter do-work)

;; With options
(flux/attempt! limiter do-work
  {:on-reject #(throw (ex-info "Too busy" {:status 503}))
   :classify  (fn [result]
                (if (:error result) :dropped :success))
   :context   {:tenant "acme"}})

Options for attempt!:

| Key | Type | Default | Description | |-----|------|---------|-------------| | :context | any | nil | Passed to acquire! | | :on-reject | (fn []) | throws ex-info | Called when limit is exceeded | | :classify | (fn [result]) | always :success | Maps the return value to :success, :ignore, or :dropped |

On exception, attempt! always signals :dropped and rethrows.

The default rejection throws:

(ex-info "Concurrency limit exceeded" {:type :s-exp.flux/limit-exceeded})

Limit algorithm options

vegas-limit

Delay-based algorithm. Estimates queue size from the ratio of minimum observed RTT to current RTT. A good default for most server-side use cases.

(flux/vegas-limit
  {:initial-limit    20    ; starting concurrency
   :max-concurrency  1000  ; hard ceiling
   :smoothing        1.0   ; 0.0–1.0, lower = slower adaptation
   :probe-multiplier 30})  ; how often to probe for a new RTT baseline

gradient2-limit

Tracks short-term vs long-term RTT gradient. More stable than Vegas under bursty load, at the cost of slower reaction.

(flux/gradient2-limit
  {:initial-limit   20
   :min-limit       20     ; floor — never goes below this
   :max-concurrency 200
   :smoothing       0.2    ; lower = more stable, slower to adapt
   :rtt-tolerance   1.5    ; allow RTT to grow this much before backing off
   :long-window     600    ; ms, baseline RTT measurement window
   :queue-size      4})    ; extra buffer slots above the estimated limit

aimd-limit

Classic AIMD: increments the limit by 1 on success, multiplies down by backoff-ratio on a drop signal or timeout. Predictable behaviour, works well when the drop signal is clear.

(flux/aimd-limit
  {:initial-limit 20
   :min-limit     20
   :max-limit     200
   :backoff-ratio 0.9          ; 0.5–1.0, how aggressively to back off
   :timeout-ns    5000000000}) ; 5s in nanoseconds — treat slow calls as drops

fixed-limit

Non-adaptive. Useful for testing or as a simple counting semaphore.

(flux/fixed-limit {:limit 50})

Limiter constructors

simple-limiter

The standard limiter. Immediately rejects (acquire! returns nil) when the in-flight count reaches the current limit.

(flux/simple-limiter (flux/vegas-limit {}))
(flux/simple-limiter (flux/vegas-limit {}) {:name "my-service"}) ; :name is optional, used for metrics

blocking-limiter

Wraps any limiter. Instead of rejecting when the limit is reached, the calling thread blocks until a slot becomes free or the timeout expires. On timeout or interrupt, acquire! returns nil.

Useful for batch clients, background workers, or any context where queuing up behind backpressure is preferable to fast-failing.

(def limiter
  (flux/blocking-limiter
    (flux/simple-limiter (flux/vegas-limit {}))
    {:timeout-ms 5000})) ; block for up to 5 seconds, then return nil

| Option | Type | Default | Description | |--------|------|---------|-------------| | :timeout-ms | long | 1 hour | Max time to block waiting for a slot |

lifo-blocking-limiter

Like blocking-limiter but queues waiting threads in last-in, first-out order. When a slot opens up, the most recently queued thread is unblocked first. This means the oldest queued requests time out and shed load first, keeping the queue fresh and favouring availability over tail latency.

The backlog has a bounded size. When it fills up, further requests are rejected immediately rather than queuing.

(flux/lifo-blocking-limiter
  (flux/simple-limiter (flux/vegas-limit {}))
  {:backlog-size       100   ; max threads waiting; excess rejected immediately
   :backlog-timeout-ms 1000  ; how long a queued thread waits before giving up
   })

The backlog timeout can also be derived dynamically from the acquire context, which lets you implement per-tenant or per-priority timeouts:

(flux/lifo-blocking-limiter
  (flux/simple-limiter (flux/vegas-limit {}))
  {:backlog-timeout-fn (fn [ctx] (get ctx :timeout-ms 500))})

| Option | Type | Default | Description | |--------|------|---------|-------------| | :backlog-size | int | 100 | Max queued threads; excess are rejected immediately | | :backlog-timeout-ms | long | 1000 | Fixed timeout in ms for queued threads | | :backlog-timeout-fn | (fn [ctx] -> long ms) | nil | Dynamic timeout derived from context; overrides :backlog-timeout-ms |

partitioned-limiter

Wraps any AbstractLimiter and divides its capacity among named partitions according to fixed ratios. Each partition gets a guaranteed slice of the adaptive limit:

partition-budget = floor(current-total-limit × ratio)

Requests that resolve to a known partition are admitted only when that partition's in-flight count is below its budget. Requests that resolve to an unknown or nil key use the leftover overflow capacity — the portion not allocated to any named partition. The underlying limiter still enforces the global ceiling; partitioning is a pure admission gate on top.

(def limiter
  (flux/partitioned-limiter
    (flux/simple-limiter (flux/vegas-limit {:max-concurrency 100}))
    (fn [ctx] (get-in ctx [:headers "x-tier"]))
    {"live"  0.8
     "batch" 0.1}))
     ; remaining 0.1 is overflow capacity for unrecognised tier values

acquire! and attempt! work exactly as usual — pass the context that partition-by expects:

(flux/acquire! limiter {"x-tier" "live"})

(flux/attempt! limiter do-work :context {"x-tier" "batch"})

| Argument | Type | Description | |----------|------|-------------| | limiter | AbstractLimiter | The underlying limiter (e.g. from simple-limiter) | | partition-by | (fn [context] -> key \| nil) | Extracts a partition key from the acquire context | | partitions | {key double} | Map of partition key → ratio (0.0–1.0); ratios should sum to ≤ 1.0 |

With the Ring middleware, supply a :context-fn that returns whatever partition-by expects:

(flux.ring/wrap-concurrency-limit handler
  (flux/partitioned-limiter
    (flux/simple-limiter (flux/vegas-limit {}))
    :tier   ; keyword lookup on the context map
    {:live  0.8
     :batch 0.1})
  {:context-fn (fn [req] {:tier (keyword (get-in req [:headers "x-tier"]))})})

Ring middleware

`s-exp.flux.ring/wrap-concurrency-limit

View on GitHub
GitHub Stars12
CategoryDevelopment
Updated6d ago
Forks1

Languages

Clojure

Security Score

90/100

Audited on Mar 31, 2026

No findings