Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDC Server support data-dir (#1879) #2070

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
9bc2310
This is an automated cherry-pick of #1879
jinlingchristopher Jun 16, 2021
e62fc5f
fix conflicts.
3AceShowHand Jun 17, 2021
9a68c90
add etcdClient to server, try to fix lack of field.
3AceShowHand Jun 21, 2021
2a3748e
fix server config marshal test.
3AceShowHand Jun 21, 2021
e122742
make check, fix go mod.
3AceShowHand Jun 21, 2021
5fa87d0
fix config test, make ci happy.
3AceShowHand Jun 21, 2021
dce763e
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 21, 2021
6cf3509
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
ed6c501
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
bc8a765
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
15441a8
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
25b0644
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
802009b
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
746efee
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
0f241d4
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
jinlingchristopher Jun 22, 2021
75a5f1d
fix config conflict in marshal config.
3AceShowHand Jun 22, 2021
01b32a6
try to fix tests.
3AceShowHand Jun 22, 2021
2b891b0
fix backend_sorter_test.
3AceShowHand Jun 22, 2021
6683865
Update pkg/config/config.go
amyangfei Jun 23, 2021
bfd16b0
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
amyangfei Jun 23, 2021
df4b94e
revert changes in unified sorter.
3AceShowHand Jun 23, 2021
b010cca
Merge branch 'cherry-pick-1879-to-release-5.0' of https://github.com/…
3AceShowHand Jun 23, 2021
35de946
use owner etcd client to get changefeed related info.
3AceShowHand Jun 23, 2021
1a49ea4
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
ti-chi-bot Jun 23, 2021
f5b62f8
Merge branch 'release-5.0' into cherry-pick-1879-to-release-5.0
ti-chi-bot Jun 23, 2021
3c9032f
Merge branch 'cherry-pick-1879-to-release-5.0' of https://github.com/…
3AceShowHand Jun 23, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions cdc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ type oldProcessor struct {
globalResolvedTs uint64
localResolvedTs uint64
checkpointTs uint64
globalcheckpointTs uint64
globalCheckpointTs uint64
appliedLocalCheckpointTs uint64
flushCheckpointInterval time.Duration

Expand Down Expand Up @@ -241,7 +241,7 @@ func newProcessor(
}

if err == nil {
p.globalcheckpointTs = info.CheckpointTs
p.globalCheckpointTs = info.CheckpointTs
}

for tableID, replicaInfo := range p.status.Tables {
Expand Down Expand Up @@ -679,7 +679,7 @@ func (p *oldProcessor) globalStatusWorker(ctx context.Context) error {
)

updateStatus := func(changefeedStatus *model.ChangeFeedStatus) {
atomic.StoreUint64(&p.globalcheckpointTs, changefeedStatus.CheckpointTs)
atomic.StoreUint64(&p.globalCheckpointTs, changefeedStatus.CheckpointTs)
if lastResolvedTs == changefeedStatus.ResolvedTs &&
lastCheckPointTs == changefeedStatus.CheckpointTs {
return
Expand Down Expand Up @@ -786,15 +786,15 @@ func (p *oldProcessor) addTable(ctx context.Context, tableID int64, replicaInfo
return
}

globalcheckpointTs := atomic.LoadUint64(&p.globalcheckpointTs)
globalCheckpointTs := atomic.LoadUint64(&p.globalCheckpointTs)

if replicaInfo.StartTs < globalcheckpointTs {
// use Warn instead of Panic in case that p.globalcheckpointTs has not been initialized.
if replicaInfo.StartTs < globalCheckpointTs {
// use Warn instead of Panic in case that p.globalCheckpointTs has not been initialized.
// The cdc_state_checker will catch a real inconsistency in integration tests.
log.Warn("addTable: startTs < checkpoint",
util.ZapFieldChangefeed(ctx),
zap.Int64("tableID", tableID),
zap.Uint64("checkpoint", globalcheckpointTs),
zap.Uint64("checkpoint", globalCheckpointTs),
zap.Uint64("startTs", replicaInfo.StartTs))
}

Expand Down
4 changes: 4 additions & 0 deletions cdc/puller/sorter/backend_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,10 @@ func (p *backEndPool) alloc(ctx context.Context) (backEnd, error) {
zap.Int64("table-id", tableID),
zap.String("table-name", tableName))

if err := util.CheckDataDirSatisfied(); err != nil {
return nil, errors.Trace(err)
}

ret, err := newFileBackEnd(fname, &msgPackGenSerde{})
if err != nil {
return nil, errors.Trace(err)
Expand Down
35 changes: 28 additions & 7 deletions cdc/puller/sorter/backend_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"context"
"fmt"
"os"
"path/filepath"
"strconv"
"time"

Expand All @@ -34,10 +35,17 @@ var _ = check.SerialSuites(&backendPoolSuite{})
func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
dataDir := "/tmp/cdc_data"
err := os.MkdirAll(dataDir, 0o755)
c.Assert(err, check.IsNil)

sortDir := filepath.Join(dataDir, config.DefaultSortDir)
err = os.MkdirAll(sortDir, 0o755)
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sortDir
conf.Sorter.MaxMemoryPressure = 90 // 90%
conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G
config.StoreGlobalServerConfig(conf)
Expand All @@ -48,7 +56,7 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
defer cancel()

backEndPool, err := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool(sortDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()
Expand Down Expand Up @@ -103,14 +111,20 @@ func (s *backendPoolSuite) TestBasicFunction(c *check.C) {
func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
defer testleak.AfterTest(c)()

dir := c.MkDir()
err := os.Chmod(dir, 0o311) // no permission to `ls`
dataDir := c.MkDir()
sortDir := filepath.Join(dataDir, config.DefaultSortDir)
err := os.MkdirAll(sortDir, 0o755)
c.Assert(err, check.IsNil)

err = os.Chmod(sortDir, 0o311) // no permission to `ls`
c.Assert(err, check.IsNil)

conf := config.GetGlobalServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sortDir
conf.Sorter.MaxMemoryPressure = 0 // force using files

backEndPool, err := newBackEndPool(dir, "")
backEndPool, err := newBackEndPool(sortDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)
defer backEndPool.terminate()
Expand All @@ -131,10 +145,17 @@ func (s *backendPoolSuite) TestDirectoryBadPermission(c *check.C) {
func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {
defer testleak.AfterTest(c)()

err := os.MkdirAll("/tmp/sorter", 0o755)
dataDir := c.MkDir()
err := os.Chmod(dataDir, 0o755)
c.Assert(err, check.IsNil)

sorterDir := filepath.Join(dataDir, config.DefaultSortDir)
err = os.MkdirAll(sorterDir, 0o755)
c.Assert(err, check.IsNil)

conf := config.GetDefaultServerConfig()
conf.DataDir = dataDir
conf.Sorter.SortDir = sorterDir
conf.Sorter.MaxMemoryPressure = 90 // 90%
conf.Sorter.MaxMemoryConsumption = 16 * 1024 * 1024 * 1024 // 16G
config.StoreGlobalServerConfig(conf)
Expand All @@ -143,7 +164,7 @@ func (s *backendPoolSuite) TestCleanUpSelf(c *check.C) {
c.Assert(err, check.IsNil)
defer failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/memoryPressureInjectPoint") //nolint:errcheck

backEndPool, err := newBackEndPool("/tmp/sorter", "")
backEndPool, err := newBackEndPool(sorterDir, "")
c.Assert(err, check.IsNil)
c.Assert(backEndPool, check.NotNil)

Expand Down
47 changes: 34 additions & 13 deletions cdc/puller/sorter/sorter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@ import (
"context"
"math"
"os"
"path/filepath"
"sync/atomic"
"testing"
"time"

"go.uber.org/zap/zapcore"

_ "net/http/pprof"

"github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
Expand All @@ -32,7 +35,6 @@ import (
"github.com/pingcap/ticdc/pkg/util/testleak"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
_ "net/http/pprof"
)

const (
Expand Down Expand Up @@ -62,18 +64,21 @@ func (s *sorterSuite) TestSorterBasic(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 16 * 1024 * 1024 * 1024,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
Expand All @@ -87,18 +92,21 @@ func (s *sorterSuite) TestSorterCancel(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
Expand Down Expand Up @@ -127,6 +135,11 @@ func testSorter(ctx context.Context, c *check.C, sorter puller.EventSorter, coun
log.Panic("Could not enable failpoint", zap.Error(err))
}

c.Assert(failpoint.Enable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied", ""), check.IsNil)
defer func() {
c.Assert(failpoint.Disable("github.com/pingcap/ticdc/pkg/util/InjectCheckDataDirSatisfied"), check.IsNil)
}()

ctx, cancel := context.WithCancel(ctx)
errg, ctx := errgroup.WithContext(ctx)
errg.Go(func() error {
Expand Down Expand Up @@ -284,16 +297,19 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
defer UnifiedSorterCleanUp()

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 0, // disable memory sort
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)

// enable the failpoint to simulate delays
Expand All @@ -311,7 +327,7 @@ func (s *sorterSuite) TestSorterCancelRestart(c *check.C) {
}()

for i := 0; i < 5; i++ {
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
err = testSorter(ctx, c, sorter, 100000000, true)
Expand All @@ -328,18 +344,21 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) {
defer log.SetLevel(zapcore.InfoLevel)

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand Down Expand Up @@ -375,8 +394,7 @@ func (s *sorterSuite) TestSorterIOError(c *check.C) {
_ = failpoint.Disable("github.com/pingcap/ticdc/cdc/puller/sorter/InjectErrorBackEndWrite")
}()

// recreate the sorter
sorter, err = NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err = NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

finishedCh = make(chan struct{})
Expand All @@ -402,18 +420,21 @@ func (s *sorterSuite) TestSorterErrorReportCorrect(c *check.C) {
defer log.SetLevel(zapcore.InfoLevel)

conf := config.GetDefaultServerConfig()
conf.DataDir = "/tmp/cdc_data"
sortDir := filepath.Join(conf.DataDir, config.DefaultSortDir)
conf.Sorter = &config.SorterConfig{
NumConcurrentWorker: 8,
ChunkSizeLimit: 1 * 1024 * 1024 * 1024,
MaxMemoryPressure: 60,
MaxMemoryConsumption: 0,
NumWorkerPoolGoroutine: 4,
SortDir: sortDir,
}
config.StoreGlobalServerConfig(conf)

err := os.MkdirAll("/tmp/sorter", 0o755)
err := os.MkdirAll(conf.Sorter.SortDir, 0o755)
c.Assert(err, check.IsNil)
sorter, err := NewUnifiedSorter("/tmp/sorter", "test-cf", "test", 0, "0.0.0.0:0")
sorter, err := NewUnifiedSorter(conf.Sorter.SortDir, "test-cf", "test", 0, "0.0.0.0:0")
c.Assert(err, check.IsNil)

ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
Expand Down
Loading