Tracing
Dubbo + Zipkin + Brave + Kafka实现全链路追踪
Install / Use
/learn @ZhuBaker/TracingREADME
Dubbo + Zipkin + Brave + Kafka 实现全链路追踪
分布式跟踪系统还有其他比较成熟的实现,例如:Naver的Pinpoint、Apache的HTrace、阿里的鹰眼Tracing、京东的Hydra、新浪的Watchman,美团点评的CAT,skywalking等。 本次主要利用Dubbo数据传播特性扩展Filter接口来实现链路追踪的目的
重点和难点主要是zipkin及brave使用及特性,当前brave版本为 5.2.0 为 2018年8月份发布的稳定版 , zipkin版本为2.2.1 所需JDK为1.8
快速启动zipkin
下载最新的zipkin并启动
wget -O zipkin.jar 'https://search.maven.org/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'
java -jar zipkin.jar
输入 http://localhost:9411/zipkin/
进入WebUI界面如下

处理项
- Dubbo sync async oneway 调用处理
- RPC异常处理
- 普通业务异常处理
测试项
- Dubbo sync async oneway 测试
- RPC异常测试
- 普通业务异常测试
- 并发测试
配置方式
POM依赖添加
<dependency>
<groupId>com.github.baker</groupId>
<artifactId>Tracing</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
变量默认值(不进行设置则按如下配置)
| Key | Default Value | Description | | - | :-: | - | | transport_type | http | 数据传输方式,支持 http/kafka 两种 | | zipkin_host | localhost:9411 | 传输目的地:<br/>传输方式为http时,为zipkin地址 . <br/> 传输方式为kafka时,为 kafka地址,broker之间以逗号间隔 . | | service_name | trace-default | 项目(节点)标识 | | kafka_topic | zipkin | 传输方式为kafka时的topic |
资源目录根路径下添加tracing.properties文件

一次调用信息
调用链
调用成功失败汇总

调用链:

