Skip to content

Commit

Permalink
lightning: support null value for auto-incr column on local backend (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
dsdashun committed May 12, 2022
1 parent 4bab1db commit b824e35
Show file tree
Hide file tree
Showing 9 changed files with 219 additions and 33 deletions.
97 changes: 68 additions & 29 deletions br/pkg/lightning/backend/kv/sql2kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,50 +356,27 @@ func (kvcodec *tableKVEncoder) Encode(
}

meta := kvcodec.tbl.Meta()
isAutoRandom := meta.PKIsHandle && meta.ContainsAutoRandomBits()
for i, col := range cols {
var theDatum *types.Datum = nil
j := columnPermutation[i]
isAutoIncCol := mysql.HasAutoIncrementFlag(col.GetFlag())
isPk := mysql.HasPriKeyFlag(col.GetFlag())
switch {
case j >= 0 && j < len(row):
value, err = table.CastValue(kvcodec.se, row[j], col.ToInfo(), false, false)
if err == nil {
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx)
}
case isAutoIncCol:
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isAutoRandom && isPk:
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.GetFlag()) {
val = types.NewUintDatum(uint64(realRowID))
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
if j >= 0 && j < len(row) {
theDatum = &row[j]
}
value, err = kvcodec.getActualDatum(rowID, i, theDatum)
if err != nil {
return nil, logKVConvertFailed(logger, row, j, col.ToInfo(), err)
}

record = append(record, value)

if isAutoRandom && isPk {
if isTableAutoRandom(meta) && isPKCol(col.ToInfo()) {
incrementalBits := autoRandomIncrementBits(col, int(meta.AutoRandomBits))
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoRandomType)
if err := alloc.Rebase(context.Background(), value.GetInt64()&((1<<incrementalBits)-1), false); err != nil {
return nil, errors.Trace(err)
}
}
if isAutoIncCol {
if isAutoIncCol(col.ToInfo()) {
alloc := kvcodec.tbl.Allocators(kvcodec.se).Get(autoid.AutoIncrementType)
if err := alloc.Rebase(context.Background(), getAutoRecordID(value, &col.FieldType), false); err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -450,6 +427,68 @@ func (kvcodec *tableKVEncoder) Encode(
return kvPairs, nil
}

func isTableAutoRandom(tblMeta *model.TableInfo) bool {
return tblMeta.PKIsHandle && tblMeta.ContainsAutoRandomBits()
}

func isAutoIncCol(colInfo *model.ColumnInfo) bool {
return mysql.HasAutoIncrementFlag(colInfo.GetFlag())
}

func isPKCol(colInfo *model.ColumnInfo) bool {
return mysql.HasPriKeyFlag(colInfo.GetFlag())
}

func (kvcodec *tableKVEncoder) getActualDatum(rowID int64, colIndex int, inputDatum *types.Datum) (types.Datum, error) {
var (
value types.Datum
err error
)

tblMeta := kvcodec.tbl.Meta()
cols := kvcodec.tbl.Cols()

// Since this method is only called when iterating the columns in the `Encode()` method,
// we can assume that the `colIndex` always have a valid input
col := cols[colIndex]

isBadNullValue := false
if inputDatum != nil {
value, err = table.CastValue(kvcodec.se, *inputDatum, col.ToInfo(), false, false)
if err != nil {
return value, err
}
if err := col.CheckNotNull(&value); err == nil {
return value, nil // the most normal case
}
isBadNullValue = true
}
// handle special values
switch {
case isAutoIncCol(col.ToInfo()):
// we still need a conversion, e.g. to catch overflow with a TINYINT column.
value, err = table.CastValue(kvcodec.se, types.NewIntDatum(rowID), col.ToInfo(), false, false)
case isTableAutoRandom(tblMeta) && isPKCol(col.ToInfo()):
var val types.Datum
realRowID := kvcodec.autoIDFn(rowID)
if mysql.HasUnsignedFlag(col.GetFlag()) {
val = types.NewUintDatum(uint64(realRowID))
} else {
val = types.NewIntDatum(realRowID)
}
value, err = table.CastValue(kvcodec.se, val, col.ToInfo(), false, false)
case col.IsGenerated():
// inject some dummy value for gen col so that MutRowFromDatums below sees a real value instead of nil.
// if MutRowFromDatums sees a nil it won't initialize the underlying storage and cause SetDatum to panic.
value = types.GetMinValue(&col.FieldType)
case isBadNullValue:
err = col.HandleBadNull(&value, kvcodec.se.vars.StmtCtx)
default:
value, err = table.GetColDefaultValue(kvcodec.se, col.ToInfo())
}
return value, err
}

// get record value for auto-increment field
//
// See: https://github.com/pingcap/tidb/blob/47f0f15b14ed54fc2222f3e304e29df7b05e6805/executor/insert_common.go#L781-L852
Expand Down
92 changes: 88 additions & 4 deletions br/pkg/lightning/backend/kv/sql2kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,11 +322,17 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
},
})
require.NoError(t, err)
pairs, err := encoder.Encode(logger, []types.Datum{
types.NewStringDatum("1"),

strDatumForID := types.NewStringDatum("1")
actualDatum, err := encoder.(*tableKVEncoder).getActualDatum(70, 0, &strDatumForID)
require.NoError(t, err)
require.Equal(t, types.NewFloat64Datum(1.0), actualDatum)

pairsExpect, err := encoder.Encode(logger, []types.Datum{
types.NewFloat64Datum(1.0),
}, 70, []int{0, -1}, "1.csv", 1234)
require.NoError(t, err)
require.Equal(t, pairs, &KvPairs{pairs: []common.KvPair{
require.Equal(t, &KvPairs{pairs: []common.KvPair{
{
Key: []uint8{0x74, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1, 0x5f, 0x72, 0x80, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
Val: []uint8{0x80, 0x0, 0x1, 0x0, 0x0, 0x0, 0x1, 0x8, 0x0, 0xbf, 0xf0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Expand All @@ -337,10 +343,88 @@ func TestEncodeDoubleAutoIncrement(t *testing.T) {
Val: []uint8{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x46},
RowID: 70,
},
}})
}}, pairsExpect)

pairs, err := encoder.Encode(logger, []types.Datum{
types.NewStringDatum("1"),
}, 70, []int{0, -1}, "1.csv", 1234)
require.NoError(t, err)

require.Equal(t, pairsExpect, pairs)
require.Equal(t, tbl.Allocators(encoder.(*tableKVEncoder).se).Get(autoid.AutoIncrementType).Base(), int64(70))
}

func TestEncodeMissingAutoValue(t *testing.T) {
logger := log.Logger{Logger: zap.NewNop()}

var rowID int64 = 70
type testTableInfo struct {
AllocType autoid.AllocatorType
CreateStmt string
}

for _, testTblInfo := range []testTableInfo{
{
AllocType: autoid.AutoIncrementType,
CreateStmt: "create table t (id integer primary key auto_increment);",
},
{
AllocType: autoid.AutoRandomType,
CreateStmt: "create table t (id integer primary key auto_random(3));",
},
} {
tblInfo := mockTableInfo(t, testTblInfo.CreateStmt)
if testTblInfo.AllocType == autoid.AutoRandomType {
// seems parser can't parse auto_random properly.
tblInfo.AutoRandomBits = 3
}
tbl, err := tables.TableFromMeta(NewPanickingAllocators(0), tblInfo)
require.NoError(t, err)

encoder, err := NewTableKVEncoder(tbl, &SessionOptions{
SQLMode: mysql.ModeStrictAllTables,
SysVars: map[string]string{
"tidb_row_format_version": "2",
},
})
require.NoError(t, err)

realRowID := encoder.(*tableKVEncoder).autoIDFn(rowID)

var nullDatum types.Datum
nullDatum.SetNull()

expectIDDatum := types.NewIntDatum(realRowID)
actualIDDatum, err := encoder.(*tableKVEncoder).getActualDatum(rowID, 0, nil)
require.NoError(t, err)
require.Equal(t, expectIDDatum, actualIDDatum)

actualIDDatum, err = encoder.(*tableKVEncoder).getActualDatum(rowID, 0, &nullDatum)
require.NoError(t, err)
require.Equal(t, expectIDDatum, actualIDDatum)

pairsExpect, err := encoder.Encode(logger, []types.Datum{
types.NewIntDatum(realRowID),
}, rowID, []int{0}, "1.csv", 1234)
require.NoError(t, err)

// test insert a NULL value on auto_xxxx column, and it is set to NOT NULL
pairs, err := encoder.Encode(logger, []types.Datum{
nullDatum,
}, rowID, []int{0}, "1.csv", 1234)
require.NoError(t, err)
require.Equalf(t, pairsExpect, pairs, "test table info: %+v", testTblInfo)
require.Equalf(t, rowID, tbl.Allocators(encoder.(*tableKVEncoder).se).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo)

// test insert a row without specifying the auto_xxxx column
pairs, err = encoder.Encode(logger, []types.Datum{}, rowID, []int{0}, "1.csv", 1234)
require.NoError(t, err)
require.Equalf(t, pairsExpect, pairs, "test table info: %+v", testTblInfo)
require.Equalf(t, rowID, tbl.Allocators(encoder.(*tableKVEncoder).se).Get(testTblInfo.AllocType).Base(), "test table info: %+v", testTblInfo)

}
}

func mockTableInfo(t *testing.T, createSQL string) *model.TableInfo {
parser := parser.New()
node, err := parser.ParseOneStmt(createSQL, "", "")
Expand Down
2 changes: 2 additions & 0 deletions br/tests/lightning_auto_columns/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[tikv-importer]
backend = "local"
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema lightning_auto_cols;
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE t_auto_incr (
id bigint PRIMARY KEY AUTO_INCREMENT,
c char(40) NOT NULL DEFAULT '');
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO t_auto_incr (id, c) VALUES
(1, 'normal_pk_01');
INSERT INTO t_auto_incr VALUES
(NULL, 'null_pk_02');
INSERT INTO t_auto_incr VALUES
(NULL, 'null_pk_03');
INSERT INTO t_auto_incr (id, c) VALUES
(4, 'normal_pk_04');
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE t_auto_random (
id bigint PRIMARY KEY AUTO_RANDOM(3),
c char(40) NOT NULL DEFAULT '');
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
INSERT INTO t_auto_random (id, c) VALUES
(1, 'normal_pk_01');
INSERT INTO t_auto_random VALUES
(NULL, 'null_pk_02');
INSERT INTO t_auto_random VALUES
(NULL, 'null_pk_03');
INSERT INTO t_auto_random (id, c) VALUES
(4, 'normal_pk_04');
38 changes: 38 additions & 0 deletions br/tests/lightning_auto_columns/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#!/bin/sh
#
# Copyright 2020 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

run_sql 'DROP DATABASE IF EXISTS lightning_auto_cols;'
run_lightning

run_sql "SELECT CONCAT_WS(':', id, c) AS row_data FROM lightning_auto_cols.t_auto_incr;"
check_contains "row_data: 1:normal_pk_01"
check_contains "row_data: 2:null_pk_02"
check_contains "row_data: 3:null_pk_03"
check_contains "row_data: 4:normal_pk_04"
run_sql "SELECT COUNT(*) AS row_count FROM lightning_auto_cols.t_auto_incr;"
check_contains "row_count: 4"

run_sql "SELECT CONCAT_WS(':', id, c) AS row_data FROM lightning_auto_cols.t_auto_random;"
check_contains "row_data: 1:normal_pk_01"
check_contains ":null_pk_02"
check_not_contains "row_data: 0:null_pk_02"
check_contains ":null_pk_03"
check_not_contains "row_data: 0:null_pk_03"
check_contains "row_data: 4:normal_pk_04"
run_sql "SELECT COUNT(*) AS row_count FROM lightning_auto_cols.t_auto_random;"
check_contains "row_count: 4"

0 comments on commit b824e35

Please sign in to comment.