SkillAgentSearch skills...

Aiomsg

Smart sockets and scalability protocols for messaging network applications

Install / Use

/learn @cjrh/Aiomsg
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

.. image:: https://github.com/cjrh/aiomsg/workflows/Python%20application/badge.svg :target: https://github.com/cjrh/aiomsg/actions

.. image:: https://img.shields.io/badge/stdlib--only-yes-green.svg :target: https://img.shields.io/badge/stdlib--only-yes-green.svg

.. image:: https://coveralls.io/repos/github/cjrh/aiomsg/badge.svg?branch=master :target: https://coveralls.io/github/cjrh/aiomsg?branch=master

.. image:: https://img.shields.io/pypi/pyversions/aiomsg.svg :target: https://pypi.python.org/pypi/aiomsg

.. image:: https://img.shields.io/github/tag/cjrh/aiomsg.svg :target: https://img.shields.io/github/tag/cjrh/aiomsg.svg

.. image:: https://img.shields.io/badge/install-pip%20install%20aiomsg-ff69b4.svg :target: https://img.shields.io/badge/install-pip%20install%20aiomsg-ff69b4.svg

.. image:: https://img.shields.io/pypi/v/aiomsg.svg :target: https://img.shields.io/pypi/v/aiomsg.svg

.. image:: https://img.shields.io/badge/calver-YYYY.MM.MINOR-22bfda.svg :target: http://calver.org/

.. image:: https://img.shields.io/badge/code%20style-black-000000.svg :target: https://github.com/ambv/black

aiomsg

Pure-Python smart sockets (like ZMQ) for simpler networking

.. figure:: https://upload.wikimedia.org/wikipedia/commons/5/5e/NetworkDecentral.svg :target: https://commons.wikimedia.org/wiki/File:NetworkDecentral.svg :alt: Diagram of computers linked up in a network

:sub:`Attribution: And1mu [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)]`

Table of Contents

.. contents::

Demo

Let's make two microservices; one will send the current time to the other. Here's the end that binds to a port (a.k.a, the "server"):

.. code-block:: python3

import asyncio, time
from aiomsg import Søcket

async def main():
    async with Søcket() as sock:
        await sock.bind('127.0.0.1', 25000)
        while True:
            await sock.send(time.ctime().encode())
            await asyncio.sleep(1)

asyncio.run(main())

Running as a different process, here is the end that does the connecting (a.k.a, the "client"):

.. code-block:: python3

import asyncio
from aiomsg import Søcket

async def main():
    async with Søcket() as sock:
        await sock.connect('127.0.0.1', 25000)
        async for msg in sock.messages():
            print(msg.decode())

asyncio.run(main())

Note that these are both complete, runnable programs, not fragments.

Looks a lot like conventional socket programming, except that these sockets have a few extra tricks. These are described in more detail further down in rest of this document.

Inspiration

Looks a lot like ZeroMQ yes? no? Well if you don't know anything about ZeroMQ, that's fine too. The rest of this document will assume that you don't know anything about ZeroMQ. aiomsg is heavily influenced by ZeroMQ.

