一、背景
1.由来
通过上篇文章 juejin.cn/post/728005… 分析了跨进程传播链路数据的设计原理。
本篇讲解一下,如何通过SkyWalking提供的跨进程传播协议实现全链路传递业务数据。
2.需求场景
需求场景如:
1.希望将userId、机构Id、租户id等字段能全链路传递,这样方便根据这些字段进行搜索、数据分析。
2.希望借助SkyWalking的跨进程、跨线程传递的协议,来实现全链路压测、全链路灰度的特定标记的全链路传递。
二、如何实现
自定义数据的全链路传递,推荐借助于sw8协议里Correlation Header项来实现,上文也提到过,Correlation Header项支持跨进程、跨线程传递数据。
下面是其使用方式。
1.引入apm-toolkit-trace工具
引入apm-toolkit-trace maven依赖
org.apache.skywalking
apm-toolkit-trace
8.13.0
2.修改agent配置
这一步是可选的,如果希望将自定义的参数自动记录到tag中,则修改agent/config/agent.config配置文件。如果不需要记录到tag中,则可以不用修改下面的配置,对应的配置如下;
# 注意:auto_tag_keys各个key以逗号分割,且前后一定不要有空格!否则key名就会有空格
correlation.auto_tag_keys=${SW_CORRELATION_AUTO_TAG_KEYS:xxx,xxxx}
TIP:只会自动记录到链路中TraceSegment的第一个span的tag中。
3.使用TraceContext API
3.1 TraceContext API
通过SkyWalking的TraceContext API可以设置全局透传的数据,对应的API如下:
// org.apache.skywalking.apm.toolkit.trace.TraceContext
/**
* Put the custom key/value into trace context. 将自定义的K/V放入trace上下文
* @return previous value if it exists.
*/
public static Optional putCorrelation(String key, String value) {
return Optional.empty();
}
/**
* Try to get the custom value from trace context. 从trace上下文获取指定的自定义key所对应value
* @return custom data value.
*/
public static Optional getCorrelation(String key) {
return Optional.empty();
}
3.2 示例
假设有三个应用,他们调用关系是gateway-->app1-->app2,调用方式是通过http访问。我们做如下改造:
1.在gateway应用,将userId设置到trace context的correlation context,代码示例:
/** 请求处理的某个环节,将userId设置到context */
public Result handle() {
....
TraceContext.putCorrelation("sw8_userId", "123321");
....
}
其中"sw8_userId"是自定义的key。
2.同时我们也希望userId记录到span的tag中,所以按照前文所述,修改agent/config/agent.config配置文件,增加配置
# 注意:auto_tag_keys各个key以逗号分割,且前后一定不要有空格!否则key名就会有空格
correlation.auto_tag_keys=${SW_CORRELATION_AUTO_TAG_KEYS:sw8_userId}
整体的调用链路和自定义数据的传递如下图所示:
4.观察效果
通过SkyWalking控制台,我们可以看到完整的调用链路,如下图:
在span的详情里,也可以看到tags中包含了sw8_userId。
5.扩展
如果希望支持以sw8_userId作为链路的搜索条件,则可以修改/oapServer/skywalking-server-xxx/config/application.yml文件里如下配置:
core:
selector: ${SW_CORE:default}
default:
# 可搜索tag里增加sw8_userId
searchableTracesTags: ${SW_SEARCHABLE_TAG_KEYS:http.method,http.status_code,rpc.status_code,db.type,db.instance,mq.queue,mq.topic,mq.broker,sw8_userId}
这样就可以在控制台上以sw8_userId做为链路搜索条件,如下图:
三、原理分析
1.疑惑
我们用到了TraceContext.putCorrelation(key, value),但是,方法里面只有return Optional.empty()
,并没有设置context数据。
那么SkyWalking Agent是如何实现设置context数据的?
// org.apache.skywalking.apm.toolkit.trace.TraceContext
/**
* Put the custom key/value into trace context. 将自定义的K/V放入trace上下文
* @return previous value if it exists.
*/
public static Optional putCorrelation(String key, String value) {
return Optional.empty();
}
2.源码分析
你应该猜到了,SkyWalking Agent是基于Java agent的字节码增强技术实现各种插件,那TraceContext.putCorrelation(key, value)这块肯定是用到了字节码增强技术。
关于Java agent的内容可以看一下我之前的文章:JVM系列-Java agent超详细知识梳理
TraceContext.putCorrelation 的字节码增强对应的插桩类是apm-sniffer/apm-toolkit-activation/apm-toolkit-trace-activation/
模块下的TraceContextActivation.java,对应代码:
public static final String ENHANCE_CLASS = "org.apache.skywalking.apm.toolkit.trace.TraceContext";
public static final String ENHANCE_PUT_CORRELATION_METHOD = "putCorrelation";
public static final String INTERCEPT_PUT_CORRELATION_CLASS = "org.apache.skywalking.apm.toolkit.activation.trace.CorrelationContextPutInterceptor";
/** 增强TraceContext类 */
protected ClassMatch enhanceClass() {
return NameMatch.byName(ENHANCE_CLASS);
}
/** 拦截putCorrelation方法,对应的Interceptor是CorrelationContextPutInterceptor */
new StaticMethodsInterceptPoint() {
@Override
public ElementMatcher getMethodsMatcher() {
return named(ENHANCE_PUT_CORRELATION_METHOD);
}
@Override
public String getMethodsInterceptor() {
return INTERCEPT_PUT_CORRELATION_CLASS;
}
@Override
public boolean isOverrideArgs() {
return false;
}
}
};
从上面代码可知putCorrelation方法对应的方法增强类Interceptor是CorrelationContextPutInterceptor,对应源码:
public class CorrelationContextPutInterceptor implements StaticMethodsAroundInterceptor {
@Override
public void beforeMethod(Class clazz, Method method, Object[] allArguments, Class[] parameterTypes, MethodInterceptResult result) {
final String key = (String) allArguments[0];
final String value = (String) allArguments[1];
// 设置K/V到trace context
final Optional previous = ContextManager.getCorrelationContext().put(key, value);
result.defineReturnValue(previous);
}
}
核心代码就是ContextManager.getCorrelationContext().put(key, value)
,设置K/V到trace context。
设置到trace context后,就可以一直跨进程、跨线程传递了。
四、总结
上文讲述了如何借助apm-toolkit-trace API,实现通过sw8协议Correlation Header项全链路传递自定义数据。
并通过源码分析了Agent通过字节码增强技术,补充TraceContext.putCorrelation设置K/V到trace context的核心代码。
全链路压测、全链路灰度的特定标记的全链路传递也可以借助sw8协议实现,但本人未在工作中实际实践过,如果大家有相关经验,可以一起分享。