Skip to content

Commit

Permalink
*: support the TiFlash replica of table (#12453)
Browse files Browse the repository at this point in the history
  • Loading branch information
crazycs520 authored Oct 29, 2019
1 parent 9ed376a commit a8ed950
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 0 deletions.
21 changes: 21 additions & 0 deletions ddl/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3136,6 +3136,27 @@ func (s *testDBSuite1) TestModifyColumnCharset(c *C) {

}

func (s *testDBSuite1) TestSetTableFlashReplica(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use test_db")
s.mustExec(c, "drop table if exists t_flash;")
s.tk.MustExec("create table t_flash(a int, b int)")
defer s.mustExec(c, "drop table t_flash;")

t := s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica, IsNil)

s.tk.MustExec("alter table t_flash set tiflash replica 2 location labels 'a','b';")
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica, NotNil)
c.Assert(t.Meta().TiFlashReplica.Count, Equals, uint64(2))
c.Assert(strings.Join(t.Meta().TiFlashReplica.LocationLabels, ","), Equals, strings.Join([]string{"a", "b"}, ","))

s.tk.MustExec("alter table t_flash set tiflash replica 0")
t = s.testGetTable(c, "t_flash")
c.Assert(t.Meta().TiFlashReplica, IsNil)
}

func (s *testDBSuite4) TestAlterShardRowIDBits(c *C) {
c.Assert(failpoint.Enable("github.com/pingcap/tidb/meta/autoid/mockAutoIDChange", `return(true)`), IsNil)
defer func() {
Expand Down
1 change: 1 addition & 0 deletions ddl/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ type DDL interface {
LockTables(ctx sessionctx.Context, stmt *ast.LockTablesStmt) error
UnlockTables(ctx sessionctx.Context, lockedTables []model.TableLockTpInfo) error
CleanupTableLock(ctx sessionctx.Context, tables []*ast.TableName) error
UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error

// GetLease returns current schema lease time.
GetLease() time.Duration
Expand Down
67 changes: 67 additions & 0 deletions ddl/ddl_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1998,6 +1998,8 @@ func (d *ddl) AlterTable(ctx sessionctx.Context, ident ast.Ident, specs []*ast.A
return errors.Trace(err)
}
}
case ast.AlterTableSetTiFlashReplica:
err = d.AlterTableSetTiFlashReplica(ctx, ident, spec.TiFlashReplica)
default:
// Nothing to do now.
}
Expand Down Expand Up @@ -2988,6 +2990,71 @@ func (d *ddl) AlterTableCharsetAndCollate(ctx sessionctx.Context, ident ast.Iden
return errors.Trace(err)
}

// AlterTableSetTiFlashReplica sets the TiFlash replicas info.
func (d *ddl) AlterTableSetTiFlashReplica(ctx sessionctx.Context, ident ast.Ident, replicaInfo *ast.TiFlashReplicaSpec) error {
schema, tb, err := d.getSchemaAndTableByIdent(ctx, ident)
if err != nil {
return errors.Trace(err)
}

tbReplicaInfo := tb.Meta().TiFlashReplica
if tbReplicaInfo != nil && tbReplicaInfo.Count == replicaInfo.Count &&
len(tbReplicaInfo.LocationLabels) == len(replicaInfo.Labels) {
changed := false
for i, lable := range tbReplicaInfo.LocationLabels {
if replicaInfo.Labels[i] != lable {
changed = true
break
}
}
if !changed {
return nil
}
}

job := &model.Job{
SchemaID: schema.ID,
TableID: tb.Meta().ID,
SchemaName: schema.Name.L,
Type: model.ActionSetTiFlashReplica,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{*replicaInfo},
}
err = d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

// UpdateTableReplicaInfo updates the table flash replica infos.
func (d *ddl) UpdateTableReplicaInfo(ctx sessionctx.Context, tid int64, available bool) error {
is := d.infoHandle.Get()
tb, ok := is.TableByID(tid)
if !ok {
return infoschema.ErrTableNotExists.GenWithStack("Table which ID = %d does not exist.", tid)
}

if tb.Meta().TiFlashReplica == nil || (tb.Meta().TiFlashReplica.Available == available) {
return nil
}

db, ok := is.SchemaByTable(tb.Meta())
if !ok {
return infoschema.ErrDatabaseNotExists.GenWithStack("Database of table `%s` does not exist.", tb.Meta().Name)
}

job := &model.Job{
SchemaID: db.ID,
TableID: tb.Meta().ID,
SchemaName: db.Name.L,
Type: model.ActionUpdateTiFlashReplicaStatus,
BinlogInfo: &model.HistoryInfo{},
Args: []interface{}{available},
}
err := d.doDDLJob(ctx, job)
err = d.callHookOnChanged(err)
return errors.Trace(err)
}

// checkAlterTableCharset uses to check is it possible to change the charset of table.
// This function returns 2 variable:
// doNothing: if doNothing is true, means no need to change any more, because the target charset is same with the charset of table.
Expand Down
4 changes: 4 additions & 0 deletions ddl/ddl_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,6 +557,10 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
ver, err = onLockTables(t, job)
case model.ActionUnlockTable:
ver, err = onUnlockTables(t, job)
case model.ActionSetTiFlashReplica:
ver, err = onSetTableFlashReplica(t, job)
case model.ActionUpdateTiFlashReplicaStatus:
ver, err = onUpdateFlashReplicaStatus(t, job)
default:
// Invalid job, cancel it.
job.State = model.JobStateCancelled
Expand Down
57 changes: 57 additions & 0 deletions ddl/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/charset"
"github.com/pingcap/parser/model"
field_types "github.com/pingcap/parser/types"
Expand Down Expand Up @@ -676,6 +677,62 @@ func onModifyTableCharsetAndCollate(t *meta.Meta, job *model.Job) (ver int64, _
return ver, nil
}

func onSetTableFlashReplica(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var replicaInfo ast.TiFlashReplicaSpec
if err := job.DecodeArgs(&replicaInfo); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}

if replicaInfo.Count > 0 {
tblInfo.TiFlashReplica = &model.TiFlashReplicaInfo{
Count: replicaInfo.Count,
LocationLabels: replicaInfo.Labels,
}
} else {
tblInfo.TiFlashReplica = nil
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

func onUpdateFlashReplicaStatus(t *meta.Meta, job *model.Job) (ver int64, _ error) {
var available bool
if err := job.DecodeArgs(&available); err != nil {
job.State = model.JobStateCancelled
return ver, errors.Trace(err)
}

tblInfo, err := getTableInfoAndCancelFaultJob(t, job, job.SchemaID)
if err != nil {
return ver, errors.Trace(err)
}
if tblInfo.TiFlashReplica == nil || (tblInfo.TiFlashReplica.Available == available) {
return ver, nil
}

if tblInfo.TiFlashReplica != nil {
tblInfo.TiFlashReplica.Available = available
}

ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
if err != nil {
return ver, errors.Trace(err)
}
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
return ver, nil
}

func checkTableNotExists(d *ddlCtx, t *meta.Meta, schemaID int64, tableName string) error {
// d.infoHandle maybe nil in some test.
if d.infoHandle == nil || !d.infoHandle.IsValid() {
Expand Down
81 changes: 81 additions & 0 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,10 @@ type dbTableHandler struct {
*tikvHandlerTool
}

type flashReplicaHandler struct {
*tikvHandlerTool
}

// regionHandler is the common field for http handler. It contains
// some common functions for all handlers.
type regionHandler struct {
Expand Down Expand Up @@ -668,6 +672,83 @@ func (h binlogRecover) ServeHTTP(w http.ResponseWriter, req *http.Request) {
binloginfo.DisableSkipBinlogFlag()
}

type tableFlashReplicaInfo struct {
// Modifying the field name needs to negotiate with TiFlash colleague.
ID int64 `json:"id"`
ReplicaCount uint64 `json:"replica_count"`
LocationLabels []string `json:"location_labels"`
Available bool `json:"available"`
}

func (h flashReplicaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if req.Method == http.MethodPost {
h.handleStatusReport(w, req)
return
}
schema, err := h.schema()
if err != nil {
writeError(w, err)
return
}
replicaInfos := make([]*tableFlashReplicaInfo, 0)
allDBs := schema.AllSchemas()
for _, db := range allDBs {
tables := schema.SchemaTables(db.Name)
for _, tbl := range tables {
tblInfo := tbl.Meta()
if tblInfo.TiFlashReplica == nil {
continue
}
replicaInfos = append(replicaInfos, &tableFlashReplicaInfo{
ID: tblInfo.ID,
ReplicaCount: tblInfo.TiFlashReplica.Count,
LocationLabels: tblInfo.TiFlashReplica.LocationLabels,
Available: tblInfo.TiFlashReplica.Available,
})
}
}
writeData(w, replicaInfos)
}

type tableFlashReplicaStatus struct {
// Modifying the field name needs to negotiate with TiFlash colleague.
ID int64 `json:"id"`
RegionCount uint64 `json:"region_count"`
FlashRegionCount uint64 `json:"flash_region_count"`
}

// checkTableFlashReplicaAvailable uses to check the available status of table flash replica.
func (tf *tableFlashReplicaStatus) checkTableFlashReplicaAvailable() bool {
return tf.FlashRegionCount == tf.RegionCount
}

func (h flashReplicaHandler) handleStatusReport(w http.ResponseWriter, req *http.Request) {
var status tableFlashReplicaStatus
err := json.NewDecoder(req.Body).Decode(&status)
if err != nil {
writeError(w, err)
return
}
do, err := session.GetDomain(h.Store.(kv.Storage))
if err != nil {
writeError(w, err)
return
}
s, err := session.CreateSession(h.Store.(kv.Storage))
if err != nil {
writeError(w, err)
return
}
err = do.DDL().UpdateTableReplicaInfo(s, status.ID, status.checkTableFlashReplicaAvailable())
if err != nil {
writeError(w, err)
}
logutil.BgLogger().Info("handle flash replica report", zap.Int64("table ID", status.ID), zap.Uint64("region count",
status.RegionCount),
zap.Uint64("flash region count", status.FlashRegionCount),
zap.Error(err))
}

// ServeHTTP handles request of list a database or table's schemas.
func (h schemaHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
schema, err := h.schema()
Expand Down
72 changes: 72 additions & 0 deletions server/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package server

import (
"bytes"
"database/sql"
"encoding/base64"
"encoding/json"
Expand All @@ -23,6 +24,7 @@ import (
"net/http"
"net/url"
"sort"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -393,6 +395,76 @@ func (ts *HTTPHandlerTestSuite) TestGetMVCCNotFound(c *C) {
c.Assert(data.Value.Info.Values, IsNil)
}

func (ts *HTTPHandlerTestSuite) TestTiFlashReplica(c *C) {
ts.startServer(c)
ts.prepareData(c)
defer ts.stopServer(c)
resp, err := http.Get("http://127.0.0.1:10090/tiflash/replica")
c.Assert(err, IsNil)
decoder := json.NewDecoder(resp.Body)
var data []tableFlashReplicaInfo
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(len(data), Equals, 0)

db, err := sql.Open("mysql", getDSN())
c.Assert(err, IsNil, Commentf("Error connecting"))
defer db.Close()
dbt := &DBTest{c, db}

dbt.mustExec("use tidb")
dbt.mustExec("alter table test set tiflash replica 2 location labels 'a','b';")

resp, err = http.Get("http://127.0.0.1:10090/tiflash/replica")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(len(data), Equals, 1)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
c.Assert(data[0].Available, Equals, false)

resp, err = http.Post("http://127.0.0.1:10090/tiflash/replica", "application/json", bytes.NewBuffer([]byte(`{"id":84,"region_count":3,"flash_region_count":3}`)))
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
body, err := ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(string(body), Equals, "[schema:1146]Table which ID = 84 does not exist.")

t, err := ts.domain.InfoSchema().TableByName(model.NewCIStr("tidb"), model.NewCIStr("test"))
c.Assert(err, IsNil)
req := fmt.Sprintf(`{"id":%d,"region_count":3,"flash_region_count":3}`, t.Meta().ID)
resp, err = http.Post("http://127.0.0.1:10090/tiflash/replica", "application/json", bytes.NewBuffer([]byte(req)))
c.Assert(err, IsNil)
c.Assert(resp, NotNil)
body, err = ioutil.ReadAll(resp.Body)
c.Assert(err, IsNil)
c.Assert(string(body), Equals, "")

resp, err = http.Get("http://127.0.0.1:10090/tiflash/replica")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(len(data), Equals, 1)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
c.Assert(data[0].Available, Equals, true) // The status should be true now.

// Should not take effect.
dbt.mustExec("alter table test set tiflash replica 2 location labels 'a','b';")
resp, err = http.Get("http://127.0.0.1:10090/tiflash/replica")
c.Assert(err, IsNil)
decoder = json.NewDecoder(resp.Body)
err = decoder.Decode(&data)
c.Assert(err, IsNil)
c.Assert(len(data), Equals, 1)
c.Assert(data[0].ReplicaCount, Equals, uint64(2))
c.Assert(strings.Join(data[0].LocationLabels, ","), Equals, "a,b")
c.Assert(data[0].Available, Equals, true) // The status should be true now.
}

func (ts *HTTPHandlerTestSuite) TestDecodeColumnValue(c *C) {
ts.startServer(c)
ts.prepareData(c)
Expand Down
2 changes: 2 additions & 0 deletions server/http_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (s *Server) startHTTPServer() {
router.Handle("/info/all", allServerInfoHandler{tikvHandlerTool}).Name("InfoALL")
// HTTP path for get db and table info that is related to the tableID.
router.Handle("/db-table/{tableID}", dbTableHandler{tikvHandlerTool})
// HTTP path for get table tiflash replica info.
router.Handle("/tiflash/replica", flashReplicaHandler{tikvHandlerTool})

if s.cfg.Store == "tikv" {
// HTTP path for tikv.
Expand Down

0 comments on commit a8ed950

Please sign in to comment.