Skip to content

Commit

Permalink
Fix stopped checkpoint 1.1 (#13306)
Browse files Browse the repository at this point in the history
sync #13135 #13218

Approved by: @XuPeng-SH, @sukki37
  • Loading branch information
aptend authored Dec 11, 2023
1 parent c6b76ef commit 9585c1d
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 25 deletions.
11 changes: 9 additions & 2 deletions pkg/vm/engine/tae/logtail/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ func (d *dirtyCollector) tryCompactTree(
}

tbl.Stats.RLock()
if tbl.Stats.LastFlush.GreaterEq(to) {
lastFlush := tbl.Stats.LastFlush
if lastFlush.GreaterEq(to) {
tree.Shrink(id)
tbl.Stats.RUnlock()
continue
Expand Down Expand Up @@ -454,7 +455,13 @@ func (d *dirtyCollector) tryCompactTree(
continue
}
if !blk.IsAppendable() {
found, _ := blk.GetBlockData().HasDeleteIntentsPreparedIn(from, to)
newFrom := from
if lastFlush.Greater(newFrom) {
newFrom = lastFlush
}
// sometimes, delchain is no cleared after flushing table tail.
// the reason is still unknown, but here bumping the check from ts to lastFlush is correct anyway.
found, _ := blk.GetBlockData().HasDeleteIntentsPreparedIn(newFrom, to)
if !found {
dirtySeg.Shrink(bid)
continue
Expand Down
21 changes: 2 additions & 19 deletions pkg/vm/engine/tae/rpc/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/gc"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/handle"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/rpchandle"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
Expand Down Expand Up @@ -356,17 +355,6 @@ func (h *Handle) handleRequests(
req,
&db.WriteResp{},
)
if moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry) && (strings.HasPrefix(req.TableName, "bmsql") || strings.HasPrefix(req.TableName, "sbtest")) {
for _, rreq := range txnCtx.reqs {
if crreq, ok := rreq.(*db.WriteReq); ok {
logutil.Infof("[precommit] dup handle write typ: %v, %d-%s, %s txn: %s",
crreq.Type, crreq.TableID,
crreq.TableName, common.MoBatchToString(crreq.Batch, 3),
txn.String(),
)
}
}
}
write++
default:
err = moerr.NewNotSupported(ctx, "unknown txn request type: %T", req)
Expand Down Expand Up @@ -994,23 +982,18 @@ func (h *Handle) HandleWrite(
)
logutil.Debugf("[precommit] write batch: %s", common.DebugMoBatch(req.Batch))
})
var dbase handle.Database
var tb handle.Relation
defer func() {
common.DoIfDebugEnabled(func() {
logutil.Debugf("[precommit] handle write end txn: %s", txn.String())
})
if err != nil && moerr.IsMoErrCode(err, moerr.ErrDuplicateEntry) && (strings.HasPrefix(req.TableName, "bmsql") || strings.HasPrefix(req.TableName, "sbtest")) {
logutil.Infof("[precommit] dup handle catalog on dup %s ", tb.GetMeta().(*catalog2.TableEntry).PPString(common.PPL1, 0, ""))
}
}()

dbase, err = txn.GetDatabaseByID(req.DatabaseId)
dbase, err := txn.GetDatabaseByID(req.DatabaseId)
if err != nil {
return
}

tb, err = dbase.GetRelationByID(req.TableID)
tb, err := dbase.GetRelationByID(req.TableID)
if err != nil {
return
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/tae/txn/txnimpl/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ func (it *blockIt) GetBlock() handle.Block {
}

func buildBlock(table *txnTable, meta *catalog.BlockEntry) handle.Block {
if meta.GetSegment().GetTable().GetDB().IsSystemDB() {
if isSysTableId(meta.GetSegment().GetTable().ID) {
return newSysBlock(table, meta)
}
return newBlock(table, meta)
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/tae/txn/txnimpl/sysblock.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,9 @@ func FillColumnRow(table *catalog.TableEntry, node *catalog.MVCCNode[*catalog.Ta
}
}

func (blk *txnSysBlock) GetDeltaPersistedTS() types.TS {
return types.TS{}
}
// func (blk *txnSysBlock) GetDeltaPersistedTS() types.TS {
// return types.TS{}.Next()
// }

func (blk *txnSysBlock) getColumnTableVec(
ts types.TS, colIdx int, mp *mpool.MPool,
Expand Down

0 comments on commit 9585c1d

Please sign in to comment.