Ksbus
KSBus is a zero-configuration event bus written in Go, designed to facilitate real-time data sharing and synchronization between Go servers, JavaScript clients, and Python. It's particularly useful for building applications that require real-time communication, such as chat applications or live updates.
Install / Use
/learn @kamalshkeir/KsbusREADME
KSBus
KSBus is a zero-configuration event bus written in Go, designed to facilitate real-time data sharing and synchronization between Go servers, JavaScript clients, and Python. It's particularly useful for building applications that require real-time communication, such as chat applications or live updates.
Features
- Zero Configuration: Easy to set up and use without the need for complex configurations.
- Cross-Language Support: Supports communication between Go servers and clients, JavaScript clients, and Python clients.
- Real-Time Data Sharing: Enables real-time data synchronization and broadcasting.
- Auto TLS: Automatically generates and renews Let's Encrypt certificates for secure communication, simplifying the setup of HTTPS servers.
Installation
To install KSBus, use the following command:
go get github.com/kamalshkeir/ksbus@v1.4.6
Usage
Server Setup
To set up a KSBus server, you can use the NewServer function. Here's a basic example:
package main
import (
"fmt"
"net/http"
"github.com/kamalshkeir/lg"
"github.com/kamalshkeir/ksbus"
"github.com/kamalshkeir/ksmux"
"github.com/kamalshkeir/ksmux/ws"
)
func main() {
bus := ksbus.NewServer(ksbus.ServerOpts{
Address: ":9313",
// OnDataWS: func(data map[string]any, conn *ws.Conn, originalRequest *http.Request) error {
// fmt.Println("srv OnDataWS:", data)
// return nil
// },
})
// bus.Id = "server-9313"
fmt.Println("connected as",bus.Id)
app := bus.App // get router
app.LocalStatics("JS", "/js") //server static dir
lg.CheckError(app.LocalTemplates("examples/client-js")) // handle templates dir
bus.OnDataWs(func(data map[string]any, conn *ws.Conn, originalRequest *http.Request) error {
fmt.Println("srv OnDataWS:", data)
return nil // if error, it will be returned to the client
})
bus.OnId(func(data map[string]any) { //recv on bus.Id from PublishToID
fmt.Println("srv OnId:", data)
})
unsub := bus.Subscribe("server1", func(data map[string]any, unsub ksbus.Unsub) {
fmt.Println(data)
// unsub.Unsubscribe()
})
// unsub.Unsubscribe()
app.Get("/", func(c *ksmux.Context) {
c.Html("index.html", nil)
})
app.Get("/pp", func(c *ksmux.Context) {
bus.Bus.Publish("server1", map[string]any{
"data": "hello from INTERNAL",
})
c.Text("ok")
})
fmt.Println("server1 connected as", bus.ID)
bus.Run()
}
Client Setup (JavaScript)
For JavaScript clients, you can use the provided Bus.js file. Here's a basic example of how to use it:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Index</title>
</head>
<body>
<h1>Index</h1>
<input type="text" id="inpp">
<button type="button" id="btn">Send</button>
<script src="/js/Bus.js"></script>
<script>
let bus = new Bus({ Id: "browser" });
bus.OnId = (d) => {
console.log("recv OnId", d)
}
bus.OnDataWS = (d, wsConn) => {
console.log("recv OnData", d)
}
bus.OnOpen = () => {
console.log("connected", bus.Id)
unsub = bus.Subscribe("index.html",(data,unsub) => { // receive once, then unsub
console.log(data);
unsub.Unsubscribe();
})
// or you can unsub from the outside using returned unsub from bus.Subscribe
btn.addEventListener("click", (e) => {
e.preventDefault();
// publish and wait for recv
bus.PublishToIDWaitRecv(inpp.value, { "cxwcwxcc": "hi from browser" }, (data) => {
console.log("onRecv:", data)
}, (eventID, id) => {
console.log(`${id} did not receive ${eventID}`);
})
})
}
</script>
</body>
</html>
Client Setup (Go)
For Go clients, you can use the NewClient function. Here's a basic example:
package main
import (
"fmt"
"github.com/kamalshkeir/lg"
"github.com/kamalshkeir/ksbus"
"github.com/kamalshkeir/ksmux/ws"
)
func main() {
client, err := ksbus.NewClient(ksbus.ClientConnectOptions{
Id: "go-client",
Address: "localhost:9313",
Secure: false,
OnDataWs: func(data map[string]any, conn *ws.Conn) error {
fmt.Println("ON OnDataWs:", data)
return nil
},
OnId: func(data map[string]any, subs ksbus.Unsub) {
fmt.Println("ON OnId:", data)
},
})
if lg.CheckError(err) {
return
}
fmt.Println("CLIENT connected as", client.Id)
client.Subscribe("go-client", func(data map[string]any, sub ksbus.Unsub) {
fmt.Println("ON sub go-client:", data)
})
client.Publish("server1", map[string]any{
"data": "hello from go client",
})
client.PublishToIDWaitRecv("browser", map[string]any{
"data": "hello from go client",
}, func(data map[string]any) {
fmt.Println("onRecv:", data)
}, func(eventId, id string) {
fmt.Println("not received:", eventId, id)
})
client.Run()
}
Client Setup (Python)
For Python clients, you can install pkg ksbus
pip install ksbus==1.3.0
from ksbus import Bus
def OnId(data):
print("OnId:",data)
def OnOpen(bus):
print("connected as ",bus.Id)
bus.PublishToIDWaitRecv("browser",{
"data":"hi from pure python"
},lambda data:print("OnRecv:",data),lambda event_id:print("OnFail:",event_id))
if __name__ == '__main__':
Bus({
'Id': 'py',
'Address': 'localhost:9313',
'OnId': OnId,
'OnOpen':OnOpen},
block=True)
you can handle authentication or access to certain topics using to Global Handlers
API
Internal Bus (No websockets, use channels to handle topic communications)
func New() *Bus
func (b *Bus) Subscribe(topic string, fn func(data map[string]any, unsub Unsub), onData ...func(data map[string]any)) Unsub
func (b *Bus) Unsubscribe(topic string)
func (b *Bus) Publish(topic string, data map[string]any)
func (b *Bus) PublishToID(id string, data map[string]any)
func (b *Bus) PublishWaitRecv(topic string, data map[string]any, onRecv func(data map[string]any), onExpire func(eventId string, topic string)) error
func (b *Bus) PublishToIDWaitRecv(id string, data map[string]any, onRecv func(data map[string]any), onExpire func(eventId string, id string)) error
func (b *Bus) RemoveTopic(topic string)
Server Bus (use the internal bus and a websocket server)
func NewServer(bus ...*Bus) *Server
func (s *Server) OnServerData(fn func(data any, conn *ws.Conn)) // used when receiving data from other ksbus server (not client) using server.PublishToServer(addr string, data map[string]any, secure ...bool) error
func (s *Server) OnDataWs(fn func(data map[string]any, conn *ws.Conn, originalRequest *http.Request) error) // triggered after upgrading connection to websockets and each time on recv new message
func (s *Server) OnId(fn func(data map[string]any)) // used when server bus receive data on his ID 'server.Id'
func (s *Server) WithPprof(path ...string) // enable std library pprof endpoints /debug/pprof/...
func (s *Server) WithMetrics(httpHandler http.Handler, path ...string) // take prometheus handler as input and serve /metrics if no path specified
func (s *Server) Subscribe(topic string, fn func(data map[string]any, unsub Unsub)) (unsub Unsub)
func (s *Server) Unsubscribe(topic string)
func (srv *Server) Publish(topic string, data map[string]any)
func (s *Server) PublishToID(id string, data map[string]any) // every connection ws (go client, python client, js client and the server), all except the internal bus, each connection should have a unique id that you can send data specificaly to it (not all subscribers on a topic, only a specific connection)
func (s *Server) PublishWaitRecv(topic string, data map[string]any, onRecv func(data map[string]any), onExpire func(eventId string, topic string))
func (s *Server) PublishToIDWaitRecv(id string, data map[string]any, onRecv func(data map[string]any), onExpire func(eventId string, toID string))
func (s *Server) RemoveTopic(topic string)
func (s *Server) PublishToServer(addr string, data map[string]any, secure ...bool) error // send to another ksbus server
func (s *Server) Run() // Run without TLS
func (s *Server) RunTLS(cert string, certKey string) // RunTLS with TLS from cert files
func (s *Server) RunAutoTLS(subDomains ...string) // RunAutoTLS generate letsencrypt certificates for server.Address and subDomains and renew them automaticaly before expire, so you only need to provide a domainName(server.Address).
Client Bus GO
type ClientConnectOptions struct {
Id string
Address string
Secure bool
Path string // default ksbus.ServerPath
Autorestart bool
RestartEvery time.Duration
OnDataWs func(data map[string]any, conn *ws.Conn) error // before data is distributed on different topics
OnId func(data map[string]any, unsub Unsub) // used when client bus receive data on his ID 'client.Id'
}
func NewClient(opts ClientConnectOptions) (*Client, error)
func (client *Client) Subscribe(topic string, handler func(data map[string]any, unsub Unsub)) Unsub
func (client *Client) Unsubscribe(topic string)
func (client *Client) Publish(topic string, data map[string]any)
func (client *Client) PublishToServer(addr string, data map[string]any, secure ...bool)
func (client *Client) PublishToID(id string, data map[string]any)
func (client *Client) PublishWaitRecv(topic string, data map[string]any, onRecv func(data map[string]any), onExpire func(eventId string, topic string))
func (client *Client) PublishToIDWaitRecv(id string, data map[string]any, onRecv func(data map[string]any), onExpire func(eventId string, id string))
func (client *Client) RemoveTopic(topic string)
func (client *Client) Close() error
func (client *Client) Run
Related Skills
xurl
341.2kA CLI tool for making authenticated requests to the X (Twitter) API. Use this skill when you need to post tweets, reply, quote, search, read posts, manage followers, send DMs, upload media, or interact with any X API v2 endpoint.
diffs
341.2kUse the diffs tool to produce real, shareable diffs (viewer URL, file artifact, or both) instead of manual edit summaries.
kubeshark
11.8kCluster-wide network observability for Kubernetes. Captures L4 packets, L7 API calls, and decrypted TLS traffic using eBPF, with full Kubernetes context. Available to AI agents via MCP and human operators via dashboard.
clearshot
Structured screenshot analysis for UI implementation and critique. Analyzes every UI screenshot with a 5×5 spatial grid, full element inventory, and design system extraction — facts and taste together, every time. Escalates to full implementation blueprint when building. Trigger on any digital interface image file (png, jpg, gif, webp — websites, apps, dashboards, mockups, wireframes) or commands like 'analyse this screenshot,' 'rebuild this,' 'match this design,' 'clone this.' Skip for non-UI images (photos, memes, charts) unless the user explicitly wants to build a UI from them. Does NOT trigger on HTML source code, CSS, SVGs, or any code pasted as text.
