SkillAgentSearch skills...

Wse

WSE - Rust-powered WebSocket engine for Python. Up to 5M del/s fan-out, native cluster binary protocol, zero-GIL JWT, E2E encryption

Install / Use

/learn @silvermpx/Wse
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

WSE - WebSocket Engine

PyPI - Server PyPI - Client npm License: MIT

High-performance WebSocket server built in Rust with native clustering, E2E encryption, message recovery, presence tracking, and real-time fan-out. Exposed to Python via PyO3 with zero GIL overhead on the data path. Ships with Rust-accelerated utilities for application code: priority queues, rate limiters, event sequencing, compression, cryptography, and JWT.

Features

Server

| Feature | Details | |---------|---------| | Rust core | tokio async runtime, tungstenite WebSocket transport, dedicated thread pool, zero GIL on the data path | | JWT authentication | HS256, RS256, ES256 algorithms via jsonwebtoken crate. Validated during handshake (0.01ms), cookie + Authorization header extraction, key rotation, kid validation | | Protocol negotiation | client_hello/server_hello handshake with feature discovery, capability advertisement, version agreement | | Topic subscriptions | Per-connection topic subscriptions with automatic cleanup on disconnect | | Pre-framed broadcast | WebSocket frame built once, shared via Arc across all connections, single allocation per broadcast | | Vectored writes | write_vectored (writev syscall) batches multiple frames per connection in a single kernel call | | Write coalescing | Write task drains up to 256 pending frames per iteration via recv_many | | DashMap state | Lock-free sharded concurrent hash maps for topics, rates, formats, activity tracking | | mimalloc allocator | Global allocator optimized for multi-threaded workloads with frequent small allocations | | Deduplication | 50,000-entry AHashSet with FIFO eviction per send_event() call | | Rate limiting | Per-connection token bucket: 100K capacity, 10K/s refill, client warning at 20% remaining | | Zombie detection | Server pings every 25s, force-closes connections with no activity for 60s | | Drain mode | Lock-free crossbeam bounded channel, Python acquires GIL once per batch (not per event) | | Compression | zlib for client-facing messages above threshold (default 1024 bytes) | | MessagePack | Opt-in binary transport via ?format=msgpack, roughly 2x faster serialization, 30% smaller | | Message signing | Selective HMAC-SHA256 signing for critical operations, nonce-based replay prevention | | Queue groups | Round-robin dispatch within named groups for load-balanced worker pools | | Topic ACL | Per-connection allow/deny glob patterns for topic access control | | Graceful drain | drain() sends Close frame to all clients, rejects new connections, notifies cluster peers |

End-to-End Encryption

| Feature | Details | |---------|---------| | Key exchange | ECDH P-256 (per-connection keypair, automatic during handshake) | | Encryption | AES-GCM-256 with unique 12-byte IV per message | | Key derivation | HKDF-SHA256 (salt: wse-encryption, info: aes-gcm-key) | | Wire format | E: prefix + 12-byte IV + AES-GCM ciphertext + 16-byte auth tag | | Key rotation | Configurable rotation interval (default 1 hour), automatic renegotiation | | Replay prevention | Nonce cache (10K entries, 5-minute TTL) on the client side |

Cluster Protocol

| Feature | Details | |---------|---------| | Topology | Full TCP mesh, direct peer-to-peer connections | | Wire format | Custom binary frames: 8-byte header + topic + payload, 12 message types | | Interest routing | SUB/UNSUB/RESYNC frames, messages forwarded only to peers with matching subscribers | | Gossip discovery | PeerAnnounce/PeerList frames, new nodes need one seed address to join | | mTLS | rustls + tokio-rustls, P-256 certificates, WebPkiClientVerifier for both sides | | Compression | zstd level 1 for payloads above 256 bytes, capability-negotiated, output capped at 1 MB | | Heartbeat | 5s ping interval, 15s timeout, dead peer detection | | Circuit breaker | 10 failures to open, 60s reset, 3 half-open probe calls | | Dead letter queue | 1000-entry ring buffer for failed cluster sends | | Presence sync | PresenceUpdate/PresenceFull frames, CRDT last-write-wins conflict resolution | | Topology API | cluster_info() returns connected peer list with address, instance_id, status |

Presence Tracking

| Feature | Details | |---------|---------| | Per-topic tracking | Which users are active in each topic, with custom metadata (status, avatar, etc.) | | User-level grouping | Multiple connections from same JWT sub share one presence entry | | Join/leave lifecycle | presence_join on first connection, presence_leave on last disconnect | | O(1) stats | presence_stats() returns member/connection counts without iteration | | Data updates | update_presence() broadcasts to all topics where the user is present | | Cluster sync | Synchronized across all nodes, CRDT last-write-wins resolution | | TTL sweep | Background task every 30s removes entries from dead connections |

