Aiomsg
Smart sockets and scalability protocols for messaging network applications
Install / Use
/learn @cjrh/AiomsgREADME
.. image:: https://github.com/cjrh/aiomsg/workflows/Python%20application/badge.svg :target: https://github.com/cjrh/aiomsg/actions
.. image:: https://img.shields.io/badge/stdlib--only-yes-green.svg :target: https://img.shields.io/badge/stdlib--only-yes-green.svg
.. image:: https://coveralls.io/repos/github/cjrh/aiomsg/badge.svg?branch=master :target: https://coveralls.io/github/cjrh/aiomsg?branch=master
.. image:: https://img.shields.io/pypi/pyversions/aiomsg.svg :target: https://pypi.python.org/pypi/aiomsg
.. image:: https://img.shields.io/github/tag/cjrh/aiomsg.svg :target: https://img.shields.io/github/tag/cjrh/aiomsg.svg
.. image:: https://img.shields.io/badge/install-pip%20install%20aiomsg-ff69b4.svg :target: https://img.shields.io/badge/install-pip%20install%20aiomsg-ff69b4.svg
.. image:: https://img.shields.io/pypi/v/aiomsg.svg :target: https://img.shields.io/pypi/v/aiomsg.svg
.. image:: https://img.shields.io/badge/calver-YYYY.MM.MINOR-22bfda.svg :target: http://calver.org/
.. image:: https://img.shields.io/badge/code%20style-black-000000.svg :target: https://github.com/ambv/black
aiomsg
Pure-Python smart sockets (like ZMQ) for simpler networking
.. figure:: https://upload.wikimedia.org/wikipedia/commons/5/5e/NetworkDecentral.svg :target: https://commons.wikimedia.org/wiki/File:NetworkDecentral.svg :alt: Diagram of computers linked up in a network
:sub:`Attribution: And1mu [CC BY-SA 4.0 (https://creativecommons.org/licenses/by-sa/4.0)]`
Table of Contents
.. contents::
Demo
Let's make two microservices; one will send the current time to the other. Here's the end that binds to a port (a.k.a, the "server"):
.. code-block:: python3
import asyncio, time
from aiomsg import Søcket
async def main():
async with Søcket() as sock:
await sock.bind('127.0.0.1', 25000)
while True:
await sock.send(time.ctime().encode())
await asyncio.sleep(1)
asyncio.run(main())
Running as a different process, here is the end that does the connecting (a.k.a, the "client"):
.. code-block:: python3
import asyncio
from aiomsg import Søcket
async def main():
async with Søcket() as sock:
await sock.connect('127.0.0.1', 25000)
async for msg in sock.messages():
print(msg.decode())
asyncio.run(main())
Note that these are both complete, runnable programs, not fragments.
Looks a lot like conventional socket programming, except that these sockets have a few extra tricks. These are described in more detail further down in rest of this document.
Inspiration
Looks a lot like ZeroMQ yes? no? Well if you
don't know anything about
ZeroMQ, that's fine too. The rest of this document will assume that you
don't know anything about ZeroMQ. aiomsg is heavily influenced
by ZeroMQ.
There are some differences; hopefully they make things simpler than zmq. For one thing, aiomsg is pure-python so no compilation step is required, and relies only on the Python standard library (and that won't change).
Also, we don't have special kinds of socket pairs like ZeroMQ has. There is
only the one Søcket class. The only role distinction you need to make
between different socket instances is this: some sockets will bind
and others will connect.
This is the leaky part of the API that comes from the underlying BSD socket API. A bind socket will bind to a local interface and port. A connect socket must connect to a bind socket, which can be on the same machine or a remote machine. This is the only complicated bit. You must decide, in a distributed microservices architecture, which sockets must bind and which must connect. A useful heuristic is that the service which is more likely to require horizontal scaling should have the connect sockets. This is because the hostnames to which they will connect (these will be the bind sockets) will be long-lived.
Introduction
What you see above in the demo is pretty much a typical usage of
network sockets. So what's special about aiomsg? These are
the high-level features:
#. Messages, not streams:
Send and receive are *message-based*, not stream based. Much easier! This
does mean that if you want to transmit large amounts of data, you're going
to have have to break them up yourself, send the pieces, and put them
back together on the other side.
#. Automatic reconnection
These sockets automatically reconnect. You don't have to
write special code for it. If the bind end (a.k.a "server") is restarted,
the connecting end will automatically reconnect. This works in either
direction. Try it! run the demo code and kill one of the processes.
And then start it up again. The connection will get re-established.
#. Many connections on a single "socket"
The bind end can receive multiple connections, but you do all your
``.send()`` and ``.recv()`` calls on a single object. (No
callback handlers or protocol objects.)
More impressive is that the connecting end is exactly the same; it can make
outgoing ``connect()`` calls to multiple peers (bind sockets),
and you make all your ``send()`` and ``recv()`` calls on a single object.
This will be described in more detail further on in this document.
#. Message distribution patterns
Receiving messages is pretty simple: new messages just show up (remember
that messages from all connected peers come through the same call):
.. code-block:: python3
async with Søcket() as sock:
await sock.bind()
async for msg in sock.messages():
print(f"Received: {msg}")
However, when sending messages you have choices. The choices affect
**which peers** get the message. The options are:
- **Publish**: every connected peer is sent a copy of the message
- **Round-robin**: each connected peer is sent a *unique* message; the messages
are distributed to each connection in a circular pattern.
- **By peer identity**: you can also send to a specific peer by using
its identity directly.
The choice between *pub-sub* and *round-robin* must be made when
creating the ``Søcket()``:
.. code-block:: python3
from aiomsg import Søcket, SendMode
async with Søcket(send_mode=SendMode.PUBLISH) as sock:
await sock.bind()
async for msg in sock.messages():
await sock.send(msg)
This example receives a message from any connected peer, and sends
that same message to *every* connected peer (including the original
sender). By changing ``PUBLISH`` to ``ROUNDROBIN``, the message
distribution pattern changes so that each "sent" message goes to
only one connected peer. The next "sent" message will go to a
different connected peer, and so on.
For *identity-based* message sending, that's available any time,
regardless of what you choose for the ``send_mode`` parameter; for
example:
.. code-block:: python3
import asyncio
from aiomsg import Søcket, SendMode
async def main():
async with Søcket() as sock1, Søcket(send_mode=SendMode.PUBLISH) as sock2:
await sock1.bind(port=25000)
await sock2.bind(port=25001)
while True:
peer_id, msg = await sock1.recv_identity()
# Imagine that the sender constructs each message with
# an id like <[prefix][id][\x00][data]>
msg_id, _, data = msg.partition(b"\x00")
# This goes to all peers (publish mode)
await sock2.send(data)
# This only goes to the specified peer (identity)
await sock1.send(msg_id + b"\x00ok", identity=peer_id)
asyncio.run(main())
This example shows how you can receive messages on one socket (``sock1``,
which could have thousands of connected peers), and relay those messages to
thousands of other peers connected on a different socket (``sock2``).
For this example, the ``send_mode`` of ``sock1`` doesn't matter because
if ``identity`` is specified in the ``send()`` call, it'll ignore
``send_mode`` completely.
Oh, and the example above is a complete, runnable program which is
pretty amazing!
#. Built-in heartbeating
Because ain't nobody got time to mess around with TCP keepalive
settings. The heartbeating is internal and opaque to your application
code. You won't even know it's happening, unless you enable debug
logs. Heartbeats are sent only during periods of inactivity, so
they won't interfere with your application messages.
In theory, you really shouldn't need heartbeating because TCP is a very robust
protocol; but in practice, various intermediate servers and routers
sometimes do silly things to your connection if they think a connection
has been idle for too long. So, automatic heartbeating is baked in to
let all intermediate hops know you want the connection to stay up, and
if the connection goes down, you will know much sooner than the
standard TCP keepalive timeout duration (which can be very long!).
If either a heartbeat or a message isn't received within a specific
timeframe, that connection is destroyed. Whichever peer is making the
``connect()`` call will then automatically try to reconnect, as
discussed earlier.
#. Built-in reliability choices
Ah, so what do "reliability choices" mean exactly...?
It turns out that it's quite hard to send messages in a reliable way.
Or, stated another way, it's quite hard to avoid dropping messages:
one side sends and the other side never gets the message.
``aiomsg`` already buffers messages when being sent.
Related Skills
node-connect
346.4kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
107.2kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
346.4kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
346.4kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