核心源码解析
代码的初步版本:方便描述
import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.propagation.*;
import brave.sampler.Sampler;
import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.extension.Activate;
import com.alibaba.dubbo.common.json.JSON;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;
import com.alibaba.dubbo.remoting.exchange.ResponseCallback;
import com.alibaba.dubbo.rpc.*;
import com.alibaba.dubbo.rpc.protocol.dubbo.FutureAdapter;
import com.alibaba.dubbo.rpc.support.RpcUtils;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.reporter.AsyncReporter;
import zipkin2.reporter.Sender;
import zipkin2.reporter.okhttp3.OkHttpSender;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
/**
* Created with IntelliJ IDEA.
*
* @author: bakerZhu
* @description:
* @modifytime:
*/
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class TracingFilter implements Filter {
private static final Logger log = LoggerFactory.getLogger(TracingFilter.class);
private static Tracing tracing;
private static Tracer tracer;
private static TraceContext.Extractor<Map<String, String>> extractor;
private static TraceContext.Injector<Map<String, String>> injector;
static final Propagation.Getter<Map<String, String>, String> GETTER =
new Propagation.Getter<Map<String, String>, String>() {
@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
}
@Override
public String toString() {
return "Map::get";
}
};
static final Propagation.Setter<Map<String, String>, String> SETTER =
new Propagation.Setter<Map<String, String>, String>() {
@Override
public void put(Map<String, String> carrier, String key, String value) {
carrier.put(key, value);
}
@Override
public String toString() {
return "Map::set";
}
};
static {
// 1
Sender sender = OkHttpSender.create("http://localhost:9411/api/v2/spans");
// 2
AsyncReporter asyncReporter = AsyncReporter.builder(sender)
.closeTimeout(500, TimeUnit.MILLISECONDS)
.build(SpanBytesEncoder.JSON_V2);
// 3
tracing = Tracing.newBuilder()
.localServiceName("tracer-client")
.spanReporter(asyncReporter)
.sampler(Sampler.ALWAYS_SAMPLE)
.propagationFactory(ExtraFieldPropagation.newFactory(B3Propagation.FACTORY, "user-name"))
.build();
tracer = tracing.tracer();
// 4
// 4.1
extractor = tracing.propagation().extractor(GETTER);
// 4.2
injector = tracing.propagation().injector(SETTER);
}
public TracingFilter() {
}
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext rpcContext = RpcContext.getContext();
// 5
Span.Kind kind = rpcContext.isProviderSide() ? Span.Kind.SERVER : Span.Kind.CLIENT;
final Span span;
if (kind.equals(Span.Kind.CLIENT)) {
//6
span = tracer.nextSpan();
//7
injector.inject(span.context(), invocation.getAttachments());
} else {
//8
TraceContextOrSamplingFlags extracted = extractor.extract(invocation.getAttachments());
//9
span = extracted.context() != null ? tracer.joinSpan(extracted.context()) : tracer.nextSpan(extracted);
}
if (!span.isNoop()) {
span.kind(kind).start();
//10
String service = invoker.getInterface().getSimpleName();
String method = RpcUtils.getMethodName(invocation);
span.kind(kind);
span.name(service + "/" + method);
InetSocketAddress remoteAddress = rpcContext.getRemoteAddress();
span.remoteIpAndPort(
remoteAddress.getAddress() != null ? remoteAddress.getAddress().getHostAddress() : remoteAddress.getHostName(),remoteAddress.getPort());
}
boolean isOneway = false, deferFinish = false;
try (Tracer.SpanInScope scope = tracer.withSpanInScope(span)){
//11
collectArguments(invocation, span, kind);
Result result = invoker.invoke(invocation);
if (result.hasException()) {
onError(result.getException(), span);
}
// 12
isOneway = RpcUtils.isOneway(invoker.getUrl(), invocation);
// 13
Future<Object> future = rpcContext.getFuture();
if (future instanceof FutureAdapter) {
deferFinish = true;
((FutureAdapter) future).getFuture().setCallback(new FinishSpanCallback(span));// 14
}
return result;
} catch (Error | RuntimeException e) {
onError(e, span);
throw e;
} finally {
if (isOneway) { // 15
span.flush();
} else if (!deferFinish) { // 16
span.finish();
}
}
}
static void onError(Throwable error, Span span) {
span.error(error);
if (error instanceof RpcException) {
span.tag("dubbo.error_msg", RpcExceptionEnum.getMsgByCode(((RpcException) error).getCode()));
}
}
static void collectArguments(Invocation invocation, Span span, Span.Kind kind) {
if (kind == Span.Kind.CLIENT) {
StringBuilder fqcn = new StringBuilder();
Object[] args = invocation.getArguments();
if (args != null && args.length > 0) {
try {
fqcn.append(JSON.json(args));
} catch (IOException e) {
log.warn(e.getMessage(), e);
}
}
span.tag("args", fqcn.toString());
}
}
static final class FinishSpanCallback implements ResponseCallback {
final Span span;
FinishSpanCallback(Span span) {
this.span = span;
}
@Override
public void done(Object response) {
span.finish();
}
@Override
public void caught(Throwable exception) {
onError(exception, span);
span.finish();
}
}
// 17
private enum RpcExceptionEnum {
UNKNOWN_EXCEPTION(0, "unknown exception"),
NETWORK_EXCEPTION(1, "network exception"),
TIMEOUT_EXCEPTION(2, "timeout exception"),
BIZ_EXCEPTION(3, "biz exception"),
FORBIDDEN_EXCEPTION(4, "forbidden exception"),
SERIALIZATION_EXCEPTION(5, "serialization exception"),;
private int code;
private String msg;
RpcExceptionEnum(int code, String msg) {
this.code = code;
this.msg = msg;
}
public static String getMsgByCode(int code) {
for (RpcExceptionEnum error : RpcExceptionEnum.values()) {
if (code == error.code) {
return error.msg;
}
}
return null;
}
}
}
- 构建客户端发送工具
- 构建异步reporter
- 构建tracing上下文
- 初始化injector 和 Extractor [tab]4.1 extractor 指数据提取对象,用于在carrier中提取TraceContext相关信息或者采样标记信息到TraceContextOrSamplingFlags 中 -4.2 injector 用于将TraceContext中的各种数据注入到carrier中,其中carrier一半是指数据传输中的载体,类似于Dubbo中Invocation中的attachment(附件集合)
- 判断此次调用是作为服务端还是客户端
- rpc客户端调用会从ThreadLocal中获取parent的 TraceContext ,为新生成的Span指定traceId及 parentId如果没有parent traceContext 则生成的Span为 root span
- 将Span绑定的TraceContext中 属性信息 Copy 到 Invocation中达到远程参数传递的作用
- rpc服务提供端 , 从invocation中提取TraceContext相关信息及采样数据信息
- 生成span , 兼容初次服务端调用
- 记录接口信息及远程IP Port
- 将创建的Span 作为当前Span (可以通过Tracer.currentSpan 访问到它) 并设置查询范围
- oneway调用即只请求不接受结果
- 如果future不为空则为 async 调用 在回调中finish span
- 设置异步回调,回调代码执行span finish() .
- oneway调用 因为不需等待返回值 即没有 cr (Client Receive) 需手动flush()
- 同步调用 业务代码执行完毕后需手动finish()
- 设置枚举类 与 Dubbo中RpcException保持对应
整合Kafka
1.搭建Kafka运行环境 Scala 2.搭建并启动Kafka 3.启动zipkin:启动zipkin时 建议先看官方文档,鉴于kafka版本更新较快,zipkin连接kafka时不同版本之间的差异, 建议先看GitHub 后进行zipkin的启动 针对不同的kafka版本 zipkin的启动配置方式不同 , 基于Kafka 0.10.+ 以上版本重要配置做下说明:
| Attribute | Property | Description | | - | :-: | - | | KAFKA_BOOTSTRAP_SERVERS | bootstrap.servers | Comma-separated list of brokers, ex. 127.0.0.1:9092. No default | | KAFKA_GROUP_ID | group.id | The consumer group this process is consuming on behalf of. Defaults to zipkin | | KAFKA_TOPIC | N/A | Comma-separated list of topics that zipkin spans will be consumed from. Defaults to zipkin | | KAFKA_STREAMS | N/A | Count of threads consuming the topic. Defaults to 1 |
Overriding other properties
You may need to override other consumer properties than what zi
Related Skills
node-connect
334.9kDiagnose OpenClaw node connection and pairing failures for Android, iOS, and macOS companion apps
frontend-design
82.3kCreate distinctive, production-grade frontend interfaces with high design quality. Use this skill when the user asks to build web components, pages, or applications. Generates creative, polished code that avoids generic AI aesthetics.
openai-whisper-api
334.9kTranscribe audio via OpenAI Audio Transcriptions API (Whisper).
commit-push-pr
82.3kCommit, push, and open a PR