There are some differences; hopefully they make things simpler than zmq. For one thing, aiomsg is pure-python so no compilation step is required, and relies only on the Python standard library (and that won't change).

Also, we don't have special kinds of socket pairs like ZeroMQ has. There is only the one Søcket class. The only role distinction you need to make between different socket instances is this: some sockets will bind and others will connect.

This is the leaky part of the API that comes from the underlying BSD socket API. A bind socket will bind to a local interface and port. A connect socket must connect to a bind socket, which can be on the same machine or a remote machine. This is the only complicated bit. You must decide, in a distributed microservices architecture, which sockets must bind and which must connect. A useful heuristic is that the service which is more likely to require horizontal scaling should have the connect sockets. This is because the hostnames to which they will connect (these will be the bind sockets) will be long-lived.

Introduction

What you see above in the demo is pretty much a typical usage of network sockets. So what's special about aiomsg? These are the high-level features:

#. Messages, not streams:

Send and receive are *message-based*, not stream based. Much easier! This
does mean that if you want to transmit large amounts of data, you're going
to have have to break them up yourself, send the pieces, and put them
back together on the other side.

#. Automatic reconnection

These sockets automatically reconnect. You don't have to
write special code for it. If the bind end (a.k.a "server") is restarted,
the connecting end will automatically reconnect. This works in either
direction.  Try it! run the demo code and kill one of the processes.
And then start it up again. The connection will get re-established.

#. Many connections on a single "socket"

The bind end can receive multiple connections, but you do all your
``.send()`` and ``.recv()`` calls on a single object. (No
callback handlers or protocol objects.)

More impressive is that the connecting end is exactly the same; it can make
outgoing ``connect()`` calls to multiple peers (bind sockets),
and you make all your ``send()`` and ``recv()`` calls on a single object.

This will be described in more detail further on in this document.

#. Message distribution patterns

Receiving messages is pretty simple: new messages just show up (remember
that messages from all connected peers come through the same call):

.. code-block:: python3

    async with Søcket() as sock:
        await sock.bind()
        async for msg in sock.messages():
            print(f"Received: {msg}")

However, when sending messages you have choices. The choices affect
**which peers** get the message. The options are:

- **Publish**: every connected peer is sent a copy of the message
- **Round-robin**: each connected peer is sent a *unique* message; the messages
  are distributed to each connection in a circular pattern.
- **By peer identity**: you can also send to a specific peer by using
  its identity directly.

The choice between *pub-sub* and *round-robin* must be made when
creating the ``Søcket()``:

.. code-block:: python3

    from aiomsg import Søcket, SendMode

    async with Søcket(send_mode=SendMode.PUBLISH) as sock:
        await sock.bind()
        async for msg in sock.messages():
            await sock.send(msg)

This example receives a message from any connected peer, and sends
that same message to *every* connected peer (including the original
sender). By changing ``PUBLISH`` to ``ROUNDROBIN``, the message
distribution pattern changes so that each "sent" message goes to
only one connected peer. The next "sent" message will go to a
different connected peer, and so on.

For *identity-based* message sending, that's available any time,
regardless of what you choose for the ``send_mode`` parameter; for
example:

.. code-block:: python3

    import asyncio
    from aiomsg import Søcket, SendMode

    async def main():
        async with Søcket() as sock1, Søcket(send_mode=SendMode.PUBLISH) as sock2:
            await sock1.bind(port=25000)
            await sock2.bind(port=25001)
            while True:
                peer_id, msg = await sock1.recv_identity()
                # Imagine that the sender constructs each message with
                # an id like <[prefix][id][\x00][data]>
                msg_id, _, data = msg.partition(b"\x00")
                # This goes to all peers (publish mode)
                await sock2.send(data)
                # This only goes to the specified peer (identity)
                await sock1.send(msg_id + b"\x00ok", identity=peer_id)

    asyncio.run(main())

This example shows how you can receive messages on one socket (``sock1``,
which could have thousands of connected peers), and relay those messages to
thousands of other peers connected on a different socket (``sock2``).

For this example, the ``send_mode`` of ``sock1`` doesn't matter because
if ``identity`` is specified in the ``send()`` call, it'll ignore
``send_mode`` completely.

Oh, and the example above is a complete, runnable program which is
pretty amazing!

#. Built-in heartbeating

Because ain't nobody got time to mess around with TCP keepalive
settings. The heartbeating is internal and opaque to your application
code. You won't even know it's happening, unless you enable debug
logs. Heartbeats are sent only during periods of inactivity, so
they won't interfere with your application messages.

In theory, you really shouldn't need heartbeating because TCP is a very robust
protocol; but in practice, various intermediate servers and routers
sometimes do silly things to your connection if they think a connection
has been idle for too long. So, automatic heartbeating is baked in to
let all intermediate hops know you want the connection to stay up, and
if the connection goes down, you will know much sooner than the
standard TCP keepalive timeout duration (which can be very long!).

If either a heartbeat or a message isn't received within a specific
timeframe, that connection is destroyed. Whichever peer is making the
``connect()`` call will then automatically try to reconnect, as
discussed earlier.

#. Built-in reliability choices

Ah, so what do "reliability choices" mean exactly...?

It turns out that it's quite hard to send messages in a reliable way.
Or, stated another way, it's quite hard to avoid dropping messages:
one side sends and the other side never gets the message.

``aiomsg`` already buffers messages when being sent. 

Related Skills

View on GitHub
GitHub Stars17
CategoryDevelopment
Updated7h ago
Forks1

Languages

Python

Security Score

90/100

Audited on Apr 2, 2026

No findings