diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index b5ece2d43d0f..a8724d701db1 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -26,6 +26,7 @@ import ( "time" v2 "github.com/matrixorigin/matrixone/pkg/util/metric/v2" + "go.uber.org/zap" "github.com/matrixorigin/matrixone/pkg/util/fault" @@ -306,17 +307,11 @@ func (r *runner) onGlobalCheckpointEntries(items ...any) { } } if doCheckpoint { - now := time.Now() - entry, err := r.doGlobalCheckpoint(ctx.end, ctx.ckpLSN, ctx.truncateLSN, ctx.interval) - if err != nil { - logutil.Errorf("Global checkpoint %v failed: %v", entry, err) - continue - } - if err := r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil { - logutil.Errorf("Global checkpoint %v failed: %v", entry, err) + if _, err := r.doGlobalCheckpoint( + ctx.end, ctx.ckpLSN, ctx.truncateLSN, ctx.interval, + ); err != nil { continue } - logutil.Infof("%s is done, takes %s", entry.String(), time.Since(now)) } } } @@ -390,34 +385,80 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) { if entry == nil || entry.GetState() != ST_Running { return } - err := r.doIncrementalCheckpoint(entry) - if err != nil { - logutil.Errorf("Do checkpoint %s: %v", entry.String(), err) + var ( + err error + errPhase string + lsnToTruncate uint64 + lsn uint64 + fatal bool + fields []zap.Field + ) + now = time.Now() + + logutil.Info( + "Checkpoint-Start", + zap.String("entry", entry.String()), + ) + + defer func() { + if err != nil { + var logger func(msg string, fields ...zap.Field) + if fatal { + logger = logutil.Fatal + } else { + logger = logutil.Error + } + logger( + "Checkpoint-Error", + zap.String("entry", entry.String()), + zap.Error(err), + zap.String("phase", errPhase), + zap.Duration("cost", time.Since(now)), + ) + } else { + fields = append(fields, zap.Duration("cost", time.Since(now))) + fields = append(fields, zap.Uint64("truncate", lsnToTruncate)) + fields = append(fields, zap.Uint64("lsn", lsn)) + fields = append(fields, zap.Uint64("reserve", r.options.reservedWALEntryCount)) + fields = append(fields, zap.String("entry", entry.String())) + logutil.Info( + "Checkpoint-End", + fields..., + ) + } + }() + + if fields, err = r.doIncrementalCheckpoint(entry); err != nil { + errPhase = "do-ckp" return } - lsn := r.source.GetMaxLSN(entry.start, entry.end) - lsnToTruncate := uint64(0) + + lsn = r.source.GetMaxLSN(entry.start, entry.end) if lsn > r.options.reservedWALEntryCount { lsnToTruncate = lsn - r.options.reservedWALEntryCount } entry.SetLSN(lsn, lsnToTruncate) entry.SetState(ST_Finished) - if err = r.saveCheckpoint(entry.start, entry.end, lsn, lsnToTruncate); err != nil { - logutil.Errorf("Save checkpoint %s: %v", entry.String(), err) + + if err = r.saveCheckpoint( + entry.start, entry.end, lsn, lsnToTruncate, + ); err != nil { + errPhase = "save-ckp" return } - e, err := r.wal.RangeCheckpoint(1, lsnToTruncate) - if err != nil { - panic(err) + var logEntry wal.LogEntry + if logEntry, err = r.wal.RangeCheckpoint(1, lsnToTruncate); err != nil { + errPhase = "wal-ckp" + fatal = true + return } - if err = e.WaitDone(); err != nil { - panic(err) + if err = logEntry.WaitDone(); err != nil { + errPhase = "wait-wal-ckp-done" + fatal = true + return } - logutil.Infof("%s is done, takes %s, truncate %d, checkpoint %d, reserve %d", - entry.String(), time.Since(now), lsnToTruncate, lsn, r.options.reservedWALEntryCount) - r.postCheckpointQueue.Enqueue(entry) r.globalCheckpointQueue.Enqueue(&globalCheckpointContext{ end: entry.end, @@ -503,12 +544,13 @@ func (r *runner) saveCheckpoint(start, end types.TS, ckpLSN, truncateLSN uint64) return } -func (r *runner) doIncrementalCheckpoint(entry *CheckpointEntry) (err error) { +func (r *runner) doIncrementalCheckpoint(entry *CheckpointEntry) (fields []zap.Field, err error) { factory := logtail.IncrementalCheckpointDataFactory(entry.start, entry.end, true, false) data, err := factory(r.catalog) if err != nil { return } + fields = data.ExportStats("") defer data.Close() cnLocation, tnLocation, _, err := data.WriteTo(r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize) if err != nil { @@ -541,25 +583,69 @@ func (r *runner) doCheckpointForBackup(entry *CheckpointEntry) (location string, return } -func (r *runner) doGlobalCheckpoint(end types.TS, ckpLSN, truncateLSN uint64, interval time.Duration) (entry *CheckpointEntry, err error) { +func (r *runner) doGlobalCheckpoint( + end types.TS, ckpLSN, truncateLSN uint64, interval time.Duration, +) (entry *CheckpointEntry, err error) { + var ( + errPhase string + fields []zap.Field + ) + now := time.Now() + entry = NewCheckpointEntry(types.TS{}, end.Next(), ET_Global) entry.ckpLSN = ckpLSN entry.truncateLSN = truncateLSN + + logutil.Info( + "Checkpoint-Start", + zap.String("entry", entry.String()), + ) + + defer func() { + if err != nil { + logutil.Error( + "Checkpoint-Error", + zap.String("entry", entry.String()), + zap.String("phase", errPhase), + zap.Error(err), + zap.Duration("cost", time.Since(now)), + ) + } else { + fields = append(fields, zap.Duration("cost", time.Since(now))) + fields = append(fields, zap.String("entry", entry.String())) + logutil.Info( + "Checkpoint-End", + fields..., + ) + } + }() + factory := logtail.GlobalCheckpointDataFactory(entry.end, interval) data, err := factory(r.catalog) if err != nil { + errPhase = "collect" return } + fields = data.ExportStats("") defer data.Close() - cnLocation, tnLocation, _, err := data.WriteTo(r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize) + cnLocation, tnLocation, _, err := data.WriteTo( + r.rt.Fs.Service, r.options.checkpointBlockRows, r.options.checkpointSize, + ) if err != nil { + errPhase = "flush" return } + entry.SetLocation(cnLocation, tnLocation) r.tryAddNewGlobalCheckpointEntry(entry) entry.SetState(ST_Finished) + if err = r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil { + errPhase = "save" + return + } + perfcounter.Update(r.ctx, func(counter *perfcounter.CounterSet) { counter.TAE.CheckPoint.DoGlobalCheckPoint.Add(1) }) diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index 58b9e5018a4f..bb5c50fcf0af 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -24,6 +24,8 @@ import ( "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" + "go.uber.org/zap" ) type TestRunner interface { @@ -144,22 +146,62 @@ func (r *runner) ForceFlush(ts types.TS, ctx context.Context, forceDuration time } func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error { + now := time.Now() prev := r.MaxCheckpoint() if prev != nil && !prev.IsFinished() { return moerr.NewInternalError(r.ctx, "prev checkpoint not finished") } - start := types.TS{} + + var ( + err error + errPhase string + start types.TS + fatal bool + fields []zap.Field + ) + if prev != nil { start = prev.end.Next() } + entry := NewCheckpointEntry(start, end, ET_Incremental) + logutil.Info( + "Checkpoint-Start-Force", + zap.String("entry", entry.String()), + ) + + defer func() { + if err != nil { + logger := logutil.Error + if fatal { + logger = logutil.Fatal + } + logger( + "Checkpoint-Error-Force", + zap.String("entry", entry.String()), + zap.String("phase", errPhase), + zap.Error(err), + zap.Duration("cost", time.Since(now)), + ) + } else { + fields = append(fields, zap.Duration("cost", time.Since(now))) + fields = append(fields, zap.String("entry", entry.String())) + logutil.Info( + "Checkpoint-End-Force", + fields..., + ) + } + }() + r.storage.Lock() r.storage.entries.Set(entry) - now := time.Now() r.storage.Unlock() - if err := r.doIncrementalCheckpoint(entry); err != nil { + + if fields, err = r.doIncrementalCheckpoint(entry); err != nil { + errPhase = "do-ckp" return err } + var lsn, lsnToTruncate uint64 if truncate { lsn = r.source.GetMaxLSN(entry.start, entry.end) @@ -169,20 +211,28 @@ func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error { entry.ckpLSN = lsn entry.truncateLSN = lsnToTruncate } - if err := r.saveCheckpoint(entry.start, entry.end, lsn, lsnToTruncate); err != nil { + + if err = r.saveCheckpoint( + entry.start, entry.end, lsn, lsnToTruncate, + ); err != nil { + errPhase = "save-ckp" return err } entry.SetState(ST_Finished) + if truncate { - e, err := r.wal.RangeCheckpoint(1, lsnToTruncate) - if err != nil { - panic(err) + var e wal.LogEntry + if e, err = r.wal.RangeCheckpoint(1, lsnToTruncate); err != nil { + errPhase = "wal-ckp" + fatal = true + return err } if err = e.WaitDone(); err != nil { - panic(err) + errPhase = "wait-wal-ckp" + fatal = true + return err } } - logutil.Infof("%s is done, takes %s", entry.String(), time.Since(now)) return nil } @@ -200,7 +250,7 @@ func (r *runner) ForceCheckpointForBackup(end types.TS) (location string, err er r.storage.entries.Set(entry) now := time.Now() r.storage.Unlock() - if err = r.doIncrementalCheckpoint(entry); err != nil { + if _, err = r.doIncrementalCheckpoint(entry); err != nil { return } var lsn, lsnToTruncate uint64 diff --git a/pkg/vm/engine/tae/logtail/utils.go b/pkg/vm/engine/tae/logtail/utils.go index 589240912fa6..d26e8b0b556b 100644 --- a/pkg/vm/engine/tae/logtail/utils.go +++ b/pkg/vm/engine/tae/logtail/utils.go @@ -21,6 +21,7 @@ import ( "time" "github.com/matrixorigin/matrixone/pkg/fileservice" + "go.uber.org/zap" pkgcatalog "github.com/matrixorigin/matrixone/pkg/catalog" "github.com/matrixorigin/matrixone/pkg/common/moerr" @@ -423,6 +424,76 @@ func init() { checkpointDataSchemas_Curr = checkpointDataSchemas_V11 } +func IDXString(idx uint16) string { + switch idx { + case MetaIDX: + return "MetaIDX" + case DBInsertIDX: + return "DBInsertIDX" + case DBInsertTxnIDX: + return "DBInsertTxnIDX" + case DBDeleteIDX: + return "DBDeleteIDX" + case DBDeleteTxnIDX: + return "DBDeleteTxnIDX" + case TBLInsertIDX: + return "TBLInsertIDX" + case TBLInsertTxnIDX: + return "TBLInsertTxnIDX" + case TBLDeleteIDX: + return "TBLDeleteIDX" + case TBLDeleteTxnIDX: + return "TBLDeleteTxnIDX" + case TBLColInsertIDX: + return "TBLColInsertIDX" + case TBLColDeleteIDX: + return "TBLColDeleteIDX" + case SEGInsertIDX: + return "SEGInsertIDX" + case SEGInsertTxnIDX: + return "SEGInsertTxnIDX" + case SEGDeleteIDX: + return "SEGDeleteIDX" + case SEGDeleteTxnIDX: + return "SEGDeleteTxnIDX" + case BLKMetaInsertIDX: + return "BLKMetaInsertIDX" + case BLKMetaInsertTxnIDX: + return "BLKMetaInsertTxnIDX" + case BLKMetaDeleteIDX: + return "BLKMetaDeleteIDX" + case BLKMetaDeleteTxnIDX: + return "BLKMetaDeleteTxnIDX" + + case BLKTNMetaInsertIDX: + return "BLKTNMetaInsertIDX" + case BLKTNMetaInsertTxnIDX: + return "BLKTNMetaInsertTxnIDX" + case BLKTNMetaDeleteIDX: + return "BLKTNMetaDeleteIDX" + case BLKTNMetaDeleteTxnIDX: + return "BLKTNMetaDeleteTxnIDX" + + case BLKCNMetaInsertIDX: + return "BLKCNMetaInsertIDX" + + case TNMetaIDX: + return "TNMetaIDX" + + case StorageUsageInsIDX: + return "StorageUsageInsIDX" + + case ObjectInfoIDX: + return "ObjectInfoIDX" + case TNObjectInfoIDX: + return "TNObjectInfoIDX" + case StorageUsageDelIDX: + return "StorageUsageDelIDX" + default: + return fmt.Sprintf("UnknownIDX(%d)", idx) + } +} + func registerCheckpointDataReferVersion(version uint32, schemas []*catalog.Schema) { var checkpointDataRefer [MaxIDX]*checkpointDataItem for idx, schema := range schemas { @@ -476,7 +547,8 @@ func BackupCheckpointDataFactory(start, end types.TS) func(c *catalog.Catalog) ( func GlobalCheckpointDataFactory( end types.TS, - versionInterval time.Duration) func(c *catalog.Catalog) (*CheckpointData, error) { + versionInterval time.Duration, +) func(c *catalog.Catalog) (*CheckpointData, error) { return func(c *catalog.Catalog) (data *CheckpointData, err error) { collector := NewGlobalCollector(end, versionInterval) defer collector.Close() @@ -2488,6 +2560,26 @@ func (data *CheckpointData) readAll( return } +func (data *CheckpointData) ExportStats(prefix string) []zap.Field { + fields := make([]zap.Field, 0, len(data.bats)+2) + totalSize := 0 + totalRow := 0 + for idx := range data.bats { + if data.bats[idx] == nil || data.bats[idx].Length() == 0 { + continue + } + size := data.bats[idx].Allocated() + rows := data.bats[idx].Length() + totalSize += size + totalRow += rows + fields = append(fields, zap.Int(fmt.Sprintf("%s%s-Size", prefix, IDXString(uint16(idx))), size)) + fields = append(fields, zap.Int(fmt.Sprintf("%s%s-Row", prefix, IDXString(uint16(idx))), rows)) + } + fields = append(fields, zap.Int(fmt.Sprintf("%stotalSize", prefix), totalSize)) + fields = append(fields, zap.Int(fmt.Sprintf("%stotalRow", prefix), totalRow)) + return fields +} + func (data *CheckpointData) Close() { for idx := range data.bats { if data.bats[idx] != nil {