Tinyio
Ever used asyncio and wished you hadn't? A tiny (~400 lines) event loop for Python.
Install / Use
/learn @patrick-kidger/TinyioREADME
Ever used asyncio and wished you hadn't?
tinyio is a dead-simple event loop for Python, born out of my frustration with trying to get robust error handling with asyncio. (I'm not the only one running into its sharp corners: link1, link2.)
This is an alternative for the simple use-cases, where you just need an event loop, and want to crash the whole thing if anything goes wrong. (Raising an exception in every coroutine so it can clean up its resources.)
import tinyio
def slow_add_one(x: int):
yield tinyio.sleep(1)
return x + 1
def foo():
four, five = yield [slow_add_one(3), slow_add_one(4)]
return four, five
loop = tinyio.Loop()
out = loop.run(foo())
assert out == (4, 5)
- Somewhat unusually, our syntax uses
yieldrather thanawait, but the behaviour is the same. Await another coroutine withyield coro. Await on multiple withyield [coro1, coro2, ...](a 'gather' inasyncioterminology; a 'nursery' intrioterminology). - An error in one coroutine will cancel all coroutines across the entire event loop.
- If the erroring coroutine is sequentially depended on by a chain of other coroutines, then we chain their tracebacks for easier debugging.
- Errors propagate to and from synchronous operations ran in threads.
- Can nest tinyio loops inside each other, none of this one-per-thread business.
- Ludicrously simple. No need for futures, tasks, etc. Here's the entirety of the day-to-day API:
tinyio.Loop tinyio.run_in_thread tinyio.sleep tinyio.CancelledError
Installation
pip install tinyio
Documentation
Loops
Create a loop with tinyio.Loop(). It has a single method, .run(coro), which consumes a coroutine, and which returns the output of that coroutine.
Coroutines can yield four possible things:
yield: yield nothing, this just pauses and gives other coroutines a chance to run.yield coro: wait on a single coroutine, in which case we'll resume with the output of that coroutine once it is available.yield [coro1, coro2, coro3]: wait on multiple coroutines by putting them in a list, and resume with a list of outputs once all have completed. This is whatasynciocalls a 'gather' or 'TaskGroup', and whattriocalls a 'nursery'.yield {coro1, coro2, coro3}: schedule one or more coroutines but do not wait on their result - they will run independently in the background.
If you yield on the same coroutine multiple times (e.g. in a diamond dependency pattern) then the coroutine will be scheduled once, and on completion all dependees will receive its output. (You can even do this if the coroutine has already finished: yield on it to retrieve its output.)
Threading
Blocking functions can be ran in threads using tinyio.run_in_thread(fn, *args, **kwargs), which gives a coroutine you can yield on. Example:
import time, tinyio
def slow_blocking_add_one(x: int) -> int:
time.sleep(1)
return x + 1
def foo(x: int):
out = yield [tinyio.run_in_thread(slow_blocking_add_one, x) for _ in range(3)]
return out
loop = tinyio.Loop()
out = loop.run(foo(x=1)) # runs in one second, not three
assert out == [2, 2, 2]
Sleeping
This is tinyio.sleep(delay_in_seconds), which is a coroutine you can yield on.
Error propagation
If any coroutine raises an error, then:
- All coroutines across the entire loop will have
tinyio.CancelledErrorraised in them (from whateveryieldpoint they are currently waiting at). - Any functions ran in threads via
tinyio.run_in_threadwill also havetinyio.CancelledErrorraised in the thread. - The original error is raised out of
loop.run(...). This behaviour can be configured (e.g. to collect errors into aBaseExceptionGroup) by settingloop.run(..., exception_group=None/False/True).
This gives every coroutine a chance to shut down gracefully. Debuggers like patdb offer the ability to navigate across exceptions in an exception group, allowing you to inspect the state of all coroutines that were related to the error.
Further documentation
<details><summary><h3>Synchronisation</h3> (Click to expand)</summary>We ship batteries-included with the usual collection of standard operations for synchronisation.
tinyio.as_completed tinyio.Semaphore
tinyio.Barrier tinyio.ThreadPool
tinyio.Event tinyio.timeout
tinyio.Lock tinyio.TimeoutError
-
tinyio.as_completed({coro1, coro2, ...})This schedules multiple coroutines in the background (like
yield {coro1, coro2, ...}), and then offers their results in the order they complete.This is iterated over in the following way, using its
.done()and.get()methods:def main(): iterator = yield tinyio.as_completed({coro1, coro2, coro3}) for next_coro in iterator: result = yield next_coro
-
tinyio.Barrier(value)This has a single method
barrier.wait(), which is a coroutine you canyieldon. Oncevaluemany coroutines have yielded on this method then it will unblock.
-
tinyio.Event()This is a wrapper around a boolean flag, initialised with
False. This has the following methods:.is_set(): return the value of the flag..set(): set the flag toTrue..clear(): set the flag toFalse..wait(timeout_in_seconds=None), which is a coroutine you canyieldon. This will unblock if the internal flag isTrueor iftimeout_in_secondsseconds pass. (Typically the former is accomplished by calling.set()from another coroutine or from a thread.)
-
tinyio.Lock()This is just a convenience for
tinyio.Semaphore(value=1), see below.
-
tinyio.Semaphore(value)This manages an internal counter that is initialised at
value, is decremented when entering a region, and incremented when exiting. This blocks if this counter is at zero. In this way, at mostvaluecoroutines may acquire the semaphore at a time.This is used as:
semaphore = Semaphore(value) ... with (yield semaphore()): ...
-
tinyio.timeout(coro, timeout_in_seconds)This is a coroutine you can
yieldon, used asoutput, success = yield tinyio.timeout(coro, timeout_in_seconds).This runs
corofor at mosttimeout_in_seconds. If it succeeds in that time then the pair(output, True)is returned . Else this will return(None, False), andcorowill be halted by raisingtinyio.TimeoutErrorinside it.
-
tinyio.ThreadPool(max_threads)This is equivalent to making multiple
tinyio.run_in_threadcalls, but will limit the number of threads to at mostmax_threads. Additional work after that will block until a thread becomes available.This has two methods:
.run_in_thread(fn, *args, **kwargs), which is a coroutine you canyieldon. This is equivalent toyield tinyio.run_in_thread(fn, *args, **kwargs)..map(fn, xs), which is a coroutine you canyieldon. This is equivalent toyield [tinyio.run_in_thread(fn, x) for x in xs].
</details> <details><summary><h3>Asynchronous context managers</h3> (Click to expand)</summary>
You can use the following pattern to implement context managers with asynchronous entry:
def my_coro():
with (yield my_context_manager(x=5)) as val:
print(f"Got val {val}")
where:
def my_context_manager(x):
print("Initialising...")
yield tinyio.sleep(1)
print("Initialised")
return make_context_manager(x)
@contextlib.contextmanager
def make_context_manager(x):
try:
yield x
finally:
print("Cleaning up")
This isn't anything fancier than just using a coroutine that returns a regular with-compatible context manager. See tinyio.Semaphore for an example of this pattern.
You can use the following pattern to implement asychronous iterators:
def my_coro():
for x in slow_range(5):
x = yield x
print(f"Got {x}")
where:
def slow_range(x): # this function is an iterator-of-coroutines
for i in range(x):
yield slow_range_i(i) # this `yield` statement is seen by the `for` loop
def slow_range_i(i): # this function is a coroutine
yield tinyio.sleep(1) # this `yield` statement is seen by the `tinyio.Loop()`
return i
Here we just have yield being used in a couple of different ways that you're already used to:
- as a regular Python generator/iterator;
- as a
tinyiocoroutine.
For an example of this, see tinyio.as_completed.
We have support for putting trio event loops within asyncio/trio event loops, or vice-versa.
tinyio.to_asyncio tinyio.to_trio
tinyio.from_asyncio tinyio.from_trio
-
tinyio.to_asyncio(coro, exception_group=None)This converts a
tinyiocoroutine into anasynciocoroutine.For example:
def add_one(x): yield tinyio.sleep(1) return x + 1 async def foo(x): y = await tinyio.to_asyncio(add_one(x)) return y asyncio.run(foo(3))
-
tinyio.from_asyncio(coro)This converts an
asynciocoroutine into atinyiocoroutine.WARNING!
This works by running the entireasyncioportion in a separate thread. This may lead to surprises if theasyncioand non-asyncioportions interact in non-threadsafe ways.
