Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pick print more ckp details to 1.1 #14188

Merged
merged 2 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 113 additions & 27 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/util/fault"

Expand Down Expand Up @@ -306,17 +307,11 @@ func (r *runner) onGlobalCheckpointEntries(items ...any) {
}
}
if doCheckpoint {
now := time.Now()
entry, err := r.doGlobalCheckpoint(ctx.end, ctx.ckpLSN, ctx.truncateLSN, ctx.interval)
if err != nil {
logutil.Errorf("Global checkpoint %v failed: %v", entry, err)
continue
}
if err := r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil {
logutil.Errorf("Global checkpoint %v failed: %v", entry, err)
if _, err := r.doGlobalCheckpoint(
ctx.end, ctx.ckpLSN, ctx.truncateLSN, ctx.interval,
); err != nil {
continue
}
logutil.Infof("%s is done, takes %s", entry.String(), time.Since(now))
}
}
}
Expand Down Expand Up @@ -390,34 +385,80 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) {
if entry == nil || entry.GetState() != ST_Running {
return
}
err := r.doIncrementalCheckpoint(entry)
if err != nil {
logutil.Errorf("Do checkpoint %s: %v", entry.String(), err)
var (
err error
errPhase string
lsnToTruncate uint64
lsn uint64
fatal bool
fields []zap.Field
)
now = time.Now()

logutil.Info(
"Checkpoint-Start",
zap.String("entry", entry.String()),
)

defer func() {
if err != nil {
var logger func(msg string, fields ...zap.Field)
if fatal {
logger = logutil.Fatal
} else {
logger = logutil.Error
}
logger(
"Checkpoint-Error",
zap.String("entry", entry.String()),
zap.Error(err),
zap.String("phase", errPhase),
zap.Duration("cost", time.Since(now)),
)
} else {
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.Uint64("truncate", lsnToTruncate))
fields = append(fields, zap.Uint64("lsn", lsn))
fields = append(fields, zap.Uint64("reserve", r.options.reservedWALEntryCount))
fields = append(fields, zap.String("entry", entry.String()))
logutil.Info(
"Checkpoint-End",
fields...,
)
}
}()

if fields, err = r.doIncrementalCheckpoint(entry); err != nil {
errPhase = "do-ckp"
return
}
lsn := r.source.GetMaxLSN(entry.start, entry.end)
lsnToTruncate := uint64(0)

lsn = r.source.GetMaxLSN(entry.start, entry.end)
if lsn > r.options.reservedWALEntryCount {
lsnToTruncate = lsn - r.options.reservedWALEntryCount
}
entry.SetLSN(lsn, lsnToTruncate)
entry.SetState(ST_Finished)
if err = r.saveCheckpoint(entry.start, entry.end, lsn, lsnToTruncate); err != nil {
logutil.Errorf("Save checkpoint %s: %v", entry.String(), err)

if err = r.saveCheckpoint(
entry.start, entry.end, lsn, lsnToTruncate,
); err != nil {
errPhase = "save-ckp"
return
}

e, err := r.wal.RangeCheckpoint(1, lsnToTruncate)
if err != nil {
panic(err)
var logEntry wal.LogEntry
if logEntry, err = r.wal.RangeCheckpoint(1, lsnToTruncate); err != nil {
errPhase = "wal-ckp"
fatal = true
return
}
if err = e.WaitDone(); err != nil {
panic(err)
if err = logEntry.WaitDone(); err != nil {
errPhase = "wait-wal-ckp-done"
fatal = true
return
}

logutil.Infof("%s is done, takes %s, truncate %d, checkpoint %d, reserve %d",
entry.String(), time.Since(now), lsnToTruncate, lsn, r.options.reservedWALEntryCount)

r.postCheckpointQueue.Enqueue(entry)
r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{
end: entry.end,
Expand Down Expand Up @@ -503,12 +544,13 @@ func (r *runner) saveCheckpoint(start, end types.TS, ckpLSN, truncateLSN uint64)
return
}

func (r *runner) doIncrementalCheckpoint(entry *CheckpointEntry) (err error) {
func (r *runner) doIncrementalCheckpoint(entry *CheckpointEntry) (fields []zap.Field, err error) {
factory := logtail.IncrementalCheckpointDataFactory(entry.start, entry.end, true)
data, err := factory(r.catalog)
if err != nil {
return
}
fields = data.ExportStats("")
defer data.Close()
cnLocation, tnLocation, _, err := data.WriteTo(r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize)
if err != nil {
Expand Down Expand Up @@ -541,25 +583,69 @@ func (r *runner) doCheckpointForBackup(entry *CheckpointEntry) (location string,
return
}

func (r *runner) doGlobalCheckpoint(end types.TS, ckpLSN, truncateLSN uint64, interval time.Duration) (entry *CheckpointEntry, err error) {
func (r *runner) doGlobalCheckpoint(
end types.TS, ckpLSN, truncateLSN uint64, interval time.Duration,
) (entry *CheckpointEntry, err error) {
var (
errPhase string
fields []zap.Field
)
now := time.Now()

entry = NewCheckpointEntry(types.TS{}, end.Next(), ET_Global)
entry.ckpLSN = ckpLSN
entry.truncateLSN = truncateLSN

logutil.Info(
"Checkpoint-Start",
zap.String("entry", entry.String()),
)

defer func() {
if err != nil {
logutil.Error(
"Checkpoint-Error",
zap.String("entry", entry.String()),
zap.String("phase", errPhase),
zap.Error(err),
zap.Duration("cost", time.Since(now)),
)
} else {
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.String("entry", entry.String()))
logutil.Info(
"Checkpoint-End",
fields...,
)
}
}()

factory := logtail.GlobalCheckpointDataFactory(entry.end, interval)
data, err := factory(r.catalog)
if err != nil {
errPhase = "collect"
return
}
fields = data.ExportStats("")
defer data.Close()

cnLocation, tnLocation, _, err := data.WriteTo(r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize)
cnLocation, tnLocation, _, err := data.WriteTo(
r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize,
)
if err != nil {
errPhase = "flush"
return
}

entry.SetLocation(cnLocation, tnLocation)
r.tryAddNewGlobalCheckpointEntry(entry)
entry.SetState(ST_Finished)

if err = r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil {
errPhase = "save"
return
}

perfcounter.Update(r.ctx, func(counter *perfcounter.CounterSet) {
counter.TAE.CheckPoint.DoGlobalCheckPoint.Add(1)
})
Expand Down
70 changes: 60 additions & 10 deletions pkg/vm/engine/tae/db/checkpoint/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/util/fault"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"go.uber.org/zap"
)

type TestRunner interface {
Expand Down Expand Up @@ -144,22 +146,62 @@ func (r *runner) ForceFlush(ts types.TS, ctx context.Context, forceDuration time
}

func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error {
now := time.Now()
prev := r.MaxCheckpoint()
if prev != nil && !prev.IsFinished() {
return moerr.NewInternalError(r.ctx, "prev checkpoint not finished")
}
start := types.TS{}

var (
err error
errPhase string
start types.TS
fatal bool
fields []zap.Field
)

if prev != nil {
start = prev.end.Next()
}

entry := NewCheckpointEntry(start, end, ET_Incremental)
logutil.Info(
"Checkpoint-Start-Force",
zap.String("entry", entry.String()),
)

defer func() {
if err != nil {
logger := logutil.Error
if fatal {
logger = logutil.Fatal
}
logger(
"Checkpoint-Error-Force",
zap.String("entry", entry.String()),
zap.String("phase", errPhase),
zap.Error(err),
zap.Duration("cost", time.Since(now)),
)
} else {
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.String("entry", entry.String()))
logutil.Info(
"Checkpoint-End-Force",
fields...,
)
}
}()

r.storage.Lock()
r.storage.entries.Set(entry)
now := time.Now()
r.storage.Unlock()
if err := r.doIncrementalCheckpoint(entry); err != nil {

if fields, err = r.doIncrementalCheckpoint(entry); err != nil {
errPhase = "do-ckp"
return err
}

var lsn, lsnToTruncate uint64
if truncate {
lsn = r.source.GetMaxLSN(entry.start, entry.end)
Expand All @@ -169,20 +211,28 @@ func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error {
entry.ckpLSN = lsn
entry.truncateLSN = lsnToTruncate
}
if err := r.saveCheckpoint(entry.start, entry.end, lsn, lsnToTruncate); err != nil {

if err = r.saveCheckpoint(
entry.start, entry.end, lsn, lsnToTruncate,
); err != nil {
errPhase = "save-ckp"
return err
}
entry.SetState(ST_Finished)

if truncate {
e, err := r.wal.RangeCheckpoint(1, lsnToTruncate)
if err != nil {
panic(err)
var e wal.LogEntry
if e, err = r.wal.RangeCheckpoint(1, lsnToTruncate); err != nil {
errPhase = "wal-ckp"
fatal = true
return err
}
if err = e.WaitDone(); err != nil {
panic(err)
errPhase = "wait-wal-ckp"
fatal = true
return err
}
}
logutil.Infof("%s is done, takes %s", entry.String(), time.Since(now))
return nil
}

Expand All @@ -200,7 +250,7 @@ func (r *runner) ForceCheckpointForBackup(end types.TS) (location string, err er
r.storage.entries.Set(entry)
now := time.Now()
r.storage.Unlock()
if err = r.doIncrementalCheckpoint(entry); err != nil {
if _, err = r.doIncrementalCheckpoint(entry); err != nil {
return
}
var lsn, lsnToTruncate uint64
Expand Down
Loading
Loading