Arpc
More effective network communication, two-way calling, notify and broadcast supported.
Install / Use
/learn @lesismal/ArpcREADME
ARPC - More Effective Network Communication
Contents
- ARPC - More Effective Network Communication
- Contents
- Features
- Performance
- Header Layout
- Installation
- Quick Start
- API Examples
- Register Routers
- Router Middleware
- Coder Middleware
- Client Call, CallAsync, Notify
- Server Call, CallAsync, Notify
- Broadcast - Notify
- Async Response
- Handle New Connection
- Handle Disconnected
- Handle Client's send queue overstock
- Custom Net Protocol
- Custom Codec
- Custom Logger
- Custom operations before conn's recv and send
- Custom arpc.Client's Reader by wrapping net.Conn
- Custom arpc.Client's send queue capacity
- JS Client
- Web Chat Examples
- Pub/Sub Examples
- More Examples
Features
- [x] Two-Way Calling
- [x] Two-Way Notify
- [x] Sync and Async Calling
- [x] Sync and Async Response
- [x] Batch Write | Writev | net.Buffers
- [x] Broadcast
- [x] Middleware
- [x] Pub/Sub
- [x] Opentracing
| Pattern | Interactive Directions | Description | | ------- | ---------------------------- | ------------------------ | | call | two-way:<br>c -> s<br>s -> c | request and response | | notify | two-way:<br>c -> s<br>s -> c | request without response |
Performance
Here are some thirdparty benchmark including arpc, although these repos have provide the performance report, but I suggest you run the code yourself and get the real result, other than just believe other people's doc:
Header Layout
- LittleEndian
| bodyLen | reserved | cmd | flag | methodLen | sequence | method | body | | ------- | -------- | ------ | ------- | --------- | -------- | --------------- | ----------------------- | | 4 bytes | 1 byte | 1 byte | 1 bytes | 1 bytes | 8 bytes | methodLen bytes | bodyLen-methodLen bytes |
Installation
- Get and install arpc
$ go get -u github.com/lesismal/arpc
- Import in your code:
import "github.com/lesismal/arpc"
Quick Start
- start a server
package main
import "github.com/lesismal/arpc"
func main() {
server := arpc.NewServer()
// register router
server.Handler.Handle("/echo", func(ctx *arpc.Context) {
str := ""
if err := ctx.Bind(&str); err == nil {
ctx.Write(str)
}
})
server.Run("localhost:8888")
}
- start a client
package main
import (
"log"
"net"
"time"
"github.com/lesismal/arpc"
)
func main() {
client, err := arpc.NewClient(func() (net.Conn, error) {
return net.DialTimeout("tcp", "localhost:8888", time.Second*3)
})
if err != nil {
panic(err)
}
defer client.Stop()
req := "hello"
rsp := ""
err = client.Call("/echo", &req, &rsp, time.Second*5)
if err != nil {
log.Fatalf("Call failed: %v", err)
} else {
log.Printf("Call Response: \"%v\"", rsp)
}
}
API Examples
Register Routers
var handler arpc.Handler
// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler
// message would be default handled one by one in the same conn reader goroutine
handler.Handle("/route", func(ctx *arpc.Context) { ... })
handler.Handle("/route2", func(ctx *arpc.Context) { ... })
// this make message handled by a new goroutine
async := true
handler.Handle("/asyncResponse", func(ctx *arpc.Context) { ... }, async)
Router Middleware
See router middleware, it's easy to implement middlewares yourself
import "github.com/lesismal/arpcex/extension/middleware/router"
var handler arpc.Handler
// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler
handler.Use(router.Recover())
handler.Use(router.Logger())
handler.Use(func(ctx *arpc.Context) { ... })
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
handler.Use(func(ctx *arpc.Context) { ... })
Coder Middleware
- Coder Middleware is used for converting a message data to your designed format, e.g encrypt/decrypt and compress/uncompress
import "github.com/lesismal/arpcex/extension/middleware/coder/gzip"
var handler arpc.Handler
// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler
handler.UseCoder(gzip.New())
handler.Handle("/echo", func(ctx *arpc.Context) { ... })
Client Call, CallAsync, Notify
- Call (Block, with timeout/context)
request := &Echo{...}
response := &Echo{}
timeout := time.Second*5
err := client.Call("/call/echo", request, response, timeout)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// err := client.CallWith(ctx, "/call/echo", request, response)
- CallAsync (Nonblock, with callback and timeout/context)
request := &Echo{...}
timeout := time.Second*5
err := client.CallAsync("/call/echo", request, func(ctx *arpc.Context) {
response := &Echo{}
ctx.Bind(response)
...
}, timeout)
- Notify (same as CallAsync with timeout/context, without callback)
data := &Notify{...}
client.Notify("/notify", data, time.Second)
// ctx, cancel := context.WithTimeout(context.Background(), time.Second)
// defer cancel()
// client.NotifyWith(ctx, "/notify", data)
Server Call, CallAsync, Notify
- Get client and keep it in your application
var client *arpc.Client
server.Handler.Handle("/route", func(ctx *arpc.Context) {
client = ctx.Client
// release client
client.OnDisconnected(func(c *arpc.Client){
client = nil
})
})
go func() {
for {
time.Sleep(time.Second)
if client != nil {
client.Call(...)
client.CallAsync(...)
client.Notify(...)
}
}
}()
- Then Call/CallAsync/Notify
Broadcast - Notify
var mux = sync.RWMutex{}
var clientMap = make(map[*arpc.Client]struct{})
func broadcast() {
var svr *arpc.Server = ...
msg := svr.NewMessage(arpc.CmdNotify, "/broadcast", fmt.Sprintf("broadcast msg %d", i))
mux.RLock()
for client := range clientMap {
client.PushMsg(msg, arpc.TimeZero)
}
mux.RUnlock()
}
Async Response
var handler arpc.Handler
// package
handler = arpc.DefaultHandler
// server
handler = server.Handler
// client
handler = client.Handler
asyncResponse := true // default is true, or set false
handler.Handle("/echo", func(ctx *arpc.Context) {
req := ...
err := ctx.Bind(req)
if err == nil {
ctx.Write(data)
}
}, asyncResponse)
Handle New Connection
// package
arpc.DefaultHandler.HandleConnected(func(c *arpc.Client) {
...
})
// server
svr := arpc.NewServer()
svr.Handler.HandleConnected(func(c *arpc.Client) {
...
})
// client
client, err := arpc.NewClient(...)
client.Handler.HandleConnected(func(c *arpc.Client) {
...
})
Handle Disconnected
// package
arpc.DefaultHandler.HandleDisconnected(func(c *arpc.Client) {
...
})
// server
svr := arpc.NewServer()
svr.Handler.HandleDisconnected(func(c *arpc.Client) {
...
})
// client
client, err := arpc.NewClient(...)
client.Handler.HandleDisconnected(func(c *arpc.Client) {
...
})
Handle Client's send queue overstock
// package
arpc.DefaultHandler.HandleOverstock(func(c *arpc.Client) {
...
})
// server
svr := arpc.NewServer()
svr.Handler.HandleOverstock(func(c *arpc.Client) {
...
})
// client
client, err := arpc.NewClient(...)
client.Handler.HandleOverstock(func(c *arpc.Client) {
...
})
