Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/release-5.4' into pr/35770
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao committed Sep 9, 2022
2 parents f5b8122 + 20872f0 commit a780ace
Show file tree
Hide file tree
Showing 57 changed files with 1,351 additions and 127 deletions.
1 change: 1 addition & 0 deletions .github/licenserc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ header:
- '.github/'
- 'parser/'
- 'dumpling/'
- '**/*.sql'
comment: on-failure
7 changes: 5 additions & 2 deletions br/pkg/lightning/common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"syscall"
"time"
Expand Down Expand Up @@ -53,8 +55,9 @@ type MySQLConnectParam struct {
}

func (param *MySQLConnectParam) ToDSN() string {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/?charset=utf8mb4&sql_mode='%s'&maxAllowedPacket=%d&tls=%s",
param.User, param.Password, param.Host, param.Port,
hostPort := net.JoinHostPort(param.Host, strconv.Itoa(param.Port))
dsn := fmt.Sprintf("%s:%s@tcp(%s)/?charset=utf8mb4&sql_mode='%s'&maxAllowedPacket=%d&tls=%s",
param.User, param.Password, hostPort,
param.SQLMode, param.MaxAllowedPacket, param.TLS)

for k, v := range param.Vars {
Expand Down
3 changes: 3 additions & 0 deletions br/pkg/lightning/common/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ func (s *utilSuite) TestToDSN(c *C) {
},
}
c.Assert(param.ToDSN(), Equals, "root:123456@tcp(127.0.0.1:4000)/?charset=utf8mb4&sql_mode='strict'&maxAllowedPacket=1234&tls=cluster&tidb_distsql_scan_concurrency='1'")

param.Host = "::1"
c.Assert(param.ToDSN(), Equals, "root:123456@tcp([::1]:4000)/?charset=utf8mb4&sql_mode='strict'&maxAllowedPacket=1234&tls=cluster&tidb_distsql_scan_concurrency='1'")
}

