Skip to content

Commit

Permalink
Merge branch 'master' into char
Browse files Browse the repository at this point in the history
  • Loading branch information
AilinKid committed Nov 11, 2021
2 parents 9642fb1 + 58c02d5 commit e56dae4
Show file tree
Hide file tree
Showing 27 changed files with 451 additions and 140 deletions.
23 changes: 21 additions & 2 deletions .github/workflows/compile_br.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ on:
- '!br/docs/**'
- '!br/tests/**'
- '!br/docker/**'
#change trigger policy
pull_request:
types:
- labeled # <--
branches:
- master
- 'release-[0-9].[0-9]*'
Expand All @@ -35,8 +38,25 @@ concurrency:
cancel-in-progress: true

jobs:
compile-windows:
if: github.event_name == 'push' || github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for Windows job
runs-on: windows-latest
steps:
- uses: actions/checkout@v2.1.0

- name: Set up Go
uses: actions/setup-go@v2
with:
go-version: 1.16

- name: Run build
run: make build_tools

compile:
if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for ${{ matrix.os }} / ${{ matrix.target}}

runs-on: ${{ matrix.os }}
strategy:
matrix:
Expand All @@ -47,8 +67,6 @@ jobs:
- os: ubuntu-latest
target: aarch64-unknown-linux-gnu

- os: windows-latest
target: x86_64-pc-windows-msvc
steps:
- uses: actions/checkout@v2.1.0

Expand All @@ -61,6 +79,7 @@ jobs:
run: make build_tools

compile-freebsd:
if: github.event_name == 'pull_request' && github.event.label.name == 'action/run-br-cross-platform-build'
name: Compile for FreeBSD job
runs-on: ubuntu-latest
steps:
Expand Down
3 changes: 2 additions & 1 deletion br/pkg/lightning/restore/table_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,8 @@ func (tr *TableRestore) postProcess(
nextStage = checkpoints.CheckpointStatusChecksumSkipped
}

if err == nil {
// Don't call FinishTable when other lightning will calculate checksum.
if err == nil && !hasDupe && needChecksum {
err = metaMgr.FinishTable(ctx)
}

Expand Down
19 changes: 10 additions & 9 deletions br/pkg/restore/systable_restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@ var unRecoverableTable = map[string]struct{}{
"global_variables": {},

// all user related tables cannot be recovered for now.
"columns_priv": {},
"db": {},
"default_roles": {},
"global_grants": {},
"global_priv": {},
"role_edges": {},
"tables_priv": {},
"user": {},

"column_stats_usage": {},
"columns_priv": {},
"db": {},
"default_roles": {},
"global_grants": {},
"global_priv": {},
"role_edges": {},
"tables_priv": {},
"user": {},
"capture_plan_baselines_blacklist": {},
// gc info don't need to recover.
"gc_delete_range": {},
"gc_delete_range_done": {},
Expand Down
9 changes: 9 additions & 0 deletions br/tests/lightning_distributed_import/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[tikv-importer]
backend = 'local'
duplicate-resolution = 'none'

[post-restore]
checksum = "required"

[mydumper.csv]
header = false
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create database distributed_import;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table t(a int primary key, b varchar(255), c double);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
1,a1,1.1
3,b3,3.3
5,c5,5.5
7,d7,7.7
9,e9,9.9
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create database distributed_import;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create table t(a int primary key, b varchar(255), c double);
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
2,a2,2.2
4,b4,4.4
6,c6,6.6
8,d8,8.8
10,e10,10.10
36 changes: 36 additions & 0 deletions br/tests/lightning_distributed_import/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#!/bin/bash
#
# Copyright 2021 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

LOG_FILE1="$TEST_DIR/lightning-distributed-import1.log"
LOG_FILE2="$TEST_DIR/lightning-distributed-import2.log"

# let lightning run a bit slow to avoid some table in the first lightning finish too fast.
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/restore/SlowDownImport=sleep(50)"

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted1" \
-d "tests/$TEST_NAME/data1" --log-file "$LOG_FILE1" --config "tests/$TEST_NAME/config.toml" &
pid1="$!"

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_distributed_import.sorted2" \
-d "tests/$TEST_NAME/data2" --log-file "$LOG_FILE2" --config "tests/$TEST_NAME/config.toml" &
pid2="$!"

wait "$pid1" "$pid2"

run_sql 'select count(*) from distributed_import.t'
check_contains 'count(*): 10'
65 changes: 49 additions & 16 deletions ddl/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -1397,23 +1397,14 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo

// the follow code is a swap function for rules of two partitions
// though partitions has exchanged their ID, swap still take effect
bundles := make([]*placement.Bundle, 0, 2)
ptBundle, ptOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(partDef.ID))
ptOK = ptOK && !ptBundle.IsEmpty()
ntBundle, ntOK := d.infoCache.GetLatest().BundleByName(placement.GroupID(nt.ID))
ntOK = ntOK && !ntBundle.IsEmpty()
if ptOK && ntOK {
bundles = append(bundles, ptBundle.Clone().Reset(placement.RuleIndexPartition, []int64{nt.ID}))
bundles = append(bundles, ntBundle.Clone().Reset(placement.RuleIndexPartition, []int64{partDef.ID}))
} else if ptOK {
bundles = append(bundles, placement.NewBundle(partDef.ID))
bundles = append(bundles, ptBundle.Clone().Reset(placement.RuleIndexPartition, []int64{nt.ID}))
} else if ntOK {
bundles = append(bundles, placement.NewBundle(nt.ID))
bundles = append(bundles, ntBundle.Clone().Reset(placement.RuleIndexPartition, []int64{partDef.ID}))
}
err = infosync.PutRuleBundles(context.TODO(), bundles)

bundles, err := bundlesForExchangeTablePartition(t, job, pt, partDef, nt)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

if err = infosync.PutRuleBundles(context.TODO(), bundles); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Wrapf(err, "failed to notify PD the placement rules")
}
Expand Down Expand Up @@ -1463,6 +1454,48 @@ func (w *worker) onExchangeTablePartition(d *ddlCtx, t *meta.Meta, job *model.Jo
return ver, nil
}

