Easyrpc
实现远程过程调用(RPC)
Install / Use
/learn @gofish2020/EasyrpcREADME
适合新手的简单Golang项目:实现自己的RPC框架
设计并实现了自己的easyrpc供大家了解和学习,源代码已托管至Github: https://github.com/gofish2020/easyrpc
通过本项目你可以学到什么?
- 服务优雅停止
- 沾包的处理
- 网络通信协议格式制定
- 客户端网络连接复用(并发调用)
- 序列化和反序列化
- 数据压缩和解压缩
- 代码封装
- 熟悉网络API、Golang的基础数据结构
RPC简介
RPC ( Remote Procedure Call ) 远程过程调用,用于不在同一个机器上的两个应用程序,互相调用对方的方法,就像调用本地一样简单。

- 在机器
B启动服务端server程序,并注册本地方法 - 在机器
A启动客户端client程序,调用客户端本地方法 - 客户端方法内部对请求参数进行统一编码,协议格式包括【请求头】【请求体】
- 通过网络接口将数据包
write到服务端 - 服务端接收到数据包后,拆解协议数据包,提取其中的:对象名,方法名,请求参数(payload),利用对象名和方法名,找到服务端注册的实际方法,调用该方法并传入方法参数payload
- 方法执行完成后,将执行结果按照协议格式进行编码,然后发送回客户端,客户端解码数据,将最终的结果返回给
client调用方
代码结构
Go总代码量 1127 行,核心代码也就是700多行,包括rpcclient rpcmsg rpcserver 3 个文件夹

codec序列化和反序列化compress对数据进行压缩和解压缩example客户端和服务端测试代码(先不用关心)rpcclient客户端核心代码,包括本地方法存根定义、网络请求发送/接收、本地方法并发调用 功能实现rpcmsg网络传输协议格式定义,沾包处理,协议的封包和拆包,核心结构体RPCMsgrpcserver网络监听、服务端本地方法注册、客户端连接并行处理、服务端优雅停止
代码图解
协议格式 (rpcmsg)
一个函数由三个部分组成函数名、函数入参、函数出参。

要想通过RPC调用其他服务的方法,肯定需要将 函数名(我要调哪个函数)以及入参(函数的参数是什么),通过网络传递给被调用方,调用方执行完对应的函数以后,将结果即出参再通过网络传递回来,完成一次完整的RPC过程。
协议格式就是定义如何组织这些数据,被调用方再通过相反的方式解析出这些数据即可。
格式如下:
消息头 5 字节

-
magicNumber :用于内部校验使用
-
协议版本 : 用于迭代升级
-
消息类型: 0 表示请求 request 1 表示响应 response
-
压缩类型:避免数据包过大,对数据包进行压缩
0 不压缩
1 Zlib压缩
2 Snappy压缩
3 Lz4 压缩
-
序列化类型:对入参进行序列化和反序列化 0 使用Gob进行序列化 1 使用Json进行序列化
消息体 不定长

