Skip to content

Commit

Permalink
Pick print more ckp details to 1.1 (#14188)
Browse files Browse the repository at this point in the history
print more checkpoint details

Approved by: @LeftHandCold, @sukki37
  • Loading branch information
XuPeng-SH authored Jan 16, 2024
1 parent 1da5d88 commit d0ffd5d
Show file tree
Hide file tree
Showing 3 changed files with 266 additions and 38 deletions.
140 changes: 113 additions & 27 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2"
"go.uber.org/zap"

"github.com/matrixorigin/matrixone/pkg/util/fault"

Expand Down Expand Up @@ -306,17 +307,11 @@ func (r *runner) onGlobalCheckpointEntries(items ...any) {
}
}
if doCheckpoint {
now := time.Now()
entry, err := r.doGlobalCheckpoint(ctx.end, ctx.ckpLSN, ctx.truncateLSN, ctx.interval)
if err != nil {
logutil.Errorf("Global checkpoint %v failed: %v", entry, err)
continue
}
if err := r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil {
logutil.Errorf("Global checkpoint %v failed: %v", entry, err)
if _, err := r.doGlobalCheckpoint(
ctx.end, ctx.ckpLSN, ctx.truncateLSN, ctx.interval,
); err != nil {
continue
}
logutil.Infof("%s is done, takes %s", entry.String(), time.Since(now))
}
}
}
Expand Down Expand Up @@ -390,34 +385,80 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) {
if entry == nil || entry.GetState() != ST_Running {
return
}
err := r.doIncrementalCheckpoint(entry)
if err != nil {
logutil.Errorf("Do checkpoint %s: %v", entry.String(), err)
var (
err error
errPhase string
lsnToTruncate uint64
lsn uint64
fatal bool
fields []zap.Field
)
now = time.Now()

logutil.Info(
"Checkpoint-Start",
zap.String("entry", entry.String()),
)

defer func() {
if err != nil {
var logger func(msg string, fields ...zap.Field)
if fatal {
logger = logutil.Fatal
} else {
logger = logutil.Error
}
logger(
"Checkpoint-Error",
zap.String("entry", entry.String()),
zap.Error(err),
zap.String("phase", errPhase),
zap.Duration("cost", time.Since(now)),
)
} else {
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.Uint64("truncate", lsnToTruncate))
fields = append(fields, zap.Uint64("lsn", lsn))
fields = append(fields, zap.Uint64("reserve", r.options.reservedWALEntryCount))
fields = append(fields, zap.String("entry", entry.String()))
logutil.Info(
"Checkpoint-End",
fields...,
)
}
}()

if fields, err = r.doIncrementalCheckpoint(entry); err != nil {
errPhase = "do-ckp"
return
}
lsn := r.source.GetMaxLSN(entry.start, entry.end)
lsnToTruncate := uint64(0)

lsn = r.source.GetMaxLSN(entry.start, entry.end)
if lsn > r.options.reservedWALEntryCount {
lsnToTruncate = lsn - r.options.reservedWALEntryCount
}
entry.SetLSN(lsn, lsnToTruncate)
entry.SetState(ST_Finished)
if err = r.saveCheckpoint(entry.start, entry.end, lsn, lsnToTruncate); err != nil {
logutil.Errorf("Save checkpoint %s: %v", entry.String(), err)

if err = r.saveCheckpoint(
entry.start, entry.end, lsn, lsnToTruncate,
); err != nil {
errPhase = "save-ckp"
return
}

e, err := r.wal.RangeCheckpoint(1, lsnToTruncate)
if err != nil {
panic(err)
var logEntry wal.LogEntry
if logEntry, err = r.wal.RangeCheckpoint(1, lsnToTruncate); err != nil {
errPhase = "wal-ckp"
fatal = true
return
}
if err = e.WaitDone(); err != nil {
panic(err)
if err = logEntry.WaitDone(); err != nil {
errPhase = "wait-wal-ckp-done"
fatal = true
return
}

logutil.Infof("%s is done, takes %s, truncate %d, checkpoint %d, reserve %d",
entry.String(), time.Since(now), lsnToTruncate, lsn, r.options.reservedWALEntryCount)

r.postCheckpointQueue.Enqueue(entry)
r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{
end: entry.end,
Expand Down Expand Up @@ -503,12 +544,13 @@ func (r *runner) saveCheckpoint(start, end types.TS, ckpLSN, truncateLSN uint64)
return
}

func (r *runner) doIncrementalCheckpoint(entry *CheckpointEntry) (err error) {
func (r *runner) doIncrementalCheckpoint(entry *CheckpointEntry) (fields []zap.Field, err error) {
factory := logtail.IncrementalCheckpointDataFactory(entry.start, entry.end, true, false)
data, err := factory(r.catalog)
if err != nil {
return
}
fields = data.ExportStats("")
defer data.Close()
cnLocation, tnLocation, _, err := data.WriteTo(r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize)
if err != nil {
Expand Down Expand Up @@ -541,25 +583,69 @@ func (r *runner) doCheckpointForBackup(entry *CheckpointEntry) (location string,
return
}

func (r *runner) doGlobalCheckpoint(end types.TS, ckpLSN, truncateLSN uint64, interval time.Duration) (entry *CheckpointEntry, err error) {
func (r *runner) doGlobalCheckpoint(
end types.TS, ckpLSN, truncateLSN uint64, interval time.Duration,
) (entry *CheckpointEntry, err error) {
var (
errPhase string
fields []zap.Field
)
now := time.Now()

entry = NewCheckpointEntry(types.TS{}, end.Next(), ET_Global)
entry.ckpLSN = ckpLSN
entry.truncateLSN = truncateLSN

logutil.Info(
"Checkpoint-Start",
zap.String("entry", entry.String()),
)

defer func() {
if err != nil {
logutil.Error(
"Checkpoint-Error",
zap.String("entry", entry.String()),
zap.String("phase", errPhase),
zap.Error(err),
zap.Duration("cost", time.Since(now)),
)
} else {
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.String("entry", entry.String()))
logutil.Info(
"Checkpoint-End",
fields...,
)
}
}()

factory := logtail.GlobalCheckpointDataFactory(entry.end, interval)
data, err := factory(r.catalog)
if err != nil {
errPhase = "collect"
return
}
fields = data.ExportStats("")
defer data.Close()

cnLocation, tnLocation, _, err := data.WriteTo(r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize)
cnLocation, tnLocation, _, err := data.WriteTo(
r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize,
)
if err != nil {
errPhase = "flush"
return
}

entry.SetLocation(cnLocation, tnLocation)
r.tryAddNewGlobalCheckpointEntry(entry)
entry.SetState(ST_Finished)

if err = r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil {
errPhase = "save"
return
}

perfcounter.Update(r.ctx, func(counter *perfcounter.CounterSet) {
counter.TAE.CheckPoint.DoGlobalCheckPoint.Add(1)
})
Expand Down
70 changes: 60 additions & 10 deletions pkg/vm/engine/tae/db/checkpoint/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/matrixorigin/matrixone/pkg/util/fault"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal"
"go.uber.org/zap"
)

type TestRunner interface {
Expand Down Expand Up @@ -144,22 +146,62 @@ func (r *runner) ForceFlush(ts types.TS, ctx context.Context, forceDuration time
}

func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error {
now := time.Now()
prev := r.MaxCheckpoint()
if prev != nil && !prev.IsFinished() {
return moerr.NewInternalError(r.ctx, "prev checkpoint not finished")
}
start := types.TS{}

var (
err error
errPhase string
start types.TS
fatal bool
fields []zap.Field
)

if prev != nil {
start = prev.end.Next()
}

entry := NewCheckpointEntry(start, end, ET_Incremental)
logutil.Info(
"Checkpoint-Start-Force",
zap.String("entry", entry.String()),
)

defer func() {
if err != nil {
logger := logutil.Error
if fatal {
logger = logutil.Fatal
}
logger(
"Checkpoint-Error-Force",
zap.String("entry", entry.String()),
zap.String("phase", errPhase),
zap.Error(err),
zap.Duration("cost", time.Since(now)),
)
} else {
fields = append(fields, zap.Duration("cost", time.Since(now)))
fields = append(fields, zap.String("entry", entry.String()))
logutil.Info(
"Checkpoint-End-Force",
fields...,
)
}
}()

r.storage.Lock()
r.storage.entries.Set(entry)
now := time.Now()
r.storage.Unlock()
if err := r.doIncrementalCheckpoint(entry); err != nil {

if fields, err = r.doIncrementalCheckpoint(entry); err != nil {
errPhase = "do-ckp"
return err
}

var lsn, lsnToTruncate uint64
if truncate {
lsn = r.source.GetMaxLSN(entry.start, entry.end)
Expand All @@ -169,20 +211,28 @@ func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error {
entry.ckpLSN = lsn
entry.truncateLSN = lsnToTruncate
}
if err := r.saveCheckpoint(entry.start, entry.end, lsn, lsnToTruncate); err != nil {

if err = r.saveCheckpoint(
entry.start, entry.end, lsn, lsnToTruncate,
); err != nil {
errPhase = "save-ckp"
return err
}
entry.SetState(ST_Finished)

if truncate {
e, err := r.wal.RangeCheckpoint(1, lsnToTruncate)
if err != nil {
panic(err)
var e wal.LogEntry
if e, err = r.wal.RangeCheckpoint(1, lsnToTruncate); err != nil {
errPhase = "wal-ckp"
fatal = true
return err
}
if err = e.WaitDone(); err != nil {
panic(err)
errPhase = "wait-wal-ckp"
fatal = true
return err
}
}
logutil.Infof("%s is done, takes %s", entry.String(), time.Since(now))
return nil
}

Expand All @@ -200,7 +250,7 @@ func (r *runner) ForceCheckpointForBackup(end types.TS) (location string, err er
r.storage.entries.Set(entry)
now := time.Now()
r.storage.Unlock()
if err = r.doIncrementalCheckpoint(entry); err != nil {
if _, err = r.doIncrementalCheckpoint(entry); err != nil {
return
}
var lsn, lsnToTruncate uint64
Expand Down
Loading

0 comments on commit d0ffd5d

Please sign in to comment.