Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

backend: implement disk quota #493

Merged
merged 41 commits into from
Jan 29, 2021
Merged
Show file tree
Hide file tree
Changes from 38 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
cec3d30
common: copied the GetStorageSize function from DM
kennytm Nov 25, 2020
147851e
common: recognize multierr in IsRetryableError()
kennytm Nov 26, 2020
bd02b59
restore: refactor runPeriodicActions
kennytm Nov 26, 2020
360c491
config: fix test failure on Windows
kennytm Nov 26, 2020
3a605bc
*: implement disk quota
kennytm Nov 23, 2020
6afb02d
backend: split the disk size and mem size metrics
kennytm Nov 27, 2020
657adea
backend,restore: refactor, move the postProcessLock into the backend
kennytm Nov 27, 2020
1d7a411
go.mod1: update
kennytm Nov 27, 2020
95891e1
backend: introduce a conversion factor between memtable and disk size
kennytm Nov 27, 2020
7bf5fbf
backend: force the aes-256-cbc before anyone notices ;)
kennytm Dec 2, 2020
cc65433
tests: split FailAfterWriteRows into two failpoints
kennytm Dec 2, 2020
dad23f5
tests: add disk-quota test
kennytm Dec 2, 2020
2575452
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 8, 2020
626f480
backend/local: replace the semaphore by a regular mutex
kennytm Dec 8, 2020
1ed6164
tests/disk_quota: add that 2-second loop for SET GLOBAL
kennytm Dec 8, 2020
4c6f595
Apply suggestions from code review
kennytm Dec 9, 2020
958531e
config: elaborate the "insufficient disk space" error
kennytm Dec 9, 2020
369b6f4
restore: addressed comments
kennytm Dec 14, 2020
3f5034e
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 14, 2020
3dbcd9a
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 24, 2020
c2abaf6
backend/local: do not reopen if resetting a closed engine
kennytm Dec 24, 2020
6312a4c
local: protect against concurrent flush + reset
kennytm Dec 28, 2020
c03837d
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 28, 2020
0bd3d11
backend/local: seems have to treat everything as isImporting
kennytm Dec 28, 2020
50b6cf4
Merge branch 'master' into kennytm/disk-quota
kennytm Dec 28, 2020
73b7d9a
backend/local: store the sync.Mutex separately from the engines
kennytm Dec 30, 2020
5c5bd68
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 11, 2021
9e78a25
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 11, 2021
e66c73f
*: fix tests
kennytm Jan 12, 2021
cd8c3cc
backend: limit the SST size of the LocalWriters
kennytm Jan 13, 2021
11fa643
backend/local: fix FlushAllEngines being no op
kennytm Jan 15, 2021
8b51097
backend/local: use go.uber.org/atomic instead of sync/atomic
kennytm Jan 18, 2021
3af075d
backend/local: refactor LocalWriter to clean up the logic
kennytm Jan 18, 2021
dc5b5b7
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 19, 2021
66a250c
tests: use a unique sorted-kv-dir per test
kennytm Jan 19, 2021
6dd5ef3
backend/local: fixed some doc comments
kennytm Jan 19, 2021
ca808b2
backend: addressed comments
kennytm Jan 20, 2021
e401ffe
backend/local: just always save meta every time we flush
kennytm Jan 21, 2021
6309d13
Merge branch 'master' into kennytm/disk-quota
kennytm Jan 28, 2021
98e02a7
backend: addressed comments
kennytm Jan 28, 2021
93beae8
Merge branch 'master' into kennytm/disk-quota
ti-srebot Jan 29, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod1
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,12 @@ require (
github.com/xitongsys/parquet-go v1.5.5-0.20201110004701-b09c49d6d457
github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0
go.etcd.io/bbolt v1.3.5 // indirect
go.uber.org/atomic v1.7.0
go.uber.org/multierr v1.6.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20200904194848-62affa334b73
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f // indirect
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f
golang.org/x/text v0.3.4
golang.org/x/tools v0.0.0-20200904185747-39188db58858 // indirect
google.golang.org/grpc v1.27.1
Expand Down
11 changes: 11 additions & 0 deletions go.sum1
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/cheggaaa/pb v2.0.7+incompatible h1:gLKifR1UkZ/kLkda5gC0K6c8g+jU2sINPtBeOiNlMhU=
github.com/cheggaaa/pb v2.0.7+incompatible/go.mod h1:pQciLPpbU0oxA0h+VJYYLxO+XeDQb5pZijXscXHm81s=
github.com/cheggaaa/pb/v3 v3.0.4/go.mod h1:7rgWxLrAUcFMkvJuv09+DYi7mMUYi8nO9iOWcvGJPfw=
github.com/cheggaaa/pb/v3 v3.0.5 h1:lmZOti7CraK9RSjzExsY53+WWfub9Qv13B5m4ptEoPE=
github.com/cheggaaa/pb/v3 v3.0.5/go.mod h1:X1L61/+36nz9bjIsrDU52qHKOQukUQe2Ge+YvGuquCw=
github.com/cheynewallace/tabby v1.1.0/go.mod h1:Pba/6cUL8uYqvOc9RkyvFbHGrQ9wShyrn6/S/1OYVys=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
Expand Down Expand Up @@ -206,6 +207,7 @@ github.com/coreos/go-systemd v0.0.0-20181031085051-9002847aa142/go.mod h1:F5haX7
github.com/coreos/go-systemd v0.0.0-20190212144455-93d5ec2c7f76/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e h1:Wf6HqHfScWJN9/ZjdUKyjop4mf3Qdd+1TvvltAvM3m8=
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f h1:JOrtw2xFKzlg+cbHpyrpLDmnN1HqhBfnX7WDiW7eG2c=
github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
Expand Down Expand Up @@ -500,13 +502,15 @@ github.com/gostaticanalysis/analysisutil v0.0.3/go.mod h1:eEOZF4jCKGi+aprrirO9e7
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4 h1:z53tR0945TRRQO/fLEVPI6SMv7ZflF0TEaTAoU7tOzg=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0 h1:THDBEeQ9xZ8JEaCLyLQqXMMdRqNr0QAUJTIkQAUtFjg=
github.com/grpc-ecosystem/go-grpc-middleware v1.1.0/go.mod h1:f5nM7jw/oeRSadq3xCzHAvxcr8HZnzsqU6ILg/0NiiE=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.8.1/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.12.1/go.mod h1:8XEsbTttt/W+VvjtQhLACqCisSPWTxCZ7sBRjU6iH9c=
github.com/grpc-ecosystem/grpc-gateway v1.14.3 h1:OCJlWkOUoTnl0neNGlf4fUm3TmbEtguw7vR+nGtnDjY=
github.com/grpc-ecosystem/grpc-gateway v1.14.3/go.mod h1:6CwZWGDSPRJidgKAtJVvND6soZe6fT7iteq8wDPdhb0=
github.com/gtank/cryptopasta v0.0.0-20170601214702-1f550f6f2f69/go.mod h1:YLEMZOtU+AZ7dhN9T/IpGhXVGly2bvkJQ+zxj3WeVQo=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
Expand Down Expand Up @@ -655,6 +659,7 @@ github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVc
github.com/mattn/go-colorable v0.1.4 h1:snbPLB8fVfU9iwbbo30TPtbLRzwWu6aJS6Xh4eaaviA=
github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.7 h1:bQGKb3vps/j0E9GfJQ03JyhRuxsvdAanXlT9BTw3mdw=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
Expand All @@ -664,6 +669,7 @@ github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2y
github.com/mattn/go-isatty v0.0.10/go.mod h1:qgIWMr58cqv1PHHyhnkY9lrL7etaEgOFcMEpPG5Rm84=
github.com/mattn/go-isatty v0.0.11 h1:FxPOTFNqGkuDUGi3H/qkUbQO4ZiBa2brKq5r0l8TGeM=
github.com/mattn/go-isatty v0.0.11/go.mod h1:PhnuNfih5lzO57/f3n+odYbM4JtupLOxQOAqxQCu2WE=
github.com/mattn/go-isatty v0.0.12 h1:wuysRhFDzyxgEmMf5xjvJ2M9dZoWAXNNr5LSBS7uHXY=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
Expand Down Expand Up @@ -918,6 +924,7 @@ github.com/remyoudompheng/bigfft v0.0.0-20170806203942-52369c62f446/go.mod h1:uY
github.com/remyoudompheng/bigfft v0.0.0-20190512091148-babf20351dd7/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237 h1:HQagqIiBmr8YXawX/le3+O26N+vPPC1PtjaF3mwnook=
github.com/remyoudompheng/bigfft v0.0.0-20190728182440-6a916e37a237/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0 h1:OdAsTTz6OkFY5QxjkYwrChwuRruF69c169dPK26NUlk=
github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo=
github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
Expand Down Expand Up @@ -1030,6 +1037,7 @@ github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d h1:4J9HCZVpvDmj2t
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/syndtr/goleveldb v1.0.1-0.20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285 h1:uSDYjYejelKyceA6DiCsngFof9jAyeaSyX9XC5a1a7Q=
github.com/syndtr/goleveldb v1.0.1-0.20190625010220-02440ea7a285/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM=
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8=
Expand Down Expand Up @@ -1061,6 +1069,7 @@ github.com/twmb/murmur3 v1.1.3 h1:D83U0XYKcHRYwYIpBKf3Pks91Z0Byda/9SJ8B6EMRcA=
github.com/twmb/murmur3 v1.1.3/go.mod h1:Qq/R7NUyOfr65zD+6Q5IHKsJLwP7exErjN6lyyq3OSQ=
github.com/uber-go/atomic v1.3.2 h1:Azu9lPBWRNKzYXSIwRfgRuDuS0YKsK4NFhiQv98gkxo=
github.com/uber-go/atomic v1.3.2/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o=
github.com/uber-go/atomic v1.4.0/go.mod h1:/Ct5t2lcmbJ4OSe/waGBoaVvVqtO0bmtfVNex1PFV8g=
github.com/uber/jaeger-client-go v2.15.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-client-go v2.16.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
Expand Down Expand Up @@ -1234,6 +1243,7 @@ golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHl
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f h1:J5lckAjkw6qYlOZNj90mLYNTEKDvWeuc1yieZ8qUzUE=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRuDixDT3tpyyb+LUpUlRWLxfhWrs=
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b h1:Wh+f8QHJXR411sJR8/vRBTZ7YapZaRvUcLFFJhusH0k=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
Expand Down Expand Up @@ -1623,6 +1633,7 @@ sigs.k8s.io/yaml v1.2.0/go.mod h1:yfXDCHCao9+ENCvLSE62v9VSji2MKu5jeNfTrofGhJc=
software.sslmate.com/src/go-pkcs12 v0.0.0-20200619203921-c9ed90bd32dc/go.mod h1:/xvNRWUqm0+/ZMiF4EX00vrSCMsE4/NHb+Pt3freEeQ=
sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4 h1:VO9oZbbkvTwqLimlQt15QNdOOBArT2dw/bvzsMZBiqQ=
sourcegraph.com/sourcegraph/appdash v0.0.0-20180531100431-4c381bd170b4/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 h1:ucqkfpjg9WzSUubAO62csmucvxl4/JeW3F4I4909XkM=
sourcegraph.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0/go.mod h1:hI742Nqp5OhwiqlzhgfbWU4mW4yO10fP+LoT9WOswdU=
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 h1:e1sMhtVq9AfcEy8AXNb8eSg6gbzfdpYhoNqnPJa+GzI=
sourcegraph.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67/go.mod h1:L5q+DGLGOQFpo1snNEkLOJT2d1YTW66rWNzatr3He1k=
105 changes: 96 additions & 9 deletions lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package backend
import (
"context"
"fmt"
"sort"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -82,6 +83,18 @@ func MakeUUID(tableName string, engineID int32) (string, uuid.UUID) {

var engineNamespace = uuid.MustParse("d68d6abe-c59e-45d6-ade8-e2b0ceb7bedf")

type EngineFileSize struct {
// UUID is the engine's UUID.
UUID uuid.UUID
// DiskSize is the estimated total file size on disk right now.
DiskSize int64
// MemSize is the total memory size used by the engine. This is the
// estimated additional size saved onto disk after calling Flush().
MemSize int64
// IsImporting indicates whether the engine performing Import().
IsImporting bool
}

// AbstractBackend is the abstract interface behind Backend.
// Implementations of this interface must be goroutine safe: you can share an
// instance and execute any method anywhere.
Expand Down Expand Up @@ -128,7 +141,29 @@ type AbstractBackend interface {
// - PKIsHandle (true = do not generate _tidb_rowid)
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)

LocalWriter(ctx context.Context, engineUUID uuid.UUID) (EngineWriter, error)
// FlushEngine ensures all KV pairs written to an open engine has been
// synchronized, such that kill-9'ing Lightning afterwards and resuming from
// checkpoint can recover the exact same content.
//
// This method is only relevant for local backend, and is no-op for all
// other backends.
FlushEngine(ctx context.Context, engineUUID uuid.UUID) error

// FlushAllEngines performs FlushEngine on all opened engines. This is a
// very expensive operation and should only be used in some rare situation
// (e.g. preparing to resolve a disk quota violation).
FlushAllEngines(ctx context.Context) error

// EngineFileSizes obtains the size occupied locally of all engines managed
// by this backend. This method is used to compute disk quota.
// It can return nil if the content are all stored remotely.
EngineFileSizes() []EngineFileSize

// ResetEngine clears all written KV pairs in this opened engine.
ResetEngine(ctx context.Context, engineUUID uuid.UUID) error

// LocalWriter obtains a thread-local EngineWriter for writing rows into the given engine.
LocalWriter(ctx context.Context, engineUUID uuid.UUID, maxCacheSize int64) (EngineWriter, error)
}

func fetchRemoteTableModelsFromTLS(ctx context.Context, tls *common.TLS, schema string) ([]*model.TableInfo, error) {
Expand Down Expand Up @@ -207,6 +242,61 @@ func (be Backend) FetchRemoteTableModels(ctx context.Context, schemaName string)
return be.abstract.FetchRemoteTableModels(ctx, schemaName)
}

func (be Backend) FlushAll(ctx context.Context) error {
return be.abstract.FlushAllEngines(ctx)
}

// CheckDiskQuota verifies if the total engine file size is below the given
// quota. If the quota is exceeded, this method returns an array of engines,
// which after importing can decrease the total size below quota.
func (be Backend) CheckDiskQuota(quota int64) (
largeEngines []uuid.UUID,
inProgressLargeEngines int,
totalDiskSize int64,
totalMemSize int64,
) {
sizes := be.abstract.EngineFileSizes()
sort.Slice(sizes, func(i, j int) bool {
a, b := &sizes[i], &sizes[j]
if a.IsImporting != b.IsImporting {
return a.IsImporting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it mean? Do you want to sort all importing engine before other engines?

Copy link
Collaborator Author

@kennytm kennytm Dec 1, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We can't import engines that are already importing, but their size do count towards the disk quota.

}
return a.DiskSize+a.MemSize < b.DiskSize+b.MemSize
})
for _, size := range sizes {
totalDiskSize += size.DiskSize
totalMemSize += size.MemSize
if totalDiskSize+totalMemSize > quota {
if size.IsImporting {
inProgressLargeEngines++
} else {
largeEngines = append(largeEngines, size.UUID)
}
}
}
return
}

// UnsafeImportAndReset forces the backend to import the content of an engine
// into the target and then reset the engine to empty. This method will not
// close the engine. Make sure the engine is flushed manually before calling
// this method.
func (be Backend) UnsafeImportAndReset(ctx context.Context, engineUUID uuid.UUID) error {
// DO NOT call be.abstract.CloseEngine()! The engine should still be writable after
// calling UnsafeImportAndReset().
closedEngine := ClosedEngine{
engine: engine{
backend: be.abstract,
logger: makeLogger("<import-and-reset>", engineUUID),
uuid: engineUUID,
},
}
if err := closedEngine.Import(ctx); err != nil {
return err
}
return be.abstract.ResetEngine(ctx, engineUUID)
}

// OpenEngine opens an engine with the given table name and engine ID.
func (be Backend) OpenEngine(ctx context.Context, tableName string, engineID int32) (*OpenedEngine, error) {
tag, engineUUID := MakeUUID(tableName, engineID)
Expand Down Expand Up @@ -251,16 +341,13 @@ func (engine *OpenedEngine) Close(ctx context.Context) (*ClosedEngine, error) {
}

// Flush current written data for local backend
func (engine *OpenedEngine) Flush() error {
if l, ok := engine.backend.(*local); ok {
return l.Flush(engine.uuid)
}
return nil
func (engine *OpenedEngine) Flush(ctx context.Context) error {
return engine.backend.FlushEngine(ctx, engine.uuid)
}

// WriteRows writes a collection of encoded rows into the engine.
func (engine *OpenedEngine) WriteRows(ctx context.Context, columnNames []string, rows Rows) error {
writer, err := engine.backend.LocalWriter(ctx, engine.uuid)
writer, err := engine.backend.LocalWriter(ctx, engine.uuid, LocalMemoryTableSize)
if err != nil {
return err
}
Expand All @@ -271,8 +358,8 @@ func (engine *OpenedEngine) WriteRows(ctx context.Context, columnNames []string,
return writer.Close()
}

func (engine *OpenedEngine) LocalWriter(ctx context.Context) (*LocalEngineWriter, error) {
w, err := engine.backend.LocalWriter(ctx, engine.uuid)
func (engine *OpenedEngine) LocalWriter(ctx context.Context, maxCacheSize int64) (*LocalEngineWriter, error) {
w, err := engine.backend.LocalWriter(ctx, engine.uuid, maxCacheSize)
if err != nil {
return nil, err
}
Expand Down
82 changes: 78 additions & 4 deletions lightning/backend/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (s *backendSuite) TestWriteEngine(c *C) {
Return(nil)

mockWriter := mock.NewMockEngineWriter(s.controller)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes()
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(mockWriter, nil).AnyTimes()
mockWriter.EXPECT().
AppendRows(ctx, "`db`.`table`", []string{"c1", "c2"}, gomock.Any(), rows1).
Return(nil)
Expand Down Expand Up @@ -153,7 +153,7 @@ func (s *backendSuite) TestWriteToEngineWithNothing(c *C) {
s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil)
writer.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), emptyRows).Return(nil)
writer.EXPECT().Close().Return(nil)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(writer, nil)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(writer, nil)

engine, err := s.backend.OpenEngine(ctx, "`db`.`table`", 1)
c.Assert(err, IsNil)
Expand Down Expand Up @@ -183,7 +183,7 @@ func (s *backendSuite) TestWriteEngineFailed(c *C) {

s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil)
mockWriter := mock.NewMockEngineWriter(s.controller)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes()
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(mockWriter, nil).AnyTimes()
mockWriter.EXPECT().
AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.Annotate(context.Canceled, "fake unrecoverable write error"))
Expand All @@ -204,7 +204,7 @@ func (s *backendSuite) TestWriteBatchSendFailedWithRetry(c *C) {

s.mockBackend.EXPECT().OpenEngine(ctx, gomock.Any()).Return(nil)
mockWriter := mock.NewMockEngineWriter(s.controller)
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any()).Return(mockWriter, nil).AnyTimes()
s.mockBackend.EXPECT().LocalWriter(ctx, gomock.Any(), int64(kv.LocalMemoryTableSize)).Return(mockWriter, nil).AnyTimes()
mockWriter.EXPECT().AppendRows(ctx, gomock.Any(), gomock.Any(), gomock.Any(), rows).
Return(errors.New("fake recoverable write batch error")).
MinTimes(1)
Expand Down Expand Up @@ -304,3 +304,77 @@ func (s *backendSuite) TestNewEncoder(c *C) {
c.Assert(realEncoder, Equals, encoder)
c.Assert(err, IsNil)
}

func (s *backendSuite) TestCheckDiskQuota(c *C) {
s.setUpTest(c)
defer s.tearDownTest()

uuid1 := uuid.MustParse("11111111-1111-1111-1111-111111111111")
uuid3 := uuid.MustParse("33333333-3333-3333-3333-333333333333")
uuid5 := uuid.MustParse("55555555-5555-5555-5555-555555555555")
uuid7 := uuid.MustParse("77777777-7777-7777-7777-777777777777")
uuid9 := uuid.MustParse("99999999-9999-9999-9999-999999999999")

fileSizes := []kv.EngineFileSize{
{
UUID: uuid1,
DiskSize: 1000,
MemSize: 0,
IsImporting: false,
},
{
UUID: uuid3,
DiskSize: 2000,
MemSize: 1000,
IsImporting: true,
},
{
UUID: uuid5,
DiskSize: 1500,
MemSize: 3500,
IsImporting: false,
},
{
UUID: uuid7,
DiskSize: 0,
MemSize: 7000,
IsImporting: true,
},
{
UUID: uuid9,
DiskSize: 4500,
MemSize: 4500,
IsImporting: false,
},
}

s.mockBackend.EXPECT().EngineFileSizes().Return(fileSizes).Times(4)

// No quota exceeded
le, iple, ds, ms := s.backend.CheckDiskQuota(30000)
c.Assert(le, HasLen, 0)
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))

// Quota exceeded, the largest one is out
le, iple, ds, ms = s.backend.CheckDiskQuota(20000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid9})
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))

// Quota exceeded, the importing one should be ranked least priority
le, iple, ds, ms = s.backend.CheckDiskQuota(12000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid5, uuid9})
c.Assert(iple, Equals, 0)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))

// Quota exceeded, the importing ones should not be visible
le, iple, ds, ms = s.backend.CheckDiskQuota(5000)
c.Assert(le, DeepEquals, []uuid.UUID{uuid1, uuid5, uuid9})
c.Assert(iple, Equals, 1)
c.Assert(ds, Equals, int64(9000))
c.Assert(ms, Equals, int64(16000))
}
Loading