Roomkit
Pure async Python framework for multi-channel conversations
Install / Use
/learn @roomkit-live/RoomkitREADME
RoomKit
Pure async Python 3.12+ framework for multi-channel conversation orchestration.
RoomKit gives you one abstraction — the room — to wire together any combination of SMS, WhatsApp, Email, Teams, Telegram, Voice, Video, WebSocket, and AI channels. Messages flow in, pass through a hook pipeline, get routed to the right agent, and broadcast out to every attached channel. You focus on the conversation logic; the framework handles routing, transcoding, audio processing, video processing, and agent handoffs.
Website: roomkit.live | Docs: roomkit.live/docs | API Reference: roomkit.live/docs/api
How it works
Every channel implements the same interface: handle_inbound() converts a provider message into a RoomEvent, and deliver() pushes events out. Channels have two categories: transport (delivers to external systems) and intelligence (generates content, like AI agents).
Message flow
An inbound message is normalized into a RoomEvent, passes through the hook pipeline (where it can be blocked, modified, or enriched), gets stored, then fans out to every attached channel. AI agents generate responses that re-enter the same pipeline.
Quickstart
pip install roomkit
Example: AI chatbot in 20 lines
import asyncio
from roomkit import (
ChannelCategory, InboundMessage, RoomKit,
TextContent, WebSocketChannel,
)
from roomkit.channels.ai import AIChannel
from roomkit.providers.anthropic import AnthropicAIProvider, AnthropicConfig
async def main():
kit = RoomKit()
# One channel for the user, one for AI
ws = WebSocketChannel("ws-user")
ai = AIChannel("assistant", provider=AnthropicAIProvider(
AnthropicConfig(api_key="sk-...")
), system_prompt="You are a helpful assistant.")
kit.register_channel(ws)
kit.register_channel(ai)
# Create a room and wire everything together
await kit.create_room(room_id="chat")
await kit.attach_channel("chat", "ws-user")
await kit.attach_channel("chat", "assistant", category=ChannelCategory.INTELLIGENCE)
# Process a message — AI responds automatically
await kit.process_inbound(InboundMessage(
channel_id="ws-user", sender_id="user-1",
content=TextContent(body="What is RoomKit?"),
))
asyncio.run(main())
That's it. The message flows through the hook pipeline, gets routed to the AI channel, and the response is broadcast back to the WebSocket.
Example: Multi-channel bridge
The same room can bridge any mix of channels — a user on SMS, another on WhatsApp, and an AI assistant all sharing one conversation:
kit = RoomKit()
sms = SMSChannel("sms", provider=TwilioSMSProvider(...))
wa = WhatsAppChannel("whatsapp", provider=...)
ai = AIChannel("assistant", provider=...)
for ch in [sms, wa, ai]:
kit.register_channel(ch)
await kit.create_room(room_id="support-case-42")
await kit.attach_channel("support-case-42", "sms")
await kit.attach_channel("support-case-42", "whatsapp")
await kit.attach_channel("support-case-42", "assistant", category=ChannelCategory.INTELLIGENCE)
# Message from SMS → broadcast to WhatsApp + AI
# AI reply → broadcast to SMS + WhatsApp
Content is automatically transcoded between channel capabilities (rich → text fallback, media handling, etc.).
More examples in examples/.
Installation
RoomKit's core has a single dependency (pydantic). Everything else is optional:
pip install roomkit # core only
pip install roomkit[anthropic] # + Anthropic Claude
pip install roomkit[openai] # + OpenAI GPT
pip install roomkit[gemini] # + Google Gemini
# Voice & video
pip install roomkit[fastrtc] # WebRTC audio
pip install roomkit[sip] # SIP voice + video
pip install roomkit[deepgram] # Deepgram STT
pip install roomkit[elevenlabs] # ElevenLabs TTS
pip install roomkit[sherpa-onnx] # Local STT/TTS/VAD/Denoiser (ONNX)
pip install roomkit[realtime-gemini] # Gemini Live (speech-to-speech)
pip install roomkit[realtime-openai] # OpenAI Realtime (speech-to-speech)
# Messaging
pip install roomkit[httpx] # SMS, RCS, Email providers
pip install roomkit[teams] # Microsoft Teams
pip install roomkit[telegram] # Telegram
pip install roomkit[neonize] # WhatsApp Personal
# Infrastructure
pip install roomkit[postgres] # PostgreSQL storage
pip install roomkit[opentelemetry] # Distributed tracing
pip install roomkit[mcp] # Model Context Protocol tools
# Everything
pip install roomkit[all]
For development:
git clone https://github.com/roomkit-live/roomkit.git
cd roomkit
uv sync --extra dev
make all # ruff check + mypy --strict + pytest
Requires Python 3.12+.
Multi-Agent Orchestration
RoomKit has four built-in orchestration strategies, all configured through RoomKit(orchestration=...). The framework handles agent registration, routing, handoff tools, and conversation state — you just define agents and pick a strategy.
Agents
Agent extends AIChannel with identity metadata (role, scope, voice, greeting) that gets auto-injected into the system prompt:
from roomkit import Agent
from roomkit.providers.anthropic import AnthropicAIProvider, AnthropicConfig
from roomkit.orchestration.handoff import HandoffMemoryProvider
from roomkit.memory.sliding_window import SlidingWindowMemory
triage = Agent(
"agent-triage",
provider=AnthropicAIProvider(AnthropicConfig(api_key="sk-...")),
role="Triage receptionist",
description="Routes callers to the right specialist",
system_prompt="You triage incoming requests.",
voice="Zephyr", # TTS voice ID
language="French",
greeting="Greet the caller warmly and ask how you can help.",
memory=HandoffMemoryProvider(SlidingWindowMemory(max_events=20)),
)
Pipeline — linear handoff chain
Agents hand off to the next in a fixed sequence. Each agent gets a handoff_conversation tool and can only move forward:
from roomkit import Agent, Pipeline, RoomKit
kit = RoomKit(
orchestration=Pipeline(agents=[triage, handler, resolver]),
)
Swarm — any-to-any handoff
Every agent can hand off to any other. The AI decides when a topic change requires a different specialist:
from roomkit import Agent, Swarm, RoomKit
kit = RoomKit(
orchestration=Swarm(
agents=[sales, support, billing],
entry="agent-sales",
),
)
Loop — iterative refinement
A producer agent generates content, one or more reviewers evaluate it (sequentially or in parallel), and the cycle repeats until all approve or max iterations are reached:
from roomkit import Agent, Loop, RoomKit
kit = RoomKit(
orchestration=Loop(
agent=writer,
reviewers=[quality, accuracy, style],
strategy="parallel",
max_iterations=3,
),
)
Supervisor — delegating to workers
A supervisor agent talks to the user and delegates tasks to workers that run in isolated child rooms:
from roomkit import Agent, Supervisor, RoomKit
kit = RoomKit(
orchestration=Supervisor(
supervisor=manager,
workers=[researcher, coder],
),
)
Voice orchestration
All orchestration strategies work seamlessly on live voice calls. The voice/realtime channel is a transport — swapping the active agent doesn't touch the audio session:
For speech-to-speech mode (Gemini Live, OpenAI Realtime), the realtime session is reconfigured on handoff — system prompt, voice, and tools change with ~200-500ms latency while the audio stream stays connected.
Audio Pipeline
All stages are optional. AEC and AGC are automatically skipped when the backend declares native support.
| Stage | Role | Implementations | |-------|------|-----------------| | VAD | Voice activity detection | SherpaOnnx, Energy-based | | Denoiser | Noise reduction | RNNoise, SherpaOnnx | | AEC | Acoustic echo cancellation | Speex | | STT | Speech-to-text | Deepgram, SherpaOnnx, Qwen, Gradium | | TTS | Text-to-speech | ElevenLabs, SherpaOnnx, Qwen, Gradium, Grok | | Diarization | Speaker identification | Pluggable | | DTMF | Tone detection (parallel) | Pluggable |
Interruption strategies control how user speech during TTS playback is handled: IMMEDIATE, CONFIRMED (wait for sustained speech), SEMANTIC (backchannel detection ignores "uh-huh"), or DISABLED.
voice = VoiceChannel(
"voice", stt=stt, tts=tts, backend=backend,
pipeline=AudioPipelineConfig(vad=vad, denoiser=denoiser, aec=aec),
interruption=InterruptionConfig(
strategy=InterruptionStrategy.CONFIRMED, min_speech_ms=300
),
)
Hooks
Hooks intercept events at specific points in the pipeline. They can block, modify, or observe events:
@kit.hook(HookTrigger.BEFORE_BROADCAST, name="compliance_check")
async def check(event: RoomEvent, ctx: RoomContext) -> HookResult:
if contains_pii(event.content):
return HookResult.block("PII detected")
return HookResult.allow()
35 hook triggers across the full lifecycle