- 消息编号:为了客户端复用一个连接发送多个请求数据包,当返回响应数包的时候,知道该响应数据包归属于哪个请求。
- 请求体总长度:为了解决沾包问题,增加的字段,可以知道后序待读取数据长度
- 对象名长度:为了读取对象名
- 对象名:告知服务端我要调用哪个对象
- 方法名长度:为了读取方法名
- 方法名:告知服务端我要调用哪个方法
- 入参长度:为了读取入参
- 入参Payload:被调用方法的参数,这里的参数是被序列化后的字节流
rpcmsg代码解析
核心的数据结构
RPCMsg
// RPCMsg: 一个完整的数据包 header + body
type RPCMsg struct {
Header // 数据头
Seq int64 //请求编号
// uint32 表示长度
ObjectName string
// uint32 表示长度
MethodName string
// uint32 表示长度
Payload []byte
}
// ********数据包头格式: 【魔法数 协议版本 消息类型 压缩类型 序列化类型】*******
type Header [HEADER_LEN]byte // 就是一个5个字节的固定数组
RPCMsg中没有显示定义上面协议图中画的长度字段。因为这些字段只用在沾包处理中,外部不需要显示使用。所以没有在结构体中定义,在发送数据包的时候会额外补上,请往下看即可知道。
构造RPCMsg对象
RPCMsg可以看成就是一个完整的数据包;通过网络请求发送RPCMsg或者通过网络请求接收数据保存到RPCMsg中,设计模式中单一职责模式的应用。
func NewRPCMsg() *RPCMsg {
rpcMsg := RPCMsg{
Header: NewHeader(),
}
rpcMsg.Header[0] = magicNumber
return &rpcMsg
}
发送 RPCMsg 数据包 :就是把整个结构体发送出去了
注意看在发送数据包的时候,有额外发送长度字段
还有就是大端的处理,记住即可,属于常识
// SendMsg 发送消息
func (t *RPCMsg) SendMsg(w io.Writer) error {
var err error
//******
_, err = w.Write(t.Header[:]) // 1.发送header头 5字节
if err != nil {
return err
}
err = binary.Write(w, binary.BigEndian, uint64(t.Seq)) // 8字节
if err != nil {
return err
}
//******
totalLen := DATA_LEN + uint32(len(t.ObjectName)) + DATA_LEN + uint32(len(t.MethodName)) + DATA_LEN + uint32(len(t.Payload))
err = binary.Write(w, binary.BigEndian, uint32(totalLen)) // 2.写入总长度 4字节
if err != nil {
return err
}
//******
err = binary.Write(w, binary.BigEndian, uint32(len(t.ObjectName))) // 3.写入 ObjectName 长度
if err != nil {
return err
}
_, err = w.Write(utils.String2Bytes(t.ObjectName)) // 4.写入 ObjectName
if err != nil {
return err
}
//******
err = binary.Write(w, binary.BigEndian, uint32(len(t.MethodName))) // 5.写入 MethodName 长度
if err != nil {
return err
}
_, err = w.Write(utils.String2Bytes(t.MethodName)) // 6.写入 MethodName
if err != nil {
return err
}
//******
err = binary.Write(w, binary.BigEndian, uint32(len(t.Payload))) // 7.写入 Payload 长度
if err != nil {
return err
}
_, err = w.Write(t.Payload) // 8.写入 Payload
return err
}
从网络接收数据到 RPCMsg结构体中
// RecvMsg 接收消息
func (t *RPCMsg) RecvMsg(r io.Reader) error {
var err error
//1. 读取header数据
_, err = io.ReadFull(r, t.Header[:])
if err != nil {
return err
}
if !t.Header.CheckMagicNumber() {
return fmt.Errorf("magic number error: %v", t.Header[0])
}
seqByte := make([]byte, 8)
_, err = io.ReadFull(r, seqByte)
if err != nil {
return err
}
t.Seq = int64(binary.BigEndian.Uint64(seqByte))
//2. 读取总长度
totalByte := make([]byte, 4)
_, err = io.ReadFull(r, totalByte)
if err != nil {
return err
}
totalLen := binary.BigEndian.Uint32(totalByte)
//3. 读取全部数据
data := make([]byte, totalLen)
_, err = io.ReadFull(r, data)
if err != nil {
return err
}
left, right := uint32(0), DATA_LEN
//4. 获取ObjectName
objectNameLen := binary.BigEndian.Uint32(data[left:right])
left = right
right = left + objectNameLen
t.ObjectName = utils.Bytes2String(data[left:right])
//5 .获取 MethodName
left = right
right = left + DATA_LEN
methodNameLen := binary.BigEndian.Uint32(data[left:right])
left = right
right = left + methodNameLen
t.MethodName = utils.Bytes2String(data[left:right])
// 6. 获取 Payload
left = right
right = left + DATA_LEN
payLoadLen := binary.BigEndian.Uint32(data[left:right])
left = right
right = left + payLoadLen
t.Payload = data[left:right]
return err
}
服务端代码流程(rpcserver)
服务启动 -> 注册本地方法 -> 监听新连接 -> 一个连接
conn启动goroutine处理 -> 调用rpcmsg package从conn读取一个RPCMsg-> 根据objectName和methodName查找本地方法并调用 -> 再通过conn将处理结果返回给客户端

