Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ddl: refactor bundle[1/2] [5/6] #24008

Merged
merged 5 commits into from
Jun 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 16 additions & 196 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package ddl

import (
"encoding/hex"
"fmt"
"math"
"strconv"
Expand All @@ -28,7 +27,6 @@ import (
"unicode/utf8"

"github.com/cznic/mathutil"
"github.com/go-yaml/yaml"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
Expand All @@ -49,12 +47,10 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/table/tables"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/domainutil"
Expand Down Expand Up @@ -5917,150 +5913,6 @@ func (d *ddl) AlterIndexVisibility(ctx sessionctx.Context, ident ast.Ident, inde
return errors.Trace(err)
}

func buildPlacementSpecReplicasAndConstraint(replicas uint64, cnstr string) ([]*placement.Rule, error) {
rules := []*placement.Rule{}

cnstbytes := []byte(cnstr)

constraints1 := []string{}
err1 := yaml.UnmarshalStrict(cnstbytes, &constraints1)
if err1 == nil {
// can not emit REPLICAS with an array label
if replicas == 0 {
return rules, errors.Errorf("array CONSTRAINTS should be with a positive REPLICAS")
}

labelConstraints, err := placement.NewConstraints(constraints1)
if err != nil {
return rules, err
}

rules = append(rules, &placement.Rule{
Count: int(replicas),
Constraints: labelConstraints,
})

return rules, nil
}

constraints2 := map[string]int{}
err2 := yaml.UnmarshalStrict(cnstbytes, &constraints2)
if err2 == nil {
ruleCnt := int(replicas)

for labels, cnt := range constraints2 {
if cnt <= 0 {
return rules, errors.Errorf("count should be positive, but got %d", cnt)
}

if replicas != 0 {
ruleCnt -= cnt
if ruleCnt < 0 {
return rules, errors.Errorf("REPLICAS should be larger or equal to the number of total replicas, but got %d", replicas)
}
}

labelConstraints, err := placement.NewConstraints(strings.Split(strings.TrimSpace(labels), ","))
if err != nil {
return rules, err
}

rules = append(rules, &placement.Rule{
Count: cnt,
Constraints: labelConstraints,
})
}

if ruleCnt > 0 {
rules = append(rules, &placement.Rule{
Count: ruleCnt,
})
}

return rules, nil
}

return nil, errors.Errorf("constraint is neither an array of string, nor a string-to-number map, due to:\n%s\n%s", err1, err2)
}

func buildPlacementSpecs(bundle *placement.Bundle, specs []*ast.PlacementSpec) (*placement.Bundle, error) {
var err error
var spec *ast.PlacementSpec

for _, rspec := range specs {
spec = rspec

var role placement.PeerRoleType
switch spec.Role {
case ast.PlacementRoleFollower:
role = placement.Follower
case ast.PlacementRoleLeader:
if spec.Replicas == 0 {
spec.Replicas = 1
}
if spec.Replicas > 1 {
err = errors.Errorf("replicas can only be 1 when the role is leader")
}
role = placement.Leader
case ast.PlacementRoleLearner:
role = placement.Learner
case ast.PlacementRoleVoter:
role = placement.Voter
default:
err = errors.Errorf("ROLE is not specified")
}
if err != nil {
break
}

if spec.Tp == ast.PlacementAlter || spec.Tp == ast.PlacementDrop {
origLen := len(bundle.Rules)
newRules := bundle.Rules[:0]
for _, r := range bundle.Rules {
if r.Role != role {
newRules = append(newRules, r)
}
}
bundle.Rules = newRules

// alter == drop + add new rules
if spec.Tp == ast.PlacementDrop {
// error if no rules will be dropped
if len(bundle.Rules) == origLen {
err = errors.Errorf("no rule of role '%s' to drop", role)
break
}
continue
}
}

var newRules []*placement.Rule
newRules, err = buildPlacementSpecReplicasAndConstraint(spec.Replicas, spec.Constraints)
if err != nil {
break
}
for _, r := range newRules {
r.Role = role
bundle.Rules = append(bundle.Rules, r)
}
}

if err != nil {
var sb strings.Builder
sb.Reset()

restoreCtx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreKeyWordLowercase|format.RestoreNameBackQuotes, &sb)

if e := spec.Restore(restoreCtx); e != nil {
return nil, ErrInvalidPlacementSpec.GenWithStackByArgs("", err)
}

return nil, ErrInvalidPlacementSpec.GenWithStackByArgs(sb.String(), err)
}

return bundle, nil
}

func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident, spec *ast.AlterTableSpec) (err error) {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ident)
if err != nil {
Expand All @@ -6077,61 +5929,29 @@ func (d *ddl) AlterTableAlterPartition(ctx sessionctx.Context, ident ast.Ident,
return errors.Trace(err)
}

oldBundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID})
bundle := infoschema.GetBundle(d.infoCache.GetLatest(), []int64{partitionID, meta.ID, schema.ID})

