Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support auto_increment_increment & auto_increment_offset. (#14301) #14396

Merged
merged 4 commits into from
Jan 8, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions executor/insert_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -585,23 +585,23 @@ func (e *InsertValues) lazyAdjustAutoIncrementDatum(ctx context.Context, rows []
i++
cnt++
}
// Alloc batch N consecutive (min, max] autoIDs.
// max value can be derived from adding one for cnt times.
min, _, err := table.AllocBatchAutoIncrementValue(ctx, e.Table, e.ctx, cnt)
if e.filterErr(err) != nil {
// AllocBatchAutoIncrementValue allocates batch N consecutive autoIDs.
// The max value can be derived from adding the increment value to min for cnt-1 times.
min, increment, err := table.AllocBatchAutoIncrementValue(ctx, e.Table, e.ctx, cnt)
if e.handleErr(col, &autoDatum, cnt, err) != nil {
return nil, err
}
// It's compatible with mysql setting the first allocated autoID to lastInsertID.
// Cause autoID may be specified by user, judge only the first row is not suitable.
if e.lastInsertID == 0 {
e.lastInsertID = uint64(min) + 1
e.lastInsertID = uint64(min)
}
// Assign autoIDs to rows.
for j := 0; j < cnt; j++ {
offset := j + start
d := rows[offset][colIdx]

id := int64(uint64(min) + uint64(j) + 1)
id := int64(uint64(min) + uint64(j)*uint64(increment))
d.SetAutoID(id, col.Flag)
retryInfo.AddAutoIncrementID(id)

Expand Down
70 changes: 70 additions & 0 deletions executor/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -791,3 +791,73 @@ func (s *testSuite3) TestDMLCast(c *C) {
tk.MustExec("delete from t where a = ''")
tk.MustQuery(`select * from t`).Check(testkit.Rows())
}

// There is a potential issue in MySQL: when the value of auto_increment_offset is greater
// than that of auto_increment_increment, the value of auto_increment_offset is ignored
// (https://dev.mysql.com/doc/refman/8.0/en/replication-options-master.html#sysvar_auto_increment_increment),
// This issue is a flaw of the implementation of MySQL and it doesn't exist in TiDB.
func (s *testSuite3) TestAutoIDIncrementAndOffset(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec(`use test`)
// Test for offset is larger than increment.
tk.Se.GetSessionVars().AutoIncrementIncrement = 5
tk.Se.GetSessionVars().AutoIncrementOffset = 10
tk.MustExec(`create table io (a int key auto_increment)`)
tk.MustExec(`insert into io values (null),(null),(null)`)
tk.MustQuery(`select * from io`).Check(testkit.Rows("10", "15", "20"))
tk.MustExec(`drop table io`)

// Test handle is PK.
tk.MustExec(`create table io (a int key auto_increment)`)
tk.Se.GetSessionVars().AutoIncrementOffset = 10
tk.Se.GetSessionVars().AutoIncrementIncrement = 2
tk.MustExec(`insert into io values (),(),()`)
tk.MustQuery(`select * from io`).Check(testkit.Rows("10", "12", "14"))
tk.MustExec(`delete from io`)

// Test reset the increment.
tk.Se.GetSessionVars().AutoIncrementIncrement = 5
tk.MustExec(`insert into io values (),(),()`)
tk.MustQuery(`select * from io`).Check(testkit.Rows("15", "20", "25"))
tk.MustExec(`delete from io`)

tk.Se.GetSessionVars().AutoIncrementIncrement = 10
tk.MustExec(`insert into io values (),(),()`)
tk.MustQuery(`select * from io`).Check(testkit.Rows("30", "40", "50"))
tk.MustExec(`delete from io`)

tk.Se.GetSessionVars().AutoIncrementIncrement = 5
tk.MustExec(`insert into io values (),(),()`)
tk.MustQuery(`select * from io`).Check(testkit.Rows("55", "60", "65"))
tk.MustExec(`drop table io`)

// Test handle is not PK.
tk.Se.GetSessionVars().AutoIncrementIncrement = 2
tk.Se.GetSessionVars().AutoIncrementOffset = 10
tk.MustExec(`create table io (a int, b int auto_increment, key(b))`)
tk.MustExec(`insert into io(b) values (null),(null),(null)`)
// AutoID allocation will take increment and offset into consideration.
tk.MustQuery(`select b from io`).Check(testkit.Rows("10", "12", "14"))
// HandleID allocation will ignore the increment and offset.
tk.MustQuery(`select _tidb_rowid from io`).Check(testkit.Rows("15", "16", "17"))
tk.MustExec(`delete from io`)

tk.Se.GetSessionVars().AutoIncrementIncrement = 10
tk.MustExec(`insert into io(b) values (null),(null),(null)`)
tk.MustQuery(`select b from io`).Check(testkit.Rows("20", "30", "40"))
tk.MustQuery(`select _tidb_rowid from io`).Check(testkit.Rows("41", "42", "43"))

// Test invalid value.
tk.Se.GetSessionVars().AutoIncrementIncrement = -1
tk.Se.GetSessionVars().AutoIncrementOffset = -2
_, err := tk.Exec(`insert into io(b) values (null),(null),(null)`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[autoid:8060]Invalid auto_increment settings: auto_increment_increment: -1, auto_increment_offset: -2, both of them must be in range [1..65535]")
tk.MustExec(`delete from io`)

tk.Se.GetSessionVars().AutoIncrementIncrement = 65536
tk.Se.GetSessionVars().AutoIncrementOffset = 65536
_, err = tk.Exec(`insert into io(b) values (null),(null),(null)`)
c.Assert(err, NotNil)
c.Assert(err.Error(), Equals, "[autoid:8060]Invalid auto_increment settings: auto_increment_increment: 65536, auto_increment_offset: 65536, both of them must be in range [1..65535]")
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ require (
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623
github.com/pingcap/parser v0.0.0-20200108075733-73763c7a9133
github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2
github.com/pingcap/tidb-tools v3.0.6-0.20191119150227-ff0a3c6e5763+incompatible
github.com/pingcap/tipb v0.0.0-20191120045257-1b9900292ab6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,8 @@ github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d h1:zTHgLr8+0LTEJmj
github.com/pingcap/kvproto v0.0.0-20191106014506-c5d88d699a8d/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd h1:hWDol43WY5PGhsh3+8794bFHY1bPrmu6bTalpssCrGg=
github.com/pingcap/log v0.0.0-20190715063458-479153f07ebd/go.mod h1:WpHUKhNZ18v116SvGrmjkA9CBhYmuUTKL+p8JC9ANEw=
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623 h1:/BJjVyJlNKWMMrgPsbzk5Y9VPJWwHKYttj3oWxnFQ9U=
github.com/pingcap/parser v0.0.0-20191224043251-93f4d5ec2623/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/parser v0.0.0-20200108075733-73763c7a9133 h1:/8H/v/T1nwDGSuiI/DLwWEvGbamR6LZxwyYPD4DGPDk=
github.com/pingcap/parser v0.0.0-20200108075733-73763c7a9133/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2 h1:NL23b8tsg6M1QpSQedK14/Jx++QeyKL2rGiBvXAQVfA=
github.com/pingcap/pd v1.1.0-beta.0.20191223090411-ea2b748f6ee2/go.mod h1:b4gaAPSxaVVtaB+EHamV4Nsv8JmTdjlw0cTKmp4+dRQ=
github.com/pingcap/tidb-tools v3.0.6-0.20191119150227-ff0a3c6e5763+incompatible h1:I8HirWsu1MZp6t9G/g8yKCEjJJxtHooKakEgccvdJ4M=
Expand Down
106 changes: 95 additions & 11 deletions meta/autoid/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const (
minStep = 30000
maxStep = 2000000
defaultConsumeTime = 10 * time.Second
minIncrement = 1
maxIncrement = 65535
)

// Test needs to change it, so it's a variable.
Expand All @@ -48,7 +50,12 @@ type Allocator interface {
// Alloc allocs N consecutive autoID for table with tableID, returning (min, max] of the allocated autoID batch.
// It gets a batch of autoIDs at a time. So it does not need to access storage for each call.
// The consecutive feature is used to insert multiple rows in a statement.
Alloc(tableID int64, n uint64) (int64, int64, error)
// increment & offset is used to validate the start position (the allocator's base is not always the last allocated id).
// The returned range is (min, max]:
// case increment=1 & offset=1: you can derive the ids like min+1, min+2... max.
// case increment=x & offset=y: you firstly need to seek to firstID by `SeekToFirstAutoIDXXX`, then derive the IDs like firstID, firstID + increment * 2... in the caller.
Alloc(tableID int64, n uint64, increment, offset int64) (int64, int64, error)

// Rebase rebases the autoID base for table with tableID and the new base value.
// If allocIDs is true, it will allocate some IDs and save to the cache.
// If allocIDs is false, it will not allocate IDs.
Expand Down Expand Up @@ -261,23 +268,92 @@ func GenLocalSchemaID() int64 {
}

// Alloc implements autoid.Allocator Alloc interface.
func (alloc *allocator) Alloc(tableID int64, n uint64) (int64, int64, error) {
// For autoIncrement allocator, the increment and offset should always be positive in [1, 65535].
// Attention:
// When increment and offset is not the default value(1), the return range (min, max] need to
// calculate the correct start position rather than simply the add 1 to min. Then you can derive
// the successive autoID by adding increment * cnt to firstID for (n-1) times.
//
// Example:
// (6, 13] is returned, increment = 4, offset = 1, n = 2.
// 6 is the last allocated value for other autoID or handle, maybe with different increment and step,
// but actually we don't care about it, all we need is to calculate the new autoID corresponding to the
// increment and offset at this time now. To simplify the rule is like (ID - offset) % increment = 0,
// so the first autoID should be 9, then add increment to it to get 13.
func (alloc *allocator) Alloc(tableID int64, n uint64, increment, offset int64) (int64, int64, error) {
if tableID == 0 {
return 0, 0, errInvalidTableID.GenWithStackByArgs("Invalid tableID")
}
if n == 0 {
return 0, 0, nil
}
var skipValid bool
failpoint.Inject("unValidIncrementAndOffset", func(val failpoint.Value) {
if val.(bool) {
skipValid = true
}
})
if !skipValid {
if !validIncrementAndOffset(increment, offset) {
return 0, 0, errInvalidIncrementAndOffset.GenWithStackByArgs(increment, offset)
}
}
alloc.mu.Lock()
defer alloc.mu.Unlock()
if alloc.isUnsigned {
return alloc.alloc4Unsigned(tableID, n)
return alloc.alloc4Unsigned(tableID, n, increment, offset)
}
return alloc.alloc4Signed(tableID, n, increment, offset)
}

func validIncrementAndOffset(increment, offset int64) bool {
return (increment >= minIncrement && increment <= maxIncrement) && (offset >= minIncrement && offset <= maxIncrement)
}

// CalcNeededBatchSize is used to calculate batch size for autoID allocation.
// It firstly seeks to the first valid position based on increment and offset,
// then plus the length remained, which could be (n-1) * increment.
func CalcNeededBatchSize(base, n, increment, offset int64, isUnsigned bool) int64 {
if increment == 1 {
return n
}
if isUnsigned {
// SeekToFirstAutoIDUnSigned seeks to the next unsigned valid position.
nr := SeekToFirstAutoIDUnSigned(uint64(base), uint64(increment), uint64(offset))
// Calculate the total batch size needed.
nr += (uint64(n) - 1) * uint64(increment)
return int64(nr - uint64(base))
}
return alloc.alloc4Signed(tableID, n)
nr := SeekToFirstAutoIDSigned(base, increment, offset)
// Calculate the total batch size needed.
nr += (n - 1) * increment
return nr - base
}

func (alloc *allocator) alloc4Signed(tableID int64, n uint64) (int64, int64, error) {
n1 := int64(n)
// SeekToFirstAutoIDSigned seeks to the next valid signed position.
func SeekToFirstAutoIDSigned(base, increment, offset int64) int64 {
nr := (base + increment - offset) / increment
nr = nr*increment + offset
return nr
}

// SeekToFirstAutoIDUnSigned seeks to the next valid unsigned position.
func SeekToFirstAutoIDUnSigned(base, increment, offset uint64) uint64 {
nr := (base + increment - offset) / increment
nr = nr*increment + offset
return nr
}

func (alloc *allocator) alloc4Signed(tableID int64, n uint64, increment, offset int64) (int64, int64, error) {
// Check offset rebase if necessary.
if offset-1 > alloc.base {
if err := alloc.rebase4Signed(tableID, offset-1, true); err != nil {
return 0, 0, err
}
}
// CalcNeededBatchSize calculates the total batch size needed.
n1 := CalcNeededBatchSize(alloc.base, int64(n), increment, offset, alloc.isUnsigned)

// Condition alloc.base+N1 > alloc.end will overflow when alloc.base + N1 > MaxInt64. So need this.
if math.MaxInt64-alloc.base <= n1 {
return 0, 0, ErrAutoincReadFailed
Expand Down Expand Up @@ -330,14 +406,22 @@ func (alloc *allocator) alloc4Signed(tableID int64, n uint64) (int64, int64, err
return min, alloc.base, nil
}

func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64) (int64, int64, error) {
n1 := int64(n)
func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64, increment, offset int64) (int64, int64, error) {
// Check offset rebase if necessary.
if uint64(offset-1) > uint64(alloc.base) {
if err := alloc.rebase4Unsigned(tableID, uint64(offset-1), true); err != nil {
return 0, 0, err
}
}
// CalcNeededBatchSize calculates the total batch size needed.
n1 := CalcNeededBatchSize(alloc.base, int64(n), increment, offset, alloc.isUnsigned)

// Condition alloc.base+n1 > alloc.end will overflow when alloc.base + n1 > MaxInt64. So need this.
if math.MaxUint64-uint64(alloc.base) <= n {
if math.MaxUint64-uint64(alloc.base) <= uint64(n1) {
return 0, 0, ErrAutoincReadFailed
}
// The local rest is not enough for alloc, skip it.
if uint64(alloc.base)+n > uint64(alloc.end) {
if uint64(alloc.base)+uint64(n1) > uint64(alloc.end) {
var newBase, newEnd int64
startTime := time.Now()
// Although it may skip a segment here, we still treat it as consumed.
Expand Down Expand Up @@ -381,6 +465,6 @@ func (alloc *allocator) alloc4Unsigned(tableID int64, n uint64) (int64, int64, e
zap.Int64("database ID", alloc.dbID))
min := alloc.base
// Use uint64 n directly.
alloc.base = int64(uint64(alloc.base) + n)
alloc.base = int64(uint64(alloc.base) + uint64(n1))
return min, alloc.base, nil
}
Loading