Skip to content

Commit

Permalink
store/gc_worker: Use UnsafeDestroyRange instead of DeleteRange in GC (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
MyonKeminta authored and zhangjinpeng87 committed Sep 14, 2018
1 parent 580e857 commit a7f4687
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 74 deletions.
23 changes: 21 additions & 2 deletions ddl/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@ import (
)

const (
loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.gc_delete_range WHERE ts < %v`
deleteRangesTable = `gc_delete_range`
doneDeleteRangesTable = `gc_delete_range_done`
loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.%s WHERE ts < %v`
recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d`
completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d`
updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%s" WHERE job_id = %d AND element_id = %d AND start_key = "%s"`
deleteDoneRecordSQL = `DELETE FROM mysql.gc_delete_range_done WHERE job_id = %d AND element_id = %d`
)

// DelRangeTask is for run delete-range command in gc_worker.
Expand All @@ -46,7 +49,16 @@ func (t DelRangeTask) Range() ([]byte, []byte) {

// LoadDeleteRanges loads delete range tasks from gc_delete_range table.
func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRangeTask, _ error) {
sql := fmt.Sprintf(loadDeleteRangeSQL, safePoint)
return loadDeleteRangesFromTable(ctx, deleteRangesTable, safePoint)
}

// LoadDoneDeleteRanges loads deleted ranges from gc_delete_range_done table.
func LoadDoneDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRangeTask, _ error) {
return loadDeleteRangesFromTable(ctx, doneDeleteRangesTable, safePoint)
}

