Go 是一种年轻而强大的语言,专为编写小型、简单的服务而创建。但随着时间推移,越来越多复杂应用和系统也在采用 Go 进行开发,这就出现了一些问题:如何处理事务?
为了深入探讨这个问题,我们假设一个简单的业务场景:用户注册。
作为一个系统,我希望在注册时创建用户和个人资料。
RDBMS/DBMS 的现代 Go 库不像 C# 和 Java 的 Hibernate、Entity Framework 那样强大,因此我们必须自己处理。为了实现用户注册业务场景,我们将创建并评估几种处理事务的方法。
由于每种事务处理方法都必须与 sql.DB 和 sql.Tx 配合使用,因此需要引入接口来封装对数据库的访问。
生成的应用有两个域实体和一个用于访问数据库的 DB 低级接口。
package model
type User struct {
Email string
}
type Profile struct {
Name string
}
package transaction
type DB interface {
QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row
QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error)
ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error)
PrepareContext(ctx context.Context, query string) (*sql.Stmt, error)
}
准备工作完成后,就可以采用如下两种方法。
1. 事务感知上下文
工作原理:transaction.Manager启动事务并将其放入上下文。当存储库执行查询时,助手会检查上下文中是否有事务,并使用创建的事务来执行查询,或者如果上下文为空,则不使用事务来执行查询。
为了启动事务,我们需要实体:Manager
package transaction
type Manager interface {
Run(
ctx context.Context,
callback func(ctx context.Context) error,
) error
}
transaction.Manager 实现:
package transaction
import (
"context"
"database/sql"
"github.com/pkg/errors"
"go.uber.org/multierr"
)
type txKey string
var ctxWithTx = txKey("tx")
type SQLTransactionManager struct {
db *sql.DB
}
func NewManager(db *sql.DB) *SQLTransactionManager {
return &SQLTransactionManager{db: db}
}
func (m *SQLTransactionManager) Run(
ctx context.Context,
callback func(ctx context.Context) error,
) (rErr error) {
tx, err := m.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return errors.WithStack(err)
}
defer func() {
if rErr != nil {
rErr = multierr.Combine(rErr, errors.WithStack(tx.Rollback()))
}
}()
defer func() {
if rec := recover(); rec != nil {
if e, ok := rec.(error); ok {
rErr = e
} else {
rErr = errors.Errorf("%s", rec)
}
}
}()
if err = callback(putTxToContext(ctx, tx)); err != nil {
return err
}
return errors.WithStack(tx.Commit())
}
func ExtractTxFromContext(ctx context.Context) (*sql.Tx, bool) {
tx := ctx.Value(ctxWithTx)
if t, ok := tx.(*sql.Tx); ok {
return t, true
}
return nil, false
}
func putTxToContext(ctx context.Context, tx *sql.Tx) context.Context {
return context.WithValue(ctx, ctxWithTx, tx)
}
DB实现:
package storage
import (
"brand/transaction/example1/transaction"
"context"
"database/sql"
)
type DB struct {
db *sql.DB
}
func NewDB(db *sql.DB) *DB {
return &DB{db: db}
}
func (d *DB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row {
tx, ok := transaction.ExtractTxFromContext(ctx)
if !ok {
return d.db.QueryRowContext(ctx, query, args...)
}
return tx.QueryRowContext(ctx, query, args...)
}
func (d *DB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
tx, ok := transaction.ExtractTxFromContext(ctx)
if !ok {
return d.db.QueryContext(ctx, query, args...)
}
return tx.QueryContext(ctx, query, args...)
}
func (d *DB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
tx, ok := transaction.ExtractTxFromContext(ctx)
if !ok {
return d.db.ExecContext(ctx, query, args...)
}
return tx.ExecContext(ctx, query, args...)
}
func (d *DB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
tx, ok := transaction.ExtractTxFromContext(ctx)
if !ok {
return d.db.PrepareContext(ctx, query)
}
return tx.PrepareContext(ctx, query)
}
RegistrationService 负责用户注册业务场景
package service
import (
"brand/transaction/example1/model"
"brand/transaction/example1/transaction"
"context"
)
type UserRepository interface {
Create(ctx context.Context, user *model.User) error
}
type ProfileRepository interface {
Create(ctx context.Context, user *model.Profile) error
}
type RegistrationData struct {
Email string
Name string
}
type RegistrationService struct {
transactionManager transaction.Manager
userRepository UserRepository
profileRepository ProfileRepository
}
func NewRegistrationService(
transactionManager transaction.Manager,
userRepository UserRepository,
profileRepository ProfileRepository,
) *RegistrationService {
return &RegistrationService{
transactionManager: transactionManager,
userRepository: userRepository,
profileRepository: profileRepository,
}
}
func (s *RegistrationService) Register(ctx context.Context, data RegistrationData) error {
return s.transactionManager.Run(ctx, func(ctx context.Context) error {
if err := s.userRepository.Create(ctx, &model.User{
Email: data.Email,
}); err != nil {
return err
}
if err := s.profileRepository.Create(ctx, &model.Profile{
Name: data.Name,
}); err != nil {
return err
}
return nil
})
}
User和ProfileRepository的实现:
package storage
import (
"brand/transaction"
"brand/transaction/example1/model"
"context"
)
type ProfileRepository struct {
db transaction.DB
}
func NewProfileRepository(db transaction.DB) *ProfileRepository {
return &ProfileRepository{db: db}
}
func (r *ProfileRepository) Create(ctx context.Context, profile *model.Profile) error {
_, err := r.db.ExecContext(ctx, "INSERT ...", profile.Name)
return err
}
package storage
import (
"brand/transaction"
"brand/transaction/example1/model"
"context"
)
type UserRepository struct {
db transaction.DB
}
func NewUserRepository(db transaction.DB) *UserRepository {
return &UserRepository{db: db}
}
func (r *UserRepository) Create(ctx context.Context, user *model.User) error {
_, err := r.db.ExecContext(ctx, "INSERT ...", user.Email)
return err
}
优点:
- 简单:存储库会自动使用由 TransactionManager 启动的事务
- 与存储无关:客户端代码对存储类型一无所知
缺点
- 不符合Go的使用习惯
- 控制较少:无法防止在事务中启动事务,可能会产生意想不到的副作用,代码审查时必须考虑到这一点
2. 事务感知存储库
工作原理:事务管理器启动事务并将事务放入回调,存储库工厂方法使用事务创建自己。
为了启动事务,我们需要实体:Manager
type Manager interface {
Run(
ctx context.Context,
callback func(ctx context.Context, tx *sql.Tx) error,
) error
}
transaction.Manager 实现:
package transaction
import (
"context"
"database/sql"
"github.com/pkg/errors"
"go.uber.org/multierr"
)
type txKey string
var ctxWithTx = txKey("tx")
type SQLTransactionManager struct {
db *sql.DB
}
func NewManager(db *sql.DB) *SQLTransactionManager {
return &SQLTransactionManager{db: db}
}
func (m *SQLTransactionManager) Run(
ctx context.Context,
callback func(ctx context.Context, tx *sql.Tx) error,
) (rErr error) {
tx, err := m.db.BeginTx(ctx, &sql.TxOptions{})
if err != nil {
return errors.WithStack(err)
}
defer func() {
if rErr != nil {
rErr = multierr.Combine(rErr, errors.WithStack(tx.Rollback()))
}
}()
defer func() {
if rec := recover(); rec != nil {
if e, ok := rec.(error); ok {
rErr = e
} else {
rErr = errors.Errorf("%s", rec)
}
}
}()
if err = callback(ctx, tx); err != nil {
return err
}
return errors.WithStack(tx.Commit())
}
DB实现:
package storage
import (
"context"
"database/sql"
)
type DB struct {
db *sql.DB
}
func NewDB(db *sql.DB) *DB {
return &DB{db: db}
}
func (d *DB) QueryRowContext(ctx context.Context, query string, args ...any) *sql.Row {
return d.db.QueryRowContext(ctx, query, args...)
}
func (d *DB) QueryContext(ctx context.Context, query string, args ...any) (*sql.Rows, error) {
return d.db.QueryContext(ctx, query, args...)
}
func (d *DB) ExecContext(ctx context.Context, query string, args ...any) (sql.Result, error) {
return d.db.ExecContext(ctx, query, args...)
}
func (d *DB) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
return d.db.PrepareContext(ctx, query)
}
RegistrationService 负责用户注册业务场景
有两种方法可以创建带有事务的存储库:
- 存储库带有结构方法 WithTransaction(示例中使用了该方法)
- 存储库工厂 userRepositoryFactory.CreateFromTransaction(tx)
package service
import (
"brand/transaction/example2/model"
"brand/transaction/example2/transaction"
"context"
"database/sql"
)
type UserRepository interface {
Create(ctx context.Context, user *model.User) error
WithTransaction(tx *sql.Tx) UserRepository
}
type ProfileRepository interface {
Create(ctx context.Context, user *model.Profile) error
WithTransaction(tx *sql.Tx) ProfileRepository
}
type RegistrationData struct {
Email string
Name string
}
type RegistrationService struct {
transactionManager transaction.Manager
userRepository UserRepository
profileRepository ProfileRepository
}
func NewRegistrationService(
transactionManager transaction.Manager,
userRepository UserRepository,
profileRepository ProfileRepository,
) *RegistrationService {
return &RegistrationService{
transactionManager: transactionManager,
userRepository: userRepository,
profileRepository: profileRepository,
}
}
func (s *RegistrationService) Register(ctx context.Context, data RegistrationData) error {
return s.transactionManager.Run(ctx, func(ctx context.Context, tx *sql.Tx) error {
userRepository := s.userRepository.WithTransaction(tx)
profileRepository := s.profileRepository.WithTransaction(tx)
if err := userRepository.Create(ctx, &model.User{
Email: data.Email,
}); err != nil {
return err
}
if err := profileRepository.Create(ctx, &model.Profile{
Name: data.Name,
}); err != nil {
return err
}
return nil
})
}
User和ProfileRepository的实现:
package storage
import (
"brand/transaction"
"brand/transaction/example2/model"
"brand/transaction/example2/service"
"context"
"database/sql"
)
type ProfileRepository struct {
db transaction.DB
}
func NewProfileRepository(db transaction.DB) *ProfileRepository {
return &ProfileRepository{db: db}
}
func (r *ProfileRepository) Create(ctx context.Context, profile *model.Profile) error {
_, err := r.db.ExecContext(ctx, "INSERT ...", profile.Name)
return err
}
func (r *ProfileRepository) WithTransaction(tx *sql.Tx) service.ProfileRepository {
return NewProfileRepository(tx)
}
package storage
import (
"brand/transaction"
"brand/transaction/example2/model"
"brand/transaction/example2/service"
"context"
"database/sql"
)
type UserRepository struct {
db transaction.DB
}
func NewUserRepository(db transaction.DB) *UserRepository {
return &UserRepository{db: db}
}
func (r *UserRepository) Create(ctx context.Context, user *model.User) error {
_, err := r.db.ExecContext(ctx, "INSERT ...", user.Email)
return err
}
func (r *UserRepository) WithTransaction(tx *sql.Tx) service.UserRepository {
return NewUserRepository(tx)
}
优点:
- 更明确:在注册服务内部创建事务,可避免副作用
缺点:
- 客户端代码知道存储类型
- 客户端代码负责创建新的存储库
我相信任何一种方法都能使代码更易读、更简单,但建议使用第一种方法,从而可以隐藏存储细节,使我们能够在一个项目中使用多个存储,而无需考虑实现和存储细节。
package storage
import (
"brand/transaction"
"brand/transaction/example2/model"
"brand/transaction/example2/service"
"context"
"database/sql"
)
type UserRepository struct {
db transaction.DB
}
func NewUserRepository(db transaction.DB) *UserRepository {
return &UserRepository{db: db}
}
func (r *UserRepository) Create(ctx context.Context, user *model.User) error {
_, err := r.db.ExecContext(ctx, "INSERT ...", user.Email)
return err
}
func (r *UserRepository) WithTransaction(tx *sql.Tx) service.UserRepository {
return NewUserRepository(tx)
}