Skip to content

Commit

Permalink
Merge branch 'master' into disaggregated_batch_copr
Browse files Browse the repository at this point in the history
  • Loading branch information
breezewish authored Jan 4, 2023
2 parents bd76d19 + f483b39 commit 0b25423
Show file tree
Hide file tree
Showing 65 changed files with 2,866 additions and 130 deletions.
18 changes: 17 additions & 1 deletion autoid_service/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "autoid_service",
Expand All @@ -9,6 +9,7 @@ go_library(
"//config",
"//kv",
"//meta",
"//meta/autoid",
"//metrics",
"//owner",
"//parser/model",
Expand All @@ -23,3 +24,18 @@ go_library(
"@org_uber_go_zap//:zap",
],
)

go_test(
name = "autoid_service_test",
srcs = ["autoid_test.go"],
embed = [":autoid_service"],
deps = [
"//parser/model",
"//testkit",
"@com_github_pingcap_kvproto//pkg/autoid",
"@com_github_stretchr_testify//require",
"@io_etcd_go_etcd_tests_v3//integration",
"@org_golang_google_grpc//:grpc",
"@org_golang_google_grpc//credentials/insecure",
],
)
13 changes: 11 additions & 2 deletions autoid_service/autoid.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
autoid1 "github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/owner"
"github.com/pingcap/tidb/parser/model"
Expand Down Expand Up @@ -253,6 +254,7 @@ type Service struct {
func New(selfAddr string, etcdAddr []string, store kv.Storage, tlsConfig *tls.Config) *Service {
cfg := config.GetGlobalConfig()
etcdLogCfg := zap.NewProductionConfig()

cli, err := clientv3.New(clientv3.Config{
LogConfig: &etcdLogCfg,
Endpoints: etcdAddr,
Expand All @@ -270,9 +272,12 @@ func New(selfAddr string, etcdAddr []string, store kv.Storage, tlsConfig *tls.Co
if err != nil {
panic(err)
}
return newWithCli(selfAddr, cli, store)
}

func newWithCli(selfAddr string, cli *clientv3.Client, store kv.Storage) *Service {
l := owner.NewOwnerManager(context.Background(), cli, "autoid", selfAddr, autoIDLeaderPath)
err = l.CampaignOwner()
err := l.CampaignOwner()
if err != nil {
panic(err)
}
Expand All @@ -299,7 +304,7 @@ func (m *mockClient) Rebase(ctx context.Context, in *autoid.RebaseRequest, opts
var global = make(map[string]*mockClient)

// MockForTest is used for testing, the UT test and unistore use this.
func MockForTest(store kv.Storage) *mockClient {
func MockForTest(store kv.Storage) autoid.AutoIDAllocClient {
uuid := store.UUID()
ret, ok := global[uuid]
if !ok {
Expand Down Expand Up @@ -515,3 +520,7 @@ func (s *Service) Rebase(ctx context.Context, req *autoid.RebaseRequest) (*autoi
}
return &autoid.RebaseResponse{}, nil
}

func init() {
autoid1.MockForTest = MockForTest
}
202 changes: 202 additions & 0 deletions autoid_service/autoid_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,202 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package autoid

import (
"context"
"fmt"
"math"
"net"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/testkit"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/tests/v3/integration"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type autoIDResp struct {
*autoid.AutoIDResponse
error
*testing.T
}

func (resp autoIDResp) check(min, max int64) {
require.NoError(resp.T, resp.error)
require.Equal(resp.T, resp.AutoIDResponse, &autoid.AutoIDResponse{Min: min, Max: max})
}

func (resp autoIDResp) checkErrmsg() {
require.NoError(resp.T, resp.error)
require.True(resp.T, len(resp.GetErrmsg()) > 0)
}

type rebaseResp struct {
*autoid.RebaseResponse
error
*testing.T
}

func (resp rebaseResp) check(msg string) {
require.NoError(resp.T, resp.error)
require.Equal(resp.T, string(resp.RebaseResponse.GetErrmsg()), msg)
}

func TestAPI(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
cli := MockForTest(store)
tk.MustExec("use test")
tk.MustExec("create table t (id int key auto_increment);")
is := dom.InfoSchema()
dbInfo, ok := is.SchemaByName(model.NewCIStr("test"))
require.True(t, ok)

tbl, err := is.TableByName(model.NewCIStr("test"), model.NewCIStr("t"))
require.NoError(t, err)
tbInfo := tbl.Meta()

ctx := context.Background()
checkCurrValue := func(t *testing.T, cli autoid.AutoIDAllocClient, min, max int64) {
req := &autoid.AutoIDRequest{DbID: dbInfo.ID, TblID: tbInfo.ID, N: 0}
resp, err := cli.AllocAutoID(ctx, req)
require.NoError(t, err)
require.Equal(t, resp, &autoid.AutoIDResponse{Min: min, Max: max})
}
autoIDRequest := func(t *testing.T, cli autoid.AutoIDAllocClient, unsigned bool, n uint64, more ...int64) autoIDResp {
increment := int64(1)
offset := int64(1)
if len(more) >= 1 {
increment = more[0]
}
if len(more) >= 2 {
offset = more[1]
}
req := &autoid.AutoIDRequest{DbID: dbInfo.ID, TblID: tbInfo.ID, IsUnsigned: unsigned, N: n, Increment: increment, Offset: offset}
resp, err := cli.AllocAutoID(ctx, req)
return autoIDResp{resp, err, t}
}
rebaseRequest := func(t *testing.T, cli autoid.AutoIDAllocClient, unsigned bool, n int64, force ...struct{}) rebaseResp {
req := &autoid.RebaseRequest{
DbID: dbInfo.ID,
TblID: tbInfo.ID,
Base: n,
IsUnsigned: unsigned,
Force: len(force) > 0,
}
resp, err := cli.Rebase(ctx, req)
return rebaseResp{resp, err, t}
}
var force = struct{}{}

// basic auto id operation
autoIDRequest(t, cli, false, 1).check(0, 1)
autoIDRequest(t, cli, false, 10).check(1, 11)
checkCurrValue(t, cli, 11, 11)
autoIDRequest(t, cli, false, 128).check(11, 139)
autoIDRequest(t, cli, false, 1, 10, 5).check(139, 145)

// basic rebase operation
rebaseRequest(t, cli, false, 666).check("")
autoIDRequest(t, cli, false, 1).check(666, 667)

rebaseRequest(t, cli, false, 6666).check("")
autoIDRequest(t, cli, false, 1).check(6666, 6667)

// rebase will not decrease the value without 'force'
rebaseRequest(t, cli, false, 44).check("")
checkCurrValue(t, cli, 6667, 6667)
rebaseRequest(t, cli, false, 44, force).check("")
checkCurrValue(t, cli, 44, 44)

// max increase 1
rebaseRequest(t, cli, false, math.MaxInt64, force).check("")
checkCurrValue(t, cli, math.MaxInt64, math.MaxInt64)
autoIDRequest(t, cli, false, 1).checkErrmsg()

rebaseRequest(t, cli, true, 0, force).check("")
checkCurrValue(t, cli, 0, 0)
autoIDRequest(t, cli, true, 1).check(0, 1)
autoIDRequest(t, cli, true, 10).check(1, 11)
autoIDRequest(t, cli, true, 128).check(11, 139)
autoIDRequest(t, cli, true, 1, 10, 5).check(139, 145)

// max increase 1
rebaseRequest(t, cli, true, math.MaxInt64).check("")
checkCurrValue(t, cli, math.MaxInt64, math.MaxInt64)
autoIDRequest(t, cli, true, 1).check(math.MaxInt64, math.MinInt64)
autoIDRequest(t, cli, true, 1).check(math.MinInt64, math.MinInt64+1)

rebaseRequest(t, cli, true, -1).check("")
checkCurrValue(t, cli, -1, -1)
autoIDRequest(t, cli, true, 1).check(-1, 0)
}

func TestGRPC(t *testing.T) {
integration.BeforeTestExternal(t)
store := testkit.CreateMockStore(t)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
etcdCli := cluster.RandClient()

var addr string
var listener net.Listener
for port := 10080; ; port++ {
var err error
addr = fmt.Sprintf("127.0.0.1:%d", port)
listener, err = net.Listen("tcp", addr)
if err == nil {
break
}
}
defer listener.Close()

service := newWithCli(addr, etcdCli, store)
defer service.Close()

var i int
for !service.leaderShip.IsOwner() {
time.Sleep(100 * time.Millisecond)
i++
if i >= 20 {
break
}
}
require.Less(t, i, 20)

grpcServer := grpc.NewServer()
autoid.RegisterAutoIDAllocServer(grpcServer, service)
go func() {
grpcServer.Serve(listener)
}()
defer grpcServer.Stop()

grpcConn, err := grpc.Dial("127.0.0.1:10080", grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
cli := autoid.NewAutoIDAllocClient(grpcConn)
_, err = cli.AllocAutoID(context.Background(), &autoid.AutoIDRequest{
DbID: 0,
TblID: 0,
N: 1,
Increment: 1,
Offset: 1,
IsUnsigned: false,
})
require.NoError(t, err)
}
1 change: 1 addition & 0 deletions ddl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ go_test(
flaky = True,
shard_count = 50,
deps = [
"//autoid_service",
"//config",
"//ddl/ingest",
"//ddl/placement",
Expand Down
38 changes: 34 additions & 4 deletions ddl/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ const (
totalLockedRegionsOffset
startTSOffset
commitTSOffset
ttlJobEnableOffSet
)

func closePDSchedule() error {
Expand Down Expand Up @@ -124,6 +125,18 @@ func ValidateFlashbackTS(ctx context.Context, sctx sessionctx.Context, flashBack
return gcutil.ValidateSnapshotWithGCSafePoint(flashBackTS, gcSafePoint)
}

func getTiDBTTLJobEnable(sess sessionctx.Context) (string, error) {
val, err := sess.GetSessionVars().GlobalVarsAccessor.GetGlobalSysVar(variable.TiDBTTLJobEnable)
if err != nil {
return "", errors.Trace(err)
}
return val, nil
}

func setTiDBTTLJobEnable(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBTTLJobEnable, value)
}

func setTiDBEnableAutoAnalyze(ctx context.Context, sess sessionctx.Context, value string) error {
return sess.GetSessionVars().GlobalVarsAccessor.SetGlobalSysVar(ctx, variable.TiDBEnableAutoAnalyze, value)
}
Expand Down Expand Up @@ -176,6 +189,9 @@ func checkAndSetFlashbackClusterInfo(sess sessionctx.Context, d *ddlCtx, t *meta
if err = setTiDBSuperReadOnly(d.ctx, sess, variable.On); err != nil {
return err
}
if err = setTiDBTTLJobEnable(d.ctx, sess, variable.Off); err != nil {
return err
}

nowSchemaVersion, err := t.GetSchemaVersion()
if err != nil {
Expand Down Expand Up @@ -553,9 +569,9 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabledValue bool
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabledValue, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
Expand Down Expand Up @@ -595,6 +611,12 @@ func (w *worker) onFlashbackCluster(d *ddlCtx, t *meta.Meta, job *model.Job) (ve
return ver, errors.Trace(err)
}
job.Args[readOnlyOffset] = &readOnlyValue
ttlJobEnableValue, err = getTiDBTTLJobEnable(sess)
if err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}
job.Args[ttlJobEnableOffSet] = &ttlJobEnableValue
job.SchemaState = model.StateDeleteOnly
return ver, nil
// Stage 2, check flashbackTS, close GC and PD schedule.
Expand Down Expand Up @@ -694,10 +716,10 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {

var flashbackTS, lockedRegions, startTS, commitTS uint64
var pdScheduleValue map[string]interface{}
var autoAnalyzeValue, readOnlyValue string
var autoAnalyzeValue, readOnlyValue, ttlJobEnableValue string
var gcEnabled bool

if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS); err != nil {
if err := job.DecodeArgs(&flashbackTS, &pdScheduleValue, &gcEnabled, &autoAnalyzeValue, &readOnlyValue, &lockedRegions, &startTS, &commitTS, &ttlJobEnableValue); err != nil {
return errors.Trace(err)
}
sess, err := w.sessPool.get()
Expand All @@ -718,6 +740,14 @@ func finishFlashbackCluster(w *worker, job *model.Job) error {
if err = setTiDBSuperReadOnly(w.ctx, sess, readOnlyValue); err != nil {
return err
}

if job.IsCancelled() {
// only restore `tidb_ttl_job_enable` when flashback failed
if err = setTiDBTTLJobEnable(w.ctx, sess, ttlJobEnableValue); err != nil {
return err
}
}

return setTiDBEnableAutoAnalyze(w.ctx, sess, autoAnalyzeValue)
})
if err != nil {
Expand Down
Loading

0 comments on commit 0b25423

Please sign in to comment.