func bundlesForExchangeTablePartition(t *meta.Meta, job *model.Job, pt *model.TableInfo, newPar *model.PartitionDefinition, nt *model.TableInfo) ([]*placement.Bundle, error) {
bundles := make([]*placement.Bundle, 0, 3)

ptBundle, err := newBundleFromTblInfo(t, job, pt)
if err != nil {
return nil, errors.Trace(err)
}
if ptBundle != nil {
bundles = append(bundles, ptBundle)
}

parBundle, err := newBundleFromPartitionDef(t, job, *newPar)
if err != nil {
return nil, errors.Trace(err)
}
if parBundle != nil {
bundles = append(bundles, parBundle)
}

ntBundle, err := newBundleFromTblInfo(t, job, nt)
if err != nil {
return nil, errors.Trace(err)
}
if ntBundle != nil {
bundles = append(bundles, ntBundle)
}

if parBundle == nil && ntBundle != nil {
// newPar.ID is the ID of old table to exchange, so ntBundle != nil means it has some old placement settings.
// We should remove it in this situation
bundles = append(bundles, placement.NewBundle(newPar.ID))
}

if parBundle != nil && ntBundle == nil {
// nt.ID is the ID of old partition to exchange, so parBundle != nil means it has some old placement settings.
// We should remove it in this situation
bundles = append(bundles, placement.NewBundle(nt.ID))
}

return bundles, nil
}

func checkExchangePartitionRecordValidation(w *worker, pt *model.TableInfo, index int, schemaName, tableName model.CIStr) error {
var sql string
var paramList []interface{}
Expand Down
158 changes: 158 additions & 0 deletions ddl/placement_policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1386,3 +1386,161 @@ func (s *testDBSuite6) TestTruncateTablePartitionWithPlacement(c *C) {
" PARTITION `p3` VALUES LESS THAN (100000) /*T![placement] PRIMARY_REGION=\"r2\" REGIONS=\"r2\" */\n" +
")"))
}

func (s *testDBSuite6) TestExchangePartitionWithPlacement(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("set @@tidb_enable_exchange_partition=1")
tk.MustExec("use test")
tk.MustExec("drop table if exists t1, t2, tp")
tk.MustExec("drop placement policy if exists p1")
tk.MustExec("drop placement policy if exists p2")

tk.MustExec("create placement policy p1 primary_region='r1' regions='r1'")
defer tk.MustExec("drop placement policy p1")

tk.MustExec("create placement policy p2 primary_region='r2' regions='r2'")
defer tk.MustExec("drop placement policy p2")

policy1, ok := tk.Se.GetInfoSchema().(infoschema.InfoSchema).PolicyByName(model.NewCIStr("p1"))
c.Assert(ok, IsTrue)

policy2, ok := tk.Se.GetInfoSchema().(infoschema.InfoSchema).PolicyByName(model.NewCIStr("p2"))
c.Assert(ok, IsTrue)

tk.MustExec(`CREATE TABLE t1 (id INT) placement policy p1`)
defer tk.MustExec("drop table t1")

tk.MustExec(`CREATE TABLE t2 (id INT)`)
defer tk.MustExec("drop table t2")

t1, err := tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
t1ID := t1.Meta().ID

t2, err := tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
t2ID := t2.Meta().ID

tk.MustExec(`CREATE TABLE tp (id INT) primary_region="r1" regions="r1" PARTITION BY RANGE (id) (
PARTITION p0 VALUES LESS THAN (100),
PARTITION p1 VALUES LESS THAN (1000) placement policy p2,
PARTITION p2 VALUES LESS THAN (10000) primary_region="r1" regions="r1,r2"
);`)
defer tk.MustExec("drop table tp")

tp, err := tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
c.Assert(err, IsNil)
tpID := tp.Meta().ID
par0ID := tp.Meta().Partition.Definitions[0].ID
par1ID := tp.Meta().Partition.Definitions[1].ID
par2ID := tp.Meta().Partition.Definitions[2].ID

// exchange par0, t1
tk.MustExec("alter table tp exchange partition p0 with table t1")
tk.MustQuery("show create table t1").Check(testkit.Rows("" +
"t1 CREATE TABLE `t1` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */"))
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" +
"PARTITION BY RANGE ( `id` ) (\n" +
" PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
")"))
tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
c.Assert(err, IsNil)
c.Assert(tp.Meta().ID, Equals, tpID)
c.Assert(tp.Meta().Partition.Definitions[0].ID, Equals, t1ID)
c.Assert(tp.Meta().Partition.Definitions[0].DirectPlacementOpts, IsNil)
c.Assert(tp.Meta().Partition.Definitions[0].PlacementPolicyRef, IsNil)
t1, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
c.Assert(t1.Meta().ID, Equals, par0ID)
c.Assert(t1.Meta().DirectPlacementOpts, IsNil)
c.Assert(t1.Meta().PlacementPolicyRef.ID, Equals, policy1.ID)

