Wse
WSE - Rust-powered WebSocket engine for Python. Up to 5M del/s fan-out, native cluster binary protocol, zero-GIL JWT, E2E encryption
Install / Use
/learn @silvermpx/WseREADME
WSE - WebSocket Engine
High-performance WebSocket server built in Rust with native clustering, E2E encryption, message recovery, presence tracking, and real-time fan-out. Exposed to Python via PyO3 with zero GIL overhead on the data path. Ships with Rust-accelerated utilities for application code: priority queues, rate limiters, event sequencing, compression, cryptography, and JWT.
Features
Server
| Feature | Details |
|---------|---------|
| Rust core | tokio async runtime, tungstenite WebSocket transport, dedicated thread pool, zero GIL on the data path |
| JWT authentication | HS256, RS256, ES256 algorithms via jsonwebtoken crate. Validated during handshake (0.01ms), cookie + Authorization header extraction, key rotation, kid validation |
| Protocol negotiation | client_hello/server_hello handshake with feature discovery, capability advertisement, version agreement |
| Topic subscriptions | Per-connection topic subscriptions with automatic cleanup on disconnect |
| Pre-framed broadcast | WebSocket frame built once, shared via Arc across all connections, single allocation per broadcast |
| Vectored writes | write_vectored (writev syscall) batches multiple frames per connection in a single kernel call |
| Write coalescing | Write task drains up to 256 pending frames per iteration via recv_many |
| DashMap state | Lock-free sharded concurrent hash maps for topics, rates, formats, activity tracking |
| mimalloc allocator | Global allocator optimized for multi-threaded workloads with frequent small allocations |
| Deduplication | 50,000-entry AHashSet with FIFO eviction per send_event() call |
| Rate limiting | Per-connection token bucket: 100K capacity, 10K/s refill, client warning at 20% remaining |
| Zombie detection | Server pings every 25s, force-closes connections with no activity for 60s |
| Drain mode | Lock-free crossbeam bounded channel, Python acquires GIL once per batch (not per event) |
| Compression | zlib for client-facing messages above threshold (default 1024 bytes) |
| MessagePack | Opt-in binary transport via ?format=msgpack, roughly 2x faster serialization, 30% smaller |
| Message signing | Selective HMAC-SHA256 signing for critical operations, nonce-based replay prevention |
| Queue groups | Round-robin dispatch within named groups for load-balanced worker pools |
| Topic ACL | Per-connection allow/deny glob patterns for topic access control |
| Graceful drain | drain() sends Close frame to all clients, rejects new connections, notifies cluster peers |
End-to-End Encryption
| Feature | Details |
|---------|---------|
| Key exchange | ECDH P-256 (per-connection keypair, automatic during handshake) |
| Encryption | AES-GCM-256 with unique 12-byte IV per message |
| Key derivation | HKDF-SHA256 (salt: wse-encryption, info: aes-gcm-key) |
| Wire format | E: prefix + 12-byte IV + AES-GCM ciphertext + 16-byte auth tag |
| Key rotation | Configurable rotation interval (default 1 hour), automatic renegotiation |
| Replay prevention | Nonce cache (10K entries, 5-minute TTL) on the client side |
Cluster Protocol
| Feature | Details |
|---------|---------|
| Topology | Full TCP mesh, direct peer-to-peer connections |
| Wire format | Custom binary frames: 8-byte header + topic + payload, 12 message types |
| Interest routing | SUB/UNSUB/RESYNC frames, messages forwarded only to peers with matching subscribers |
| Gossip discovery | PeerAnnounce/PeerList frames, new nodes need one seed address to join |
| mTLS | rustls + tokio-rustls, P-256 certificates, WebPkiClientVerifier for both sides |
| Compression | zstd level 1 for payloads above 256 bytes, capability-negotiated, output capped at 1 MB |
| Heartbeat | 5s ping interval, 15s timeout, dead peer detection |
| Circuit breaker | 10 failures to open, 60s reset, 3 half-open probe calls |
| Dead letter queue | 1000-entry ring buffer for failed cluster sends |
| Presence sync | PresenceUpdate/PresenceFull frames, CRDT last-write-wins conflict resolution |
| Topology API | cluster_info() returns connected peer list with address, instance_id, status |
Presence Tracking
| Feature | Details |
|---------|---------|
| Per-topic tracking | Which users are active in each topic, with custom metadata (status, avatar, etc.) |
| User-level grouping | Multiple connections from same JWT sub share one presence entry |
| Join/leave lifecycle | presence_join on first connection, presence_leave on last disconnect |
| O(1) stats | presence_stats() returns member/connection counts without iteration |
| Data updates | update_presence() broadcasts to all topics where the user is present |
| Cluster sync | Synchronized across all nodes, CRDT last-write-wins resolution |
| TTL sweep | Background task every 30s removes entries from dead connections |
Message Recovery
| Feature | Details |
|---------|---------|
| Ring buffers | Per-topic, power-of-2 capacity, bitmask indexing (single AND instruction) |
| Epoch+offset tracking | Precise recovery positioning, epoch changes on buffer recreation |
| Memory management | Global budget (default 256 MB), TTL eviction, LRU eviction when over budget |
| Zero-copy storage | Recovery entries share Bytes (Arc) with the broadcast path |
| Recovery on reconnect | subscribe_with_recovery() replays missed messages automatically |
Client SDKs (Python + TypeScript/React)
| Feature | Details | |---------|---------| | Auto-reconnection | 4 strategies: exponential, linear, fibonacci, adaptive backoff with jitter | | Connection pool | Multi-endpoint with health scoring, 3 load balancing strategies, automatic failover | | Circuit breaker | CLOSED/OPEN/HALF_OPEN state machine, prevents connection storms | | Rate limiting | Client-side token bucket, coordinates with server feedback | | E2E encryption | Wire-compatible AES-GCM-256 + ECDH P-256 (both clients speak the same protocol) | | Event sequencing | Duplicate detection (sliding window) + out-of-order buffering | | Network monitor | Real-time latency, jitter, packet loss measurement, quality scoring | | Priority queues | 5 levels from CRITICAL to BACKGROUND | | Offline queue | IndexedDB persistence (TypeScript), replayed on reconnect | | Compression | Automatic zlib for messages above threshold | | MessagePack | Binary encoding for smaller payloads and faster serialization | | Message signing | HMAC-SHA256 integrity verification |
Transport Security
| Feature | Details |
|---------|---------|
| Origin validation | Configure in reverse proxy (nginx/Caddy) to prevent CSWSH |
| Cookie auth | access_token HTTP-only cookie with Secure + SameSite=Lax (OWASP recommended for browsers) |
| Frame protection | 1 MB max frame size, serde_json parsing (no eval), escaped user IDs in server_ready |
| Cluster frame protection | zstd decompression output capped at 1 MB (MAX_FRAME_SIZE), protocol version validation |
Quick Start
pip install wse-server
from wse_server import RustWSEServer, rust_jwt_encode
import time, threading
server = RustWSEServer(
"0.0.0.0", 5007,
max_connections=10_000,
jwt_secret=b"replace-with-a-strong-secret-key!",
jwt_issuer="my-app",
jwt_audience="my-api",
)
server.enable_drain_mode()
server.start()
def handle_events(srv):
while True:
for ev in srv.drain_inbound(256, 50):
if ev[0] == "auth_connect":
srv.subscribe_connection(ev[1], ["updates"])
elif ev[0] == "msg":
print(f"Message from {ev[1]}: {ev[2]}")
elif ev[0] == "disconnect":
print(f"Disconnected: {ev[1]}")
threading.Thread(target=handle_events, args=(server,), daemon=True).start()
while server.is_running():
time.sleep(1)
Generate a test token:
token = rust_jwt_encode(
{"sub": "user-1", "iss": "my-app", "aud": "my-api",
"exp": int(time.time()) + 3600, "iat": int(time.time())},
b"replace-with-a-strong-secret-key!",
)
Server Configuration
RustWSEServer constructor parameters:
| Parameter | Default | Description |
|-----------|---------|-------------|
| host | required | Bind address |
| port | required | Bind port |
| max_connections | 1000 | Maximum concurrent WebSocket connections |
| jwt_secret | None | JWT key for validation. HS256: shared secret (bytes, min 32). RS256/ES256: PEM public key. None disables auth |
| jwt_issuer | None | Expected iss claim. Skipped if None |
| jwt_audience | None | Expected aud claim. Skipped if None |
| jwt_cookie_name | "access_token" | Cookie name for JWT token extraction |
| jwt_previous_secret | None | Previous key for zero-downtime rotation. HS256: previous secret. RS256/ES256: previous public key PEM |
| jwt_key_id | None | Expected kid header claim. Rejects tokens with mismatched key ID |
| jwt_algorithm | None | JWT algorithm: "HS256" (default), "RS256", or "ES256" |
| jwt_private_key | None | PEM private key for RS256/ES256 token encoding. Not needed for HS256 |
| max_inbound_queue_size | 131072 | Drain mode bounded queue capacity |
| recovery_enabled | False | Enable per-topic message recovery buffers |
| recovery_buffer_size | 128 | Ring buffer slots per topic (rounded to power-of-2) |
| recovery_ttl | 300 | Buffer TTL in seconds before eviction |
| `
