SkillAgentSearch skills...

SwarmDB

A production-grade message queue system for agent communication and LLM backend load balancing in large-scale multi-agent systems. This system provides a scalable solution using Apache Kafka for reliable message distribution with comprehensive features for agent communication, message management, and load balancing

Install / Use

/learn @The-Swarm-Corporation/SwarmDB
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Swarms DB

Join our Discord Subscribe on YouTube Connect on LinkedIn Follow on X.com

A production-grade message queue system for agent communication and LLM backend load balancing in large-scale multi-agent systems. This system provides a scalable solution using Apache Kafka for reliable message distribution with comprehensive features for agent communication, message management, and load balancing.

Features

  • Robust Message Distribution: Built on Apache Kafka for reliable, scalable message handling
  • Auto-Partitioning: Kafka topics automatically scale based on agent load
  • Stateful Management: Comprehensive tracking of message states and history
  • Persistence: Automatic saving of all interactions to JSON files
  • RESTful API: Complete FastAPI interface for all messaging operations
  • Authentication & Authorization: JWT-based secure agent authentication
  • Production-Ready: Deployment with Gunicorn/Uvicorn for high performance
  • Comprehensive Logging: Detailed logs using Loguru
  • Containerized: Docker and Docker Compose setup for easy deployment
  • Message Types: Support for different message types (chat, commands, function calls)
  • Group Messaging: Ability to create agent groups for targeted communication
  • Load Balancing: Built-in LLM backend load balancing capabilities

System Architecture

The Agent Messaging System is built with the following components:

  1. Kafka Backend: For reliable message distribution and queuing
  2. SwarmsDB Class: Core messaging logic and state management
  3. FastAPI Server: RESTful API for agent interaction
  4. Gunicorn/Uvicorn: ASGI server for production deployment

Installation

Using Docker Compose (Recommended)

# Clone the repository
git clone https://github.com/yourusername/agent-messaging-system.git
cd agent-messaging-system

# Start the entire stack (Kafka, Zookeeper, API)
docker-compose up -d

# The API will be available at http://localhost:8000
# The Kafka UI will be available at http://localhost:8080

Manual Installation

  1. Install Poetry (dependency management):
curl -sSL https://install.python-poetry.org | python3 -
  1. Install dependencies:
poetry install
  1. Make sure Kafka is running and accessible.

  2. Start the API server:

# For development
./start.sh development

# For production
./start.sh production

Environment Configuration

The system can be configured using environment variables:

# API Configuration
API_ENV=production
PORT=8000

# Security
JWT_SECRET=your_secret_key_here
JWT_ALGORITHM=HS256
TOKEN_EXPIRE_MINUTES=1440

# Kafka Configuration
KAFKA_BOOTSTRAP_SERVERS=kafka:9092
KAFKA_TOPIC_PREFIX=agent_messaging_
KAFKA_NUM_PARTITIONS=6
KAFKA_REPLICATION_FACTOR=1

# Message History Configuration
MESSAGE_HISTORY_DIR=/app/message_history
SAVE_INTERVAL_SECONDS=300

# Rate Limiting
RATE_LIMIT_PER_MINUTE=300

API Usage

Authentication

# Get access token
curl -X POST http://localhost:8000/auth/token \
  -H "Content-Type: application/json" \
  -d '{"username":"agent1", "password":"password"}'

Register an Agent

curl -X POST http://localhost:8000/agents/register \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"agent_id":"agent1", "description":"AI Assistant Agent"}'

Send a Message

curl -X POST http://localhost:8000/messages \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "content": "Hello, can you analyze this data?",
    "receiver_id": "agent2",
    "message_type": "command",
    "priority": 2
  }'

Receive Messages

curl -X POST http://localhost:8000/agents/receive \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -d "max_messages=10&timeout=2.0"

Create an Agent Group

curl -X POST http://localhost:8000/groups \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "group_name": "analysis_team",
    "agent_ids": ["agent2", "agent3", "agent4"]
  }'

Send to a Group

curl -X POST http://localhost:8000/groups/message \
  -H "Authorization: Bearer YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{
    "group_name": "analysis_team",
    "content": "Everyone, please review these results.",
    "message_type": "chat"
  }'

Development

Running Tests

poetry run pytest

Code Formatting

poetry run black .
poetry run isort .

Type Checking

poetry run mypy .

Production Deployment

For production deployment, we recommend using Docker Compose with appropriate environment variables for security.

  1. Edit the environment variables in docker-compose.yml
  2. Ensure the JWT_SECRET is a secure random string
  3. Deploy with docker-compose:
docker-compose up -d

License

MIT

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

View on GitHub
GitHub Stars8
CategoryDevelopment
Updated2mo ago
Forks1

Languages

Python

Security Score

85/100

Audited on Jan 28, 2026

No findings