服务调用 example/server/main.go
func main() {
//.....
server := rpcserver.NewRPCServer(option)
server.RegisterByName("User", &user.UserService{})
server.Run()
//.....
}
服务启动 rpcserver/server.go
type RPCServer struct {
listener Listener
option Option
}
// 注册本地方法
func (server *RPCServer) RegisterByName(objectName string, obj interface{}) {
server.listener.SetHandler(objectName, &RPCHandler{object: reflect.ValueOf(obj)})
}
// 服务启动
func (server *RPCServer) Run() {
go server.listener.Run()
}
服务监听
// 服务启动
func (listen *RPCListener) Run() {
addr := fmt.Sprintf("%s:%d", listen.Ip, listen.Port)
// 绑定地址
l, err := net.Listen("tcp", addr)
if err != nil {
panic(err)
}
listen.l = l
log.Printf("server listen on %s\n", addr)
// 监听处理
go listen.acceptConn()
}
// 监听处理
func (listen *RPCListener) acceptConn() {
for {
conn, err := listen.l.Accept()
if err != nil {
select {
case <-listen.closechan:
return
default:
}
if e, ok := err.(net.Error); ok && e.Timeout() {
time.Sleep(2 * time.Microsecond)
continue
}
log.Printf("accept() err:%+v\n", err)
return
}
// 启动一个goroutine 处理conn
go listen.handleConn(conn)
}
}
一个连接
conn启动一个goroutine处理
// 启动一个goroutine 处理conn
func (listen *RPCListener) handleConn(conn net.Conn) {
// 如果服务正在关闭中...新连接进来自动关闭
if listen.isShutDonw() {
conn.Close()
return
}
log.Printf("new client connection come in %s\n", conn.RemoteAddr().String())
// 避免 panic
defer func() {
if err := recover(); err != nil {
log.Printf("addr %s panic err :%+v\n", conn.RemoteAddr().String(), err)
}
conn.Close()
}()
// 记录处理中的连接个数,用于优雅关闭
atomic.AddInt32(&listen.running, 1)
defer func() {
atomic.AddInt32(&listen.running, -1)
}()
// 服务度是否关闭
for !listen.isShutDonw() {
// 读超时时间
// if listen.option.ReadTimeout != 0 {
// conn.SetReadDeadline(time.Now().Add(listen.option.ReadTimeout))
// }
// 从连接中接收一个完整的数据包
msg, err := rpcmsg.RecvFrom(conn)
if err != nil {
log.Printf("receive msg error:%+v\n", err)
return
}
startTime := time.Now()
// 压缩器
compressor := rpcmsg.Compressor[msg.Header.CompressType()]
payload, err := compressor.UnCompress(msg.Payload)
if err != nil {
log.Printf("uncompress msg error; %+v\n", err)
return
}
// 序列化器
codeTool := rpcmsg.Codecs[msg.Header.SerializeType()]
// 入参解码
argsIn := make([]interface{}, 0)
err = codeTool.Decode(payload, &argsIn)
if err != nil {
log.Printf("decode msg error; %+v\n", err)
return
}
// 并行读 Handlers是安全的
handler, ok := listen.Handlers[msg.ObjectName]
if !ok {
log.Printf("%s is't registered!\n", msg.ObjectName)
return
}
// 执行对象的具体方法
result, err := handler.Handle(msg.MethodName, argsIn)
if err != nil {
log.Printf("%s.%s func exec error(可忽略错误)\n", msg.ObjectName, msg.MethodName)
}
// 编码结果
encodeRes, err := codeTool.Encode(result)
if err != nil {
log.Printf("encode msg error:%+v\n", err)
return
}
// 压缩结果
compressRes, err := compressor.Compress(encodeRes)
if err != nil {
log.Printf("compress msg error:%+v\n", err)
return
}
// 写超时时间
if listen.option.WriteTimeout != 0 {
conn.SetWriteDeadline(time.Now().Add(listen.option.WriteTimeout))
}
config := rpcmsg.RPCMsgConfig{
MsgTypeConf: rpcmsg.Response,
CompressTypeConf: msg.CompressType(),
SerializeTypeConf: msg.SerializeType(),
VersionConf: msg.Version(),
Seq: msg.Seq,
ObjectName: "",
MethodName: "",
}
// 将结果返回给客户端
err = rpcmsg.SendTo(conn, compressRes, config)
if err != nil {
log.Printf("send msg error:%+v\n", err)
return
}
log.Printf("%s.%s total runtime %d ms\n", msg.ObjectName, msg.MethodName, time.Since(startTime).Milliseconds())
}
}
服务停止 -> 设置关闭标识
shutdown-> 自旋等待running为0 -> 服务成功停止
func main(){
//.....
// 优雅关闭服务(阻塞中..)
server.Shutdown()
//.....
}
func (server *RPCServer) Shutdown() {
if server.listener != nil {
server.listener.Shutdown()
}
}
func (listen *RPCListener) Shutdown() {
// 设置关闭标识
atomic.CompareAndSwapInt32(&listen.shutdown, 0, 1)
// 关闭监听
listen.closeChan()
if listen.l != nil {
listen.l.Close()
}
// 说明还有连接在处理(自旋锁)
for atomic.LoadInt32(&listen.running) != 0 {
//log.Printf("还有 %d task running\n", listen.running)
}
log.Printf("server shutdown success!!!\n")
}
客户端代码流程 (rpcclient)
Connect建立连接 -> reflect.MakeFunc 对定义的函数指针生成存根 -> 调用函数(并发调用)发送数据包 -> 接收结果数据
func main() {
client := rpcclient.NewRPCClient(option)
// 建立连接
err := client.Connect("127.0.0.1:6060")
if err != nil {
log.Println(err)
return
}
// 定义Stub存根函数
v
Related Skills
node-connect
343.1kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
90.0kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
343.1kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
qqbot-media
343.1kQQBot 富媒体收发能力。使用 <qqmedia> 标签,系统根据文件扩展名自动识别类型(图片/语音/视频/文件)。
