前言
在项目中使用的是skywalking做为分布式链路跟踪,但是有个问题就是只能根据追踪tid去查询,而项目上日志又未输出tid。
不过对于我们而言,首要目的就是 Trouble Shooting,出了问题需要快速定位异常出现在什么服务,整个请求的链路是怎样的。其实很简单我们只需要跟项目中的框架结合(MQ 线程池 分布式调用 分布式Job)。
在一个请求进来的时候,在 Gateway生成唯一的TraceId,以及skywalking的tid往后传递。
使用方法
<!-- Skywalking tid-->
<dependency>
<groupId>org.apache.skywalking</groupId>
<artifactId>apm-toolkit-logback-1.x</artifactId>
<version>8.9.0</version>
</dependency>
<!-- feign-->
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-core</artifactId>
<version>10.0.1</version>
</dependency>
<!-- MQ-->
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
RabbitMQ
- 发消息前将MDC放入header中
public class MdcMessagePostProcess implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
for (Map.Entry<String, String> m : mdcContainer.entrySet()) {
message.getMessageProperties().setHeader(m.getKey(),m.getValue());
}
return message;
}
}
- MQ消息消费取出traceId
public class MdcReceivePostProcessors implements MessagePostProcessor {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
Map<String, Object> mdc= message.getMessageProperties().getHeaders();
Map<String, String> mdcString= new HashMap<>();
for (Map.Entry<String, Object> m : mdc.entrySet()) {
mdcString.put(m.getKey(),m.getValue().toString());
}
mdcString.put(TRACE_ID_HEADER_KEY, String.valueOf(mdc.get(TRACE_ID_HEADER_KEY)));
MDC.setContextMap(mdcString);
return message;
}
}
//实例化RabbitTemplate
@Bean
public RabbitTemplate reforeRabbitTemplate(ConnectionFactory connectionFactory)
{
RabbitTemplate rabbitTemplate =new RabbitTemplate(connectionFactory);
rabbitTemplate.setBeforePublishPostProcessors(new MdcMessagePostProcess());
rabbitTemplate.setAfterReceivePostProcessors(new MdcReceivePostProcessors());
// rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter);
if (log.isDebugEnabled()) {
log.debug("reforeRabbitTemplate init success");
}
return rabbitTemplate;
}
SpringTask
public class MdcAwareTaskDecorator implements TaskDecorator {
@Override
public Runnable decorate(Runnable runnable) {
Map<String, String> parentThreadContextMap = MDC.getCopyOfContextMap();
return MdcTaskUtils.adaptMdcRunnable(runnable, parentThreadContextMap);
}
}
@Slf4j
public abstract class MdcTaskUtils {
public static Runnable adaptMdcRunnable(Runnable runnable, Map<String, String> parentThreadContextMap) {
return () -> {
Map<String, String> currentThreadContextMap = MDC.getCopyOfContextMap();
log.debug("parentThreadContextMap: {}, currentThreadContextMap: {}", parentThreadContextMap,
currentThreadContextMap);
Map<String, String> threadContextMapToSet = parentThreadContextMap;
if (threadContextMapToSet == null) {
log.info("can not find a parentThreadContextMap, maybe task is fired using scheduled task.");
threadContextMapToSet = CollUtil.newHashMap();
threadContextMapToSet.put(TRACE_ID_HEADER_KEY, UUID.randomUUID().toString());
}
if (currentThreadContextMap == null) {
MDC.setContextMap(threadContextMapToSet);
} else {
log.warn(
"try to start a runnable task in the parent thread: {}, parentThreadContextMap: {}, currentThreadContextMap: {}",
Thread.currentThread(), threadContextMapToSet, currentThreadContextMap);
}
try {
runnable.run();
} finally {
if (currentThreadContextMap == null) {
MDC.clear();
}
}
};
}
}
Feign
public class MdcFeignRequestInterceptor implements RequestInterceptor {
@Override
public void apply(RequestTemplate template) {
template.header(TRACE_ID_HEADER_KEY, MDC.get(TRACE_ID_HEADER_KEY));
}
}
@Bean
public MdcFeignRequestInterceptor mdcFeignRequestInterceptor() {
return new MdcFeignRequestInterceptor();
}
评论区