Skip to content

Commit

Permalink
feat(mysql): 权限申请单据合并冲突 #7217
Browse files Browse the repository at this point in the history
  • Loading branch information
xfwduke committed Dec 5, 2024
1 parent a2d201f commit 19eba17
Show file tree
Hide file tree
Showing 41 changed files with 2,342 additions and 15 deletions.
3 changes: 2 additions & 1 deletion dbm-services/mysql/db-priv/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,5 @@ privkey.pem
infile
outfile
.code.yml
*.log
*.log
priv-service
1 change: 1 addition & 0 deletions dbm-services/mysql/db-priv/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ require (
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.16.0
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.46.1
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0
golang.org/x/time v0.1.0
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
Expand Down
2 changes: 2 additions & 0 deletions dbm-services/mysql/db-priv/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0 h1:pVgRXcIictcr+lBQIFeiwuwtDIs4eL21OuM9nyAADmo=
golang.org/x/exp v0.0.0-20230315142452-642cacee5cc0/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
38 changes: 38 additions & 0 deletions dbm-services/mysql/db-priv/handler/v2/add_priv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package v2

import (
"dbm-services/common/go-pubpkg/errno"
"dbm-services/mysql/priv-service/handler"
"dbm-services/mysql/priv-service/service/v2/add_priv"
"io"

"encoding/json"
"log/slog"
"strings"

"github.com/gin-gonic/gin"
)

func AddPriv(c *gin.Context) {
slog.Info("do AddPriv v2!")

var input add_priv.PrivTaskPara
ticket := strings.TrimPrefix(c.FullPath(), "/priv/v2")

body, err := io.ReadAll(c.Request.Body)
if err != nil {
slog.Error("msg", err)
handler.SendResponse(c, errno.ErrBind, err)
return
}

if err = json.Unmarshal(body, &input); err != nil {
slog.Error("msg", err)
handler.SendResponse(c, errno.ErrBind, err)
return
}

err = input.AddPriv(string(body), ticket)
handler.SendResponse(c, err, nil)
return
}
13 changes: 13 additions & 0 deletions dbm-services/mysql/db-priv/handler/v2/init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package v2

import (
"net/http"

"github.com/gin-gonic/gin"
)

func Routes() []*gin.RouteInfo {
return []*gin.RouteInfo{
{Method: http.MethodPost, Path: "add_priv", HandlerFunc: AddPriv},
}
}
14 changes: 8 additions & 6 deletions dbm-services/mysql/db-priv/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
package main

import (
"dbm-services/common/go-pubpkg/apm/metric"
"dbm-services/common/go-pubpkg/apm/trace"
v2 "dbm-services/mysql/priv-service/handler/v2"
"dbm-services/mysql/priv-service/service"
"dbm-services/mysql/priv-service/util"
"io"
"log/slog"
"net/http"
"os"
"strings"

"dbm-services/common/go-pubpkg/apm/metric"
"dbm-services/common/go-pubpkg/apm/trace"
"dbm-services/mysql/priv-service/service"

"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin"

"gopkg.in/natefinch/lumberjack.v2"

"dbm-services/mysql/priv-service/assests"
Expand All @@ -36,7 +36,7 @@ func main() {

// 元数据库 migration
if viper.GetBool("migrate") {
if err := dbMigrate(); err != nil && err != migrate.ErrNoChange {
if err := dbMigrate(); err != nil && !errors.Is(err, migrate.ErrNoChange) {
slog.Error("migrate失败", err)
os.Exit(0)
}
Expand All @@ -62,6 +62,8 @@ func main() {
context.String(http.StatusOK, "pong")
}}})
handler.RegisterRoutes(engine, "/priv", (&handler.PrivService{}).Routes())
handler.RegisterRoutes(engine, "/priv/v2", v2.Routes())

if err := engine.Run(viper.GetString("http.listenAddress")); err != nil {
slog.Error("注册服务失败", err)
}
Expand Down
6 changes: 3 additions & 3 deletions dbm-services/mysql/db-priv/service/init_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ func openDB(username, password, addr, name string) *gorm.DB {
addr,
name,
true,
tz)
tz,
)
db, err := gorm.Open("mysql", config)
if err != nil {
log.Fatalf(config)
log.Fatalf("Database connection failed. Database name: %s, error: %v", name, err)
log.Fatalf("Database connection failed. config: %s, Database name: %s, error: %v", config, name, err)
}
// set for db connection
setupDB(db)
Expand Down
231 changes: 231 additions & 0 deletions dbm-services/mysql/db-priv/service/v2/add_priv/add_on_mysql.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
package add_priv

import (
"dbm-services/mysql/priv-service/service"
"dbm-services/mysql/priv-service/service/v2/internal"
"dbm-services/mysql/priv-service/service/v2/internal/drs"
"encoding/json"
"fmt"
"log/slog"
"strings"

"github.com/pkg/errors"
)

func (c *PrivTaskPara) addOnMySQL(
clientIps []string, workingInstances map[int64][]string,
accountAndRuleDetails *accountAndRule,
) (reports map[string][]string, err error) {
reports = make(map[string][]string)
var accountPSW service.MultiPsw
err = json.Unmarshal([]byte(accountAndRuleDetails.TbAccount.Psw), &accountPSW)
if err != nil {
slog.Error("add on mysql",
slog.String("psw", accountAndRuleDetails.TbAccount.Psw),
slog.String("err", err.Error()),
)
return nil, err
}
slog.Info(
"add on mysql",
slog.String("psw", accountAndRuleDetails.TbAccount.Psw),
)

for _, dt := range accountAndRuleDetails.TbAccountRulesList {
err := c.addOneDtOnMySQL(clientIps, workingInstances, accountAndRuleDetails, &accountPSW, dt, reports)
if err != nil {
slog.Error("add on mysql", slog.String("err", err.Error()))
return nil, err
}
slog.Info(
"add one dt on mysql finish",
slog.Any("dt", dt),
)
}
slog.Info(
"add on mysql finish",
slog.Any("reports", reports),
)

return reports, nil
}

/*
这个存储过程本身是有限制的
client ip 和 db list 最大只能 2000 长
db list不太可能超长, 因为前面把 dbname 单独循环了
client ip 有可能, 所以这里要切分下
*/
func (c *PrivTaskPara) addOneDtOnMySQL(
clientIps []string,
workingInstances map[int64][]string,
accountAndRuleDetails *accountAndRule,
psw *service.MultiPsw,
dt *service.TbAccountRules,
reports map[string][]string,
) error {
var ipStr string
for idx, ip := range clientIps {
// 限长 1500 代码会比较好些, 不往极限的 2000 搞
ipStr = ipStr + "," + ip
if len(ipStr) > 1500 || idx == len(clientIps)-1 {
slog.Info("add one dt on mysql", slog.String("ipstr", ipStr))
err := c.addOneDtOnMySQLForSplitClient(
strings.Trim(ipStr, ","),
workingInstances,
accountAndRuleDetails,
psw,
dt,
reports,
)
if err != nil {
slog.Error("add on mysql", slog.String("err", err.Error()))
return err
}
ipStr = ""
}
}
return nil
}

func (c *PrivTaskPara) addOneDtOnMySQLForSplitClient(
clientIpsStr string,
workingInstances map[int64][]string,
accountAndRuleDetails *accountAndRule,
psw *service.MultiPsw,
dt *service.TbAccountRules,
reports map[string][]string,
) error {
for bkCloudId, workingInstanceAddrs := range workingInstances {
slog.Info(
"add on mysql call procedure",
slog.Any("addrs", workingInstanceAddrs),
slog.String("user", accountAndRuleDetails.TbAccount.User),
slog.String("ipstr", clientIpsStr),
slog.String("dbname", dt.Dbname),
slog.String("psw", psw.Psw),
slog.String("old psw", psw.OldPsw),
slog.String("priv", dt.DmlDdlPriv),
slog.String("global priv", dt.GlobalPriv),
)
drsRes, err := drs.RPCMySQL(
bkCloudId,
workingInstanceAddrs,
[]string{
fmt.Sprintf(
`CALL infodba_schema.dba_grant('%s', '%s', '%s', '%s', '%s', '%s', '%s')`,
accountAndRuleDetails.TbAccount.User,
clientIpsStr,
dt.Dbname,
psw.Psw,
psw.OldPsw,
dt.DmlDdlPriv,
dt.GlobalPriv,
),
},
true,
30,
)
// 调用 api 有问题, 比如 request body
if err != nil {
slog.Error("add on mysql", slog.String("err", err.Error()))
return err
}
// 这里其实有个没检查, 不过应该不太可能
// len(workingInstanceAddrs) == len(drsRes)
slog.Info("add on mysql", slog.String("response", fmt.Sprintf("%+v", drsRes)))
readOneDtRes(bkCloudId, drsRes, reports)
}

return nil
}

func readOneDtRes(bkCloudId int64, res []*drs.OneAddressResult, reports map[string][]string) {
for _, r := range res {
// 和 addr 建立连接之类的有问题
// 这个错误应该收集起来
if r.ErrorMsg != "" {
err := errors.New(r.ErrorMsg)
slog.Error(
"add on mysql",
slog.String("err", err.Error()),
slog.String("addr", r.Address),
)
reports[r.Address] = []string{r.ErrorMsg}
}
readOneAddrRes(bkCloudId, r, reports)
}
}

func readOneAddrRes(bkCloudId int64, r *drs.OneAddressResult, reports map[string][]string) {
errMsg := r.CmdResults[0].ErrorMsg
if errMsg == "" {
return
}

_, sqlStat, msgText, isException := internal.ParseMySQLErrStr(errMsg)
if !isException {
reports[r.Address] = append(reports[r.Address], msgText)
return
}

switch sqlStat {
case 32401:
reports[r.Address] = append(reports[r.Address], msgText)
case 32402:
// 冲突检测错误
readConflictReport(msgText, bkCloudId, r.Address, reports)
default:
reports[r.Address] = append(reports[r.Address], msgText)
}
}

// 这个函数的所有错误都要收集了, 不能 return
func readConflictReport(uuid string, bkCloudId int64, addr string, reports map[string][]string) {
r, err := drs.RPCMySQL(
bkCloudId,
[]string{addr},
[]string{
fmt.Sprintf(`SELECT * FROM infodba_schema.dba_grant_result WHERE id = '%s'`, uuid),
},
false,
30,
)
if err != nil {
slog.Error("add on mysql read conflict report", slog.String("err", err.Error()))
reports[addr] = append(reports[addr], err.Error())
return
}

if r[0].ErrorMsg != "" {
slog.Error("add on mysql read conflict report", slog.String("err", r[0].ErrorMsg))
reports[addr] = append(reports[addr], r[0].ErrorMsg)
return
}

if r[0].CmdResults[0].ErrorMsg != "" {
slog.Error("add on mysql read conflict report", slog.String("err", r[0].CmdResults[0].ErrorMsg))
reports[addr] = append(reports[addr], r[0].CmdResults[0].ErrorMsg)
return
}

for _, row := range r[0].CmdResults[0].TableData {
dbname, ok := row["dbname"].(string)
var msg string
if ok {
msg = fmt.Sprintf(
`apply %s@%s on %s: %s`,
row["username"], row["client_ip"], dbname, row["msg"],
)
} else {
msg = fmt.Sprintf(
`apply %s@%s: %s`,
row["username"], row["client_ip"], row["msg"],
)
}

slog.Error("add on mysql read conflict report", slog.String("msg", msg))
reports[addr] = append(reports[addr], msg)
}
return
}
Loading

0 comments on commit 19eba17

Please sign in to comment.