Skip to content

Commit

Permalink
Implement replay
Browse files Browse the repository at this point in the history
  • Loading branch information
TszKitLo40 committed Dec 25, 2021
1 parent 1e3e541 commit b634505
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 38 deletions.
33 changes: 17 additions & 16 deletions server/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,23 @@ import (
)

const (
pDBName = "db"
pHexKey = "hexKey"
pIndexName = "index"
pHandle = "handle"
pRegionID = "regionID"
pStartTS = "startTS"
pTableName = "table"
pTableID = "tableID"
pColumnID = "colID"
pColumnTp = "colTp"
pColumnFlag = "colFlag"
pColumnLen = "colLen"
pRowBin = "rowBin"
pSnapshot = "snapshot"
pFileName = "filename"
pStatus = "status"
pDBName = "db"
pHexKey = "hexKey"
pIndexName = "index"
pHandle = "handle"
pRegionID = "regionID"
pStartTS = "startTS"
pTableName = "table"
pTableID = "tableID"
pColumnID = "colID"
pColumnTp = "colTp"
pColumnFlag = "colFlag"
pColumnLen = "colLen"
pRowBin = "rowBin"
pSnapshot = "snapshot"
pFileName = "filename"
pRecordStatus = "recordStatus"
pReplayStatus = "replayStatus"
)

// For query string
Expand Down
22 changes: 17 additions & 5 deletions server/sql_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package server

import (
"fmt"
"github.com/pingcap/tidb/util/logutil"
"net/http"
"strconv"

"github.com/pingcap/tidb/util/logutil"

"github.com/gorilla/mux"
"github.com/pingcap/tidb/config"
)
Expand All @@ -43,7 +44,7 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
var err error
cfg := config.GetGlobalConfig()
params := mux.Vars(req)
if status, ok := params[pStatus]; ok {
if status, ok := params[pRecordStatus]; ok {
if status == "on" {
// set replay meta TS first.
cfg.ReplayMetaTS, err = strconv.ParseInt(params[pStartTS], 10, 64)
Expand All @@ -53,10 +54,10 @@ func (h SQLRecorderHandler) ServeHTTP(w http.ResponseWriter, req *http.Request)
return
}
cfg.EnableReplaySQL.Store(true)
logutil.InitReplay(params[pFileName])
logutil.InitRecord(params[pFileName])
} else {
cfg.EnableReplaySQL.Store(false)
logutil.StopReplay()
logutil.StopRecord()
}
w.WriteHeader(http.StatusOK)
return
Expand All @@ -73,5 +74,16 @@ func (s *Server) newSQLReplayHandler() *SQLReplayHandler {
}

func (h SQLReplayHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {

params := mux.Vars(req)
if status, ok := params[pReplayStatus]; ok {
if status == "on" {
logutil.StartReplay(params[pFileName])
} else {
logutil.StopReplay()
}
w.WriteHeader(http.StatusOK)
return
} else {
w.WriteHeader(http.StatusBadRequest)
}
}
35 changes: 18 additions & 17 deletions util/logutil/replay_log.go → util/logutil/record_logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,48 +2,49 @@ package logutil

import (
"bufio"
"github.com/pingcap/tidb/metrics"
"go.uber.org/atomic"
"os"
"sync"

"github.com/pingcap/tidb/metrics"
"go.uber.org/atomic"
)

// ReplayLogger is used to log replay record
var ReplayLogger *replayLogger
// RecordLogger is used to log replay record
var RecordLogger *recordLogger

// InitReplay initialize logger
func InitReplay(filename string) {
ReplayLogger = newReplayLogger(filename)
// InitRecord initialize logger
func InitRecord(filename string) {
RecordLogger = newRecordLogger(filename)
}

// StopReplay stops goroutine
func StopReplay() {
ReplayLogger.stopLogWorker()
// StopRecord stops goroutine
func StopRecord() {
RecordLogger.stopLogWorker()
}

func PutRecordOrDrop(record string) {
select {
case ReplayLogger.recordChan <- record:
case RecordLogger.recordChan <- record:
default:
metrics.ReplayDropCounter.Inc()
}
}

// replayLogger receives record from channel, and log or drop them as needed
type replayLogger struct {
// recordLogger receives record from channel, and log or drop them as needed
type recordLogger struct {
a atomic.String
writer *bufio.Writer
recordChan chan string
close chan struct{}
wg sync.WaitGroup
}

func newReplayLogger(filename string) *replayLogger {
func newRecordLogger(filename string) *recordLogger {
f, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, os.ModePerm)
if err != nil {
return nil
}
l := &replayLogger{
l := &recordLogger{
writer: bufio.NewWriter(f),
recordChan: make(chan string, 10000),
close: make(chan struct{}),
Expand All @@ -54,7 +55,7 @@ func newReplayLogger(filename string) *replayLogger {
}

// startLogWorker starts a log flushing worker that flushes log periodically or when batch is full
func (re *replayLogger) startLogWorker() {
func (re *recordLogger) startLogWorker() {
for {
select {
case str := <-re.recordChan:
Expand All @@ -72,7 +73,7 @@ func (re *replayLogger) startLogWorker() {
}
}

func (re *replayLogger) stopLogWorker() {
func (re *recordLogger) stopLogWorker() {
close(re.close)
re.wg.Wait()
}
95 changes: 95 additions & 0 deletions util/logutil/record_replayer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package logutil

import (
"bufio"
"fmt"
"os"
"strings"
)

// RecordReplayer is used to replay sql
var RecordReplayer *recordReplayer

// StartReplay starts replay
func StartReplay(filename string) {
RecordReplayer = newRecordPlayer(filename)
RecordReplayer.start()
}

// StopReplay stops replay
func StopReplay() {
RecordReplayer.close <- struct{}{}
}

func newRecordPlayer(filename string) *recordReplayer {
r := &recordReplayer{
fileName: filename,
close: make(chan struct{}),
}
return r
}

type recordReplayer struct {
close chan struct{}
fileName string
scanner *bufio.Scanner
}

func (r *recordReplayer) start() {
f, err := os.OpenFile(r.fileName, os.O_RDONLY, os.ModePerm)
defer f.Close()
if err != nil {
fmt.Printf("Open file error %s\n", err.Error())
return
}

r.scanner = bufio.NewScanner(f)
txns := make(map[string][]string)
for r.scanner.Scan() {
select {
case <-r.close:
break
default:
}
text := r.scanner.Text()
s := strings.Split(text, " ")
if len(s) < 2 {
fmt.Printf("invalid sql log %v\n", s)
continue
}
inTxn := len(s) == 3
if inTxn {
txnID := s[0]
sql := s[2]
txn := txns[txnID]
txn = append(txn, sql)
if sql == strings.ToLower("commit") {
tr := &txnReplayer{
close: make(chan struct{}),
sqls: txn,
}
go tr.replay()
} else {
// todo
// replay single sql
}
}
}
}

type txnReplayer struct {
close chan struct{}
sqls []string
}

func (tr *txnReplayer) replay() {
for _, sql := range tr.sqls {
select {
case <-tr.close:
break
default:
// todo
// replay single sql
}
}
}

0 comments on commit b634505

Please sign in to comment.