Skip to content

Commit

Permalink
Lightning : make pebbleDB block size configurable (pingcap#49514)
Browse files Browse the repository at this point in the history
  • Loading branch information
mittalrishabh authored Jan 3, 2024
1 parent 999f599 commit 89fc440
Show file tree
Hide file tree
Showing 9 changed files with 21 additions and 10 deletions.
3 changes: 3 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ type LocalEngineConfig struct {
CompactThreshold int64
// compact routine concurrency
CompactConcurrency int

// blocksize
BlockSize int
}

// ExternalEngineConfig is the configuration used for local backend external engine.
Expand Down
14 changes: 7 additions & 7 deletions br/pkg/lightning/backend/local/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ func (e *Engine) ingestSSTLoop() {
}
ingestMetas := metas.metas
if e.config.Compact {
newMeta, err := e.sstIngester.mergeSSTs(metas.metas, e.sstDir)
newMeta, err := e.sstIngester.mergeSSTs(metas.metas, e.sstDir, e.config.BlockSize)
if err != nil {
e.setError(err)
return
Expand Down Expand Up @@ -1349,7 +1349,7 @@ func (w *Writer) addSST(ctx context.Context, meta *sstMeta) error {

func (w *Writer) createSSTWriter() (*sstWriter, error) {
path := filepath.Join(w.engine.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path)
writer, err := newSSTWriter(path, w.engine.config.BlockSize)
if err != nil {
return nil, err
}
Expand All @@ -1365,7 +1365,7 @@ type sstWriter struct {
logger log.Logger
}

func newSSTWriter(path string) (*sstable.Writer, error) {
func newSSTWriter(path string, blockSize int) (*sstable.Writer, error) {
f, err := os.Create(path)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -1374,7 +1374,7 @@ func newSSTWriter(path string) (*sstable.Writer, error) {
TablePropertyCollectors: []func() pebble.TablePropertyCollector{
newRangePropertiesCollector,
},
BlockSize: 16 * 1024,
BlockSize: blockSize,
})
return writer, nil
}
Expand Down Expand Up @@ -1504,15 +1504,15 @@ func (h *sstIterHeap) Next() ([]byte, []byte, error) {
// sstIngester is a interface used to merge and ingest SST files.
// it's a interface mainly used for test convenience
type sstIngester interface {
mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error)
mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error)
ingest([]*sstMeta) error
}

type dbSSTIngester struct {
e *Engine
}

func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) {
func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error) {
if len(metas) == 0 {
return nil, errors.New("sst metas is empty")
} else if len(metas) == 1 {
Expand Down Expand Up @@ -1561,7 +1561,7 @@ func (i dbSSTIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error)
heap.Init(mergeIter)

name := filepath.Join(dir, fmt.Sprintf("%s.sst", uuid.New()))
writer, err := newSSTWriter(name)
writer, err := newSSTWriter(name, blockSize)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/backend/local/engine_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func (em *engineManager) openEngineDB(engineUUID uuid.UUID, readOnly bool) (*peb
opt.Levels = []pebble.LevelOptions{
{
TargetFileSize: 16 * units.GiB,
BlockSize: em.BlockSize,
},
}

Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ type BackendConfig struct {
// see DisableAutomaticCompactions of pebble.Options for more details.
// default true.
DisableAutomaticCompactions bool
BlockSize int
}

// NewBackendConfig creates a new BackendConfig.
Expand All @@ -430,6 +431,7 @@ func NewBackendConfig(cfg *config.Config, maxOpenFiles int, keyspaceName, resour
MaxConnPerStore: cfg.TikvImporter.RangeConcurrency,
ConnCompressType: cfg.TikvImporter.CompressKVPairs,
WorkerConcurrency: cfg.TikvImporter.RangeConcurrency * 2,
BlockSize: int(cfg.TikvImporter.BlockSize),
KVWriteBatchSize: int64(cfg.TikvImporter.SendKVSize),
RegionSplitBatchSize: cfg.TikvImporter.RegionSplitBatchSize,
RegionSplitConcurrency: cfg.TikvImporter.RegionSplitConcurrency,
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/backend/local/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ func (c *mockSplitClient) GetRegion(ctx context.Context, key []byte) (*split.Reg

type testIngester struct{}

func (i testIngester) mergeSSTs(metas []*sstMeta, dir string) (*sstMeta, error) {
func (i testIngester) mergeSSTs(metas []*sstMeta, dir string, blockSize int) (*sstMeta, error) {
if len(metas) == 0 {
return nil, errors.New("sst metas is empty")
} else if len(metas) == 1 {
Expand Down Expand Up @@ -592,7 +592,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {

createSSTWriter := func() (*sstWriter, error) {
path := filepath.Join(f.sstDir, uuid.New().String()+".sst")
writer, err := newSSTWriter(path)
writer, err := newSSTWriter(path, 16*1024)
if err != nil {
return nil, err
}
Expand All @@ -614,7 +614,7 @@ func testMergeSSTs(t *testing.T, kvs [][]common.KvPair, meta *sstMeta) {
}

i := dbSSTIngester{e: f}
newMeta, err := i.mergeSSTs(metas, tmpPath)
newMeta, err := i.mergeSSTs(metas, tmpPath, 16*1024)
require.NoError(t, err)

require.Equal(t, meta.totalCount, newMeta.totalCount)
Expand Down
2 changes: 2 additions & 0 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,7 @@ type TikvImporter struct {
StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"`
// default is PausePDSchedulerScopeTable to compatible with previous version(>= 6.1)
PausePDSchedulerScope PausePDSchedulerScope `toml:"pause-pd-scheduler-scope" json:"pause-pd-scheduler-scope"`
BlockSize ByteSize `toml:"block-size" json:"block-size"`
}

func (t *TikvImporter) adjust() error {
Expand Down Expand Up @@ -1457,6 +1458,7 @@ func NewConfig() *Config {
DiskQuota: ByteSize(math.MaxInt64),
DuplicateResolution: DupeResAlgNone,
PausePDSchedulerScope: PausePDSchedulerScopeTable,
BlockSize: 16 * 1024,
},
PostRestore: PostRestore{
Checksum: OpLevelRequired,
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func (tr *TableImporter) importEngines(pCtx context.Context, rc *Controller, cp
Compact: threshold > 0,
CompactConcurrency: 4,
CompactThreshold: threshold,
BlockSize: int(rc.cfg.TikvImporter.BlockSize),
}
}
// import backend can't reopen engine if engine is closed, so
Expand Down
1 change: 1 addition & 0 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func generateLocalEngineConfig(id int64, dbName, tbName string) *backend.EngineC
Compact: true,
CompactThreshold: int64(compactMemory),
CompactConcurrency: compactConcurrency,
BlockSize: 16 * 1024, // using default for DDL
},
TableInfo: &checkpoints.TidbTableInfo{
ID: id,
Expand Down
1 change: 1 addition & 0 deletions pkg/executor/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ func (ti *TableImporter) OpenIndexEngine(ctx context.Context, engineID int32) (*
Compact: threshold > 0,
CompactConcurrency: 4,
CompactThreshold: threshold,
BlockSize: 16 * 1024,
}
fullTableName := ti.fullTableName()
// todo: cleanup all engine data on any error since we don't support checkpoint for now
Expand Down

0 comments on commit 89fc440

Please sign in to comment.