Maslite
A very fast multi agent messaging kernel in Python
Install / Use
/learn @root-11/MasliteREADME
MASlite
A multi-agent platform contrived by Bjorn Madsen
For a comprehensive tutorial by Max Yu, go here: Tutorial.ipynb
All right reserved © 2016-2023. MIT-license. All code has been written by the author in isolation and any similarity to other systems is purely coincidental.
New in version 2022.11.4
- update of agents now follows a strict order as inserted.
MASlite explained in 60 seconds:
MASlite is a simle python module for creating multi-agent simulations.
- Simple API: Only 3 modules to learn: Scheduler, Agent & Agent message
- Fast: Handles up to 2.7M messages per second (pypy, py310)
- Lightweight: 52kB.
It only has 3 components:
-
The scheduler (main loop)
- handles pause and proceed with a single call.
- assures repeatability in execution, which makes agents easy to debug.
- handles up to 2.7M messages per second (pypy)
-
Agent's
- are python classes that have setup(), update() and teardown() methods that can be customized.
- can exchange messages using send() and receive().
- can subscribe/unsubscribe to message classes.
- have clocks and can set alarms.
- can be tested individually.
- can have independent I/O/Database interaction.
-
Messages
- that have sender and receiver enable direct communication
- that have topics and no receiver are treated as broadcasts, and sent to subscribers.
The are plenty of use-cases for MASlite:
- Prototyping MASSIVE™ type games.
- Creating data processing pipeline
- Optimisation Engine, for:
- Scheduling (using Bjorn Madsen's distributed scheduling method)
- Auctions (using Dimtry Bertsekas alternating iterative auction)
All the user needs to worry about are the protocols of interaction, which conveniently may be summarised as:
- Design the messages that an agent will send or receive as regular
python objects that inherit the necessary implementation details from
a basic
AgentMessage. The messages must have an unambiguoustopic. - Write the functions that are supposed to execute once an agent receives one of the messages.
- Update the agents operations (
self.operations) with a dictionary that describes the relationship betweentopicandfunction. - Write the update function that maintains the inner state of the agent
using
sendto send messages, and usingreceiveto get messages.
The user can thereby create an agent using just:
class HelloMessage(AgentMessage):
def __init__(self, sender, receiver)
super().__init__(sender=sender, receiver=receiver)
class myAgent(Agent):
def __init__(self):
super().__init__()
self.operations[HelloMessage.__name__] = self.hello
def update(self):
while self.messages:
msg = self.receive()
operation = self.operations.get(msg.topic))
if operation is not None:
operation(msg)
else:
self.logger.debug("%s: don't know what to do with: %s" % (self.uuid), str(msg)))
def hello(self, msg)
print(msg)
That simple!
The dictionary self.operations which is inherited from the Agent-class
is updated with HelloMessage.__name__ pointing to the function self.hello.
self.operations thereby acts
as a pointer for when a HelloMessage arrives, so when the agents
update function is called, it will get the topic from the message's and
point to the function self.hello, where self.hello in this simple
example just prints the content of the message.
More nuanced behaviour, can also be embedded without the user having to worry about any externals. For example if some messages take precedence over others (priority messages), the inbox should be emptied in the beginning of the update function for sorting.
Here is an example where some topics are treated with priority over others:
class AgentWithPriorityInbox(Agent):
def __init__(self):
super().__init__()
self.operations.update({"1": self.some_priority_function,
"2": self.some_function,
"3": self.some_function, # Same function for 2 topics.!
"hello": self.hello, })
self.priority_topics = ["1","2","3"]
self.priority_messages = deque() # from collections import deque
self.normal_messages = deque() # deques append and popleft are threadsafe.
def update(self):
# 1. Empty the inbox and sort the messages using the topic:
while self.messages:
msg = self.receive()
if msg.topic in self.priority_topics:
self.priority_messages.append(msg)
else:
self.normal_messages.append(msg)
# 2. We've now sorted the incoming messages and can now extend
# the priority message deque with the normal messages:
self.priority_messages.extend(normal_messages)
# 3. Next we process them as usual:
while self.priority_messages:
msg = self.priority_messages.popleft()
operation = self.operations.get(msg.topic)
if operation is not None:
operation(msg)
else:
...
The only thing which the user needs to worry about, is that the update
function cannot depend on any externals. The agent is confined to
sending (self.send(msg)) and receiving (msg = self.receive())
messages which must be processed within the function self.update.
Any responses to sent messages will not happen until the agent runs
update again.
If any state needs to be stored within the agent, such as for example
memory of messages sent or received, then the agents __init__ should
declare the variables as class variables and store the information.
Calls to databases, files, etc. can of course happen, including the usage
of self.setup() and self.teardown() which are called when the agent
is, respectively, started or stopped. See the boiler-plate (below) for a more
detailed description.
Boilerplate
The following boiler-plate allows the user to manage the whole lifecycle of an agent, including:
- add variables to
__init__which can store information between updates. - react to topics by extending
self.operations - extend
setupandteardownfor start and end of the agents lifecycle. - use
updatewith actions before(1), during(2) and after(3) reading messages.
There are no requirements, for using all functions. The boiler-plate merely seeks to illustrate typical usage.
There are also no requirements for the agent to be programmed in procedural, functional or object oriented manner. Doing that is completely up to the user of MASlite.
class Example(Agent):
def __init__(self, db_connection):
super().__init__()
# add variables here.
self._is_setup = False
self.db_connection = db_connection
# remember to register topics and their functions:
self.operations.update({"topic x": self.x,
"topic y": self.y,
"topic ...": self....})
def update(self):
assert self._is_setup
# do something before reading messages
self.action_before_processing_messages()
# read the messages
while self.messages:
msg = self.receive()
# react immediately to some messages:
operation = self.operations.get(msg.topic)
if operation is not None:
operation(msg)
# react after reading all messages:
self.action_after_processing_all_messages()
# Functions added by the user that are not inherited from the
# `Agent`-class. If the `update` function should react on these,
# the topic of the message must be in the self.operations dict.
def setup(self):
self._is_setup = True
# add own setup operations here.
self.subscribe(self.__class__.__name__)
def action_before_processing_messages(self)
# do something.
def action_after_processing_all_messages(self)
# do something. Perhaps send a message to somebody that update is done?
msg = DoneMessages(sender=self, receiver=SomeOtherAgent)
self.send(msg)
def x(msg):
# read msg and send a response
from_ = msg.sender
response = SomeMessage(sender=self, receiver=from_)
self.send(response)
def y(msg):
