From 9585c1dbb9fb24604b328a2301bc8be4a0670baa Mon Sep 17 00:00:00 2001 From: aptend <49832303+aptend@users.noreply.github.com> Date: Mon, 11 Dec 2023 08:44:01 +0800 Subject: [PATCH] Fix stopped checkpoint 1.1 (#13306) sync #13135 #13218 Approved by: @XuPeng-SH, @sukki37 --- pkg/vm/engine/tae/logtail/collector.go | 11 +++++++++-- pkg/vm/engine/tae/rpc/handle.go | 21 ++------------------- pkg/vm/engine/tae/txn/txnimpl/block.go | 2 +- pkg/vm/engine/tae/txn/txnimpl/sysblock.go | 6 +++--- 4 files changed, 15 insertions(+), 25 deletions(-) diff --git a/pkg/vm/engine/tae/logtail/collector.go b/pkg/vm/engine/tae/logtail/collector.go index e6c3c2a75710..9db672705db6 100644 --- a/pkg/vm/engine/tae/logtail/collector.go +++ b/pkg/vm/engine/tae/logtail/collector.go @@ -407,7 +407,8 @@ func (d *dirtyCollector) tryCompactTree( } tbl.Stats.RLock() - if tbl.Stats.LastFlush.GreaterEq(to) { + lastFlush := tbl.Stats.LastFlush + if lastFlush.GreaterEq(to) { tree.Shrink(id) tbl.Stats.RUnlock() continue @@ -454,7 +455,13 @@ func (d *dirtyCollector) tryCompactTree( continue } if !blk.IsAppendable() { - found, _ := blk.GetBlockData().HasDeleteIntentsPreparedIn(from, to) + newFrom := from + if lastFlush.Greater(newFrom) { + newFrom = lastFlush + } + // sometimes, delchain is no cleared after flushing table tail. + // the reason is still unknown, but here bumping the check from ts to lastFlush is correct anyway. + found, _ := blk.GetBlockData().HasDeleteIntentsPreparedIn(newFrom, to) if !found { dirtySeg.Shrink(bid) continue diff --git a/pkg/vm/engine/tae/rpc/handle.go b/pkg/vm/engine/tae/rpc/handle.go index 2ab9fdc65bea..7f851831d075 100644 --- a/pkg/vm/engine/tae/rpc/handle.go +++ b/pkg/vm/engine/tae/rpc/handle.go @@ -46,7 +46,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/gc" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/rpchandle" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" @@ -356,17 +355,6 @@ func (h *Handle) handleRequests( req, &db.WriteResp{}, ) - if moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry) && (strings.HasPrefix(req.TableName, "bmsql") || strings.HasPrefix(req.TableName, "sbtest")) { - for _, rreq := range txnCtx.reqs { - if crreq, ok := rreq.(*db.WriteReq); ok { - logutil.Infof("[precommit] dup handle write typ: %v, %d-%s, %s txn: %s", - crreq.Type, crreq.TableID, - crreq.TableName, common.MoBatchToString(crreq.Batch, 3), - txn.String(), - ) - } - } - } write++ default: err = moerr.NewNotSupported(ctx, "unknown txn request type: %T", req) @@ -994,23 +982,18 @@ func (h *Handle) HandleWrite( ) logutil.Debugf("[precommit] write batch: %s", common.DebugMoBatch(req.Batch)) }) - var dbase handle.Database - var tb handle.Relation defer func() { common.DoIfDebugEnabled(func() { logutil.Debugf("[precommit] handle write end txn: %s", txn.String()) }) - if err != nil && moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry) && (strings.HasPrefix(req.TableName, "bmsql") || strings.HasPrefix(req.TableName, "sbtest")) { - logutil.Infof("[precommit] dup handle catalog on dup %s ", tb.GetMeta().(*catalog2.TableEntry).PPString(common.PPL1, 0, "")) - } }() - dbase, err = txn.GetDatabaseByID(req.DatabaseId) + dbase, err := txn.GetDatabaseByID(req.DatabaseId) if err != nil { return } - tb, err = dbase.GetRelationByID(req.TableID) + tb, err := dbase.GetRelationByID(req.TableID) if err != nil { return } diff --git a/pkg/vm/engine/tae/txn/txnimpl/block.go b/pkg/vm/engine/tae/txn/txnimpl/block.go index 97669488ad5e..840bb66d6447 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/block.go +++ b/pkg/vm/engine/tae/txn/txnimpl/block.go @@ -136,7 +136,7 @@ func (it *blockIt) GetBlock() handle.Block { } func buildBlock(table *txnTable, meta *catalog.BlockEntry) handle.Block { - if meta.GetSegment().GetTable().GetDB().IsSystemDB() { + if isSysTableId(meta.GetSegment().GetTable().ID) { return newSysBlock(table, meta) } return newBlock(table, meta) diff --git a/pkg/vm/engine/tae/txn/txnimpl/sysblock.go b/pkg/vm/engine/tae/txn/txnimpl/sysblock.go index 9c0191b7d9cd..d5208034955c 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/sysblock.go +++ b/pkg/vm/engine/tae/txn/txnimpl/sysblock.go @@ -218,9 +218,9 @@ func FillColumnRow(table *catalog.TableEntry, node *catalog.MVCCNode[*catalog.Ta } } -func (blk *txnSysBlock) GetDeltaPersistedTS() types.TS { - return types.TS{} -} +// func (blk *txnSysBlock) GetDeltaPersistedTS() types.TS { +// return types.TS{}.Next() +// } func (blk *txnSysBlock) getColumnTableVec( ts types.TS, colIdx int, mp *mpool.MPool,