GO微服务中使用RocketMQ消息队列系统实现分布式事务

2023年 9月 2日 207.0k 0

 分布式事务的介绍

分布式事务是指在分布式计算环境中,多个独立的组件或系统之间需要协调和保持一致性的一系列操作,这些操作通常涉及多个数据存储位置或多个服务。分布式事务的主要目标是确保所有参与的操作要么全部成功完成,要么全部失败回滚,从而保持数据的一致性和可靠性。

例如在电商购物项目中,当一个用户购买商品时,订单微服务有以下几个步骤:

  • 生成订单号

  • 查询商品信息获得商品价格

  • 调用库存服务扣减库存

  • 本地创建订单和订单详情记录

  • 其中,第二步查询商品服务需要调用商品微服务,第三步扣库存需要调用库存微服务,那会不会出现这种情况:当程序执行第三步完毕后,执行第四步时发生错误,致使创建订单失败,但是这个时候我们已经在第三步扣了库存,也就是说,订单没出来,库存却扣了。

    为了解决这种情况,我们需要把这四步作为一个事务,也就是任何一个步骤报错,就需要回滚整个事务,因为涉及到不同的数据库(商品数据库,库存数据库,订单数据库),所以这个由这四个步骤组成的事务就叫做分布式事务。

    消息队列

    消息队列是一种用于在分布式系统中实现异步通信的机制,它通常用于解耦不同组件或服务之间的通信,提高系统的可伸缩性和性能。RocketMQ是一个高性能、低延迟、分布式消息队列系统,最初由阿里巴巴开发并开源。

    RocketMQ中文文档:github.com/apache/rock…

    官方提供 Go客户端(在GO中使用RocketMQ):github.com/apache/rock…

    推荐使用docker部署RoketMQ:github.com/foxiswho/do…

    RocketMQ的架构上主要分为四部分,如上图所示

  • Producer(生产者) :生产者负责将消息发送到RocketMQ的主题(Topic),并可以指定消息发送到哪个队列。生产者通常是应用程序的一部分,用于将数据发送到消息队列。
  • Broker(消息代理服务器) :Broker是消息队列的核心组件,负责存储消息、管理主题和队列、接收和分发消息。RocketMQ集群由多个Broker组成,它们协同工作以确保消息的高可用性和可靠性。干活的!
  • Consumer(消费者) :消费者从RocketMQ的主题订阅消息,并将消息处理为应用程序所需的操作。RocketMQ支持多种消费模式,包括集群消费和广播消费。
  • NameServer(名称服务器) :NameServer是RocketMQ的元数据存储,用于存储主题和队列的信息,以及路由信息。它允许生产者和消费者查找和定位Broker
  • 整体的流程是生产者要发送消息,他会请求NameServer从多个Broker节点给他分配一个可用的Broker并连接,同样,消费者也会请求NameServer从多个Broker节点给他分配一个可用的Broker并连接,NameServer就类似微服务的注册中心, 当全部连接好后,他们会按照事务消息的方法工作。

    事务消息:一些消息队列系统(如Apache Kafka)提供了事务消息的支持。事务消息允许生产者将一组消息发送到队列,并要么全部成功提交,要么全部回滚。消费者可以按照事务性的方式处理这些消息,确保消息的一致性。这种方式通常基于分布式事务协议,如2PC。

    具体执行步骤如上图所示

    1.事务发起方首先发送prepare消息到MQ

    2.在发送prepare消息成功后执行本地事务

    3.根据本地事务执行结果返回commit或者是rollback

    4.如果消息是rollback,MQ将删除该prepare消息不进行下发,如果是commit消息,MQ将会把这个消息发送给consumer端

    5.如果执行本地事务过程中,执行端挂掉或者超时(一般是网络出问题),MQ将会不停的询问其同组的其他producer来获取状态 

    具体代码实现

    接下来会在GO微服务开发电商购物创建订单demo中用代码实现上述步骤 

    由本文开头得知订单服务的创建订单操作分了4步

  • 生成订单号
  • 查询商品信息获得商品价格
  • 调用库存服务扣减库存
  • 本地创建订单和订单详情记录
  • 那么要如何基于事务消息实现上述分布式事务呢,这里建议可以先自己思考一下,再看接下来的思路。

    分析需求可以知道上游服务就包括了以上四点,那我们可以采用逆向思维,在第一步发送half消息时发送回滚库存的操作,rocketMQ收到消息后回复ok,然后上游服务执行本地事务,执行没有报错,也就是成功创建了订单,扣了库存,那么就回复一个rollback信息,把之前发送的回滚库存操作信息丢了,这样下游的库存微服务什么也不用做,假设在上游服务第四步创建订单和订单详情记录(此时库存已经扣了)失败了,那么回复一个commit信息,这样之前发送的回滚库存操作的信息就会被投递给下游服务,也就是库存微服务,它会调用库存回滚方法回滚库存,从而把扣库存操作抵消

    创建生产者

    调用官方库的rockermq.NewTranscationProducer方法创建生产者,传入一个自定义结构体(结构体要实现执行本地事务和检查本地事务状态两种方法),MQserver的ip和端口,重试发送消息的次数,生产者组的名字

    p, err := rocketmq.NewTransactionProducer(
    		orderEntity,
    		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{config.Conf.RocketMqConfig.Addr})),
    		//producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)),
    		producer.WithRetry(2),
    		producer.WithGroupName("order_srv_1"), // 生产者组
    	)
    	if err != nil {
    		zap.L().Error("NewTransactionProducer failed", zap.Error(err))
    		return status.Error(codes.Internal, "NewTransactionProducer failed")
    	}
    	err = p.Start()
    	if err != nil {
    		zap.L().Error("start producer error", zap.Error(err))
    		return status.Error(codes.Internal, "start producer error")
    	}
    

    自定义结构体的方法实现

    对于在生产者创建中传入的结构体,定义了如下代码,分别实现了ExecuteLocalTransaction和CheckLocalTransaction方法用来执行本地事务和检查本地事务状态,并且在执行本地事务中,一旦报错,如果错误在扣库存之前发生,即库存没有扣,则不需要回滚库存,那么此时就返回rollback丢弃消息,如果错误发生在扣库存之后,那么就需要返回commit来回滚库存

    检查状态的方法是通过查看数据库是否有该订单记录,如果本地事务执行成功就会把订单记录保存到数据库,如果查不到就说明本地事务失败了,此时需要回滚库存,如果查到了说明本地事务成功,就不需要回滚库存(这里仅仅是个demo,具体逻辑会更复杂!!!)

    // OrderEntity 自定义结构体,实现了两个方法
    // 发送事务消息的时候 RocketMQ 会自动根据情况调用那两个方法
    type OrderEntity struct {
    	OrderId int64              //订单号
    	Param   *proto.OrderReq    //订单详细
    	err     error			   //报错时返回的错误
    }
    
    //	当发送prepare(half) message 成功后, 这个方法(本地的事务方法)就会被执行
    func (o *OrderEntity) ExecuteLocalTransaction(*primitive.Message) primitive.LocalTransactionState {
    	fmt.Println("in ExecuteLocalTransaction...")
    	if o.Param == nil {
    		zap.L().Error("ExecuteLocalTransaction param is nil")
    		o.err = status.Error(codes.Internal, "invalid OrderEntity")
    		return primitive.CommitMessageState
    	}
    	param := o.Param
    	ctx := context.Background()
    	// 1. 查询商品金额(营销)--> RPC连接 goods_service
    	goodsDatail, err := rpc.GoodsCli.GetGoodsDetail(ctx, &proto.GetGoodsDetailReq{
    		GoodsId: param.GoodsId,
    		UserId:  param.UserId,
    	})
    	if err != nil {
    		zap.L().Error("GoodsCli.GetGoodsDetail failed", zap.Error(err))
    		// 库存未扣减
    		o.err = status.Error(codes.Internal, err.Error())
    		return primitive.RollbackMessageState
    	}
    	payAmountStr := goodsDatail.Price
    	payAmount, _ := strconv.ParseInt(payAmountStr, 10, 64)
    
    	// 2. 库存校验及扣减  --> RPC连接 stock_service
    	_, err = rpc.StockCli.ReduceStock(ctx, &proto.GoodsStockInfo{
    		OrderId: o.OrderId,
    		GoodsId: o.Param.GoodsId,
    		Num:     o.Param.Num,
    	})
    	if err != nil {
    		// 库存扣减失败,丢弃half-message
    		zap.L().Error("StockCli.ReduceStock failed", zap.Error(err))
    		o.err = status.Error(codes.Internal, "ReduceStock failed")
    		return primitive.RollbackMessageState
    	}
    	// 代码能执行到这里说明 扣减库存成功了,
    	// 从这里开始如果本地事务执行失败就需要回滚库存
    	// 3. 创建订单
    	// 生成订单表
    	orderData := model.Order{
    		OrderId:        o.OrderId,
    		UserId:         param.UserId,
    		PayAmount:      payAmount,
    		ReceiveAddress: param.Address,
    		ReceiveName:    param.Name,
    		ReceivePhone:   param.Phone,
    		Status:         100, // 待支付
    	}
    	// mysql.CreateOrder(ctx, &orderData)
    	orderDetail := model.OrderDetail{
    		OrderId: o.OrderId,
    		UserId:  param.UserId,
    		GoodsId: param.GoodsId,
    		Num:     param.Num,
    	}
    	// mysql.CreateOrderDetail(ctx, &orderDetail)
    	// 在本地事务创建订单和订单详情记录
    	err = mysql.CreateOrderWithTransation(ctx, &orderData, &orderDetail)
    	if err != nil {
    		// 本地事务执行失败了,上一步已经库存扣减成功
    		// 就需要将库存回滚的消息投递出去,下游根据消息进行库存回滚
    		zap.L().Error("CreateOrderWithTransation failed", zap.Error(err))
    		return primitive.CommitMessageState // 将之前发送的hal-message commit
    	}
        // 走到这里说明 本地事务执行成功
    	// 需要将之前的half-message rollback, 丢弃掉
    	return primitive.RollbackMessageState
    }
    // 当 prepare(half) message 没有响应时(一般网络问题)
    // broker 会回查本地事务的状态,此时这个方法会被执行
    func (o *OrderEntity) CheckLocalTransaction(*primitive.MessageExt) primitive.LocalTransactionState {
    	// 检查本地状态是否创建成功订单
    	_, err := mysql.QueryOrder(context.Background(), o.OrderId)
    	// 需要再查询订单详情表
    	if err == gorm.ErrRecordNotFound {
    		// 没查询到说明订单创建失败,需要回滚库存
    		return primitive.CommitMessageState
    	}
    	return primitive.RollbackMessageState
    }
    

    发送消息

    创建订单主函数的完整内容,即创建生产者 -->封装消息 -->发送消息 -->根据commit or rollback执行策略(commit报错创建订单失败并让下游回滚库存,rollback说明成功),消息发送后,MQ会自动执行你定义的执行本地事务的方法,不需要手动操作,当没有返回信息,也会自动执行检查本地事务状态的方法

        func Create(ctx context.Context, param *proto.OrderReq) error {
        	// 3.1 生成订单号
        	orderId := snowflake.GenID()
    
        	orderEntity := &OrderEntity{
        		OrderId: orderId,
        		Param:   param,
        	}
        	//创建生产者
        	p, err := rocketmq.NewTransactionProducer(
        		orderEntity,
        		producer.WithNsResolver(primitive.NewPassthroughResolver([]string{config.Conf.RocketMqConfig.Addr})),
        		//producer.WithNsResolver(primitive.NewPassthroughResolver(endPoint)),
        		producer.WithRetry(2),
        		producer.WithGroupName("order_srv_1"), // 生产者组
        	)
        	if err != nil {
        		zap.L().Error("NewTransactionProducer failed", zap.Error(err))
        		return status.Error(codes.Internal, "NewTransactionProducer failed")
        	}
        	err = p.Start()
        	if err != nil {
        		zap.L().Error("start producer error", zap.Error(err))
        		return status.Error(codes.Internal, "start producer error")
        	}
        	// 封装消息 orderId GoodsId num
        	data := model.OrderGoodsStockInfo{
        		OrderId: orderId,
        		GoodsId: param.GoodsId,
        		Num:     param.Num,
        	}
        	body, _ := json.Marshal(data)
        	msg := &primitive.Message{
        		Topic: config.Conf.RocketMqConfig.Topic.StockRollback, // xx_stock_rollback
        		Body:  body,
        	}
        	// 发送事务消息,对应RocketMQ事务消息第一步发送Half消息,发送消息给MQserver返回成功后后会根据之前创建生产者传入的结构体自动调用结构体的执行本地事务方法
        	res, err := p.SendMessageInTransaction(context.Background(), msg)
        	if err != nil {
        		zap.L().Error("SendMessageInTransaction failed", zap.Error(err))
        		return status.Error(codes.Internal, "create order failed")
        	}
        	zap.L().Info("p.SendMessageInTransaction success", zap.Any("res", res))
        	// 执行到这一步说明生产者事务已有结果,如果回滚库存的消息被投递出去给消费者(commit)说明本地事务执行失败,也就是创建订单失败
        	if res.State == primitive.CommitMessageState {
        		return status.Error(codes.Internal, "create order failed")
        	}
        	// 其他内部错误
        	if orderEntity.err != nil {
        		return orderEntity.err
        	}
        	return nil
        }
    
    

    下游服务监听MQserver的消息

    在下游服务(库存微服务)的main函数中会有以下代码,当有消息发来,就会调用handler层的RollbackMsghandle方法回滚库存

        // 监听库存回滚的消息
        	c, _ := rocketmq.NewPushConsumer(
        		consumer.WithGroupName("stock_srv_1"),
        		consumer.WithNsResolver(primitive.NewPassthroughResolver([]string{"127.0.0.1:9876"})),
        	)
        	// 订阅topic
        	err = c.Subscribe("xx_stock_rollback", consumer.MessageSelector{}, handler.RollbackMsghandle)
        	if err != nil {
        		fmt.Println(err.Error())
        	}
    
    

    handler.RollbackMasghandle定义

        func RollbackMsghandle(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
        	for i := range msgs {
        		var data model.OrderGoodsStockInfo
        		err := json.Unmarshal(msgs[i].Body, &data)
        		if err != nil {
        			zap.L().Error("json.Unmarshal RollbackMsg failed", zap.Error(err))
        			continue
        		}
        		// 将库存回滚
        		err = mysql.RollbackStockByMsg(ctx, data)
        		if err != nil {
        			return consumer.ConsumeRetryLater, nil
        		}
        		return consumer.ConsumeSuccess, nil
        	}
        	return consumer.ConsumeSuccess, nil
        }
    
    
    

    至此,整个事务消息执行结束,实现了分布式事务

    相关文章

    JavaScript2024新功能:Object.groupBy、正则表达式v标志
    PHP trim 函数对多字节字符的使用和限制
    新函数 json_validate() 、randomizer 类扩展…20 个PHP 8.3 新特性全面解析
    使用HTMX为WordPress增效:如何在不使用复杂框架的情况下增强平台功能
    为React 19做准备:WordPress 6.6用户指南
    如何删除WordPress中的所有评论

    发布评论