Skip to content

Commit

Permalink
fix breaker config nil (#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
EAHITechnology authored Mar 29, 2021
1 parent 873e1a7 commit 578a9e3
Show file tree
Hide file tree
Showing 18 changed files with 178 additions and 82 deletions.
10 changes: 7 additions & 3 deletions conf/namespace/test_namespace.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ frontend:
- "test_weir_db"
slow_sql_time: 50
denied_sqls:
- sql: "select * from test"
ttl: 0
- sql: "select count(*) from test"
ttl: 60
denied_ips:
idle_timeout: 3600
users:
Expand All @@ -26,7 +30,7 @@ breaker:
- min_qps: 3
failure_ratethreshold: 0
failure_num: 5
sql_timeoutMs: 2000
open_status_durationMs: 5000
sql_timeout_ms: 2000
open_status_duration_ms: 5000
size: 10
cellIntervalMs: 1000
cellInterval_ms: 1000
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/EAHITechnology/weir v0.0.0-20210325113128-176b4c6ed699 h1:bKZN4dhKgB1oxxG7E8NNXH9kdBcW/3h1GAutOWAqyY4=
github.com/Jeffail/gabs/v2 v2.5.1 h1:ANfZYjpMlfTTKebycu4X1AgkVWumFVDYQl7JwOr4mDk=
github.com/Jeffail/gabs/v2 v2.5.1/go.mod h1:xCn81vdHKxFUuWWAaD5jCTQDNPBMh5pPs9IJ+NcziBI=
github.com/KyleBanks/depth v1.2.1/go.mod h1:jzSb9d0L43HxTQfT+oSA1EEp2q+ne2uh6XgeJcm8brE=
Expand Down
13 changes: 9 additions & 4 deletions pkg/config/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@ type Namespace struct {
Breaker BreakerInfo `yaml:"breaker"`
}

type DeniedSqlInfo struct {
Sql string `yaml:"sql"`
Ttl int64 `yaml:"ttl"`
}

type FrontendNamespace struct {
AllowedDBs []string `yaml:"allowed_dbs"`
SlowSQLTime int `yaml:"slow_sql_time"`
DeniedSQLs []string `yaml:"denied_sqls"`
DeniedSQLs []DeniedSqlInfo `yaml:"denied_sqls"`
DeniedIPs []string `yaml:"denied_ips"`
IdleTimeout int `yaml:"idle_timeout"`
IsGlobalBreaker bool `yaml:"global_breaker_switch"`
Expand All @@ -34,12 +39,12 @@ type BackendNamespace struct {

type StrategyInfo struct {
MinQps int64 `yaml:"min_qps"`
SqlTimeoutMs int64 `yaml:"sql_timeoutMs"`
SqlTimeoutMs int64 `yaml:"sql_timeout_ms"`
FailureRatethreshold int64 `yaml:"failure_ratethreshold"`
FailureNum int64 `yaml:"failure_num"`
OpenStatusDurationMs int64 `yaml:"open_status_durationMs"`
OpenStatusDurationMs int64 `yaml:"open_status_duratio_ms"`
Size int64 `yaml:"size"`
CellIntervalMs int64 `yaml:"cellIntervalMs"`
CellIntervalMs int64 `yaml:"cellInterval_ms"`
}

type BreakerInfo struct {
Expand Down
2 changes: 2 additions & 0 deletions pkg/proxy/driver/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ type Namespace interface {
Name() string
IsDatabaseAllowed(db string) bool
ListDatabases() []string
IsDeniedSQL(sqlFeature uint32) bool
GetPooledConn(context.Context) (PooledBackendConn, error)
IncrConnCount()
DescConnCount()
GetBreaker() (Breaker, error)
}

type Breaker interface {
IsUseBreaker() bool
GetBreakerScope() string
Hit(name string, idx int, isFail bool) error
Status(name string) (int32, int)
Expand Down
32 changes: 23 additions & 9 deletions pkg/proxy/driver/queryctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package driver
import (
"context"
"fmt"
"hash/crc32"
"time"

"github.com/pingcap-incubator/weir/pkg/proxy/server"
wast "github.com/pingcap-incubator/weir/pkg/util/ast"
cb "github.com/pingcap-incubator/weir/pkg/util/rate_limit_breaker/circuit_breaker"
"github.com/pingcap/parser"
"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -40,7 +42,7 @@ type QueryCtxImpl struct {
parser *parser.Parser
sessionVars *SessionVarsWrapper

connMgr *BackendConnManager
connMgr *BackendConnManager
}

func NewQueryCtxImpl(nsmgr NamespaceManager, connId uint64) *QueryCtxImpl {
Expand Down Expand Up @@ -109,22 +111,28 @@ func (q *QueryCtxImpl) Execute(ctx context.Context, sql string) (*gomysql.Result
return nil, err
}

tableName := extractFirstTableNameFromStmt(stmt)
ctx = CtxWithAstTableName(ctx, tableName)
tableName := wast.ExtractFirstTableNameFromStmt(stmt)
ctx = wast.CtxWithAstTableName(ctx, tableName)

if q.isStmtDenied(ctx, sql, stmt) {
sqlParadigm, err := q.extractSqlParadigm(ctx, sql)
if err != nil {
return nil, err
}
sqlDigest := crc32.ChecksumIEEE([]byte(sqlParadigm))

if q.isStmtDenied(ctx, sqlDigest) {
q.recordDeniedQueryMetrics(ctx, stmt)
return nil, mysql.NewErrf(mysql.ErrUnknown, "statement is denied")
}

if !isStmtNeedToCheckCircuitBreaking(stmt) {
if !q.isStmtNeedToCheckCircuitBreaking(stmt) {
return q.executeStmt(ctx, sql, stmt)
}

return q.executeWithBreakerInterceptor(ctx, stmt, sql)
return q.executeWithBreakerInterceptor(ctx, stmt, sql, sqlDigest)
}

func (q *QueryCtxImpl) executeWithBreakerInterceptor(ctx context.Context, stmtNode ast.StmtNode, sql string) (*gomysql.Result, error) {
func (q *QueryCtxImpl) executeWithBreakerInterceptor(ctx context.Context, stmtNode ast.StmtNode, sql string, sqlDigest uint32) (*gomysql.Result, error) {
startTime := time.Now()

breaker, err := q.ns.GetBreaker()
Expand Down Expand Up @@ -258,7 +266,14 @@ func (q *QueryCtxImpl) initAttachedConnHolder() {
q.connMgr = connMgr
}

func isStmtNeedToCheckCircuitBreaking(stmt ast.StmtNode) bool {
func (q *QueryCtxImpl) isStmtNeedToCheckCircuitBreaking(stmt ast.StmtNode) bool {
breaker, err := q.ns.GetBreaker()
if err != nil {
return false
}
if !breaker.IsUseBreaker() {
return false
}
switch stmt.(type) {
case *ast.SelectStmt:
return true
Expand All @@ -272,4 +287,3 @@ func isStmtNeedToCheckCircuitBreaking(stmt ast.StmtNode) bool {
return false
}
}

15 changes: 6 additions & 9 deletions pkg/proxy/driver/queryctx_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"strings"

"github.com/pingcap-incubator/weir/pkg/proxy/constant"
wast "github.com/pingcap-incubator/weir/pkg/util/ast"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
Expand All @@ -14,30 +15,26 @@ import (
)

// TODO(eastfisher): implement this function
func (q *QueryCtxImpl) isStmtDenied(ctx context.Context, sql string, stmtNode ast.StmtNode) bool {
//todo解析sql

return false
func (q *QueryCtxImpl) isStmtDenied(ctx context.Context, sqlDigest uint32) bool {
return q.ns.IsDeniedSQL(sqlDigest)
}

func (q *QueryCtxImpl) getBreakerName(ctx context.Context, sql string, breaker Breaker) (string, error) {
switch breaker.GetBreakerScope() {
case "global":
return q.ns.Name(), nil
case "namesapce":
return q.ns.Name(), nil
case "db":
return q.currentDB, nil
case "table":
firstTableName, _ := GetAstTableNameFromCtx(ctx)
firstTableName, _ := wast.GetAstTableNameFromCtx(ctx)
return firstTableName, nil
case "sql":
sqlParadigm, err := q.extractSqlParadigm(ctx, sql)
if err != nil {
return "", err
}
sqlDigest := crc32.ChecksumIEEE([]byte(sqlParadigm))
return string(UInt322Bytes(sqlDigest)), nil
return string(wast.UInt322Bytes(sqlDigest)), nil
default:
return "", errors.New("breaker_name err")
}
Expand All @@ -49,7 +46,7 @@ func (q *QueryCtxImpl) extractSqlParadigm(ctx context.Context, sql string) (stri
if err != nil {
return "", err
}
visitor, err := ExtractAstVisit(featureStmt)
visitor, err := wast.ExtractAstVisit(featureStmt)
if err != nil {
return "", err
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/proxy/driver/queryctx_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ import (
"context"

"github.com/pingcap-incubator/weir/pkg/proxy/metrics"
wast "github.com/pingcap-incubator/weir/pkg/util/ast"
"github.com/pingcap/parser/ast"
)

func (q *QueryCtxImpl) recordQueryMetrics(ctx context.Context, stmt ast.StmtNode, err error, durationMilliSecond float64) {
ns := q.ns.Name()
db := q.currentDB
firstTableName, _ := GetAstTableNameFromCtx(ctx)
firstTableName, _ := wast.GetAstTableNameFromCtx(ctx)
stmtType := metrics.GetStmtTypeName(stmt)
retLabel := metrics.RetLabel(err)

Expand All @@ -21,7 +22,7 @@ func (q *QueryCtxImpl) recordQueryMetrics(ctx context.Context, stmt ast.StmtNode
func (q *QueryCtxImpl) recordDeniedQueryMetrics(ctx context.Context, stmt ast.StmtNode) {
ns := q.ns.Name()
db := q.currentDB
firstTableName, _ := GetAstTableNameFromCtx(ctx)
firstTableName, _ := wast.GetAstTableNameFromCtx(ctx)
stmtType := metrics.GetStmtTypeName(stmt)

metrics.QueryCtxQueryDeniedCounter.WithLabelValues(ns, db, firstTableName, stmtType).Inc()
Expand Down
Loading

0 comments on commit 578a9e3

Please sign in to comment.