oldBundle.ID = placement.GroupID(partitionID)
bundle.ID = placement.GroupID(partitionID)

bundle, err := buildPlacementSpecs(oldBundle, spec.PlacementSpecs)
err = bundle.ApplyPlacementSpec(spec.PlacementSpecs)
if err != nil {
return errors.Trace(err)
}
var sb strings.Builder
sb.Reset()

extraCnt := map[placement.PeerRoleType]int{}
startKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(partitionID)))
endKey := hex.EncodeToString(codec.EncodeBytes(nil, tablecodec.GenTableRecordPrefix(partitionID+1)))
newRules := bundle.Rules[:0]
for i, rule := range bundle.Rules {
// merge all empty constraints
if len(rule.Constraints) == 0 {
extraCnt[rule.Role] += rule.Count
continue
}
// refer to tidb#22065.
// add -engine=tiflash to every rule to avoid schedules to tiflash instances.
// placement rules in SQL is not compatible with `set tiflash replica` yet
if err := rule.Constraints.Add(placement.Constraint{
Op: placement.NotIn,
Key: placement.EngineLabelKey,
Values: []string{placement.EngineLabelTiFlash},
}); err != nil {
return errors.Trace(err)
restoreCtx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreKeyWordLowercase|format.RestoreNameBackQuotes, &sb)

if e := spec.Restore(restoreCtx); e != nil {
return ErrInvalidPlacementSpec.GenWithStackByArgs("", err)
}
rule.GroupID = bundle.ID
rule.ID = strconv.Itoa(i)
rule.StartKeyHex = startKey
rule.EndKeyHex = endKey
newRules = append(newRules, rule)
return ErrInvalidPlacementSpec.GenWithStackByArgs(sb.String(), err)
}
for role, cnt := range extraCnt {
if cnt <= 0 {
continue
}
// refer to tidb#22065.
newRules = append(newRules, &placement.Rule{
GroupID: bundle.ID,
ID: string(role),
Role: role,
Count: cnt,
StartKeyHex: startKey,
EndKeyHex: endKey,
Constraints: []placement.Constraint{{
Op: placement.NotIn,
Key: placement.EngineLabelKey,
Values: []string{placement.EngineLabelTiFlash},
}},
})

err = bundle.Tidy()
if err != nil {
return errors.Trace(err)
}
bundle.Rules = newRules
bundle.Reset(partitionID)

if len(bundle.Rules) == 0 {
bundle.Index = 0
bundle.Override = false
Expand Down
18 changes: 9 additions & 9 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -915,7 +915,7 @@ func dropRuleBundles(d *ddlCtx, physicalTableIDs []int64) error {
for _, ID := range physicalTableIDs {
oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(ID))
if ok && !oldBundle.IsEmpty() {
bundles = append(bundles, placement.BuildPlacementDropBundle(ID))
bundles = append(bundles, placement.NewBundle(ID))
}
}
err := infosync.PutRuleBundles(context.TODO(), bundles)
Expand Down Expand Up @@ -1097,8 +1097,8 @@ func onTruncateTablePartition(d *ddlCtx, t *meta.Meta, job *model.Job) (int64, e
for i, oldID := range oldIDs {
oldBundle, ok := d.infoCache.GetLatest().BundleByName(placement.GroupID(oldID))
if ok && !oldBundle.IsEmpty() {
bundles = append(bundles, placement.BuildPlacementDropBundle(oldID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(oldBundle, newPartitions[i].ID))
bundles = append(bundles, placement.NewBundle(oldID))
bundles = append(bundles, oldBundle.Clone().Reset(newPartitions[i].ID))
}
}

Expand Down Expand Up @@ -1300,14 +1300,14 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID))
ntOK = ntOK && !ntBundle.IsEmpty()
if ptOK && ntOK {
bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID))
bundles = append(bundles, ptBundle.Clone().Reset(nt.ID))
bundles = append(bundles, ntBundle.Clone().Reset(partDef.ID))
} else if ptOK {
bundles = append(bundles, placement.BuildPlacementDropBundle(partDef.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ptBundle, nt.ID))
bundles = append(bundles, placement.NewBundle(partDef.ID))
bundles = append(bundles, ptBundle.Clone().Reset(nt.ID))
} else if ntOK {
bundles = append(bundles, placement.BuildPlacementDropBundle(nt.ID))
bundles = append(bundles, placement.BuildPlacementCopyBundle(ntBundle, partDef.ID))
bundles = append(bundles, placement.NewBundle(nt.ID))
bundles = append(bundles, ntBundle.Clone().Reset(partDef.ID))
}
err = infosync.PutRuleBundles(context.TODO(), bundles)
if err != nil {
Expand Down
Loading