数据库接口

概述

dtm中的子事务屏障,需要与数据库交互,xa事务模式,也需要与数据库交互。目前dtm定义的交互接口,采用了与标准库sql兼容的方式,使用中,直接传递sql.DB/sql.Tx即可。

barrier需要的接口

因为barrier需要在事务内部操作barrier相关的表,所以它的接口需要传入一个 *sql.Tx或 *sql.DB

func (bb *BranchBarrier) Call(tx *sql.Tx, busiCall BusiFunc) error
func (bb *BranchBarrier) CallWithDB(db *sql.DB, busiCall BusiFunc) error

Xa需要的接口

Xa事务模式中,本地数据库连接是由dtmcli创建和管理的,因此调用回调函数的参数类型是 *sql.DB 。如果您使用其他的库,例如gorm等,那么您根据 *sql.DB 构建相关的orm对象即可。

type XaLocalFunc func(db *sql.DB, xa *Xa) (interface{}, error)

示例

为了保持dtm的依赖尽量小,dtm的示例,只给出了gorm的,其他orm,在此说明文档中说明用法

GORM

示例在 dtm-examples

barrier示例:

  barrier := MustBarrierFromGin(c)
  // gdb is a *gorm.DB
  tx := gdb.Begin()
	return dtmcli.ResultSuccess, barrier.Call(tx.Statement.ConnPool.(*sql.Tx), func(tx1 *sql.Tx) error {
		return tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, 2).Error
	})

xa示例:

  return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
    // gorm提供接口,可以从标准的sql.DB对象,构造gorm.DB
    gdb, err := gorm.Open(mysql.New(mysql.Config{
      Conn: db,
    }), &gorm.Config{})
    if err != nil {
      return nil, err
    }
    dbr := gdb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
    return dtmcli.ResultSuccess, dbr.Error
  })

GOQU

barrier示例:

	dialect := goqu.Dialect("mysql")
	sdb, err := dbGet().DB.DB()
	if err != nil {
		return nil, err
  }
  gdb := dialect.DB(sdb)
  // gdb is a goqu dialect.DB, the following code shows how to obtain tx
	tx, err := gdb.Begin()
	return dtmcli.ResultSuccess, barrier.Call(tx, func(tx1 *sql.Tx) error {
		_, err := tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, 2)
		return err
	})

xa示例

  return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
    dialect := goqu.Dialect("mysql")
    godb := dialect.DB(db)
    _, err := godb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
    return dtmcli.ResultSuccess, err
  })

XORM

请注意,2021-08-21刚给xorm提了pr,暴露了sql.Tx,虽然已合并,但是还未发布版本,因此需要安装最新版本

go get -u xorm.io/xorm@7cd6a74c9f

barrier示例:

	x, _ := xorm.NewEngineWithDB("mysql", "dtm", core.FromDB(sdbGet()))
	se := x.NewSession()
	defer se.Close()
	err := se.Begin()
	if err != nil {
		return nil, err
	}
  // se is a xorm session, the following code shows how to obtain tx
	return dtmcli.ResultSuccess, barrier.Call(se.Tx().Tx, func(tx1 *sql.Tx) error {
		_, err := se.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, 2)
		return err
	})

xa示例

  return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
    xdb, _ := xorm.NewEngineWithDB("mysql", "dtm", core.FromDB(db))
    _, err := xdb.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
    return dtmcli.ResultSuccess, err
  })

Go-zero

需要go-zero >= v1.2.0

barrier示例:

  // 假设conn为go-zero里面的 sqlx.SqlConn
  db, err := conn.RawDB()
  if err != nil {
    return err
  }
	return dtmcli.ResultSuccess, barrier.CallWithDB(db, func(tx *sql.Tx) error {
		_, err := tx.Exec("update dtm_busi.user_account set balance = balance + ? where user_id = ?", -req.Amount, 2)
		return err
	})

xa示例

  return XaClient.XaLocalTransaction(c.Request.URL.Query(), func(db *sql.DB, xa *dtmcli.Xa) (interface{}, error) {
    conn := NewSqlConnFromDB(db)
    _, err := conn.Exec("update dtm_busi.user_account set balance=balance-? where user_id=?", reqFrom(c).Amount, 1)
    return dtmcli.ResultSuccess, err
  })

ent

可以支持,代码示例待补充

Last Updated: