Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support plan replayer for ch,rawsql #136

Merged
merged 4 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ch/ddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func (w *Workloader) createTableDDL(ctx context.Context, query string, tableName
}

// createTables creates tables schema.
func (w Workloader) createTables(ctx context.Context) error {
func (w *Workloader) createTables(ctx context.Context) error {
query := `
CREATE TABLE IF NOT EXISTS nation (
N_NATIONKEY BIGINT NOT NULL,
Expand Down
65 changes: 45 additions & 20 deletions ch/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/pingcap/go-tpc/pkg/measurement"
replayer "github.com/pingcap/go-tpc/pkg/plan-replayer"
"github.com/pingcap/go-tpc/pkg/util"
"github.com/pingcap/go-tpc/pkg/workload"
"github.com/pingcap/go-tpc/tpch"
Expand Down Expand Up @@ -37,6 +38,9 @@ type Config struct {
AnalyzeTable analyzeConfig
RefreshConnWait time.Duration

EnablePlanReplayer bool
PlanReplayerConfig replayer.PlanReplayerConfig

// output style
OutputStyle string
}
Expand All @@ -53,11 +57,13 @@ type Workloader struct {

// stats
measurement *measurement.Measurement

PlanReplayerRunner *replayer.PlanReplayerRunner
}

// NewWorkloader new work loader
func NewWorkloader(db *sql.DB, cfg *Config) workload.Workloader {
return Workloader{
return &Workloader{
db: db,
cfg: cfg,
measurement: measurement.NewMeasurement(func(m *measurement.Measurement) {
Expand All @@ -79,12 +85,12 @@ func (w *Workloader) updateState(ctx context.Context) {
}

// Name return workloader name
func (w Workloader) Name() string {
func (w *Workloader) Name() string {
return "ch"
}

// InitThread inits thread
func (w Workloader) InitThread(ctx context.Context, threadID int) context.Context {
func (w *Workloader) InitThread(ctx context.Context, threadID int) context.Context {
s := &chState{
queryIdx: threadID % len(w.cfg.QueryNames),
TpcState: workload.NewTpcState(ctx, w.db),
Expand All @@ -95,13 +101,13 @@ func (w Workloader) InitThread(ctx context.Context, threadID int) context.Contex
}

// CleanupThread cleans up thread
func (w Workloader) CleanupThread(ctx context.Context, threadID int) {
func (w *Workloader) CleanupThread(ctx context.Context, threadID int) {
s := w.getState(ctx)
s.Conn.Close()
}

// Prepare prepares data
func (w Workloader) Prepare(ctx context.Context, threadID int) error {
func (w *Workloader) Prepare(ctx context.Context, threadID int) error {
if threadID != 0 {
return nil
}
Expand Down Expand Up @@ -138,7 +144,7 @@ func (w Workloader) Prepare(ctx context.Context, threadID int) error {
return nil
}

func (w Workloader) prepareView(ctx context.Context) error {
func (w *Workloader) prepareView(ctx context.Context) error {
s := w.getState(ctx)
fmt.Println("creating view revenue1")
if _, err := s.Conn.ExecContext(ctx, `
Expand All @@ -155,7 +161,7 @@ create view revenue1 (supplier_no, total_revenue) as (
return nil
}

func (w Workloader) createTiFlashReplica(ctx context.Context, s *chState) error {
func (w *Workloader) createTiFlashReplica(ctx context.Context, s *chState) error {
for _, tableName := range allTables {
fmt.Printf("creating tiflash replica for %s\n", tableName)
replicaSQL := fmt.Sprintf("ALTER TABLE %s SET TIFLASH REPLICA 1", tableName)
Expand All @@ -166,7 +172,7 @@ func (w Workloader) createTiFlashReplica(ctx context.Context, s *chState) error
return nil
}

func (w Workloader) analyzeTables(ctx context.Context, acfg analyzeConfig) error {
func (w *Workloader) analyzeTables(ctx context.Context, acfg analyzeConfig) error {
s := w.getState(ctx)
if w.cfg.Driver == "mysql" {
for _, tbl := range allTables {
Expand All @@ -190,12 +196,12 @@ func (w Workloader) analyzeTables(ctx context.Context, acfg analyzeConfig) error
}

// CheckPrepare checks prepare
func (w Workloader) CheckPrepare(ctx context.Context, threadID int) error {
func (w *Workloader) CheckPrepare(ctx context.Context, threadID int) error {
return nil
}

// Run runs workload
func (w Workloader) Run(ctx context.Context, threadID int) error {
func (w *Workloader) Run(ctx context.Context, threadID int) error {
s := w.getState(ctx)
defer w.updateState(ctx)

Expand All @@ -209,6 +215,14 @@ func (w Workloader) Run(ctx context.Context, threadID int) error {
queryName := w.cfg.QueryNames[s.queryIdx%len(w.cfg.QueryNames)]
query := queries[queryName]

// only for driver == mysql and EnablePlanReplayer == true
if w.cfg.EnablePlanReplayer && w.cfg.Driver == "mysql" {
err := w.dumpPlanReplayer(ctx, s, query, queryName)
if err != nil {
return err
}
}

start := time.Now()
rows, err := s.Conn.QueryContext(ctx, query)
w.measurement.Measure(queryName, time.Now().Sub(start), err)
Expand All @@ -220,12 +234,12 @@ func (w Workloader) Run(ctx context.Context, threadID int) error {
}

// Cleanup cleans up workloader
func (w Workloader) Cleanup(ctx context.Context, threadID int) error {
func (w *Workloader) Cleanup(ctx context.Context, threadID int) error {
return nil
}

// Check checks data
func (w Workloader) Check(ctx context.Context, threadID int) error {
func (w *Workloader) Check(ctx context.Context, threadID int) error {
return nil
}

Expand Down Expand Up @@ -268,7 +282,7 @@ func chSummary(h *measurement.Histogram) []string {
}
}

func (w Workloader) OutputStats(ifSummaryReport bool) {
func (w *Workloader) OutputStats(ifSummaryReport bool) {
w.measurement.Output(ifSummaryReport, w.cfg.OutputStyle, outputRtMeasurement)
if ifSummaryReport {
var count int64
Expand All @@ -287,18 +301,29 @@ func (w Workloader) OutputStats(ifSummaryReport bool) {
}

// DBName returns the name of test db.
func (w Workloader) DBName() string {
func (w *Workloader) DBName() string {
return w.cfg.DBName
}

func (w Workloader) IsPlanReplayerDumpEnabled() bool {
return false
func (w *Workloader) dumpPlanReplayer(ctx context.Context, s *chState, query, queryName string) error {
query = strings.Replace(query, "/*PLACEHOLDER*/", "plan replayer dump explain", 1)
return w.PlanReplayerRunner.Dump(ctx, s.Conn, query, queryName)
}

func (w Workloader) PreparePlanReplayerDump() error {
return nil
func (w *Workloader) IsPlanReplayerDumpEnabled() bool {
return w.cfg.EnablePlanReplayer
}

func (w Workloader) FinishPlanReplayerDump() error {
return nil
func (w *Workloader) PreparePlanReplayerDump() error {
w.cfg.PlanReplayerConfig.WorkloadName = w.Name()
if w.PlanReplayerRunner == nil {
w.PlanReplayerRunner = &replayer.PlanReplayerRunner{
Config: w.cfg.PlanReplayerConfig,
}
}
return w.PlanReplayerRunner.Prepare()
}

func (w *Workloader) FinishPlanReplayerDump() error {
return w.PlanReplayerRunner.Finish()
}
18 changes: 18 additions & 0 deletions cmd/go-tpc/ch_benchmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,21 @@ func registerCHBenchmark(root *cobra.Command) {
executeCH("run")
},
}
cmdRun.PersistentFlags().BoolVar(&chConfig.EnablePlanReplayer,
"use-plan-replayer",
false,
"Use Plan Replayer to dump stats and variables before running queries")

cmdRun.PersistentFlags().StringVar(&chConfig.PlanReplayerConfig.PlanReplayerDir,
"plan-replayer-dir",
"",
"Dir of Plan Replayer file dumps")

cmdRun.PersistentFlags().StringVar(&chConfig.PlanReplayerConfig.PlanReplayerFileName,
"plan-replayer-file",
"",
"Name of plan Replayer file dumps")

cmdRun.PersistentFlags().IntSliceVar(&tpccConfig.Weight, "weight", []int{45, 43, 4, 4, 4}, "Weight for NewOrder, Payment, OrderStatus, Delivery, StockLevel")
cmd.AddCommand(cmdRun, cmdPrepare)
root.AddCommand(cmd)
Expand All @@ -85,8 +100,11 @@ func executeCH(action string) {
tpccConfig.Threads = threads
tpccConfig.Isolation = isolationLevel
chConfig.OutputStyle = outputStyle
chConfig.Driver = driver
chConfig.DBName = dbName
chConfig.QueryNames = strings.Split(chConfig.RawQueries, ",")
chConfig.PlanReplayerConfig.Host = host
chConfig.PlanReplayerConfig.StatusPort = statusPort

var (
tp, ap workload.Workloader
Expand Down
18 changes: 18 additions & 0 deletions cmd/go-tpc/rawsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,22 @@ func registerRawsql(root *cobra.Command) {
execRawsql("run")
},
}

cmdRun.PersistentFlags().BoolVar(&rawsqlConfig.EnablePlanReplayer,
"use-plan-replayer",
false,
"Use Plan Replayer to dump stats and variables before running queries")

cmdRun.PersistentFlags().StringVar(&rawsqlConfig.PlanReplayerConfig.PlanReplayerDir,
"plan-replayer-dir",
"",
"Dir of Plan Replayer file dumps")

cmdRun.PersistentFlags().StringVar(&rawsqlConfig.PlanReplayerConfig.PlanReplayerFileName,
"plan-replayer-file",
"",
"Name of plan Replayer file dumps")

cmdRun.PersistentFlags().StringVar(&queryFiles,
"query-files",
"",
Expand Down Expand Up @@ -67,6 +83,8 @@ func execRawsql(action string) {
rawsqlConfig.QueryNames = strings.Split(queryFiles, ",")
rawsqlConfig.Queries = make(map[string]string, len(rawsqlConfig.QueryNames))
rawsqlConfig.RefreshWait = refreshConnWait
rawsqlConfig.PlanReplayerConfig.Host = host
rawsqlConfig.PlanReplayerConfig.StatusPort = statusPort

for i, filename := range rawsqlConfig.QueryNames {
queryData, err := ioutil.ReadFile(filename)
Expand Down
34 changes: 17 additions & 17 deletions cmd/go-tpc/tpch.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ func executeTpch(action string) {
os.Exit(1)
}

tpchConfig.Host = host
tpchConfig.StatusPort = statusPort
tpchConfig.PlanReplayerConfig.Host = host
tpchConfig.PlanReplayerConfig.StatusPort = statusPort

tpchConfig.OutputStyle = outputStyle
tpchConfig.Driver = driver
Expand Down Expand Up @@ -63,21 +63,6 @@ func registerTpch(root *cobra.Command) {
false,
"Check output data, only when the scale factor equals 1")

cmd.PersistentFlags().BoolVar(&tpchConfig.EnablePlanReplayer,
"use-plan-replayer",
false,
"Use Plan Replayer to dump stats and variables before running queries")

cmd.PersistentFlags().StringVar(&tpchConfig.PlanReplayerDir,
"plan-replayer-dir",
"",
"Dir of Plan Replayer file dumps")

cmd.PersistentFlags().StringVar(&tpchConfig.PlanReplayerFileName,
"plan-replayer-file",
"",
"Name of plan Replayer file dumps")

var cmdPrepare = &cobra.Command{
Use: "prepare",
Short: "Prepare data for the workload",
Expand Down Expand Up @@ -125,6 +110,21 @@ func registerTpch(root *cobra.Command) {
},
}

cmdRun.PersistentFlags().BoolVar(&tpchConfig.EnablePlanReplayer,
"use-plan-replayer",
false,
"Use Plan Replayer to dump stats and variables before running queries")

cmdRun.PersistentFlags().StringVar(&tpchConfig.PlanReplayerConfig.PlanReplayerDir,
"plan-replayer-dir",
"",
"Dir of Plan Replayer file dumps")

cmdRun.PersistentFlags().StringVar(&tpchConfig.PlanReplayerConfig.PlanReplayerFileName,
"plan-replayer-file",
"",
"Name of plan Replayer file dumps")

var cmdCleanup = &cobra.Command{
Use: "cleanup",
Short: "Cleanup data for the workload",
Expand Down
Loading