// exchange par0, t2
tk.MustExec("alter table tp exchange partition p0 with table t2")
tk.MustQuery("show create table t2").Check(testkit.Rows("" +
"t2 CREATE TABLE `t2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" +
"PARTITION BY RANGE ( `id` ) (\n" +
" PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
")"))
tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
c.Assert(err, IsNil)
c.Assert(tp.Meta().ID, Equals, tpID)
c.Assert(tp.Meta().Partition.Definitions[0].ID, Equals, t2ID)
c.Assert(tp.Meta().Partition.Definitions[0].DirectPlacementOpts, IsNil)
c.Assert(tp.Meta().Partition.Definitions[0].PlacementPolicyRef, IsNil)
t2, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
c.Assert(t2.Meta().ID, Equals, t1ID)
c.Assert(t2.Meta().DirectPlacementOpts, IsNil)
c.Assert(t2.Meta().PlacementPolicyRef, IsNil)

// exchange par1, t1
tk.MustExec("alter table tp exchange partition p1 with table t1")
tk.MustQuery("show create table t1").Check(testkit.Rows("" +
"t1 CREATE TABLE `t1` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PLACEMENT POLICY=`p1` */"))
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" +
"PARTITION BY RANGE ( `id` ) (\n" +
" PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
")"))
tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
c.Assert(err, IsNil)
c.Assert(tp.Meta().ID, Equals, tpID)
c.Assert(tp.Meta().Partition.Definitions[1].ID, Equals, par0ID)
c.Assert(tp.Meta().Partition.Definitions[1].DirectPlacementOpts, IsNil)
c.Assert(tp.Meta().Partition.Definitions[1].PlacementPolicyRef.ID, Equals, policy2.ID)
t1, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t1"))
c.Assert(err, IsNil)
c.Assert(t1.Meta().ID, Equals, par1ID)
c.Assert(t1.Meta().DirectPlacementOpts, IsNil)
c.Assert(t1.Meta().PlacementPolicyRef.ID, Equals, policy1.ID)

// exchange par2, t2
tk.MustExec("alter table tp exchange partition p2 with table t2")
tk.MustQuery("show create table t2").Check(testkit.Rows("" +
"t2 CREATE TABLE `t2` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"))
tk.MustQuery("show create table tp").Check(testkit.Rows("" +
"tp CREATE TABLE `tp` (\n" +
" `id` int(11) DEFAULT NULL\n" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1\" */\n" +
"PARTITION BY RANGE ( `id` ) (\n" +
" PARTITION `p0` VALUES LESS THAN (100),\n" +
" PARTITION `p1` VALUES LESS THAN (1000) /*T![placement] PLACEMENT POLICY=`p2` */,\n" +
" PARTITION `p2` VALUES LESS THAN (10000) /*T![placement] PRIMARY_REGION=\"r1\" REGIONS=\"r1,r2\" */\n" +
")"))
tp, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("tp"))
c.Assert(err, IsNil)
c.Assert(tp.Meta().ID, Equals, tpID)
c.Assert(tp.Meta().Partition.Definitions[2].ID, Equals, t1ID)
c.Assert(tp.Meta().Partition.Definitions[2].DirectPlacementOpts.PrimaryRegion, Equals, "r1")
c.Assert(tp.Meta().Partition.Definitions[2].DirectPlacementOpts.Regions, Equals, "r1,r2")
c.Assert(tp.Meta().Partition.Definitions[2].PlacementPolicyRef, IsNil)
t2, err = tk.Se.GetInfoSchema().(infoschema.InfoSchema).TableByName(model.NewCIStr("test"), model.NewCIStr("t2"))
c.Assert(err, IsNil)
c.Assert(t2.Meta().ID, Equals, par2ID)
c.Assert(t2.Meta().DirectPlacementOpts, IsNil)
c.Assert(t2.Meta().PlacementPolicyRef, IsNil)
}
Loading

0 comments on commit e56dae4

Please sign in to comment.