SkillAgentSearch skills...

Tinyio

Ever used asyncio and wished you hadn't? A tiny (~400 lines) event loop for Python.

Install / Use

/learn @patrick-kidger/Tinyio
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

<h1 align="center">tinyio</h1> <h2 align="center">A tiny (~400 lines) event loop for Python</h2>

Ever used asyncio and wished you hadn't?

tinyio is a dead-simple event loop for Python, born out of my frustration with trying to get robust error handling with asyncio. (I'm not the only one running into its sharp corners: link1, link2.)

This is an alternative for the simple use-cases, where you just need an event loop, and want to crash the whole thing if anything goes wrong. (Raising an exception in every coroutine so it can clean up its resources.)

import tinyio

def slow_add_one(x: int):
    yield tinyio.sleep(1)
    return x + 1

def foo():
    four, five = yield [slow_add_one(3), slow_add_one(4)]
    return four, five

loop = tinyio.Loop()
out = loop.run(foo())
assert out == (4, 5)
  • Somewhat unusually, our syntax uses yield rather than await, but the behaviour is the same. Await another coroutine with yield coro. Await on multiple with yield [coro1, coro2, ...] (a 'gather' in asyncio terminology; a 'nursery' in trio terminology).
  • An error in one coroutine will cancel all coroutines across the entire event loop.
    • If the erroring coroutine is sequentially depended on by a chain of other coroutines, then we chain their tracebacks for easier debugging.
    • Errors propagate to and from synchronous operations ran in threads.
  • Can nest tinyio loops inside each other, none of this one-per-thread business.
  • Ludicrously simple. No need for futures, tasks, etc. Here's the entirety of the day-to-day API:
    tinyio.Loop
    tinyio.run_in_thread
    tinyio.sleep
    tinyio.CancelledError
    

Installation

pip install tinyio

Documentation

Loops

Create a loop with tinyio.Loop(). It has a single method, .run(coro), which consumes a coroutine, and which returns the output of that coroutine.

Coroutines can yield four possible things:

  • yield: yield nothing, this just pauses and gives other coroutines a chance to run.
  • yield coro: wait on a single coroutine, in which case we'll resume with the output of that coroutine once it is available.
  • yield [coro1, coro2, coro3]: wait on multiple coroutines by putting them in a list, and resume with a list of outputs once all have completed. This is what asyncio calls a 'gather' or 'TaskGroup', and what trio calls a 'nursery'.
  • yield {coro1, coro2, coro3}: schedule one or more coroutines but do not wait on their result - they will run independently in the background.

If you yield on the same coroutine multiple times (e.g. in a diamond dependency pattern) then the coroutine will be scheduled once, and on completion all dependees will receive its output. (You can even do this if the coroutine has already finished: yield on it to retrieve its output.)

Threading

Blocking functions can be ran in threads using tinyio.run_in_thread(fn, *args, **kwargs), which gives a coroutine you can yield on. Example:

import time, tinyio

def slow_blocking_add_one(x: int) -> int:
    time.sleep(1)
    return x + 1

def foo(x: int):
    out = yield [tinyio.run_in_thread(slow_blocking_add_one, x) for _ in range(3)]
    return out

loop = tinyio.Loop()
out = loop.run(foo(x=1))  # runs in one second, not three
assert out == [2, 2, 2]

Sleeping

This is tinyio.sleep(delay_in_seconds), which is a coroutine you can yield on.

Error propagation

If any coroutine raises an error, then:

  1. All coroutines across the entire loop will have tinyio.CancelledError raised in them (from whatever yield point they are currently waiting at).
  2. Any functions ran in threads via tinyio.run_in_thread will also have tinyio.CancelledError raised in the thread.
  3. The original error is raised out of loop.run(...). This behaviour can be configured (e.g. to collect errors into a BaseExceptionGroup) by setting loop.run(..., exception_group=None/False/True).

This gives every coroutine a chance to shut down gracefully. Debuggers like patdb offer the ability to navigate across exceptions in an exception group, allowing you to inspect the state of all coroutines that were related to the error.

Further documentation

<details><summary><h3>Synchronisation</h3> (Click to expand)</summary>

We ship batteries-included with the usual collection of standard operations for synchronisation.

tinyio.as_completed       tinyio.Semaphore
tinyio.Barrier            tinyio.ThreadPool
tinyio.Event              tinyio.timeout
tinyio.Lock               tinyio.TimeoutError

  • tinyio.as_completed({coro1, coro2, ...})

    This schedules multiple coroutines in the background (like yield {coro1, coro2, ...}), and then offers their results in the order they complete.

    This is iterated over in the following way, using its .done() and .get() methods:

    def main():
        iterator = yield tinyio.as_completed({coro1, coro2, coro3})
        for next_coro in iterator:
            result = yield next_coro
    

  • tinyio.Barrier(value)

    This has a single method barrier.wait(), which is a coroutine you can yield on. Once value many coroutines have yielded on this method then it will unblock.


  • tinyio.Event()

    This is a wrapper around a boolean flag, initialised with False. This has the following methods:

    • .is_set(): return the value of the flag.
    • .set(): set the flag to True.
    • .clear(): set the flag to False.
    • .wait(timeout_in_seconds=None), which is a coroutine you can yield on. This will unblock if the internal flag is True or if timeout_in_seconds seconds pass. (Typically the former is accomplished by calling .set() from another coroutine or from a thread.)

  • tinyio.Lock()

    This is just a convenience for tinyio.Semaphore(value=1), see below.


  • tinyio.Semaphore(value)

    This manages an internal counter that is initialised at value, is decremented when entering a region, and incremented when exiting. This blocks if this counter is at zero. In this way, at most value coroutines may acquire the semaphore at a time.

    This is used as:

    semaphore = Semaphore(value)
    
    ...
    
    with (yield semaphore()):
        ...
    

  • tinyio.timeout(coro, timeout_in_seconds)

    This is a coroutine you can yield on, used as output, success = yield tinyio.timeout(coro, timeout_in_seconds).

    This runs coro for at most timeout_in_seconds. If it succeeds in that time then the pair (output, True) is returned . Else this will return (None, False), and coro will be halted by raising tinyio.TimeoutError inside it.


  • tinyio.ThreadPool(max_threads)

    This is equivalent to making multiple tinyio.run_in_thread calls, but will limit the number of threads to at most max_threads. Additional work after that will block until a thread becomes available.

    This has two methods:

    • .run_in_thread(fn, *args, **kwargs), which is a coroutine you can yield on. This is equivalent to yield tinyio.run_in_thread(fn, *args, **kwargs).
    • .map(fn, xs), which is a coroutine you can yield on. This is equivalent to yield [tinyio.run_in_thread(fn, x) for x in xs].

</details> <details><summary><h3>Asynchronous context managers</h3> (Click to expand)</summary>

You can use the following pattern to implement context managers with asynchronous entry:

def my_coro():
    with (yield my_context_manager(x=5)) as val:
        print(f"Got val {val}")

where:

def my_context_manager(x):
    print("Initialising...")
    yield tinyio.sleep(1)
    print("Initialised")
    return make_context_manager(x)

@contextlib.contextmanager
def make_context_manager(x):
    try:
        yield x
    finally:
        print("Cleaning up")

This isn't anything fancier than just using a coroutine that returns a regular with-compatible context manager. See tinyio.Semaphore for an example of this pattern.

</details> <details><summary><h3>Asynchronous iterators</h3> (Click to expand)</summary>

You can use the following pattern to implement asychronous iterators:

def my_coro():
    for x in slow_range(5):
        x = yield x
        print(f"Got {x}")

where:

def slow_range(x):  # this function is an iterator-of-coroutines
    for i in range(x):
        yield slow_range_i(i)  # this `yield` statement is seen by the `for` loop

def slow_range_i(i):  # this function is a coroutine
    yield tinyio.sleep(1)  # this `yield` statement is seen by the `tinyio.Loop()`
    return i

Here we just have yield being used in a couple of different ways that you're already used to:

  • as a regular Python generator/iterator;
  • as a tinyio coroutine.

For an example of this, see tinyio.as_completed.

</details> <details><summary><h3>Integration with `asyncio` and `trio`</h3> (Click to expand)</summary>

We have support for putting trio event loops within asyncio/trio event loops, or vice-versa.

tinyio.to_asyncio         tinyio.to_trio
tinyio.from_asyncio       tinyio.from_trio

  • tinyio.to_asyncio(coro, exception_group=None)

    This converts a tinyio coroutine into an asyncio coroutine.

    For example:

    def add_one(x):
        yield tinyio.sleep(1)
        return x + 1
    
    async def foo(x):
        y = await tinyio.to_asyncio(add_one(x))
        return y
    
    asyncio.run(foo(3))
    

  • tinyio.from_asyncio(coro)

    This converts an asyncio coroutine into a tinyio coroutine.

    WARNING!
    This works by running the entire asyncio portion in a separate thread. This may lead to surprises if the asyncio and non-asyncio portions interact in non-threadsafe ways.

View on GitHub
GitHub Stars724
CategoryDevelopment
Updated2d ago
Forks21

Languages

Python

Security Score

95/100

Audited on Mar 29, 2026

No findings