Message Recovery

| Feature | Details | |---------|---------| | Ring buffers | Per-topic, power-of-2 capacity, bitmask indexing (single AND instruction) | | Epoch+offset tracking | Precise recovery positioning, epoch changes on buffer recreation | | Memory management | Global budget (default 256 MB), TTL eviction, LRU eviction when over budget | | Zero-copy storage | Recovery entries share Bytes (Arc) with the broadcast path | | Recovery on reconnect | subscribe_with_recovery() replays missed messages automatically |

Client SDKs (Python + TypeScript/React)

| Feature | Details | |---------|---------| | Auto-reconnection | 4 strategies: exponential, linear, fibonacci, adaptive backoff with jitter | | Connection pool | Multi-endpoint with health scoring, 3 load balancing strategies, automatic failover | | Circuit breaker | CLOSED/OPEN/HALF_OPEN state machine, prevents connection storms | | Rate limiting | Client-side token bucket, coordinates with server feedback | | E2E encryption | Wire-compatible AES-GCM-256 + ECDH P-256 (both clients speak the same protocol) | | Event sequencing | Duplicate detection (sliding window) + out-of-order buffering | | Network monitor | Real-time latency, jitter, packet loss measurement, quality scoring | | Priority queues | 5 levels from CRITICAL to BACKGROUND | | Offline queue | IndexedDB persistence (TypeScript), replayed on reconnect | | Compression | Automatic zlib for messages above threshold | | MessagePack | Binary encoding for smaller payloads and faster serialization | | Message signing | HMAC-SHA256 integrity verification |

Transport Security

| Feature | Details | |---------|---------| | Origin validation | Configure in reverse proxy (nginx/Caddy) to prevent CSWSH | | Cookie auth | access_token HTTP-only cookie with Secure + SameSite=Lax (OWASP recommended for browsers) | | Frame protection | 1 MB max frame size, serde_json parsing (no eval), escaped user IDs in server_ready | | Cluster frame protection | zstd decompression output capped at 1 MB (MAX_FRAME_SIZE), protocol version validation |


Quick Start

pip install wse-server
from wse_server import RustWSEServer, rust_jwt_encode
import time, threading

server = RustWSEServer(
    "0.0.0.0", 5007,
    max_connections=10_000,
    jwt_secret=b"replace-with-a-strong-secret-key!",
    jwt_issuer="my-app",
    jwt_audience="my-api",
)
server.enable_drain_mode()
server.start()

def handle_events(srv):
    while True:
        for ev in srv.drain_inbound(256, 50):
            if ev[0] == "auth_connect":
                srv.subscribe_connection(ev[1], ["updates"])
            elif ev[0] == "msg":
                print(f"Message from {ev[1]}: {ev[2]}")
            elif ev[0] == "disconnect":
                print(f"Disconnected: {ev[1]}")

threading.Thread(target=handle_events, args=(server,), daemon=True).start()

while server.is_running():
    time.sleep(1)

Generate a test token:

token = rust_jwt_encode(
    {"sub": "user-1", "iss": "my-app", "aud": "my-api",
     "exp": int(time.time()) + 3600, "iat": int(time.time())},
    b"replace-with-a-strong-secret-key!",
)

Server Configuration

RustWSEServer constructor parameters:

| Parameter | Default | Description | |-----------|---------|-------------| | host | required | Bind address | | port | required | Bind port | | max_connections | 1000 | Maximum concurrent WebSocket connections | | jwt_secret | None | JWT key for validation. HS256: shared secret (bytes, min 32). RS256/ES256: PEM public key. None disables auth | | jwt_issuer | None | Expected iss claim. Skipped if None | | jwt_audience | None | Expected aud claim. Skipped if None | | jwt_cookie_name | "access_token" | Cookie name for JWT token extraction | | jwt_previous_secret | None | Previous key for zero-downtime rotation. HS256: previous secret. RS256/ES256: previous public key PEM | | jwt_key_id | None | Expected kid header claim. Rejects tokens with mismatched key ID | | jwt_algorithm | None | JWT algorithm: "HS256" (default), "RS256", or "ES256" | | jwt_private_key | None | PEM private key for RS256/ES256 token encoding. Not needed for HS256 | | max_inbound_queue_size | 131072 | Drain mode bounded queue capacity | | recovery_enabled | False | Enable per-topic message recovery buffers | | recovery_buffer_size | 128 | Ring buffer slots per topic (rounded to power-of-2) | | recovery_ttl | 300 | Buffer TTL in seconds before eviction | | `

View on GitHub
GitHub Stars41
CategoryDevelopment
Updated4d ago
Forks0

Languages

Rust

Security Score

95/100

Audited on Mar 22, 2026

No findings