Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

backend: implement disk quota #493

Merged
merged 41 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
cec3d30
common: copied the GetStorageSize function from DM
kennytm Nov 25, 2020
147851e
common: recognize multierr in IsRetryableError()
kennytm Nov 26, 2020
bd02b59
restore: refactor runPeriodicActions
kennytm Nov 26, 2020
360c491
config: fix test failure on Windows
kennytm Nov 26, 2020
3a605bc
*: implement disk quota
kennytm Nov 23, 2020
6afb02d
backend: split the disk size and mem size metrics
kennytm Nov 27, 2020
657adea
backend,restore: refactor, move the postProcessLock into the backend
kennytm Nov 27, 2020
1d7a411
go.mod1: update
kennytm Nov 27, 2020
95891e1
backend: introduce a conversion factor between memtable and disk size
kennytm Nov 27, 2020
7bf5fbf
backend: force the aes-256-cbc before anyone notices ;)
kennytm Dec 2, 2020
cc65433
tests: split FailAfterWriteRows into two failpoints
kennytm Dec 2, 2020
dad23f5
tests: add disk-quota test
kennytm Dec 2, 2020
2575452
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 8, 2020
626f480
backend/local: replace the semaphore by a regular mutex
kennytm Dec 8, 2020
1ed6164
tests/disk_quota: add that 2-second loop for SET GLOBAL
kennytm Dec 8, 2020
4c6f595
Apply suggestions from code review
kennytm Dec 9, 2020
958531e
config: elaborate the "insufficient disk space" error
kennytm Dec 9, 2020
369b6f4
restore: addressed comments
kennytm Dec 14, 2020
3f5034e
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 14, 2020
3dbcd9a
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 24, 2020
c2abaf6
backend/local: do not reopen if resetting a closed engine
kennytm Dec 24, 2020
6312a4c
local: protect against concurrent flush + reset
kennytm Dec 28, 2020
c03837d
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 28, 2020
0bd3d11
backend/local: seems have to treat everything as isImporting
kennytm Dec 28, 2020
50b6cf4
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 28, 2020
73b7d9a
backend/local: store the sync.Mutex separately from the engines
kennytm Dec 30, 2020
5c5bd68
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 11, 2021
9e78a25
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 11, 2021
e66c73f
*: fix tests
kennytm Jan 12, 2021
cd8c3cc
backend: limit the SST size of the LocalWriters
kennytm Jan 13, 2021
11fa643
backend/local: fix FlushAllEngines being no op
kennytm Jan 15, 2021
8b51097
backend/local: use go.uber.org/atomic instead of sync/atomic
kennytm Jan 18, 2021
3af075d
backend/local: refactor LocalWriter to clean up the logic
kennytm Jan 18, 2021
dc5b5b7
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 19, 2021
66a250c
tests: use a unique sorted-kv-dir per test
kennytm Jan 19, 2021
6dd5ef3
backend/local: fixed some doc comments
kennytm Jan 19, 2021
ca808b2
backend: addressed comments
kennytm Jan 20, 2021
e401ffe
backend/local: just always save meta every time we flush
kennytm Jan 21, 2021
6309d13
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 28, 2021
98e02a7
backend: addressed comments
kennytm Jan 28, 2021
93beae8
Merge branch 'master' into kennytm/disk-quota
ti-srebot Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ require (
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
go.etcd.io/bbolt v1.3.5 // indirect
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200904194848-62affa334b73
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f // indirect
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f
golang.org/x/text v0.3.4
golang.org/x/tools v0.0.0-20200904185747-39188db58858 // indirect
google.golang.org/grpc v1.27.1
Expand Down
95 changes: 91 additions & 4 deletions lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package backend
import (
"context"
"fmt"
"sort"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -82,6 +83,18 @@ func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {

var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")

type EngineFileSize struct {
// UUID is the engine's UUID.
UUID uuid.UUID
// DiskSize is the estimated total file size on disk right now.
DiskSize int64
// MemSize is the total memory size used by the engine. This is the
// estimated additional size saved onto disk after calling Flush().
MemSize int64
// IsImporting indicates whether the engine performing Import().
IsImporting bool
}

// AbstractBackend is the abstract interface behind Backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
Expand Down Expand Up @@ -128,6 +141,28 @@ type AbstractBackend interface {
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

// FlushEngine ensures all KV pairs written to an open engine has been
// synchronized, such that kill-9'ing Lightning afterwards and resuming from
// checkpoint can recover the exact same content.
//
// This method is only relevant for local backend, and is no-op for all
// other backends.
FlushEngine(engineUUID uuid.UUID) error

// FlushAllEngines performs FlushEngine on all opened engines. This is a
// very expensive operation and should only be used in some rare situation
// (e.g. preparing to resolve a disk quota violation).
FlushAllEngines() error

// EngineFileSizes obtains the size occupied locally of all engines managed
// by this backend. This method is used to compute disk quota.
// It can return nil if the content are all stored remotely.
EngineFileSizes() []EngineFileSize

// ResetEngine clears all written KV pairs in this opened engine.
ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error)
}

Expand Down Expand Up @@ -207,6 +242,61 @@ func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string)
return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}

func (be Backend) FlushAll() error {
return be.abstract.FlushAllEngines()
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
func (be Backend) CheckDiskQuota(quota int64) (
largeEngines []uuid.UUID,
inProgressLargeEngines int,
totalDiskSize int64,
totalMemSize int64,
) {
sizes := be.abstract.EngineFileSizes()
sort.Slice(sizes, func(i, j int) bool {
a, b := &sizes[i], &sizes[j]
if a.IsImporting != b.IsImporting {
return a.IsImporting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean? Do you want to sort all importing engine before other engines?

Copy link
Collaborator Author

@kennytm kennytm Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can't import engines that are already importing, but their size do count towards the disk quota.

}
return a.DiskSize+a.MemSize < b.DiskSize+b.MemSize
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
totalMemSize += size.MemSize
if totalDiskSize+totalMemSize > quota {
if size.IsImporting {
inProgressLargeEngines++
} else {
largeEngines = append(largeEngines, size.UUID)
}
}
}
return
}

// UnsafeImportAndReset forces the backend to import the content of an engine
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger("<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
Expand Down Expand Up @@ -252,10 +342,7 @@ func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {

// Flush current written data for local backend
func (engine *OpenedEngine) Flush() error {
if l, ok := engine.backend.(*local); ok {
return l.Flush(engine.uuid)
}
return nil
return engine.backend.FlushEngine(engine.uuid)
}

// WriteRows writes a collection of encoded rows into the engine.
Expand Down
74 changes: 74 additions & 0 deletions lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,77 @@ func (s *backendSuite) TestNewEncoder(c *C) {
c.Assert(realEncoder, Equals, encoder)
c.Assert(err, IsNil)
}

func (s *backendSuite) TestCheckDiskQuota(c *C) {
s.setUpTest(c)
defer s.tearDownTest()

uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111")
uuid3 := uuid.MustParse("33333333-3333-3333-3333-333333333333")
uuid5 := uuid.MustParse("55555555-5555-5555-5555-555555555555")
uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777")
uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999")

fileSizes := []kv.EngineFileSize{
{
UUID: uuid1,
DiskSize: 1000,
MemSize: 0,
IsImporting: false,
},
{
UUID: uuid3,
DiskSize: 2000,
MemSize: 1000,
IsImporting: true,
},
{
UUID: uuid5,
DiskSize: 1500,
MemSize: 3500,
IsImporting: false,
},
{
UUID: uuid7,
DiskSize: 0,
MemSize: 7000,
IsImporting: true,
},
{
UUID: uuid9,
DiskSize: 4500,
MemSize: 4500,
IsImporting: false,
},
}

s.mockBackend.EXPECT().EngineFileSizes().Return(fileSizes).Times(4)

// No quota exceeded
le, iple, ds, ms := s.backend.CheckDiskQuota(30000)
c.Assert(le, HasLen, 0)
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))

// Quota exceeded, the largest one is out
le, iple, ds, ms = s.backend.CheckDiskQuota(20000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid9})
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))

// Quota exceeded, the importing one should be ranked least priority
le, iple, ds, ms = s.backend.CheckDiskQuota(12000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid5, uuid9})
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))

// Quota exceeded, the importing ones should not be visible
le, iple, ds, ms = s.backend.CheckDiskQuota(5000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid1, uuid5, uuid9})
c.Assert(iple, Equals, 1)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))
}
22 changes: 22 additions & 0 deletions lightning/backend/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type importer struct {
tls *common.TLS

mutationPool sync.Pool
// importLock is a backend-global lock to ensure only one ImportEngine() is
// running at a time slice.
importLock sync.Mutex
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

// NewImporter creates a new connection to tikv-importer. A single connection
Expand Down Expand Up @@ -147,6 +150,9 @@ func (importer *importer) Flush(_ context.Context, _ uuid.UUID) error {
}

func (importer *importer) ImportEngine(ctx context.Context, engineUUID uuid.UUID) error {
importer.importLock.Lock()
defer importer.importLock.Unlock()

req := &kv.ImportEngineRequest{
Uuid: engineUUID[:],
PdAddr: importer.pdAddr,
Expand Down Expand Up @@ -355,6 +361,22 @@ func (importer *importer) FetchRemoteTableModels(ctx context.Context, schema str
return fetchRemoteTableModelsFromTLS(ctx, importer.tls, schema)
}

func (importer *importer) EngineFileSizes() []EngineFileSize {
return nil
}

func (importer *importer) FlushEngine(uuid.UUID) error {
return nil
}

func (importer *importer) FlushAllEngines() error {
return nil
}

func (importer *importer) ResetEngine(context.Context, uuid.UUID) error {
return errors.New("cannot reset an engine in importer backend")
}

func (importer *importer) LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error) {
return &ImporterWriter{importer: importer, engineUUID: engineUUID}, nil
}
Expand Down
Loading