func (s *utilSuite) TestIsContextCanceledError(c *C) {
Expand Down
7 changes: 3 additions & 4 deletions br/pkg/lightning/mydump/parquet_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,10 @@ func NewParquetParser(

columns := make([]string, 0, len(reader.Footer.Schema)-1)
columnMetas := make([]*parquet.SchemaElement, 0, len(reader.Footer.Schema)-1)
for _, c := range reader.SchemaHandler.SchemaElements {
for i, c := range reader.SchemaHandler.SchemaElements {
if c.GetNumChildren() == 0 {
// NOTE: the SchemaElement.Name is capitalized, SchemaHandler.Infos.ExName is the raw column name
// though in this context, there is no difference between these two fields
columns = append(columns, strings.ToLower(c.Name))
// we need to use the raw name, SchemaElement.Name might be prefixed with PARGO_PERFIX_
columns = append(columns, strings.ToLower(reader.SchemaHandler.GetExName(i)))
// transfer old ConvertedType to LogicalType
columnMeta := c
if c.ConvertedType != nil && c.LogicalType == nil {
Expand Down
Binary file not shown.
8 changes: 8 additions & 0 deletions br/tests/lightning_parquet/db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -217,3 +217,11 @@ CREATE TABLE `warehouse` (
PRIMARY KEY (`w_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
/*!40101 SET character_set_client = @saved_cs_client */;

DROP TABLE IF EXISTS `special_col_name`;
CREATE TABLE `special_col_name` (
`c1` varchar(128) DEFAULT NULL,
`_c2` timestamp NULL DEFAULT NULL,
`123_c3` timestamp NULL DEFAULT NULL,
`中_c4` timestamp NULL DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;
1 change: 1 addition & 0 deletions br/tests/lightning_parquet/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ for BACKEND in local importer tidb; do
check_row_count orders 100
check_row_count stock 50
check_row_count warehouse 1
check_row_count special_col_name 1

run_sql 'select sum(c_id) from test.customer;'
check_contains "sum(c_id): 210"
Expand Down
8 changes: 8 additions & 0 deletions ddl/db_partition_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,14 @@ create table log_message_1 (
"partition p1 values less than ('G'));",
ddl.ErrRangeNotIncreasing,
},
{
"create table t(d datetime)" +
"partition by range columns (d) (" +
"partition p0 values less than ('2022-01-01')," +
"partition p1 values less than (MAXVALUE), " +
"partition p2 values less than (MAXVALUE));",
ddl.ErrRangeNotIncreasing,
},
{
"CREATE TABLE t1(c0 INT) PARTITION BY HASH((NOT c0)) PARTITIONS 2;",
ddl.ErrPartitionFunctionIsNotAllowed,
Expand Down
2 changes: 1 addition & 1 deletion ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2319,7 +2319,7 @@ func checkTwoRangeColumns(ctx sessionctx.Context, curr, prev *model.PartitionDef
}
for i := 0; i < len(pi.Columns); i++ {
// Special handling for MAXVALUE.
if strings.EqualFold(curr.LessThan[i], partitionMaxValue) {
if strings.EqualFold(curr.LessThan[i], partitionMaxValue) && !strings.EqualFold(prev.LessThan[i], partitionMaxValue) {
// If current is maxvalue, it certainly >= previous.
return true, nil
}
Expand Down
10 changes: 8 additions & 2 deletions ddl/sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ func onAlterSequence(t *meta.Meta, job *model.Job) (ver int64, _ error) {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
shouldUpdateVer := !reflect.DeepEqual(*tblInfo.Sequence, copySequenceInfo) || restart
same := reflect.DeepEqual(*tblInfo.Sequence, copySequenceInfo)
if same && !restart {
job.State = model.JobStateDone
return ver, errors.Trace(err)
}
tblInfo.Sequence = &copySequenceInfo

// Restart the sequence value.
Expand All @@ -276,7 +280,9 @@ func onAlterSequence(t *meta.Meta, job *model.Job) (ver int64, _ error) {
}

// Store the sequence info into kv.
ver, err = updateVersionAndTableInfo(t, job, tblInfo, shouldUpdateVer)
// Set shouldUpdateVer always to be true even altering doesn't take effect, since some tools like drainer won't take
// care of SchemaVersion=0.
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
Expand Down
21 changes: 8 additions & 13 deletions domain/infosync/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,8 @@ func GlobalInfoSyncerInit(ctx context.Context, id string, serverIDGetter func()
if err != nil {
return nil, err
}
if etcdCli != nil {
is.labelRuleManager = initLabelRuleManager(etcdCli.Endpoints())
is.placementManager = initPlacementManager(etcdCli.Endpoints())
} else {
is.labelRuleManager = initLabelRuleManager([]string{})
is.placementManager = initPlacementManager([]string{})
}
is.labelRuleManager = initLabelRuleManager(etcdCli)
is.placementManager = initPlacementManager(etcdCli)
setGlobalInfoSyncer(is)
return is, nil
}
Expand All @@ -214,18 +209,18 @@ func (is *InfoSyncer) GetSessionManager() util2.SessionManager {
return is.manager
}

func initLabelRuleManager(addrs []string) LabelRuleManager {
if len(addrs) == 0 {
func initLabelRuleManager(etcdCli *clientv3.Client) LabelRuleManager {
if etcdCli == nil {
return &mockLabelManager{labelRules: map[string][]byte{}}
}
return &PDLabelManager{addrs: addrs}
return &PDLabelManager{etcdCli: etcdCli}
}

func initPlacementManager(addrs []string) PlacementManager {
if len(addrs) == 0 {
func initPlacementManager(etcdCli *clientv3.Client) PlacementManager {
if etcdCli == nil {
return &mockPlacementManager{}
}
return &PDPlacementManager{addrs: addrs}
return &PDPlacementManager{etcdCli: etcdCli}
}

// GetServerInfo gets self server static information.
Expand Down
11 changes: 6 additions & 5 deletions domain/infosync/label_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/ddl/label"
"github.com/pingcap/tidb/util/pdapi"
"go.etcd.io/etcd/clientv3"
)

// LabelRuleManager manages label rules
Expand All @@ -35,7 +36,7 @@ type LabelRuleManager interface {

// PDLabelManager manages rules with pd
type PDLabelManager struct {
addrs []string
etcdCli *clientv3.Client
}

// PutLabelRule implements PutLabelRule
Expand All @@ -44,7 +45,7 @@ func (lm *PDLabelManager) PutLabelRule(ctx context.Context, rule *label.Rule) er
if err != nil {
return err
}
_, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r))
_, err = doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rule"), "POST", bytes.NewReader(r))
return err
}

Expand All @@ -55,14 +56,14 @@ func (lm *PDLabelManager) UpdateLabelRules(ctx context.Context, patch *label.Rul
return err
}

_, err = doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r))
_, err = doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "PATCH", bytes.NewReader(r))
return err
}

// GetAllLabelRules implements GetAllLabelRules
func (lm *PDLabelManager) GetAllLabelRules(ctx context.Context) ([]*label.Rule, error) {
var rules []*label.Rule
res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules"), "GET", nil)
res, err := doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules"), "GET", nil)

if err == nil && res != nil {
err = json.Unmarshal(res, &rules)
Expand All @@ -78,7 +79,7 @@ func (lm *PDLabelManager) GetLabelRules(ctx context.Context, ruleIDs []string) (
}

rules := []*label.Rule{}
res, err := doRequest(ctx, lm.addrs, path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids))
res, err := doRequest(ctx, lm.etcdCli.Endpoints(), path.Join(pdapi.Config, "region-label", "rules", "ids"), "GET", bytes.NewReader(ids))

if err == nil && res != nil {
err = json.Unmarshal(res, &rules)
Expand Down
9 changes: 5 additions & 4 deletions domain/infosync/placement_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/pingcap/tidb/ddl/placement"
"github.com/pingcap/tidb/util/pdapi"
"go.etcd.io/etcd/clientv3"
)

// PlacementManager manages placement settings
Expand All @@ -37,13 +38,13 @@ type PlacementManager interface {

// PDPlacementManager manages placement with pd
type PDPlacementManager struct {
addrs []string
etcdCli *clientv3.Client
}

// GetRuleBundle is used to get one specific rule bundle from PD.
func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*placement.Bundle, error) {
bundle := &placement.Bundle{ID: name}
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule", name), "GET", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule", name), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, bundle)
}
Expand All @@ -53,7 +54,7 @@ func (m *PDPlacementManager) GetRuleBundle(ctx context.Context, name string) (*p
// GetAllRuleBundles is used to get all rule bundles from PD. It is used to load full rules from PD while fullload infoschema.
func (m *PDPlacementManager) GetAllRuleBundles(ctx context.Context) ([]*placement.Bundle, error) {
var bundles []*placement.Bundle
res, err := doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule"), "GET", nil)
res, err := doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule"), "GET", nil)
if err == nil && res != nil {
err = json.Unmarshal(res, &bundles)
}
Expand All @@ -71,7 +72,7 @@ func (m *PDPlacementManager) PutRuleBundles(ctx context.Context, bundles []*plac
return err
}

_, err = doRequest(ctx, m.addrs, path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b))
_, err = doRequest(ctx, m.etcdCli.Endpoints(), path.Join(pdapi.Config, "placement-rule")+"?partial=true", "POST", bytes.NewReader(b))
return err
}

Expand Down
6 changes: 4 additions & 2 deletions dumpling/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"net"
"strconv"
"strings"
"text/template"
Expand Down Expand Up @@ -194,8 +195,9 @@ func (conf *Config) String() string {
func (conf *Config) GetDSN(db string) string {
// maxAllowedPacket=0 can be used to automatically fetch the max_allowed_packet variable from server on every connection.
// https://github.com/go-sql-driver/mysql#maxallowedpacket
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?collation=utf8mb4_general_ci&readTimeout=%s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0",
conf.User, conf.Password, conf.Host, conf.Port, db, conf.ReadTimeout)
hostPort := net.JoinHostPort(conf.Host, strconv.Itoa(conf.Port))
dsn := fmt.Sprintf("%s:%s@tcp(%s)/%s?collation=utf8mb4_general_ci&readTimeout=%s&writeTimeout=30s&interpolateParams=true&maxAllowedPacket=0",
conf.User, conf.Password, hostPort, db, conf.ReadTimeout)
if len(conf.Security.CAPath) > 0 {
dsn += "&tls=dumpling-tls-target"
}
Expand Down
21 changes: 21 additions & 0 deletions executor/analyze_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package executor_test
import (
"context"
"fmt"
"io/ioutil"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -2584,3 +2585,23 @@ func TestAnalyzeColumnsErrorAndWarning(t *testing.T) {
}(val)
}
}

func TestAnalyzePartitionTableForFloat(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@tidb_partition_prune_mode='dynamic'")
tk.MustExec("use test")
tk.MustExec("CREATE TABLE t1 ( id bigint(20) unsigned NOT NULL AUTO_INCREMENT, num float(9,8) DEFAULT NULL, PRIMARY KEY (id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin PARTITION BY HASH (id) PARTITIONS 128;")
// To reproduce the error we meet in https://github.com/pingcap/tidb/issues/35910, we should use the data provided in this issue
b, err := ioutil.ReadFile("testdata/analyze_test_data.sql")
require.NoError(t, err)
sqls := strings.Split(string(b), ";")
for _, sql := range sqls {
if len(sql) < 1 {
continue
}
tk.MustExec(sql)
}
tk.MustExec("analyze table t1")
}
1 change: 1 addition & 0 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func newExecutorBuilder(ctx sessionctx.Context, is infoschema.InfoSchema, ti *Te
ctx: ctx,
is: is,
Ti: ti,
snapshotTSCached: isStaleness,
snapshotTS: snapshotTS,
isStaleness: isStaleness,
readReplicaScope: replicaReadScope,
Expand Down
5 changes: 4 additions & 1 deletion executor/index_lookup_hash_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,7 +626,10 @@ func (iw *indexHashJoinInnerWorker) handleTask(ctx context.Context, task *indexH
if task.keepOuterOrder {
if err != nil {
joinResult.err = err
resultCh <- joinResult
select {
case <-ctx.Done():
case resultCh <- joinResult:
}
}
close(resultCh)
}
Expand Down
18 changes: 15 additions & 3 deletions executor/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr
oldRow = append(oldRow, extraCols...)
}

err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, e.OnDuplicate)
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, e.OnDuplicate, idxInBatch)
if e.ctx.GetSessionVars().StmtCtx.DupKeyAsWarning && kv.ErrKeyExists.Equal(err) {
e.ctx.GetSessionVars().StmtCtx.AppendWarning(err)
return nil
Expand Down Expand Up @@ -375,7 +375,7 @@ func (e *InsertExec) initEvalBuffer4Dup() {

// doDupRowUpdate updates the duplicate row.
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum,
cols []*expression.Assignment) error {
cols []*expression.Assignment, idxInBatch int) error {
assignFlag := make([]bool, len(e.Table.WritableCols()))
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values
e.curInsertVals.SetDatums(newRow...)
Expand All @@ -389,6 +389,8 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo

// Update old row when the key is duplicated.
e.evalBuffer4Dup.SetDatums(e.row4Update...)
sc := e.ctx.GetSessionVars().StmtCtx
warnCnt := int(sc.WarningCount())
for _, col := range cols {
if col.LazyErr != nil {
return col.LazyErr
Expand All @@ -397,10 +399,20 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo
if err1 != nil {
return err1
}
e.row4Update[col.Col.Index], err1 = table.CastValue(e.ctx, val, col.Col.ToInfo(), false, false)
c := col.Col.ToInfo()
c.Name = col.ColName
e.row4Update[col.Col.Index], err1 = table.CastValue(e.ctx, val, c, false, false)
if err1 != nil {
return err1
}
if newWarnings := sc.TruncateWarnings(warnCnt); len(newWarnings) > 0 {
for k := range newWarnings {
// Use `idxInBatch` here for simplicity, since the offset of the batch is unknown under the current context.
newWarnings[k].Err = completeInsertErr(c, &val, idxInBatch, newWarnings[k].Err)
}
sc.AppendWarnings(newWarnings)
warnCnt += len(newWarnings)
}
e.evalBuffer4Dup.SetDatum(col.Col.Index, e.row4Update[col.Col.Index])
assignFlag[col.Col.Index] = true
}
Expand Down
Loading

0 comments on commit a780ace

Please sign in to comment.