Skip to content

Commit

Permalink
ddl: refactor bundle[1/2] [5/6] (#24008)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox committed Jun 11, 2021
1 parent 1f0245a commit 10db453
Show file tree
Hide file tree
Showing 12 changed files with 635 additions and 768 deletions.
212 changes: 16 additions & 196 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package ddl

import (
"encoding/hex"
"fmt"
"math"
"strconv"
Expand All @@ -28,7 +27,6 @@ import (
"unicode/utf8"

"github.com/cznic/mathutil"
"github.com/go-yaml/yaml"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
Expand All @@ -49,12 +47,10 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -5917,150 +5913,6 @@ func (d *ddl) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, inde
return errors.Trace(err)
}

func buildPlacementSpecReplicasAndConstraint(replicas uint64, cnstr string) ([]*placement.Rule, error) {
rules := []*placement.Rule{}

cnstbytes := []byte(cnstr)

constraints1 := []string{}
err1 := yaml.UnmarshalStrict(cnstbytes, &constraints1)
if err1 == nil {
// can not emit REPLICAS with an array label
if replicas == 0 {
return rules, errors.Errorf("array CONSTRAINTS should be with a positive REPLICAS")
}

labelConstraints, err := placement.NewConstraints(constraints1)
if err != nil {
return rules, err
}

rules = append(rules, &placement.Rule{
Count: int(replicas),
Constraints: labelConstraints,
})

return rules, nil
}

constraints2 := map[string]int{}
err2 := yaml.UnmarshalStrict(cnstbytes, &constraints2)
if err2 == nil {
ruleCnt := int(replicas)

for labels, cnt := range constraints2 {
if cnt <= 0 {
return rules, errors.Errorf("count should be positive, but got %d", cnt)
}

if replicas != 0 {
ruleCnt -= cnt
if ruleCnt < 0 {
return rules, errors.Errorf("REPLICAS should be larger or equal to the number of total replicas, but got %d", replicas)
}
}

labelConstraints, err := placement.NewConstraints(strings.Split(strings.TrimSpace(labels), ","))
if err != nil {
return rules, err
}

rules = append(rules, &placement.Rule{
Count: cnt,
Constraints: labelConstraints,
})
}

if ruleCnt > 0 {
rules = append(rules, &placement.Rule{
Count: ruleCnt,
})
}

return rules, nil
}

return nil, errors.Errorf("constraint is neither an array of string, nor a string-to-number map, due to:\n%s\n%s", err1, err2)
}

func buildPlacementSpecs(bundle *placement.Bundle, specs []*ast.PlacementSpec) (*placement.Bundle, error) {
var err error
var spec *ast.PlacementSpec

for _, rspec := range specs {
spec = rspec

var role placement.PeerRoleType
switch spec.Role {
case ast.PlacementRoleFollower:
role = placement.Follower
case ast.PlacementRoleLeader:
if spec.Replicas == 0 {
spec.Replicas = 1
}
if spec.Replicas > 1 {
err = errors.Errorf("replicas can only be 1 when the role is leader")
}
role = placement.Leader
case ast.PlacementRoleLearner:
role = placement.Learner
case ast.PlacementRoleVoter:
role = placement.Voter
default:
err = errors.Errorf("ROLE is not specified")
}
if err != nil {
break
}

if spec.Tp == ast.PlacementAlter || spec.Tp == ast.PlacementDrop {
origLen := len(bundle.Rules)
newRules := bundle.Rules[:0]
for _, r := range bundle.Rules {
if r.Role != role {
newRules = append(newRules, r)
}
}
bundle.Rules = newRules

// alter == drop + add new rules
if spec.Tp == ast.PlacementDrop {
// error if no rules will be dropped
if len(bundle.Rules) == origLen {
err = errors.Errorf("no rule of role '%s' to drop", role)
break
}
continue
}
}

var newRules []*placement.Rule
newRules, err = buildPlacementSpecReplicasAndConstraint(spec.Replicas, spec.Constraints)
if err != nil {
break
}
for _, r := range newRules {
r.Role = role
bundle.Rules = append(bundle.Rules, r)
}
}

if err != nil {
var sb strings.Builder
sb.Reset()

restoreCtx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreKeyWordLowercase|format.RestoreNameBackQuotes, &sb)

if e := spec.Restore(restoreCtx); e != nil {
return nil, ErrInvalidPlacementSpec.GenWithStackByArgs("", err)
}

return nil, ErrInvalidPlacementSpec.GenWithStackByArgs(sb.String(), err)
}

return bundle, nil
}

func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ident)
if err != nil {
Expand All @@ -6077,61 +5929,29 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident,
return errors.Trace(err)
}

oldBundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID})
bundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID})

oldBundle.ID = placement.GroupID(partitionID)
bundle.ID = placement.GroupID(partitionID)

bundle, err := buildPlacementSpecs(oldBundle, spec.PlacementSpecs)
err = bundle.ApplyPlacementSpec(spec.PlacementSpecs)
if err != nil {
return errors.Trace(err)
}
var sb strings.Builder
sb.Reset()

extraCnt := map[placement.PeerRoleType]int{}
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(partitionID)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(partitionID+1)))
newRules := bundle.Rules[:0]
for i, rule := range bundle.Rules {
// merge all empty constraints
if len(rule.Constraints) == 0 {
extraCnt[rule.Role] += rule.Count
continue
}
// refer to tidb#22065.
// add -engine=tiflash to every rule to avoid schedules to tiflash instances.
// placement rules in SQL is not compatible with `set tiflash replica` yet
if err := rule.Constraints.Add(placement.Constraint{
Op: placement.NotIn,
Key: placement.EngineLabelKey,
Values: []string{placement.EngineLabelTiFlash},
}); err != nil {
return errors.Trace(err)
restoreCtx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreKeyWordLowercase|format.RestoreNameBackQuotes, &sb)

if e := spec.Restore(restoreCtx); e != nil {
return ErrInvalidPlacementSpec.GenWithStackByArgs("", err)
}
rule.GroupID = bundle.ID
rule.ID = strconv.Itoa(i)
rule.StartKeyHex = startKey
rule.EndKeyHex = endKey
newRules = append(newRules, rule)
return ErrInvalidPlacementSpec.GenWithStackByArgs(sb.String(), err)
}
for role, cnt := range extraCnt {
if cnt <= 0 {
continue
}
// refer to tidb#22065.
newRules = append(newRules, &placement.Rule{
GroupID: bundle.ID,
ID: string(role),
Role: role,
Count: cnt,
StartKeyHex: startKey,
EndKeyHex: endKey,
Constraints: []placement.Constraint{{
Op: placement.NotIn,
Key: placement.EngineLabelKey,
Values: []string{placement.EngineLabelTiFlash},
}},
})

err = bundle.Tidy()
if err != nil {
return errors.Trace(err)
}
bundle.Rules = newRules
bundle.Reset(partitionID)

if len(bundle.Rules) == 0 {
bundle.Index = 0
bundle.Override = false
Expand Down
18 changes: 9 additions & 9 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error {
for _, ID := range physicalTableIDs {
oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID))
if ok && !oldBundle.IsEmpty() {
bundles = append(bundles, placement.BuildPlacementDropBundle(ID))
bundles = append(bundles, placement.NewBundle(ID))
}
}
err := infosync.PutRuleBundles(context.TODO(), bundles)
Expand Down Expand Up @@ -1097,8 +1097,8 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
for i, oldID := range oldIDs {
oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(oldID))
if ok && !oldBundle.IsEmpty() {
bundles = append(bundles, placement.BuildPlacementDropBundle(oldID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID))
bundles = append(bundles, placement.NewBundle(oldID))
bundles = append(bundles, oldBundle.Clone().Reset(newPartitions[i].ID))
}
}

Expand Down Expand Up @@ -1300,14 +1300,14 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID))
ntOK = ntOK && !ntBundle.IsEmpty()
if ptOK && ntOK {
bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID))
bundles = append(bundles, ptBundle.Clone().Reset(nt.ID))
bundles = append(bundles, ntBundle.Clone().Reset(partDef.ID))
} else if ptOK {
bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID))
bundles = append(bundles, placement.NewBundle(partDef.ID))
bundles = append(bundles, ptBundle.Clone().Reset(nt.ID))
} else if ntOK {
bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID))
bundles = append(bundles, placement.NewBundle(nt.ID))
bundles = append(bundles, ntBundle.Clone().Reset(partDef.ID))
}
err = infosync.PutRuleBundles(context.TODO(), bundles)
if err != nil {
Expand Down
Loading

0 comments on commit 10db453

Please sign in to comment.