TinyRpcByGo
🏠参考net/rpc实现的rpc框架,并在此基础上,新增了协议交换(protocol exchange)、注册中心(registry)、服务发现(service discovery)、负载均衡(load balance)、超时处理(timeout processing)等特性
Install / Use
/learn @wwwwwwwwt/TinyRpcByGoREADME
基于Go的简易rpc框架🚀
仿照Go 语言官方的标准库 net/rpc,进行开发,并在此基础上,新增了协议交换、注册中心、服务发现、负载均衡、超时处理等特性。
主要特点:
- 🔨: 编解码部分基于 Json、Gob 格式
- 🎯: 负载均衡采用了客户端负载均衡策略,实现了随机、轮询两种算法(finish)
- ⏰: 使用 time.After 和 select-chan 机制为客户端连接、服务端处理添加了超时处理机制 (finish)
- ☁: 实现了简易的注册中心和心跳机制,同时支持了Etcd作为注册中心和服务发现。
客户端
客户端的创建
-
请求进来,到达负载均衡客户端;
-
负载均衡客户端调用服务发现模块获取一个服务的[ip:port],服务发现模块通过自己的负载均衡策略给负载均衡客户端返回一个[ip:port],负载均衡客户端通过这个[ip:port]从自身维护的map中取出与之对应的通信客户端。
-
判断通信客户端是否关闭或者为空,如果是,则移除并重新创建通信客户端。创建通信客户端的方法是支持多协议的,通过 ip 上的协议名称,传入不同的客户端创建策略进行创建。具体来说,创建一个通信客户端分为以下几步:
- 调用库函数 net.DialTimeout 得到一个conn
- 启用一个新协程,使用conn去调用客户端创建策略方法,并给这个协程设置超时(select chan + time after)
- 客户端创建策略方法会根据自身的协议去创建一个客户端,如果再超时时间内会把这个客户端通过channel返回给父协程。客户端的创建策略目前有两种:
- HTTP 客户端策略:会与服务端进行一次http的 CONNECT 通信,使得这个 conn 能够被服务端劫持,接着再去创建一个 rpc 客户端。从而使得这个连接既可以传输 HTTP 请求,又可以传输我们自定义的 rpc 消息格式;
- rpc 客户端策略:使用Json格式的数据与服务端进行两次握手,从而与服务器协商用户自定好的编解码器,并且启动一个接收协程。
-
创建好的通信客户端会被返回给负载均衡客户端,用对应[ip:port]存起来,以便复用。然后使用这个通信客户端进行rpc调用
客户端调用
- 通信客户端会将这次的调用请求封装成 call,call里面包含了唯一标识(seq)、请求的方法、参数、返回值以及一个chan,并且使用唯一标识,在自己的map中维护这个call
- 之后,会将call中的具体的参数封装成协议要求的格式(header + body:header 包括 seq、服务方法名、错误;body为参数),通过编码器写给服务端。这个过程通过 call 中的 chan 实现了异步调用,发送完消息之后,直接返回这个 call。
- 一次调用的过程:
- 注册:将结构体实例通过Register传进服务端,开一个新服务实例,这个实例的map中保存着这个结构体(服务)的所有方法method,再将结构体名字string和对应着的服务实例注册到一个服务端map中。
- 服务端调用方法:将结构体实例传进服务端,服务端通过解码,得到消息头和消息体,消息头包括服务的名字(string,形如T.Method)和序号等,消息体包括参数Call结构体,内部包括序号,服务名,入参和返回类型(interface{})等。通过对请求头的解析得到服务的名字,再通过.的位置分隔出服务实例的名字string和方法string, 从服务端的map中取出对应名字的实例,和从实例中取出对应的方法method。 然后对请求体解析得到入参的结构体,使用反射的Call调用方法得到replyv。最后通过gob编码encode 进buf 再 flush 发送回客户端
- 客户端接收消息:如下
客户端接收结果
- 在创建通信客户端的时候,除了与服务端协商编码,还会启动一个接收协程。
- 接收协程死循环接收服务端的响应。
- 接收协程接在收到服务返回的消息后,会将返回结果解析成 call,call 中的返回值字段即用来承载服务端的处理结果。收到消息后,会给这个call 中的 chan 字段写入“处理完成”的消息,即通知持有这个 call 的用户,结果已经接收到了,可以进行下一步操作了。处理完成之后,会从通信客户端中移除这个call(因为它已经完成了它的使命)
- 在这个循环过程中,如果发现了消息解析错误或者网络io错误,会跳出这个循环,然后中断这个通信客户端中所有 call,告诉它们有错误发生(因为前面的解析或者网络错误,会导致后面的所有 call 都出现问题,那么没必要再等服务器的消息了)
服务端
接收连接
- 在主协程中死循环 Accept,每收到一个连接,会启动一个新的协程去处理这个连接
- 一个新的连接进来,要建立可用的rpc通信,需要进行编码协商(这也是新建通信客户端时候所做的事)。那么服务端在连接处理协程中,就需要先处理这个事情,然后才能rpc通信。具体分为以下几个步骤:
- 解析编码协商报文
- 判断魔术
- 根据编码协商报文中的编码字段创建响应编解码器
- 响应客户端,通知协商成功(客户端接收到成功响应之后,才能发送请求,防止粘包)
处理连接
- 编码协商成功后,连接处理协程会进入死循环接收该连接中的报文(因为一次连接中可以有多次调用(header+body),那么需要尽力而为)
- 因为一个连接可能有多次调用,而考虑到这些调用是可以并发进行,连接处理协程在解析一个header+body对之后,就会启动一个请求处理协程,去处理这个请求,执行相应的方法。并且如果某一次读取解析header+body出现了错误,那么就需要跳出循环,关闭这个连接,而此时前面还有header+body的调用正在请求处理协程中进行处理,所以不能立即关闭连接,那么就引入了 WaitGroup 等待所有请求处理协程处理完毕之后再关闭连接
- 执行一个方法的时间可长可短,在请求处理协程中,也采用了(协程调用 + select chan + time after 的超时机制)
- 因为调用完后,写给客户端的返回值也是header+body,那么就需要保证在这个连接中header+body是成对写入的,不能出现交织,所以需要用锁控制header+body写回的原子性。
服务
- 因为客户端的请求是 service.method,所以我们分别定义了 service 结构体和 methodType 结构体分别用来保存“一个用来提供服务的结构体”和“其方法被调用所需要“的各项信息
- service 对象代表着一个提供服务的对象,methodType 对象代表着这个提供服务的对象的一个方法。所以 service 以 map 持有多个 methodType。
- 提供了将一个结构体对象转变成 service对象,其中的合规方法转变成 methodType 的函数。(合规的方法的签名只能有两个入参,前一个代表参数,后一个代表返回值,和一个错误返回值)
- 将 service 集成进 Server,使得 Server能够注册 service,持有多个 service,并且在处理请求时,能够调用到对应 service 的 method 上
- 通过
method.Func.Call([]reflect.Value{s.rcvr, argv, replyv})调用对象的方法
注册中心
简易注册中心
- 这个注册中心维护了一个 [服务器地址 -> 服务器地址 + 该服务上一次的心跳时间] 的 map,并且通过实现 http.Handler 接口,对外提供 Http 服务,这样每个服务器可以通过 POST 请求发送心跳、服务发现模块通过 GET 请求拉取所有可用服务器的地址。
- 注册中心在响应服务发现模块的GET请求时,会遍历一遍自己维护的服务列表,剔除掉已经超时的服务,然后通过 HTTP 自定义头,返回所有存活的服务地址。
- 此外,还暴露了对外的 Heartbeat 函数,使得服务可以使用该函数向指定注册中心发送指定服务的心跳
Etcd注册中心
<center>
- 使用时需要开另一个终端 并打开一个ectd节点,默认监听2379端口。
代码逻辑
- 利用etcd暴露的clientv3接口,创建client对象,为其绑定监听地址和超时时间。
func NewEtcdClient(addr []string, timeout time.Duration) *EtcdClient {
if timeout == 0 {
timeout = defaultTimeout - time.Duration(1)*time.Minute
}
client, err := clientv3.New(clientv3.Config{
Endpoints: addr,
DialTimeout: timeout,
})
if err != nil {
log.Printf("rpc etcd: cannot connect to %s: err: %s", addr, err)
return nil
}
return &EtcdClient{client: client, timeout: timeout}
}
- key为监听地址,value为监听地址具体的服务,利用租约将其绑定给我们之前创建的client对象,并无限期定时发送心跳给key续约,这里如果做的更精细的话可以加一些错误判断,然后利用上下文ctx来取消,或使用keepaliveonce,手动定时发心跳
//用于创建租约,
func (e *EtcdClient) Put(key, value string) error {
//获取租约对象
lease := clientv3.NewLease(e.client)
//创建超时租约
leaseGrantResponse, err := lease.Grant(context.Background(), int64(e.timeout/time.Second))
if err != nil {
return err
}
//将租约绑定到kv对象中去
_, err = e.client.Put(context.TODO(), key, value, clientv3.WithLease(leaseGrantResponse.ID))
if err != nil {
return err
}
//利用心跳给key 续租
keepAlive, err := lease.KeepAlive(context.TODO(), leaseGrantResponse.ID)
// 消耗续约服务端返回的消息
go leaseKeepAlive(keepAlive)
return nil
}
服务发现
简易服务发现模块
- 简易服务发送模块,内部维护了从注册中心全量拉取的服务器地址
- 并且在每次 Get 服务器地址时,会使用 Refresh 方法根据设定好的超时时间,判断是否要去注册中心全量拉取一次
- 更新完注册中心地址后,会通过设定好的负载均衡算法,从服务器地址列表中返回一个选中的服务器地址给通信客户端
Etcd服务发现模块
- 客户端与etcd 2379节点进行交互的模块,利用clientv3提供的Get方法得到健康的服务器节点地址,watch可以获得我们监控目录下的变动,如在我们监控的目录下新增一个服务,原理是,etcd会创建一个watcher对象,watcher对象和我们监控的服务端保持一个长连接,并在键的更改时通知,将现在观察到的版本号并比较自己观察的起始点版本号做对比, 当我们监控的目录改变时,通知我们改变的键和其val。
//watcher
func (e *EtcdRegistryDiscory) watchProviders(ctx context.Context) {
watchChan := clientv3.NewWatcher(e.client).Watch(context.TODO(), config.EtcdProviderPath, clientv3.WithPrefix())
select {
case <-watchChan:
for _ = range watchChan {
// 这里可以做得更精细,因为 etcd 会给出变化的 key,我们权且简单处理
// 结点产生了变化,就从服务器拉取
}
e.refreshFromEtcd()
case <-ctx.Done():
}
}
//获得现在最新的服务端地址
func (e *EtcdRegistryDiscory) refreshFromEtcd() error {
resp, err := e.client.Get(context.Background(), config.EtcdProviderPath, clientv3.WithPrefix())
if err != nil {
log.Println("rpc&&etcd discovery: refresh err:", err)
return err
}
e.servers = make([]string, 0, resp.Count)
for i, _ := range resp.Kvs {
e.servers = append(e.servers, string(resp.Kvs[i].Value))
}
e.lastUpdate = time.Now()
return nil
}
**************************************************************************************************************
以下为开发时遇到的问题:
- bug1:客户端一开始发送json格式Option时
/*
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
在一次连接中,Option固定在报文最前面header和body可能会有多个
| Option | Header1 | Body1 | Header2 | Body2 | ...
*/
客户端发送:
_ = json.NewEncoder(conn).Encode(tinyrpc.DefaultOption)
cc := codec.NewGobCodec(conn)
服务端解码
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
log.Println("rpv server: options error", err)
return
}
由于没有确定边界,当缓冲区有多条消息堆积时,可能json把Header的内容读出,导致Header内容缺失,形成阻塞
- server端解析Option的时候可能会破坏后面RPC消息的完整性,当客户端消息发送过快服务端消息积压时(例:Option|Header|Body|Header|Body),服务端使用json解析Option,json.Decode()调用conn.read()读取数据到内部的缓冲区(例:Option|Header),此时后续的RPC消息就不完整了(Body|Header|Body)。
目前代码中客户端简单的使用time.sleep()方式隔离协议交换阶段与RPC消息阶段,减少这种问题发生的可能。(fix bug)
(new)通过客户端和服务端两次握手,确保服务端将客户端发来的option字段被处理完,客户端再发送| Header{ServiceMethod ...} | Body interface{} | 防止粘包
- Call的设计
- 一个典型的函数远程调用形式
func (t *T)MethodName(argType *T1, replyType *T2)error
把所有需要的信息封装进Call
type Call struct {
Seq uint64 // 消息的序号, 不断增长
ServiceMethod string //服务端注册过的方法
Args interface{} // 参数
Reply interface{} // 返回的结果
Error error
Done chan *Call // 为了可以异步调用,定义管道,当调用结束后通知调用方,
}
-
关于客户端接口Call与Go
Call 是同步接口,具体使用再main中有所展现,会等待执行结果返回后再进行执行
Go 为异步接口,具体使用场景如下
//--------------------------
//Call
go func(i int) {
defer wg.Done()
args := fmt.Sprintf("geerpc req %d", i)
var reply string
if err := client.Call("Foo.Sum", args, &reply); err != nil {
log.Fatal("call Foo.Sum error:", err)
}
log.Println("reply:", reply)
}(i)
//--------------------------
//Go
call := client.Go( ... )
//新启动协程,异步等待
go func(call *Call) {
select {
<-call.Done:
# do something
<-otherChan:
# do something
}
}(call)
otherFunc() // 不阻塞,继续执行其他函数。
- 通过反射实现结构体与服务的映射关系
- RPC 框架的一个基础能力是:像调用本地程序一样调用远程服务。那如何将程序映射为服务呢?那么对 Go 来说,这个问题就变成了如何将结构体的方法映射为服务。
- 对 net/rpc 而言,一个函数需要能够被远程调用,需要满足如下五个条件:
- the method’s type is exported. – 方法所属类型是导出的。
- the method is exported. – 方式是导出的
- the method has two arguments, both exported (or builtin) types. – 两个入参,均为导出或内置类型。
- the method’s second argument is a pointer. – 第二个入参必须是一个指针。
- the method has return type error. – 返回值为 error
func (t *T) MethodName(argType T1, replyType *T2) error
通过反射,可以获取某个结构体所有的方法,并且通过方法可以知道其所有的参数类型和返回值,具体示例如下:
func main() {
var wg sync.WaitGroup
typ := reflect.TypeOf(&wg)
for i := 0; i < typ.NumMethod(); i++ {
method := typ.Method(i)
argv := make([]string, 0, method.Type.NumIn())
returns := make([]string, 0, method.Type.NumOut())
// j 从 1 开始,第 0 个入参是 wg 自己。
for j := 1; j < method.Type.NumIn(); j++ {
argv = append(argv, method.Type.In(j).Name())
}
for j := 0; j < method.Type.NumOut(); j++ {
returns = append(returns, method.Type.Out(j).Name())
}
log.Printf("func (w *%s) %s(%s) %s",
typ.Elem().Name(),
method.Name,
strings.Join(argv, ","),
strings.Join(returns, ","))
}
}
运行的结果是:
func (w *WaitGroup) Add(int)
func (w *WaitGroup) Done()
func (w *WaitGroup) Wait()
- 设定定时器超时时,造成了管道阻塞内存泄漏
当主线程超时时,无缓冲管道内数据无法被拿走,导致阻塞内存泄漏
/*go func() {
client, err := f(conn, opt) // 若主线程超时结束了,这个ch中的数据没被拿走将被阻塞,造成内存泄漏
ch <- clientResult{client: client, err: err}
}()*/
// ch <- cs
// 这里会有内存泄露的隐患:超时之后,由于没有 <-ch,这个子协程会阻塞在 ch <- cs
// 有两种解决方案:
// 1. 把 ch 改成缓冲形式的 channel:ch := make(chan clientResult, 1)
// 2. 使用 select + default,结果不能放入 ch 的话,就走 default
// 样例代码 :
// // ch := make(chan struct{}, 1)
// ch := make(chan struct{})
// timeout := time.Secon
