链路追踪系统 - 分布式环境下的请求跟踪
在微服务架构中,一个用户请求往往会经过多个服务的协作处理。本章将实现一个轻量级的链路追踪系统,让日志具备分布式追踪能力。
前言
在微服务架构中,一个用户请求往往会经过多个服务的协作处理。本章将实现一个轻量级的链路追踪系统,让日志具备分布式追踪能力。
分布式链路追踪基础概念
链路追踪的核心价值
追踪信息
TraceID: abc123
RequestID: req456
UserID: user789
用户请求
API网关
用户服务
订单服务
数据库
支付服务
追踪的核心元素:
- 🎯 TraceID: 标识一次完整的请求链路
- 🔗 SpanID: 标识链路中的一个操作节点
- 👤 UserID: 标识发起请求的用户
- 📝 RequestID: 标识单次HTTP请求
TraceContext - 追踪上下文设计
package com.simpleflow.log.context; import java.time.LocalDateTime; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * 追踪上下文 - 存储分布式链路追踪相关的信息 */ public class TraceContext { private String requestId; // 请求ID private String traceId; // 追踪ID private String spanId; // 跨度ID private String userId; // 用户ID private String sessionId; // 会话ID private LocalDateTime startTime; private final Map<String, String> extraInfo; // 额外信息 public TraceContext() { this.extraInfo = new ConcurrentHashMap<>(); this.startTime = LocalDateTime.now(); } public TraceContext(String requestId, String traceId) { this(); this.requestId = requestId; this.traceId = traceId; } /** * 添加额外信息 */ public TraceContext addExtra(String key, String value) { if (key != null && value != null) { this.extraInfo.put(key, value); } return this; } /** * 创建子上下文(用于子线程或子操作) */ public TraceContext createChild() { TraceContext child = new TraceContext(); child.requestId = this.requestId; child.traceId = this.traceId; child.spanId = generateChildSpanId(this.spanId); child.userId = this.userId; child.sessionId = this.sessionId; child.startTime = LocalDateTime.now(); child.extraInfo.putAll(this.extraInfo); return child; } /** * 继承上下文(用于InheritableThreadLocal) */ public TraceContext inherit() { TraceContext inherited = new TraceContext(); inherited.requestId = this.requestId; inherited.traceId = this.traceId; inherited.spanId = this.spanId; // 继承时保持相同的spanId inherited.userId = this.userId; inherited.sessionId = this.sessionId; inherited.startTime = this.startTime; inherited.extraInfo.putAll(this.extraInfo); return inherited; } private String generateChildSpanId(String parentSpanId) { if (parentSpanId == null) { return "1"; } // 简单的层级编号:1 -> 1.1, 1.1 -> 1.1.1 return parentSpanId + "." + (System.currentTimeMillis() % 1000); } public boolean isValid() { return requestId != null && traceId != null; } // getter/setter方法省略... }
ThreadLocalTraceHolder - 线程本地存储
package com.simpleflow.log.context; import com.simpleflow.log.util.RequestIdGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 线程本地追踪上下文持有者 * 使用InheritableThreadLocal实现跨线程的上下文传递 */ public class ThreadLocalTraceHolder { private static final Logger logger = LoggerFactory.getLogger(ThreadLocalTraceHolder.class); /** * 使用InheritableThreadLocal支持父子线程间的上下文传递 */ private static final InheritableThreadLocal<TraceContext> TRACE_CONTEXT_HOLDER = new InheritableThreadLocal<TraceContext>() { @Override protected TraceContext childValue(TraceContext parentValue) { // 子线程继承父线程的上下文 if (parentValue != null) { TraceContext childContext = parentValue.inherit(); logger.debug("子线程继承追踪上下文: {}", childContext); return childContext; } return null; } }; /** * 获取当前线程的追踪上下文 */ public static TraceContext getCurrentTrace() { return TRACE_CONTEXT_HOLDER.get(); } /** * 设置当前线程的追踪上下文 */ public static void setCurrentTrace(TraceContext context) { if (context != null) { TRACE_CONTEXT_HOLDER.set(context); logger.debug("设置追踪上下文: {}", context); } else { TRACE_CONTEXT_HOLDER.remove(); } } /** * 清除当前线程的追踪上下文 */ public static void clearCurrentTrace() { TRACE_CONTEXT_HOLDER.remove(); logger.debug("清除当前线程追踪上下文"); } /** * 初始化新的追踪上下文 */ public static TraceContext initTrace() { TraceContext context = new TraceContext(); context.setRequestId(RequestIdGenerator.generate()); context.setTraceId(RequestIdGenerator.generateTraceId()); context.setSpanId("1"); setCurrentTrace(context); return context; } /** * 获取当前请求ID */ public static String getCurrentRequestId() { TraceContext context = getCurrentTrace(); return context != null ? context.getRequestId() : null; } /** * 获取当前用户ID */ public static String getCurrentUserId() { TraceContext context = getCurrentTrace(); return context != null ? context.getUserId() : null; } /** * 设置当前用户ID */ public static void setCurrentUserId(String userId) { TraceContext context = getCurrentTrace(); if (context != null) { context.setUserId(userId); } } /** * 添加额外信息到当前上下文 */ public static void addExtra(String key, String value) { TraceContext context = getCurrentTrace(); if (context != null) { context.addExtra(key, value); } } /** * 创建子上下文(用于子操作) */ public static TraceContext createChildTrace() { TraceContext current = getCurrentTrace(); if (current != null) { TraceContext child = current.createChild(); setCurrentTrace(child); return child; } return null; } /** * 在指定上下文中执行操作 */ public static <T> T executeWithTrace(TraceContext context, TraceCallback<T> callback) { TraceContext original = getCurrentTrace(); try { setCurrentTrace(context); return callback.execute(); } finally { setCurrentTrace(original); } } @FunctionalInterface public interface TraceCallback<T> { T execute() throws Exception; } }
RequestIdGenerator - ID生成器
package com.simpleflow.log.util; import java.net.InetAddress; import java.time.LocalDateTime; import java.time.format.DateTimeFormatter; import java.util.concurrent.atomic.AtomicLong; /** * 请求ID生成器 - 生成全局唯一的请求ID和追踪ID */ public class RequestIdGenerator { private static final String HOST_NAME; private static final AtomicLong SEQUENCE = new AtomicLong(1); private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyyMMddHHmmssSSS"); static { String hostName = "unknown"; try { hostName = InetAddress.getLocalHost().getHostName(); } catch (Exception e) { // 使用默认值 } HOST_NAME = hostName; } /** * 生成请求ID * 格式:REQ_时间戳_主机名_序列号 */ public static String generate() { String timestamp = LocalDateTime.now().format(TIME_FORMATTER); long sequence = SEQUENCE.getAndIncrement(); return String.format("REQ_%s_%s_%d", timestamp, getShortHostName(), sequence); } /** * 生成追踪ID * 格式:TRACE_时间戳_主机名_序列号 */ public static String generateTraceId() { String timestamp = LocalDateTime.now().format(TIME_FORMATTER); long sequence = SEQUENCE.getAndIncrement(); return String.format("TRACE_%s_%s_%d", timestamp, getShortHostName(), sequence); } private static String getShortHostName() { int dotIndex = HOST_NAME.indexOf('.'); if (dotIndex > 0) { return HOST_NAME.substring(0, dotIndex); } return HOST_NAME; } }
跨线程传递机制
InheritableThreadLocal原理演示
/** * 跨线程上下文传递演示 */ public class TraceInheritanceDemo { public static void main(String[] args) throws InterruptedException { // 在主线程中初始化追踪上下文 TraceContext mainContext = ThreadLocalTraceHolder.initTrace(); mainContext.setUserId("user123"); System.out.println("主线程: " + ThreadLocalTraceHolder.getCurrentRequestId()); // 创建子线程 - 会自动继承上下文 Thread childThread = new Thread(() -> { System.out.println("子线程: " + ThreadLocalTraceHolder.getCurrentRequestId()); System.out.println("用户ID: " + ThreadLocalTraceHolder.getCurrentUserId()); // 在子线程中添加额外信息 ThreadLocalTraceHolder.addExtra("childInfo", "fromChild"); }); childThread.start(); childThread.join(); // 主线程的上下文不受子线程影响 System.out.println("主线程最终: " + ThreadLocalTraceHolder.getCurrentRequestId()); } }
线程池场景的处理
/** * 线程池场景下的上下文传递 */ @Component public class TraceAwareExecutor { private final ExecutorService executor = Executors.newFixedThreadPool(10); /** * 带上下文传递的任务执行 */ public <T> CompletableFuture<T> executeWithTrace(Callable<T> task) { // 捕获当前上下文 TraceContext currentContext = ThreadLocalTraceHolder.getCurrentTrace(); return CompletableFuture.supplyAsync(() -> { TraceContext originalContext = ThreadLocalTraceHolder.getCurrentTrace(); try { ThreadLocalTraceHolder.setCurrentTrace(currentContext); return task.call(); } catch (Exception e) { throw new RuntimeException(e); } finally { ThreadLocalTraceHolder.setCurrentTrace(originalContext); } }, executor); } }
实战测试
@SpringBootTest class TraceContextTest { @Test void testTraceInheritance() { // 初始化追踪上下文 TraceContext context = ThreadLocalTraceHolder.initTrace(); context.setUserId("testUser"); context.addExtra("testKey", "testValue"); // 验证上下文设置 assertEquals("testUser", ThreadLocalTraceHolder.getCurrentUserId()); // 在新线程中验证上下文继承 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { // 子线程应该继承父线程的上下文 assertNotNull(ThreadLocalTraceHolder.getCurrentTrace()); assertEquals("testUser", ThreadLocalTraceHolder.getCurrentUserId()); return ThreadLocalTraceHolder.getCurrentRequestId(); }); String childRequestId = future.join(); assertEquals(context.getRequestId(), childRequestId); } @Test void testChildContextCreation() { // 创建父上下文 TraceContext parent = ThreadLocalTraceHolder.initTrace(); parent.setUserId("parentUser"); String parentSpanId = parent.getSpanId(); // 创建子上下文 TraceContext child = ThreadLocalTraceHolder.createChildTrace(); // 验证继承关系 assertEquals(parent.getRequestId(), child.getRequestId()); assertEquals(parent.getTraceId(), child.getTraceId()); assertEquals(parent.getUserId(), child.getUserId()); assertNotEquals(parentSpanId, child.getSpanId()); // SpanId应该不同 } }
本章小结
✅ 完成的任务
- 追踪上下文:设计了完整的TraceContext类
- 线程本地存储:实现了ThreadLocalTraceHolder
- ID生成器:创建了RequestIdGenerator工具
- 跨线程传递:解决了异步场景的上下文传递
- 实战测试:验证了追踪系统的功能
🎯 学习要点
- InheritableThreadLocal的原理和应用
- 上下文设计的完整性和可扩展性
- ID生成策略的唯一性保证
- 跨线程传递的实现机制
💡 思考题
- 如何处理线程池复用导致的上下文污染?
- 分布式环境下如何实现跨服务的追踪?
- 追踪信息的持久化和查询如何设计?
🚀 下章预告
下一章我们将实现Web集成模块,学习如何在HTTP请求层面集成链路追踪,包括Filter、拦截器的实现,以及如何从HTTP头中提取和传递追踪信息。
更多推荐
所有评论(0)