Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
*: limit IO concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
lonng committed Dec 26, 2018
1 parent 07232e3 commit e96ca9c
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 55 deletions.
2 changes: 2 additions & 0 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type Lightning struct {
common.LogConfig
TableConcurrency int `toml:"table-concurrency" json:"table-concurrency"`
RegionConcurrency int `toml:"region-concurrency" json:"region-concurrency"`
IOConcurrency int `toml:"io-concurrency" json:"region-concurrency"`
ProfilePort int `toml:"pprof-port" json:"pprof-port"`
CheckRequirements bool `toml:"check-requirements" json:"check-requirements"`
}
Expand Down Expand Up @@ -130,6 +131,7 @@ func NewConfig() *Config {
App: Lightning{
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 8,
IOConcurrency: 5,
CheckRequirements: true,
},
TiDB: DBStore{
Expand Down
10 changes: 9 additions & 1 deletion lightning/mydump/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"

"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/worker"
)

// ChunkParser is a parser of the data files (the file containing only INSERT
Expand All @@ -32,6 +33,7 @@ type ChunkParser struct {
// cache
remainBuf *bytes.Buffer
appendBuf *bytes.Buffer
ioWorkers *worker.RestoreWorkerPool
}

// Chunk represents a portion of the data file.
Expand All @@ -49,12 +51,13 @@ type Row struct {
}

// NewChunkParser creates a new parser which can read chunks out of a file.
func NewChunkParser(reader io.Reader, blockBufSize int64) *ChunkParser {
func NewChunkParser(reader io.Reader, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) *ChunkParser {
return &ChunkParser{
reader: reader,
blockBuf: make([]byte, blockBufSize),
remainBuf: &bytes.Buffer{},
appendBuf: &bytes.Buffer{},
ioWorkers: ioWorkers,
}
}

Expand Down Expand Up @@ -85,7 +88,12 @@ const (

func (parser *ChunkParser) readBlock() error {
startTime := time.Now()

// limit IO concurrency
w := parser.ioWorkers.Apply()
n, err := parser.reader.Read(parser.blockBuf)
defer parser.ioWorkers.Recycle(w)

switch err {
case io.ErrUnexpectedEOF, io.EOF:
parser.isLastChunk = true
Expand Down
12 changes: 9 additions & 3 deletions lightning/mydump/parser_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
package mydump_test

import (
"context"
"io"
"strings"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/mydump"
"github.com/pingcap/tidb-lightning/lightning/worker"

"github.com/pkg/errors"
)

Expand All @@ -25,7 +28,8 @@ func (s *testMydumpParserSuite) TestReadRow(c *C) {
"insert another_table values (10, 11, 12, '(13)', '(', 14, ')');",
)

parser := mydump.NewChunkParser(reader, config.ReadBlockSize)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)

c.Assert(parser.ReadRow(), IsNil)
c.Assert(parser.LastRow(), DeepEquals, mydump.Row{
Expand Down Expand Up @@ -73,7 +77,8 @@ func (s *testMydumpParserSuite) TestReadChunks(c *C) {
INSERT foo VALUES (29,30,31,32),(33,34,35,36);
`)

parser := mydump.NewChunkParser(reader, config.ReadBlockSize)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)

chunks, err := parser.ReadChunks(32)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -119,7 +124,8 @@ func (s *testMydumpParserSuite) TestNestedRow(c *C) {
("789",CONVERT("[]" USING UTF8MB4));
`)

parser := mydump.NewChunkParser(reader, config.ReadBlockSize)
ioWorkers := worker.NewRestoreWorkerPool(context.Background(), 5, "test")
parser := mydump.NewChunkParser(reader, config.ReadBlockSize, ioWorkers)
chunks, err := parser.ReadChunks(96)

c.Assert(err, IsNil)
Expand Down
68 changes: 17 additions & 51 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/pingcap/tidb-lightning/lightning/metric"
"github.com/pingcap/tidb-lightning/lightning/mydump"
verify "github.com/pingcap/tidb-lightning/lightning/verification"
"github.com/pingcap/tidb-lightning/lightning/worker"

tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/util/hack"
Expand Down Expand Up @@ -95,8 +97,9 @@ type RestoreController struct {
cfg *config.Config
dbMetas []*mydump.MDDatabaseMeta
dbInfos map[string]*TidbDBInfo
tableWorkers *RestoreWorkerPool
regionWorkers *RestoreWorkerPool
tableWorkers *worker.RestoreWorkerPool
regionWorkers *worker.RestoreWorkerPool
ioWorkers *worker.RestoreWorkerPool
importer *kv.Importer
tidbMgr *TiDBManager
postProcessLock sync.Mutex // a simple way to ensure post-processing is not concurrent without using complicated goroutines
Expand Down Expand Up @@ -129,8 +132,9 @@ func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta,
rc := &RestoreController{
cfg: cfg,
dbMetas: dbMetas,
tableWorkers: NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
tableWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.TableConcurrency, "table"),
regionWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.RegionConcurrency, "region"),
ioWorkers: worker.NewRestoreWorkerPool(ctx, cfg.App.IOConcurrency, "io"),
importer: importer,
tidbMgr: tidbMgr,

Expand Down Expand Up @@ -439,9 +443,9 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
// Note: We still need tableWorkers to control the concurrency of tables. In the future, we will investigate more about
// the difference between restoring tables concurrently and restoring tables one by one.

worker := rc.tableWorkers.Apply()
restoreWorker := rc.tableWorkers.Apply()
wg.Add(1)
go func(w *RestoreWorker, t *TableRestore, cp *TableCheckpoint) {
go func(w *worker.RestoreWorker, t *TableRestore, cp *TableCheckpoint) {
defer wg.Done()

closedEngine, err := t.restore(ctx, rc, cp)
Expand All @@ -465,7 +469,7 @@ func (rc *RestoreController) restoreTables(ctx context.Context) error {
}

err = t.postProcess(ctx, closedEngine, rc, cp)
}(worker, tr, cp)
}(restoreWorker, tr, cp)
}
}

Expand Down Expand Up @@ -548,15 +552,15 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T
// 3. load kvs data (into kv deliver server)
// 4. flush kvs data (into tikv node)

cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize)
cr, err := newChunkRestore(chunkIndex, chunk, rc.cfg.Mydumper.ReadBlockSize, rc.ioWorkers)
if err != nil {
return nil, errors.Trace(err)
}
metric.ChunkCounter.WithLabelValues(metric.ChunkStatePending).Inc()

worker := rc.regionWorkers.Apply()
restoreWorker := rc.regionWorkers.Apply()
wg.Add(1)
go func(w *RestoreWorker, cr *chunkRestore) {
go func(w *worker.RestoreWorker, cr *chunkRestore) {
// Restore a chunk.
defer func() {
cr.close()
Expand All @@ -581,7 +585,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T

handled := int(atomic.AddInt32(handledChunksCount, 1))
common.AppLogger.Infof("[%s] handled region count = %d (%s)", t.tableName, handled, common.Percent(handled, len(cp.Chunks)))
}(worker, cr)
}(restoreWorker, cr)
}

wg.Wait()
Expand Down Expand Up @@ -860,56 +864,18 @@ func (rc *RestoreController) getTables() []string {
return tables
}

////////////////////////////////////////////////////////////////

type RestoreWorkerPool struct {
limit int
workers chan *RestoreWorker
name string
}

type RestoreWorker struct {
ID int64
}

func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool {
workers := make(chan *RestoreWorker, limit)
for i := 0; i < limit; i++ {
workers <- &RestoreWorker{ID: int64(i + 1)}
}

metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit))
return &RestoreWorkerPool{
limit: limit,
workers: workers,
name: name,
}
}

func (pool *RestoreWorkerPool) Apply() *RestoreWorker {
worker := <-pool.workers
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
return worker
}
func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) {
pool.workers <- worker
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
}

