【从01 千万级直播项目实战线上拦截器中使用ThreadLocal失效问题排查

2023年 7月 12日 44.0k 0

背景

运营反馈,有用户在直播间内发送公屏,A用户发送的公屏 结果直播间内所有人员都显示了B用户发送的,而实际却是A用户发的。

原因分析
日志排查

  • 询问运营要到问题截图
  • image.png

  • 定位公屏内容和出现问题的时间点
    image.png
  • 3.查看log打印代码位置

    image.png

    好家伙,发现服务端没有没错,实际发消息的用户ID和大家收到公屏的发送人不一样? 又让客户端排查了下日志,发现客户端A发送的,实际到了服务端从Token中获取到的当前发送公屏用户竟然是B?

    代码分析

    gRpc请求拦截器代码

    @Slf4j(topic = SLSTopicType.TOPIC_GRPC)
    public class CommonGrpcServerInterceptor implements ServerInterceptor {
    
    
        @Override
        public  ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata,
                                                                     ServerCallHandler serverCallHandler) {
            String token = metadata.get(Metadata.Key.of(MetadataConstants.AUTHENTICATION, Metadata.ASCII_STRING_MARSHALLER)),
                    clientType = metadata.get(Metadata.Key.of(MetadataConstants.CLIENT_TYPE, Metadata.ASCII_STRING_MARSHALLER));
    
            if (StringUtils.isBlank(token)) {
                log.error("grpc请求异常,token为空.");
                serverCall.close(Status.UNAUTHENTICATED, null);
            }
    
    
            Long userId = JWTUtil.getUserId(token);
    
            log.info("grpc | token:{} | methodName:{} | userId:{}", token, serverCall.getMethodDescriptor().getFullMethodName(),
                    userId);
    
    
            try {
                ThreadLocalUtil.set(ThreadLocalConstant.GRPC_USER_KEY,
                        GrpcThreadData.builder()
                                .userId(userId)
                                .clientType(Integer.valueOf(clientType))
                                .build());
                return serverCallHandler.startCall(serverCall, metadata);
            } finally {
                ThreadLocalUtil.remove(ThreadLocalConstant.GRPC_USER_KEY);
            }
    
        }
    }
    

    gRpc发送公屏消息接口

    @Override
    public void sendMessage(SendBarrageMessageRequest request, StreamObserver responseStreamObserver) {
    
        Long roomId = request.getRoomId(), currentUserId = GrpcUtil.getUserId();
        int type = request.getType().getNumber(), barrageType = BarrageInfo.MessageType.TEXT_VALUE;
        String content = request.getContent();
    
        log.info("用户发送房间公屏消息 | roomId:{} | currentUserId:{} | content:{} | type {}", roomId, currentUserId, content, type);
    
        rocketMqTemplate.send(RocketMqBizConstant.User.Cluster.BARRAGE_RECORD_SAVE_MSG, dto);
        String filterContent = content;
        if (type != SendBarrageMessageRequest.Type.EMOJI_VALUE) {
            if (SensitiveWordUtil.WORD_FILTER.include(StringUtils.deleteWhitespace(filterContent))) {
                filterContent = SensitiveWordUtil.WORD_FILTER.replace(StringUtils.deleteWhitespace(content));
            }
        }
        //用户发送公屏
        rocketMqTemplate.send(RocketMqBizConstant.Grpc.Broadcast.ROOM_RTMP_MESSAGE, GrpcRoomRtmpMessageDto.builder()
                .roomId(roomId)
                .pushType(RtmpPushType.BROADCAST.getType())
                .rtmpMessageType(RtmpMessage.MessageType.BARRAGE_VALUE)
                .userId(currentUserId)
                .sendUserId(currentUserId)
                .data(GsonUtil.GsonString(LiveRoomSendBarrageDto.builder()
                        .barrageType(barrageType)
                        .content(filterContent)
                        .build()))
                .build());
    
        responseStreamObserver.onNext(SendBarrageMessageResponse.newBuilder()
                .setSuccess(true)
                .setContent(filterContent)
                .build());
    
        responseStreamObserver.onCompleted();
    
    }
    

    每次gRpc请求进来前,在拦截其中解析token并将对应的用户ID设置到 ThreadLocal中,这样从接口中可以直接从ThreadLocal的值中获取请求的用户ID,看着好像没啥问题,难道是ThreadLocal设置的时候有线程安全问题?

    来看一下ThreadLocal设置中的代码

    public class ThreadLocalUtil {
    
        private static final ThreadLocal threadLocal = ThreadLocal.withInitial(() -> new HashMap(10));
    
        public static Map getThreadLocal() {
            return threadLocal.get();
        }
    
        public static Object get(String key) {
            Map map = threadLocal.get();
            return map.get(key);
        }
    
        public static void set(String key, Object value) {
            Map map = threadLocal.get();
            map.put(key, value);
        }
    
        public static void set(Map keyValueMap) {
            Map map = threadLocal.get();
            map.putAll(keyValueMap);
        }
    
        public static void remove() {
            threadLocal.remove();
        }
    
        public static  T remove(String key) {
            Map map = threadLocal.get();
            return (T) map.remove(key);
        }
    
    }
    

    ThreadLocal.withInitial(() -> new HashMap(10));
    方法可以为每个线程提供独立的初始值,确保每个线程都具有自己的初始状态,看起来是线程安全的,没啥毛病,那就排除了ThreadLocal的问题

    继续排查日志-日志上下文分析

    image.png

    好家伙,恍然大悟,拦截器中与gRpc接口代码执行的竟然不是同一个线程,大概率是gRpc框架的线程池处理线程不够用时使用线程切换造成,并发不大的时候还真看不出来~ 看到这的时候我已经打自己两巴掌了,因为在我看来这好像算是一个低级的错误...

    使用ThreadLocal和gRpc框架提供Context的区别点

    有请CHAT-GPT给我们解释一下两者的区别吧:

    Context.current().withValue(key, value)ThreadLocalset 方法都可以用于在请求处理链路中传递数据,但它们之间有一些关键的区别:

  • 作用范围:Context 是 gRPC 框架提供的上下文传递机制,可以在整个 gRPC 请求处理链路中传递数据。而 ThreadLocal 是基于线程的局部变量,只在同一线程中有效。

  • 线程切换:当请求在 gRPC 框架中进行处理时,可能会涉及到线程的切换。在这种情况下,ThreadLocal 中的值无法在不同的线程之间共享,而 Context 可以跨线程传递数据。

  • 显式传递:使用 Context 时,您需要在代码中显式地传递上下文对象。在 gRPC 请求处理链路中,通过调用 Context.current().withValue(key, value) 方法,您可以将数据与当前的上下文关联起来,并在后续的方法调用中获取该数据。

  • 线程安全性:Context 在设计上是线程安全的,可以在多个线程之间共享数据。它是为了支持并发环境而设计的,可以在异步请求和并发请求处理中安全地传递数据。

  • 总的来说,ContextThreadLocal 相比,更适用于 gRPC 框架中的请求处理链路,可以在不同的线程和方法调用中传递数据,提供更灵活的上下文传递机制。而 ThreadLocal 则更适用于在单个线程内部共享数据。

    使用 Context 时需要注意上述区别,并根据需求选择合适的机制来传递数据。如果您需要在整个 gRPC 请求处理链路中传递数据,并且能够跨线程共享,那么使用 Context 是一个更好的选择。

    总结

  • ThreadLocal不要轻易使用在任意的拦截器中,除非你能保证拦截器的代码与实际执行的业务代码严格在同一个线程里,而不是使用线程池执行,否则在执行线程不够用时线程切换导致ThreadLocal读取失效
  • 使用新组件时尽量使用组件提供的解决方案,这需要在使用前认真阅读文档,必要时多读读组件源码加深理解。
  • 相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论