Skip to content

Commit

Permalink
placement: refine the bundle contruction logic (#28450)
Browse files Browse the repository at this point in the history
  • Loading branch information
xhebox authored Oct 2, 2021
1 parent b0b559f commit 2a02e6e
Show file tree
Hide file tree
Showing 12 changed files with 192 additions and 623 deletions.
52 changes: 0 additions & 52 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2862,58 +2862,6 @@ func (s *testSerialDBSuite) TestCreateTableWithLike2(c *C) {
c.Assert(t1.Meta().TiFlashReplica.AvailablePartitionIDs, DeepEquals, []int64{partition.Definitions[0].ID, partition.Definitions[1].ID})
}

func (s *testSerialDBSuite) TestCreateTableWithSpecialComment(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")

// case for direct options
tk.MustExec(`DROP TABLE IF EXISTS t`)
tk.MustExec("CREATE TABLE `t` (\n" +
" `a` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin " +
"/*T![placement] PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"CONSTRAINTS=\"[+disk=ssd]\" */",
)
tk.MustQuery(`show create table t`).Check(testutil.RowsWithSep("|",
"t CREATE TABLE `t` (\n"+
" `a` int(11) DEFAULT NULL\n"+
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin "+
"/*T![placement] PRIMARY_REGION=\"cn-east-1\" "+
"REGIONS=\"cn-east-1, cn-east-2\" "+
"FOLLOWERS=2 "+
"CONSTRAINTS=\"[+disk=ssd]\" */",
))

// case for policy
tk.MustExec(`DROP TABLE IF EXISTS t`)
tk.MustExec("create placement policy x " +
"PRIMARY_REGION=\"cn-east-1\" " +
"REGIONS=\"cn-east-1, cn-east-2\" " +
"FOLLOWERS=2 " +
"CONSTRAINTS=\"[+disk=ssd]\" ")
tk.MustExec("create table t(a int)" +
"/*T![placement] PLACEMENT POLICY=`x` */")
tk.MustQuery(`show create table t`).Check(testutil.RowsWithSep("|",
"t CREATE TABLE `t` (\n"+
" `a` int(11) DEFAULT NULL\n"+
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin "+
"/*T![placement] PLACEMENT POLICY=`x` */",
))

// case for policy with quotes
tk.MustExec(`DROP TABLE IF EXISTS t`)
tk.MustExec("create table t(a int)" +
"/*T![placement] PLACEMENT POLICY=\"x\" */")
tk.MustQuery(`show create table t`).Check(testutil.RowsWithSep("|",
"t CREATE TABLE `t` (\n"+
" `a` int(11) DEFAULT NULL\n"+
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin "+
"/*T![placement] PLACEMENT POLICY=`x` */",
))
}

func (s *testSerialDBSuite) TestCreateTable(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
Expand Down
87 changes: 25 additions & 62 deletions ddl/placement/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"errors"
"fmt"
"math"
"sort"
"strconv"
"strings"

Expand Down Expand Up @@ -65,9 +66,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
leaderConstraints := options.LeaderConstraints
learnerConstraints := options.LearnerConstraints
followerConstraints := options.FollowerConstraints
voterConstraints := options.VoterConstraints
followerCount := options.Followers
voterCount := options.Voters
learnerCount := options.Learners

CommonConstraints, err := NewConstraintsFromYaml([]byte(constraints))
Expand All @@ -90,21 +89,6 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
Rules = append(Rules, NewRule(Leader, 1, LeaderConstraints))
}

if voterCount > 0 {
VoterRules, err := NewRules(Voter, voterCount, voterConstraints)
if err != nil {
return nil, fmt.Errorf("%w: invalid VoterConstraints", err)
}
for _, rule := range VoterRules {
for _, cnst := range CommonConstraints {
if err := rule.Constraints.Add(cnst); err != nil {
return nil, fmt.Errorf("%w: VoterConstraints conflicts with Constraints", err)
}
}
}
Rules = append(Rules, VoterRules...)
}

if followerCount > 0 {
FollowerRules, err := NewRules(Follower, followerCount, followerConstraints)
if err != nil {
Expand Down Expand Up @@ -144,7 +128,7 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error
return nil, fmt.Errorf("%w: options can not be nil", ErrInvalidPlacementOptions)
}

if len(options.LeaderConstraints) > 0 || len(options.LearnerConstraints) > 0 || len(options.FollowerConstraints) > 0 || len(options.VoterConstraints) > 0 || options.Learners > 0 || options.Voters > 0 {
if len(options.LeaderConstraints) > 0 || len(options.LearnerConstraints) > 0 || len(options.FollowerConstraints) > 0 || len(options.Constraints) > 0 || options.Learners > 0 {
return nil, fmt.Errorf("%w: should be PRIMARY_REGION=.. REGIONS=.. FOLLOWERS=.. SCHEDULE=.., mixed other constraints into options %s", ErrInvalidPlacementOptions, options)
}

Expand All @@ -167,60 +151,39 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error
}
schedule := options.Schedule

var constraints Constraints
var err error
// regions must include the primary
sort.Strings(regions)
primaryIndex := sort.SearchStrings(regions, primaryRegion)
if primaryIndex >= len(regions) || regions[primaryIndex] != primaryRegion {
return nil, fmt.Errorf("%w: primary region must be included in regions", ErrInvalidPlacementOptions)
}

var Rules []*Rule

Rules := []*Rule{}
switch strings.ToLower(schedule) {
case "", "even":
constraints, err = NewConstraints([]string{fmt.Sprintf("+region=%s", primaryRegion)})
if err != nil {
return nil, fmt.Errorf("%w: invalid PrimaryRegion '%s'", err, primaryRegion)
primaryCount := uint64(math.Ceil(float64(followers+1) / float64(len(regions))))
Rules = append(Rules, NewRule(Voter, primaryCount, NewConstraintsDirect(NewConstraintDirect("region", In, primaryRegion))))

if len(regions) > 1 {
// delete primary from regions
regions = regions[:primaryIndex+copy(regions[primaryIndex:], regions[primaryIndex+1:])]
Rules = append(Rules, NewRule(Follower, followers+1-primaryCount, NewConstraintsDirect(NewConstraintDirect("region", In, regions...))))
}
Rules = append(Rules, NewRule(Leader, 1, constraints))
case "majority_in_primary":
// We already have the leader, so we need to calculate how many additional followers
// need to be in the primary region for quorum
followersInPrimary := uint64(math.Ceil(float64(followers) / 2))
constraints, err = NewConstraints([]string{fmt.Sprintf("+region=%s", primaryRegion)})
if err != nil {
return nil, fmt.Errorf("%w: invalid PrimaryRegion, '%s'", err, primaryRegion)
// calculate how many replicas need to be in the primary region for quorum
primaryCount := uint64(math.Ceil(float64(followers+1)/2 + 1))
Rules = append(Rules, NewRule(Voter, primaryCount, NewConstraintsDirect(NewConstraintDirect("region", In, primaryRegion))))

if len(regions) > 1 {
// delete primary from regions
regions = regions[:primaryIndex+copy(regions[primaryIndex:], regions[primaryIndex+1:])]
Rules = append(Rules, NewRule(Follower, followers+1-primaryCount, NewConstraintsDirect(NewConstraintDirect("region", In, regions...))))
}
Rules = append(Rules, NewRule(Leader, 1, constraints))
Rules = append(Rules, NewRule(Follower, followersInPrimary, constraints))
// even split the remaining followers
followers = followers - followersInPrimary
default:
return nil, fmt.Errorf("%w: unsupported schedule %s", ErrInvalidPlacementOptions, schedule)
}

if uint64(len(regions)) > followers {
return nil, fmt.Errorf("%w: remain %d region to schedule, only %d follower left", ErrInvalidPlacementOptions, uint64(len(regions)), followers)
}

if len(regions) == 0 {
constraints, err := NewConstraints(nil)
if err != nil {
return nil, err
}
Rules = append(Rules, NewRule(Follower, followers, constraints))
} else {
count := followers / uint64(len(regions))
rem := followers - count*uint64(len(regions))
for _, region := range regions {
constraints, err = NewConstraints([]string{fmt.Sprintf("+region=%s", region)})
if err != nil {
return nil, fmt.Errorf("%w: invalid region of 'Regions', '%s'", err, region)
}
replica := count
if rem > 0 {
replica += 1
rem--
}
Rules = append(Rules, NewRule(Follower, replica, constraints))
}
}

return &Bundle{Rules: Rules}, nil
}

Expand Down
Loading

0 comments on commit 2a02e6e

Please sign in to comment.