////////////////////////////////////////////////////////////////

type chunkRestore struct {
parser *mydump.ChunkParser
index int
chunk *ChunkCheckpoint
}

func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64) (*chunkRestore, error) {
func newChunkRestore(index int, chunk *ChunkCheckpoint, blockBufSize int64, ioWorkers *worker.RestoreWorkerPool) (*chunkRestore, error) {
reader, err := os.Open(chunk.Key.Path)
if err != nil {
return nil, errors.Trace(err)
}
parser := mydump.NewChunkParser(reader, blockBufSize)
parser := mydump.NewChunkParser(reader, blockBufSize, ioWorkers)

reader.Seek(chunk.Chunk.Offset, io.SeekStart)
parser.SetPos(chunk.Chunk.Offset, chunk.Chunk.PrevRowIDMax)
Expand Down
45 changes: 45 additions & 0 deletions lightning/worker/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package worker

import (
"context"

"github.com/pingcap/tidb-lightning/lightning/metric"
)

////////////////////////////////////////////////////////////////

type RestoreWorkerPool struct {
limit int
workers chan *RestoreWorker
name string
}

type RestoreWorker struct {
ID int64
}

func NewRestoreWorkerPool(ctx context.Context, limit int, name string) *RestoreWorkerPool {
workers := make(chan *RestoreWorker, limit)
for i := 0; i < limit; i++ {
workers <- &RestoreWorker{ID: int64(i + 1)}
}

metric.IdleWorkersGauge.WithLabelValues(name).Set(float64(limit))
return &RestoreWorkerPool{
limit: limit,
workers: workers,
name: name,
}
}

func (pool *RestoreWorkerPool) Apply() *RestoreWorker {
worker := <-pool.workers
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
return worker
}
func (pool *RestoreWorkerPool) Recycle(worker *RestoreWorker) {
pool.workers <- worker
metric.IdleWorkersGauge.WithLabelValues(pool.name).Set(float64(len(pool.workers)))
}

////////////////////////////////////////////////////////////////

0 comments on commit e96ca9c

Please sign in to comment.