From ce46f4f37ce8ba4cf2112f0438b41d667208d9b2 Mon Sep 17 00:00:00 2001 From: GreatRiver <14086886+LeftHandCold@users.noreply.github.com> Date: Thu, 7 Nov 2024 20:33:33 +0800 Subject: [PATCH 1/3] Fix file not found for GC (#19850) --- pkg/vm/engine/tae/db/gc/v3/exec_v1.go | 16 +++++++++++++++- pkg/vm/engine/tae/logtail/snapshot.go | 15 ++++++++++----- 2 files changed, 25 insertions(+), 6 deletions(-) diff --git a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go index 93030aa29a57..bdcecbfaabe1 100644 --- a/pkg/vm/engine/tae/db/gc/v3/exec_v1.go +++ b/pkg/vm/engine/tae/db/gc/v3/exec_v1.go @@ -342,11 +342,25 @@ func MakeFinalCanGCSinker( ctx context.Context, bat *batch.Batch, ) error { clear(buffer) + var dropTSs []types.TS + var tableIDs []uint64 + if bat.Vecs[0].Length() > 0 { + dropTSs = vector.MustFixedColNoTypeCheck[types.TS](bat.Vecs[2]) + tableIDs = vector.MustFixedColNoTypeCheck[uint64](bat.Vecs[4]) + } for i := 0; i < bat.Vecs[0].Length(); i++ { buf := bat.Vecs[0].GetRawBytesAt(i) stats := (*objectio.ObjectStats)(unsafe.Pointer(&buf[0])) name := stats.ObjectName().String() - buffer[name] = struct{}{} + dropTS := dropTSs[i] + tableID := tableIDs[i] + if !dropTS.IsEmpty() { + buffer[name] = struct{}{} + continue + } + if !logtail.IsMoTable(tableID) { + buffer[name] = struct{}{} + } } for name := range buffer { *filesToGC = append(*filesToGC, name) diff --git a/pkg/vm/engine/tae/logtail/snapshot.go b/pkg/vm/engine/tae/logtail/snapshot.go index aee230e114d2..3e9d7d334f4f 100644 --- a/pkg/vm/engine/tae/logtail/snapshot.go +++ b/pkg/vm/engine/tae/logtail/snapshot.go @@ -325,7 +325,7 @@ func (sm *SnapshotMeta) copyTablesLocked() map[uint32]map[uint64]*tableInfo { return tables } -func isMoTable(tid uint64) bool { +func IsMoTable(tid uint64) bool { return tid == catalog2.MO_TABLES_ID } @@ -353,7 +353,7 @@ func (sm *SnapshotMeta) updateTableInfo( stats objectio.ObjectStats, createTS types.TS, deleteTS types.TS, ) { - if !isMoTable(tid) { + if !IsMoTable(tid) { return } if !stats.GetAppendable() { @@ -546,7 +546,10 @@ func (sm *SnapshotMeta) updateTableInfo( for pk, tables := range sm.tablePKIndex { if len(tables) > 1 { - panic(fmt.Sprintf("table %v has more than one entry, tables len %d", pk, len(tables))) + logutil.Warn("UpdateSnapTable-Error", + zap.String("table", pk), + zap.Int("len", len(tables)), + ) } if len(tables) == 0 { continue @@ -1140,7 +1143,9 @@ func (sm *SnapshotMeta) RebuildTableInfo(ins *containers.Batch) { continue } if len(sm.tablePKIndex[pk]) > 0 { - panic(fmt.Sprintf("pk %s already exists, table: %d", pk, tid)) + logutil.Warn("RebuildTableInfo-PK-Exists", + zap.String("pk", pk), + zap.Uint64("table", tid)) } sm.tablePKIndex[pk] = make([]*tableInfo, 1) sm.tablePKIndex[pk][0] = table @@ -1197,7 +1202,7 @@ func (sm *SnapshotMeta) RebuildAObjectDel(ins *containers.Batch) { for i := 0; i < ins.Length(); i++ { commitTs := commitTsVec[i] if _, ok := sm.aobjDelTsMap[commitTs]; ok { - panic(fmt.Sprintf("commitTs %v already exists", commitTs.ToString())) + logutil.Warn("RebuildAObjectDel-Exists", zap.Any("commitTs", commitTs)) } sm.aobjDelTsMap[commitTs] = struct{}{} } From d123a93ae83510cc11c6a0a2f69839441ab7b09c Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Fri, 8 Nov 2024 11:57:42 +0800 Subject: [PATCH 2/3] Add metric for gc --- pkg/util/metric/v2/dashboard/grafana_dashboard_task.go | 2 ++ pkg/vm/engine/tae/db/gc/v3/checkpoint.go | 10 ++++++---- pkg/vm/engine/tae/logtail/storage_usage.go | 4 ++-- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/pkg/util/metric/v2/dashboard/grafana_dashboard_task.go b/pkg/util/metric/v2/dashboard/grafana_dashboard_task.go index 225a4bbec4c6..40f55b108232 100644 --- a/pkg/util/metric/v2/dashboard/grafana_dashboard_task.go +++ b/pkg/util/metric/v2/dashboard/grafana_dashboard_task.go @@ -229,6 +229,7 @@ func (c *DashboardCreator) initTaskStorageUsageRow() dashboard.Option { []string{ c.getMetricWithFilter(`mo_task_short_duration_seconds_bucket`, `type="gckp_collect_usage"`), c.getMetricWithFilter(`mo_task_short_duration_seconds_bucket`, `type="ickp_collect_usage"`), + c.getMetricWithFilter(`mo_task_short_duration_seconds_bucket`, `type="compacted_collect_usage"`), c.getMetricWithFilter(`mo_task_short_duration_seconds_bucket`, `type="handle_usage_request"`), c.getMetricWithFilter(`mo_task_short_duration_seconds_bucket`, `type="show_accounts_get_table_stats"`), c.getMetricWithFilter(`mo_task_short_duration_seconds_bucket`, `type="show_accounts_get_storage_usage"`), @@ -237,6 +238,7 @@ func (c *DashboardCreator) initTaskStorageUsageRow() dashboard.Option { []string{ "gckp_collect_usage", "ickp_collect_usage", + "compacted_collect_usage", "handle_usage_request", "show_accounts_get_table_stats", "show_accounts_get_storage_usage", diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index 11f5f2a8ee6d..9dd477ac623d 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -414,7 +414,8 @@ func (c *checkpointCleaner) Replay() (err error) { ckpData, c.mutation.snapshotMeta, accountSnapshots, - pitrs) + pitrs, + 0) logutil.Info("GC-REPLAY-COLLECT-SNAPSHOT-SIZE", zap.String("task", c.TaskNameLocked()), zap.Int("size", len(accountSnapshots)), @@ -711,6 +712,7 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( memoryBuffer *containers.OneSchemaBatchBuffer, accountSnapshots map[uint32][]types.TS, pitrs *logtail.PitrInfo, + gcFileCount int, ) (err error) { // checkpointLowWaterMark is empty only in the following cases: // 1. no incremental and no gloabl checkpoint @@ -810,7 +812,8 @@ func (c *checkpointCleaner) mergeCheckpointFilesLocked( newCheckpointData, c.mutation.snapshotMeta, accountSnapshots, - pitrs) + pitrs, + gcFileCount) if newCheckpoint == nil { panic("MergeCheckpoint new checkpoint is nil") } @@ -1040,7 +1043,7 @@ func (c *checkpointCleaner) tryGCAgainstGCKPLocked( if waterMark.GT(&scanMark) { waterMark = scanMark } - err = c.mergeCheckpointFilesLocked(&waterMark, memoryBuffer, accountSnapshots, pitrs) + err = c.mergeCheckpointFilesLocked(&waterMark, memoryBuffer, accountSnapshots, pitrs, len(filesToGC)) if err != nil { extraErrMsg = fmt.Sprintf("mergeCheckpointFilesLocked %v failed", waterMark.ToString()) } @@ -1073,7 +1076,6 @@ func (c *checkpointCleaner) doGCAgainstGlobalCheckpointLocked( zap.Duration("soft-gc", softCost), zap.Duration("merge-table", mergeCost), zap.Error(err), - zap.Strings("files-to-gc", filesToGC), zap.String("metafile", metafile), zap.String("extra-err-msg", extraErrMsg), ) diff --git a/pkg/vm/engine/tae/logtail/storage_usage.go b/pkg/vm/engine/tae/logtail/storage_usage.go index a24db9aef2f9..9e2cce2d7353 100644 --- a/pkg/vm/engine/tae/logtail/storage_usage.go +++ b/pkg/vm/engine/tae/logtail/storage_usage.go @@ -1176,13 +1176,13 @@ func FillUsageBatOfCompacted( meta *SnapshotMeta, accountSnapshots map[uint32][]types.TS, pitrs *PitrInfo, + gcFileCount int, ) { - now := time.Now() var memoryUsed float64 usage.EnterProcessing() defer func() { v2.TaskStorageUsageCacheMemUsedGauge.Set(memoryUsed) - v2.TaskCompactedCollectUsageDurationHistogram.Observe(time.Since(now).Seconds()) + v2.TaskCompactedCollectUsageDurationHistogram.Observe(float64(gcFileCount)) usage.LeaveProcessing() }() objects := data.GetObjectBatchs() From 0b0f836fcb744595a9b4c06d715cdce51c6a6714 Mon Sep 17 00:00:00 2001 From: GreatRiver Date: Fri, 8 Nov 2024 19:05:05 +0800 Subject: [PATCH 3/3] Update code --- pkg/vm/engine/tae/db/gc/v3/merge.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/vm/engine/tae/db/gc/v3/merge.go b/pkg/vm/engine/tae/db/gc/v3/merge.go index 191031a73dde..1ebf03dc0237 100644 --- a/pkg/vm/engine/tae/db/gc/v3/merge.go +++ b/pkg/vm/engine/tae/db/gc/v3/merge.go @@ -78,7 +78,8 @@ func MergeCheckpoint( // add checkpoint metafile(ckp/mete_ts-ts.ckp...) to deleteFiles deleteFiles = append(deleteFiles, nameMeta) - + // add checkpoint idx file to deleteFiles + deleteFiles = append(deleteFiles, ckpEntry.GetLocation().Name().String()) locations, err = logtail.LoadCheckpointLocations( ctx, sid, ckpEntry.GetTNLocation(), ckpEntry.GetVersion(), fs) if err != nil {