func loadDeleteRangesFromTable(ctx sessionctx.Context, table string, safePoint uint64) (ranges []DelRangeTask, _ error) {
sql := fmt.Sprintf(loadDeleteRangeSQL, table, safePoint)
rss, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if len(rss) > 0 {
defer terror.Call(rss[0].Close)
Expand Down Expand Up @@ -101,6 +113,13 @@ func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error {
return errors.Trace(err)
}

// DeleteDoneRecord removes a record from gc_delete_range_done table.
func DeleteDoneRecord(ctx sessionctx.Context, dr DelRangeTask) error {
sql := fmt.Sprintf(deleteDoneRecordSQL, dr.JobID, dr.ElementID)
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
return errors.Trace(err)
}

// UpdateDeleteRange is only for emulator.
func UpdateDeleteRange(ctx sessionctx.Context, dr DelRangeTask, newStartKey, oldStartKey kv.Key) error {
newStartKeyHex := hex.EncodeToString(newStartKey)
Expand Down
2 changes: 2 additions & 0 deletions store/mockstore/mocktikv/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,8 @@ func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.R
return resp, nil
}
resp.RawScan = handler.handleKvRawScan(r)
case tikvrpc.CmdUnsafeDestroyRange:
panic("unimplemented")
case tikvrpc.CmdCop:
r := req.Cop
if err := handler.checkRequestContext(reqCtx); err != nil {
Expand Down
11 changes: 6 additions & 5 deletions store/tikv/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,12 @@ var MaxCallMsgSize = 1<<31 - 1

// Timeout durations.
const (
dialTimeout = 5 * time.Second
readTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
dialTimeout = 5 * time.Second
readTimeoutShort = 20 * time.Second // For requests that read/write several key-values.
ReadTimeoutMedium = 60 * time.Second // For requests that may need scan region.
ReadTimeoutLong = 150 * time.Second // For requests that may need scan region multiple times.
GCTimeout = 5 * time.Minute
UnsafeDestroyRangeTimeout = 5 * time.Minute

grpcInitialWindowSize = 1 << 30
grpcInitialConnWindowSize = 1 << 30
Expand Down
113 changes: 98 additions & 15 deletions store/tikv/gcworker/gc_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (

"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/pd-client"
"github.com/pingcap/tidb/ddl/util"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta"
Expand All @@ -45,6 +47,7 @@ type GCWorker struct {
uuid string
desc string
store tikv.Storage
pdClient pd.Client
gcIsRunning bool
lastFinish time.Time
cancel context.CancelFunc
Expand All @@ -54,7 +57,7 @@ type GCWorker struct {
}

// NewGCWorker creates a GCWorker instance.
func NewGCWorker(store tikv.Storage) (tikv.GCHandler, error) {
func NewGCWorker(store tikv.Storage, pdClient pd.Client) (tikv.GCHandler, error) {
ver, err := store.CurrentVersion()
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -67,6 +70,7 @@ func NewGCWorker(store tikv.Storage) (tikv.GCHandler, error) {
uuid: strconv.FormatUint(ver.Ver, 16),
desc: fmt.Sprintf("host:%s, pid:%d, start at %s", hostName, os.Getpid(), time.Now()),
store: store,
pdClient: pdClient,
gcIsRunning: false,
lastFinish: time.Now(),
done: make(chan error),
Expand Down Expand Up @@ -98,10 +102,11 @@ const (
gcLeaderDescKey = "tikv_gc_leader_desc"
gcLeaderLeaseKey = "tikv_gc_leader_lease"

gcLastRunTimeKey = "tikv_gc_last_run_time"
gcRunIntervalKey = "tikv_gc_run_interval"
gcDefaultRunInterval = time.Minute * 10
gcWaitTime = time.Minute * 1
gcLastRunTimeKey = "tikv_gc_last_run_time"
gcRunIntervalKey = "tikv_gc_run_interval"
gcDefaultRunInterval = time.Minute * 10
gcWaitTime = time.Minute * 1
gcRedoDeleteRangeDelay = 24 * time.Hour

gcLifeTimeKey = "tikv_gc_life_time"
gcDefaultLifeTime = time.Minute * 10
Expand Down Expand Up @@ -333,6 +338,13 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64) {
w.done <- errors.Trace(err)
return
}
err = w.redoDeleteRanges(ctx, safePoint)
if err != nil {
log.Errorf("[gc worker] %s redo-delete range returns an error %v", w.uuid, errors.ErrorStack(err))
metrics.GCJobFailureCounter.WithLabelValues("redo_delete_range").Inc()
w.done <- errors.Trace(err)
return
}
err = w.doGC(ctx, safePoint)
if err != nil {
log.Errorf("[gc worker] %s do GC returns an error %v", w.uuid, errors.ErrorStack(err))
Expand All @@ -344,6 +356,7 @@ func (w *GCWorker) runGCJob(ctx context.Context, safePoint uint64) {
w.done <- nil
}

// `deleteRanges` processes all delete range records whose ts < safePoint in table `gc_delete_range`
func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error {
metrics.GCWorkerCounter.WithLabelValues("delete_range").Inc()

Expand All @@ -356,33 +369,103 @@ func (w *GCWorker) deleteRanges(ctx context.Context, safePoint uint64) error {

log.Infof("[gc worker] %s start delete %v ranges", w.uuid, len(ranges))
startTime := time.Now()
regions := 0
for _, r := range ranges {
startKey, rangeEndKey := r.Range()

deleteRangeTask := tikv.NewDeleteRangeTask(ctx, w.store, startKey, rangeEndKey)
err := deleteRangeTask.Execute()
startKey, endKey := r.Range()

err = w.sendUnsafeDestroyRangeRequest(ctx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}
if deleteRangeTask.IsCanceled() {
return errors.New("[gc worker] gc job canceled")
}

regions += deleteRangeTask.CompletedRegions()
se := createSession(w.store)
err = util.CompleteDeleteRange(se, r)
se.Close()
if err != nil {
return errors.Trace(err)
}
}
log.Infof("[gc worker] %s finish delete %v ranges, regions: %v, cost time: %s", w.uuid, len(ranges), regions, time.Since(startTime))
log.Infof("[gc worker] %s finish delete %v ranges, cost time: %s", w.uuid, len(ranges), time.Since(startTime))
metrics.GCHistogram.WithLabelValues("delete_ranges").Observe(time.Since(startTime).Seconds())
return nil
}

// `redoDeleteRanges` checks all deleted ranges whose ts is at least `lifetime + 24h` ago. See TiKV RFC #2.
func (w *GCWorker) redoDeleteRanges(ctx context.Context, safePoint uint64) error {
metrics.GCWorkerCounter.WithLabelValues("redo_delete_range").Inc()

// We check delete range records that are deleted about 24 hours ago.
redoDeleteRangesTs := safePoint - oracle.ComposeTS(int64(gcRedoDeleteRangeDelay.Seconds())*1000, 0)

se := createSession(w.store)
ranges, err := util.LoadDoneDeleteRanges(se, redoDeleteRangesTs)
se.Close()
if err != nil {
return errors.Trace(err)
}

log.Infof("[gc worker] %s start redo-delete %v ranges", w.uuid, len(ranges))
startTime := time.Now()
for _, r := range ranges {
startKey, endKey := r.Range()

err = w.sendUnsafeDestroyRangeRequest(ctx, startKey, endKey)
if err != nil {
return errors.Trace(err)
}

se := createSession(w.store)
err := util.DeleteDoneRecord(se, r)
se.Close()
if err != nil {
return errors.Trace(err)
}
}
log.Infof("[gc worker] %s finish redo-delete %v ranges, cost time: %s", w.uuid, len(ranges), time.Since(startTime))
metrics.GCHistogram.WithLabelValues("redo_delete_ranges").Observe(time.Since(startTime).Seconds())
return nil
}

func (w *GCWorker) sendUnsafeDestroyRangeRequest(ctx context.Context, startKey []byte, endKey []byte) error {
// Get all stores every time deleting a region. So the store list is less probably to be stale.
stores, err := w.pdClient.GetAllStores(ctx)
if err != nil {
log.Errorf("[gc worker] %s delete ranges: got an error while trying to get store list from pd: %v", w.uuid, errors.ErrorStack(err))
return errors.Trace(err)
}

req := &tikvrpc.Request{
Type: tikvrpc.CmdUnsafeDestroyRange,
UnsafeDestroyRange: &kvrpcpb.UnsafeDestroyRangeRequest{
StartKey: startKey,
EndKey: endKey,
},
}

var wg sync.WaitGroup

for _, store := range stores {
if store.State != metapb.StoreState_Up {
continue
}

address := store.Address
storeID := store.Id
wg.Add(1)
go func() {
defer wg.Done()
_, err1 := w.store.GetTiKVClient().SendRequest(ctx, address, req, tikv.UnsafeDestroyRangeTimeout)
if err1 != nil {
log.Errorf("[gc worker] %s destroy range on store %v failed with error: %v", w.uuid, storeID, errors.ErrorStack(err))
err = err1
}
}()
}

wg.Wait()

return errors.Trace(err)
}

func (w *GCWorker) loadGCConcurrencyWithDefault() (int, error) {
str, err := w.loadValueFromSysTable(gcConcurrencyKey)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/gcworker/gc_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (s *testGCWorkerSuite) SetUpTest(c *C) {
s.store.SetOracle(s.oracle)
s.dom, err = session.BootstrapSession(s.store)
c.Assert(err, IsNil)
gcWorker, err := NewGCWorker(s.store)
gcWorker, err := NewGCWorker(s.store, nil)
c.Assert(err, IsNil)
gcWorker.Start()
gcWorker.Close()
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package tikv
import (
"time"

"github.com/pingcap/pd/pd-client"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -67,4 +68,4 @@ type GCHandler interface {

// NewGCHandlerFunc creates a new GCHandler.
// To enable real GC, we should assign the function to `gcworker.NewGCWorker`.
var NewGCHandlerFunc func(storage Storage) (GCHandler, error)
var NewGCHandlerFunc func(storage Storage, pdClient pd.Client) (GCHandler, error)
2 changes: 1 addition & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func (s *tikvStore) StartGCWorker() error {
return nil
}

gcWorker, err := NewGCHandlerFunc(s)
gcWorker, err := NewGCHandlerFunc(s, s.pdClient)
if err != nil {
return errors.Trace(err)
}
Expand Down
Loading

0 comments on commit a7f4687

Please sign in to comment.