背景
运营反馈,有用户在直播间内发送公屏,A用户发送的公屏 结果直播间内所有人员都显示了B用户发送的,而实际却是A用户发的。
原因分析
日志排查
3.查看log打印代码位置
好家伙,发现服务端没有没错,实际发消息的用户ID和大家收到公屏的发送人不一样? 又让客户端排查了下日志,发现客户端A发送的,实际到了服务端从Token中获取到的当前发送公屏用户竟然是B?
代码分析
gRpc请求拦截器代码
@Slf4j(topic = SLSTopicType.TOPIC_GRPC)
public class CommonGrpcServerInterceptor implements ServerInterceptor {
@Override
public ServerCall.Listener interceptCall(ServerCall serverCall, Metadata metadata,
ServerCallHandler serverCallHandler) {
String token = metadata.get(Metadata.Key.of(MetadataConstants.AUTHENTICATION, Metadata.ASCII_STRING_MARSHALLER)),
clientType = metadata.get(Metadata.Key.of(MetadataConstants.CLIENT_TYPE, Metadata.ASCII_STRING_MARSHALLER));
if (StringUtils.isBlank(token)) {
log.error("grpc请求异常,token为空.");
serverCall.close(Status.UNAUTHENTICATED, null);
}
Long userId = JWTUtil.getUserId(token);
log.info("grpc | token:{} | methodName:{} | userId:{}", token, serverCall.getMethodDescriptor().getFullMethodName(),
userId);
try {
ThreadLocalUtil.set(ThreadLocalConstant.GRPC_USER_KEY,
GrpcThreadData.builder()
.userId(userId)
.clientType(Integer.valueOf(clientType))
.build());
return serverCallHandler.startCall(serverCall, metadata);
} finally {
ThreadLocalUtil.remove(ThreadLocalConstant.GRPC_USER_KEY);
}
}
}
gRpc发送公屏消息接口
@Override
public void sendMessage(SendBarrageMessageRequest request, StreamObserver responseStreamObserver) {
Long roomId = request.getRoomId(), currentUserId = GrpcUtil.getUserId();
int type = request.getType().getNumber(), barrageType = BarrageInfo.MessageType.TEXT_VALUE;
String content = request.getContent();
log.info("用户发送房间公屏消息 | roomId:{} | currentUserId:{} | content:{} | type {}", roomId, currentUserId, content, type);
rocketMqTemplate.send(RocketMqBizConstant.User.Cluster.BARRAGE_RECORD_SAVE_MSG, dto);
String filterContent = content;
if (type != SendBarrageMessageRequest.Type.EMOJI_VALUE) {
if (SensitiveWordUtil.WORD_FILTER.include(StringUtils.deleteWhitespace(filterContent))) {
filterContent = SensitiveWordUtil.WORD_FILTER.replace(StringUtils.deleteWhitespace(content));
}
}
//用户发送公屏
rocketMqTemplate.send(RocketMqBizConstant.Grpc.Broadcast.ROOM_RTMP_MESSAGE, GrpcRoomRtmpMessageDto.builder()
.roomId(roomId)
.pushType(RtmpPushType.BROADCAST.getType())
.rtmpMessageType(RtmpMessage.MessageType.BARRAGE_VALUE)
.userId(currentUserId)
.sendUserId(currentUserId)
.data(GsonUtil.GsonString(LiveRoomSendBarrageDto.builder()
.barrageType(barrageType)
.content(filterContent)
.build()))
.build());
responseStreamObserver.onNext(SendBarrageMessageResponse.newBuilder()
.setSuccess(true)
.setContent(filterContent)
.build());
responseStreamObserver.onCompleted();
}
每次gRpc请求进来前,在拦截其中解析token并将对应的用户ID设置到 ThreadLocal中,这样从接口中可以直接从ThreadLocal的值中获取请求的用户ID,看着好像没啥问题,难道是ThreadLocal设置的时候有线程安全问题?
来看一下ThreadLocal设置中的代码
public class ThreadLocalUtil {
private static final ThreadLocal threadLocal = ThreadLocal.withInitial(() -> new HashMap(10));
public static Map getThreadLocal() {
return threadLocal.get();
}
public static Object get(String key) {
Map map = threadLocal.get();
return map.get(key);
}
public static void set(String key, Object value) {
Map map = threadLocal.get();
map.put(key, value);
}
public static void set(Map keyValueMap) {
Map map = threadLocal.get();
map.putAll(keyValueMap);
}
public static void remove() {
threadLocal.remove();
}
public static T remove(String key) {
Map map = threadLocal.get();
return (T) map.remove(key);
}
}
ThreadLocal.withInitial(() -> new HashMap(10));
方法可以为每个线程提供独立的初始值,确保每个线程都具有自己的初始状态,看起来是线程安全的,没啥毛病,那就排除了ThreadLocal的问题
继续排查日志-日志上下文分析
好家伙,恍然大悟,拦截器中与gRpc接口代码执行的竟然不是同一个线程,大概率是gRpc框架的线程池处理线程不够用时使用线程切换造成,并发不大的时候还真看不出来~ 看到这的时候我已经打自己两巴掌了,因为在我看来这好像算是一个低级的错误...
使用ThreadLocal和gRpc框架提供Context的区别点
有请CHAT-GPT给我们解释一下两者的区别吧:
Context.current().withValue(key, value)
和 ThreadLocal
的 set
方法都可以用于在请求处理链路中传递数据,但它们之间有一些关键的区别:
作用范围:Context
是 gRPC 框架提供的上下文传递机制,可以在整个 gRPC 请求处理链路中传递数据。而 ThreadLocal
是基于线程的局部变量,只在同一线程中有效。
线程切换:当请求在 gRPC 框架中进行处理时,可能会涉及到线程的切换。在这种情况下,ThreadLocal
中的值无法在不同的线程之间共享,而 Context
可以跨线程传递数据。
显式传递:使用 Context
时,您需要在代码中显式地传递上下文对象。在 gRPC 请求处理链路中,通过调用 Context.current().withValue(key, value)
方法,您可以将数据与当前的上下文关联起来,并在后续的方法调用中获取该数据。
线程安全性:Context
在设计上是线程安全的,可以在多个线程之间共享数据。它是为了支持并发环境而设计的,可以在异步请求和并发请求处理中安全地传递数据。
总的来说,Context
与 ThreadLocal
相比,更适用于 gRPC 框架中的请求处理链路,可以在不同的线程和方法调用中传递数据,提供更灵活的上下文传递机制。而 ThreadLocal
则更适用于在单个线程内部共享数据。
使用 Context
时需要注意上述区别,并根据需求选择合适的机制来传递数据。如果您需要在整个 gRPC 请求处理链路中传递数据,并且能够跨线程共享,那么使用 Context
是一个更好的选择。
总结