Singleflight
Coalesce multiple identical calls into one, preventing thundering-herd/stampede to database/other backends
Install / Use
/learn @aarondwi/SingleflightREADME
singleflight
Coalesce multiple identical call into one, preventing thundering-herd/stampede to database/other backends
It is a python port of golang's groupcache singleflight implementation
This module does not provide caching mechanism. Rather, this module can used behind a caching abstraction to deduplicate cache-filling call
Only support python 3.5+
Installation
pip install singleflight
Usage
This modules has 3 implementation, which can be imported as follows
from singleflight.basic import SingleFlight # for multi-threaded apps
from singleflight.gevent import SingleFlightGevent as SingleFlight # for gevent apps
from singleflight.asynchronous import SingleFlightAsync as SingleFlight # for asyncio/curio apps
Then you can use it as follows (the example shows the multi-threaded version). Note that, key is important for the modules to know which call should be de-duplicated
from time import sleep
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from singleflight.basic import SingleFlight
if __name__ == '__main__':
sf = SingleFlight()
executor = ThreadPoolExecutor(max_workers=10)
counter = 0
result = "this is the result"
def work(num):
global counter, result
sleep(0.1) # emulate bit slower call
counter += 1
return (result, num)
res = []
for i in range(10):
sfc = partial(sf.call, work, "key", i+1)
r = executor.submit(sfc)
res.append(r)
for r in res:
assert r.result()[0] == result
# because only the first one can get the lock
# and only that one request call
assert r.result()[1] == 1
assert counter == 1
For decorator fans, you can also use it to wrap your function.
from time import sleep
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from singleflight.basic import SingleFlight
if __name__ == '__main__':
sf = SingleFlight()
executor = ThreadPoolExecutor(max_workers=10)
# success case
counter = 0
result = "this is the result"
@sf.wrap
def work(num):
global counter, result
sleep(0.1) # emulate bit slower call
counter += 1
return (result, num)
res = []
for i in range(10):
sfc = partial(work, "key", i+1)
r = executor.submit(sfc)
res.append(r)
for r in res:
assert r.result()[0] == result
# because only the first one can get the lock
# and only that one request call
assert r.result()[1] == 1
assert counter == 1
All *args and **kwargs your function has is passed directly later. Exceptions are also raised normally.
Related Skills
claude-opus-4-5-migration
90.0kMigrate prompts and code from Claude Sonnet 4.0, Sonnet 4.5, or Opus 4.1 to Opus 4.5
model-usage
343.1kUse CodexBar CLI local cost usage to summarize per-model usage for Codex or Claude, including the current (most recent) model or a full model breakdown. Trigger when asked for model-level usage/cost data from codexbar, or when you need a scriptable per-model summary from codexbar cost JSON.
feishu-drive
343.1k|
things-mac
343.1kManage Things 3 via the `things` CLI on macOS (add/update projects+todos via URL scheme; read/search/list from the local Things database)
