【从01 千万级直播项目实战线上拦截器中使用ThreadLocal失效问题排查
背景
运营反馈,有用户在直播间内发送公屏,A用户发送的公屏 结果直播间内所有人员都显示了B用户发送的,而实际却是A用户发的。
原因分析
日志排查

3.查看log打印代码位置
好家伙,发现服务端没有没错,实际发消息的用户ID和大家收到公屏的发送人不一样? 又让客户端排查了下日志,发现客户端A发送的,实际到了服务端从Token中获取到的当前发送公屏用户竟然是B?
代码分析
gRpc请求拦截器代码
@Slf4j(topic = SLSTopicType.TOPIC_GRPC) public class CommonGrpcServerInterceptor implements ServerInterceptor { @Override public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata, ServerCallHandler serverCallHandler) { String token = metadata.get(Metadata.Key.of(MetadataConstants.AUTHENTICATION, Metadata.ASCII_STRING_MARSHALLER)), clientType = metadata.get(Metadata.Key.of(MetadataConstants.CLIENT_TYPE, Metadata.ASCII_STRING_MARSHALLER)); if (StringUtils.isBlank(token)) { log.error("grpc请求异常,token为空."); serverCall.close(Status.UNAUTHENTICATED, null); } Long userId = JWTUtil.getUserId(token); log.info("grpc | token:{} | methodName:{} | userId:{}", token, serverCall.getMethodDescriptor().getFullMethodName(), userId); try { ThreadLocalUtil.set(ThreadLocalConstant.GRPC_USER_KEY, GrpcThreadData.builder() .userId(userId) .clientType(Integer.valueOf(clientType)) .build()); return serverCallHandler.startCall(serverCall, metadata); } finally { ThreadLocalUtil.remove(ThreadLocalConstant.GRPC_USER_KEY); } } }
gRpc发送公屏消息接口
@Override public void sendMessage(SendBarrageMessageRequest request, StreamObserver responseStreamObserver) { Long roomId = request.getRoomId(), currentUserId = GrpcUtil.getUserId(); int type = request.getType().getNumber(), barrageType = BarrageInfo.MessageType.TEXT_VALUE; String content = request.getContent(); log.info("用户发送房间公屏消息 | roomId:{} | currentUserId:{} | content:{} | type {}", roomId, currentUserId, content, type); rocketMqTemplate.send(RocketMqBizConstant.User.Cluster.BARRAGE_RECORD_SAVE_MSG, dto); String filterContent = content; if (type != SendBarrageMessageRequest.Type.EMOJI_VALUE) { if (SensitiveWordUtil.WORD_FILTER.include(StringUtils.deleteWhitespace(filterContent))) { filterContent = SensitiveWordUtil.WORD_FILTER.replace(StringUtils.deleteWhitespace(content)); } } //用户发送公屏 rocketMqTemplate.send(RocketMqBizConstant.Grpc.Broadcast.ROOM_RTMP_MESSAGE, GrpcRoomRtmpMessageDto.builder() .roomId(roomId) .pushType(RtmpPushType.BROADCAST.getType()) .rtmpMessageType(RtmpMessage.MessageType.BARRAGE_VALUE) .userId(currentUserId) .sendUserId(currentUserId) .data(GsonUtil.GsonString(LiveRoomSendBarrageDto.builder() .barrageType(barrageType) .content(filterContent) .build())) .build()); responseStreamObserver.onNext(SendBarrageMessageResponse.newBuilder() .setSuccess(true) .setContent(filterContent) .build()); responseStreamObserver.onCompleted(); }
每次gRpc请求进来前,在拦截其中解析token并将对应的用户ID设置到 ThreadLocal中,这样从接口中可以直接从ThreadLocal的值中获取请求的用户ID,看着好像没啥问题,难道是ThreadLocal设置的时候有线程安全问题?
来看一下ThreadLocal设置中的代码
public class ThreadLocalUtil { private static final ThreadLocal threadLocal = ThreadLocal.withInitial(() -> new HashMap(10)); public static Map getThreadLocal() { return threadLocal.get(); } public static Object get(String key) { Map map = threadLocal.get(); return map.get(key); } public static void set(String key, Object value) { Map map = threadLocal.get(); map.put(key, value); } public static void set(Map keyValueMap) { Map map = threadLocal.get(); map.putAll(keyValueMap); } public static void remove() { threadLocal.remove(); } public static T remove(String key) { Map map = threadLocal.get(); return (T) map.remove(key); } }
ThreadLocal.withInitial(() -> new HashMap(10));
方法可以为每个线程提供独立的初始值,确保每个线程都具有自己的初始状态,看起来是线程安全的,没啥毛病,那就排除了ThreadLocal的问题
继续排查日志-日志上下文分析
好家伙,恍然大悟,拦截器中与gRpc接口代码执行的竟然不是同一个线程,大概率是gRpc框架的线程池处理线程不够用时使用线程切换造成,并发不大的时候还真看不出来~ 看到这的时候我已经打自己两巴掌了,因为在我看来这好像算是一个低级的错误...
使用ThreadLocal和gRpc框架提供Context的区别点
有请CHAT-GPT给我们解释一下两者的区别吧:
Context.current().withValue(key, value)
和 ThreadLocal
的 set
方法都可以用于在请求处理链路中传递数据,但它们之间有一些关键的区别:
作用范围:Context
是 gRPC 框架提供的上下文传递机制,可以在整个 gRPC 请求处理链路中传递数据。而 ThreadLocal
是基于线程的局部变量,只在同一线程中有效。
线程切换:当请求在 gRPC 框架中进行处理时,可能会涉及到线程的切换。在这种情况下,ThreadLocal
中的值无法在不同的线程之间共享,而 Context
可以跨线程传递数据。
显式传递:使用 Context
时,您需要在代码中显式地传递上下文对象。在 gRPC 请求处理链路中,通过调用 Context.current().withValue(key, value)
方法,您可以将数据与当前的上下文关联起来,并在后续的方法调用中获取该数据。
线程安全性:Context
在设计上是线程安全的,可以在多个线程之间共享数据。它是为了支持并发环境而设计的,可以在异步请求和并发请求处理中安全地传递数据。
总的来说,Context
与 ThreadLocal
相比,更适用于 gRPC 框架中的请求处理链路,可以在不同的线程和方法调用中传递数据,提供更灵活的上下文传递机制。而 ThreadLocal
则更适用于在单个线程内部共享数据。
使用 Context
时需要注意上述区别,并根据需求选择合适的机制来传递数据。如果您需要在整个 gRPC 请求处理链路中传递数据,并且能够跨线程共享,那么使用 Context
是一个更好的选择。
总结