Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge delta loc 1.2 dev #17328

Merged
merged 6 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ fmt:

.PHONY: install-static-check-tools
install-static-check-tools:
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | bash -s -- -b $(GOPATH)/bin v1.55.2
@curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | bash -s -- -b $(GOPATH)/bin v1.59.1
@go install github.com/matrixorigin/linter/cmd/molint@latest
@go install github.com/apache/skywalking-eyes/cmd/license-eye@v0.4.0

Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1172,7 +1172,7 @@ func ExecuteBlockFilter(
blk.EntryState = obj.EntryState
blk.CommitTs = obj.CommitTS
if obj.HasDeltaLoc {
deltaLoc, commitTs, ok := snapshot.GetBockDeltaLoc(blk.BlockID)
deltaLoc, commitTs, ok := snapshot.GetBlockDeltaLoc(blk.BlockID)
if ok {
blk.DeltaLoc = deltaLoc
blk.CommitTs = commitTs
Expand Down
2 changes: 1 addition & 1 deletion pkg/vm/engine/disttae/logtailreplay/blocks_iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (p *PartitionState) GetChangedObjsBetween(
return
}

func (p *PartitionState) GetBockDeltaLoc(bid types.Blockid) (objectio.ObjectLocation, types.TS, bool) {
func (p *PartitionState) GetBlockDeltaLoc(bid types.Blockid) (objectio.ObjectLocation, types.TS, bool) {
iter := p.blockDeltas.Copy().Iter()
defer iter.Release()

Expand Down
4 changes: 0 additions & 4 deletions pkg/vm/engine/disttae/logtailreplay/object_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"slices"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/logutil"
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/compute"
)

Expand Down Expand Up @@ -65,7 +64,6 @@ func (o *overlap) Filter(objs []ObjectInfo) []ObjectInfo {
}
o.t = objs[0].SortKeyZoneMap().GetType()
for _, obj := range objs {
obj := obj
o.intervals = append(o.intervals, entryInterval{
min: obj.SortKeyZoneMap().GetMin(),
max: obj.SortKeyZoneMap().GetMax(),
Expand All @@ -81,8 +79,6 @@ func (o *overlap) Filter(objs []ObjectInfo) []ObjectInfo {

set := entrySet{entries: make([]ObjectInfo, 0), maxValue: minValue(o.t)}
for _, interval := range o.intervals {
interval := interval
logutil.Infof("Mergeblocks %v %v", interval.min, interval.max)
if len(set.entries) == 0 || compute.CompareGeneric(set.maxValue, interval.min, o.t) > 0 {
set.add(o.t, interval)
} else if len(set.entries) == 1 {
Expand Down
3 changes: 1 addition & 2 deletions pkg/vm/engine/disttae/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,6 @@ func newCNMergeTask(
blkCnts := make([]int, len(targets))
blkIters := make([]*StatsBlkIter, len(targets))
for i, objInfo := range targets {
objInfo := objInfo
blkCnts[i] = int(objInfo.BlkCnt())

loc := objInfo.ObjectLocation()
Expand Down Expand Up @@ -170,7 +169,7 @@ func (t *cnMergeTask) LoadNextBatch(ctx context.Context, objIdx uint32) (*batch.
blk.EntryState = obj.EntryState
blk.CommitTs = obj.CommitTS
if obj.HasDeltaLoc {
deltaLoc, commitTs, ok := t.state.GetBockDeltaLoc(blk.BlockID)
deltaLoc, commitTs, ok := t.state.GetBlockDeltaLoc(blk.BlockID)
if ok {
blk.DeltaLoc = deltaLoc
blk.CommitTs = commitTs
Expand Down
6 changes: 3 additions & 3 deletions pkg/vm/engine/disttae/txn_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -849,7 +849,7 @@ func (tbl *txnTable) rangesOnePart(
blk.EntryState = obj.EntryState
blk.CommitTs = obj.CommitTS
if obj.HasDeltaLoc {
deltaLoc, commitTs, ok := state.GetBockDeltaLoc(blk.BlockID)
deltaLoc, commitTs, ok := state.GetBlockDeltaLoc(blk.BlockID)
if ok {
blk.DeltaLoc = deltaLoc
blk.CommitTs = commitTs
Expand Down Expand Up @@ -2154,7 +2154,7 @@ func (tbl *txnTable) PKPersistedBetween(
blk.EntryState = obj.EntryState
blk.CommitTs = obj.CommitTS
if obj.HasDeltaLoc {
deltaLoc, commitTs, ok := p.GetBockDeltaLoc(blk.BlockID)
deltaLoc, commitTs, ok := p.GetBlockDeltaLoc(blk.BlockID)
if ok {
blk.DeltaLoc = deltaLoc
blk.CommitTs = commitTs
Expand Down Expand Up @@ -2326,7 +2326,7 @@ func (tbl *txnTable) transferDeletes(
SegmentID: *obj.ObjectShortName().Segmentid(),
}
if obj.HasDeltaLoc {
deltaLoc, commitTs, ok := state.GetBockDeltaLoc(blkInfo.BlockID)
deltaLoc, commitTs, ok := state.GetBlockDeltaLoc(blkInfo.BlockID)
if ok {
blkInfo.DeltaLoc = deltaLoc
blkInfo.CommitTs = commitTs
Expand Down
12 changes: 6 additions & 6 deletions pkg/vm/engine/tae/db/checkpoint/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,9 @@ type tableAndSize struct {
// A: A checkpoint runner organizes and manages all checkpoint-related behaviors. It roughly
// does the following things:
// - Manage the life cycle of all checkpoints and provide some query interfaces.
// - A cron job periodically collects and analyzes dirty blocks, and flushes eligibl dirty
// - A cron job periodically collects and analyzes dirty blocks, and flushes eligible dirty
// blocks to the remote storage
// - The cron job peridically test whether a new checkpoint can be created. If it is not
// - The cron job periodically test whether a new checkpoint can be created. If it is not
// satisfied, it will wait for next trigger. Otherwise, it will start the process of
// creating a checkpoint.

Expand Down Expand Up @@ -155,11 +155,11 @@ type tableAndSize struct {
// 8. Schedule to remove stale checkpoint meta objects

// Q: How to boot from the checkpoints?
// A: When a meta version is created, it contains all information of the previouse version. So we always
// A: When a meta version is created, it contains all information of the previous version. So we always
//
// delete the stale versions when a new version is created. Over time, the number of objects under
// `ckp/` is small.
// 1. List all meta objects under `ckp/`. Get the latest meta object and read all checkpoint informations
// 1. List all meta objects under `ckp/`. Get the latest meta object and read all checkpoint information
// from the meta object.
// 2. Apply the latest global checkpoint
// 3. Apply the incremental checkpoint start from the version right after the global checkpoint to the
Expand Down Expand Up @@ -200,7 +200,7 @@ type runner struct {

ctx context.Context

// logtail sourcer
// logtail source
source logtail.Collector
catalog *catalog.Catalog
rt *dbutils.Runtime
Expand Down Expand Up @@ -404,7 +404,7 @@ func (r *runner) gcCheckpointEntries(ts types.TS) {
func (r *runner) onIncrementalCheckpointEntries(items ...any) {
now := time.Now()
entry := r.MaxCheckpoint()
// In some unit tests, ckp is managed manually, and ckp deletiton (CleanPenddingCheckpoint)
// In some unit tests, ckp is managed manually, and ckp deletion (CleanPendingCheckpoint)
// can be called when the queue still has unexecuted task.
// Add `entry == nil` here as protective codes
if entry == nil || entry.GetState() != ST_Running {
Expand Down
7 changes: 1 addition & 6 deletions pkg/vm/engine/tae/db/merge/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,12 @@ func (e *MergeExecutor) OnExecDone(v any) {
atomic.AddInt64(&e.activeEstimateBytes, -int64(stat.estBytes))
}

func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, policy Policy) {
func (e *MergeExecutor) ExecuteFor(entry *catalog.TableEntry, mobjs []*catalog.ObjectEntry, kind TaskHostKind) {
if e.roundMergeRows*36 /*28 * 1.3 */ > e.transPageLimit/8 {
return
}
e.tableName = fmt.Sprintf("%v-%v", entry.ID, entry.GetLastestSchema().Name)

mobjs, kind := policy.Revise(e.CPUPercent(), int64(e.MemAvailBytes()))
if len(mobjs) < 2 {
return
}

if ActiveCNObj.CheckOverlapOnCNActive(mobjs) {
return
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/vm/engine/tae/db/merge/mod.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
)

var StopMerge atomic.Bool
var DisableDeltaLocMerge atomic.Bool

type CNMergeScheduler interface {
SendMergeTask(ctx context.Context, task *api.MergeTaskEntry) error
Expand Down Expand Up @@ -199,8 +200,8 @@ const (
)

type Policy interface {
OnObject(obj *catalog.ObjectEntry)
Revise(cpu, mem int64) ([]*catalog.ObjectEntry, TaskHostKind)
OnObject(obj *catalog.ObjectEntry, force bool)
Revise(cpu, mem int64, littleFirst bool) ([]*catalog.ObjectEntry, TaskHostKind)
ResetForTable(*catalog.TableEntry)
SetConfig(*catalog.TableEntry, func() txnif.AsyncTxn, any)
GetConfig(*catalog.TableEntry) any
Expand Down
39 changes: 20 additions & 19 deletions pkg/vm/engine/tae/db/merge/policyBasic.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ package merge

import (
"bytes"
"cmp"
"context"
"fmt"
"sort"
"slices"
"sync"

"github.com/matrixorigin/matrixone/pkg/logutil"
Expand Down Expand Up @@ -131,9 +132,7 @@ func (o *customConfigProvider) String() string {
for k := range o.configs {
keys = append(keys, k)
}
sort.Slice(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
slices.SortFunc(keys, func(a, b uint64) int { return cmp.Compare(a, b) })
buf := bytes.Buffer{}
buf.WriteString("customConfigProvider: ")
for _, k := range keys {
Expand Down Expand Up @@ -172,12 +171,12 @@ func NewBasicPolicy() Policy {
}

// impl Policy for Basic
func (o *basic) OnObject(obj *catalog.ObjectEntry) {
func (o *basic) OnObject(obj *catalog.ObjectEntry, force bool) {
rowsLeftOnObj := obj.GetRemainingRows()
osize := obj.GetOriginSize()

iscandidate := func() bool {
// objext with a lot of holes
isCandidate := func() bool {
// object with a lot of holes
if rowsLeftOnObj < obj.GetRows()/2 {
return true
}
Expand All @@ -192,7 +191,7 @@ func (o *basic) OnObject(obj *catalog.ObjectEntry) {
return false
}

if iscandidate() {
if force || isCandidate() {
o.objHeap.pushWithCap(&mItem[*catalog.ObjectEntry]{
row: rowsLeftOnObj,
entry: obj,
Expand Down Expand Up @@ -237,11 +236,17 @@ func (o *basic) GetConfig(tbl *catalog.TableEntry) any {
return r
}

func (o *basic) Revise(cpu, mem int64) ([]*catalog.ObjectEntry, TaskHostKind) {
func (o *basic) Revise(cpu, mem int64, littleFirst bool) ([]*catalog.ObjectEntry, TaskHostKind) {
objs := o.objHeap.finish()
sort.Slice(objs, func(i, j int) bool {
return objs[i].GetRemainingRows() < objs[j].GetRemainingRows()
})
if littleFirst {
slices.SortFunc(objs, func(a, b *catalog.ObjectEntry) int {
return cmp.Compare(a.GetRemainingRows(), b.GetRemainingRows())
})
} else {
slices.SortFunc(objs, func(a, b *catalog.ObjectEntry) int {
return -cmp.Compare(a.GetRemainingRows(), b.GetRemainingRows())
})
}

isStandalone := common.IsStandaloneBoost.Load()
mergeOnDNIfStandalone := !common.ShouldStandaloneCNTakeOver.Load()
Expand Down Expand Up @@ -327,16 +332,12 @@ func (o *basic) controlMem(objs []*catalog.ObjectEntry, mem int64) []*catalog.Ob
}

needPopout := func(ss []*catalog.ObjectEntry) bool {
osize, esize, _ := estimateMergeConsume(ss)
if esize > int(2*mem/3) {
return true
}

if len(ss) <= 2 {
return false
}
// make object averaged size
return osize > int(o.config.MaxOsizeMergedObj)

_, esize, _ := estimateMergeConsume(ss)
return esize > int(2*mem/3)
}
for needPopout(objs) {
objs = objs[:len(objs)-1]
Expand Down
Loading
Loading