Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

load less CKP entries when replay #14221

Merged
Merged
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 70 additions & 45 deletions pkg/vm/engine/tae/db/checkpoint/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,16 +93,16 @@ func (r *runner) Replay(dataFactory catalog.DataFactory) (
defer bat.Close()
colNames := CheckpointSchema.Attrs()
colTypes := CheckpointSchema.Types()
var CheckpointVersion int
var checkpointVersion int
// in version 1, checkpoint metadata doesn't contain 'version'.
vecLen := len(bats[0].Vecs)
logutil.Infof("checkpoint version: %d, list and load duration: %v", vecLen, time.Since(t0))
if vecLen < CheckpointSchemaColumnCountV1 {
CheckpointVersion = 1
checkpointVersion = 1
} else if vecLen < CheckpointSchemaColumnCountV2 {
CheckpointVersion = 2
checkpointVersion = 2
} else {
CheckpointVersion = 3
checkpointVersion = 3
}
for i := range bats[0].Vecs {
if len(bats) == 0 {
Expand All @@ -119,57 +119,24 @@ func (r *runner) Replay(dataFactory catalog.DataFactory) (
readDuration += time.Since(t0)
datas := make([]*logtail.CheckpointData, bat.Length())

entries := make([]*CheckpointEntry, bat.Length())
entries, maxGlobalEnd := replayCheckpointEntries(bat, checkpointVersion)
emptyFile := make([]*CheckpointEntry, 0)
var emptyFileMu sync.RWMutex
closecbs := make([]func(), 0)
var readCount, applyCount, totalCount int
totalCount = len(entries)
readfn := func(i int, readType uint16) {
start := bat.GetVectorByName(CheckpointAttr_StartTS).Get(i).(types.TS)
end := bat.GetVectorByName(CheckpointAttr_EndTS).Get(i).(types.TS)
cnLoc := objectio.Location(bat.GetVectorByName(CheckpointAttr_MetaLocation).Get(i).([]byte))
typ := ET_Global
if CheckpointVersion > 2 {
typ = EntryType(bat.GetVectorByName(CheckpointAttr_Type).Get(i).(int8))
} else {
isIncremental := bat.GetVectorByName(CheckpointAttr_EntryType).Get(i).(bool)
if isIncremental {
typ = ET_Incremental
}
}
var version uint32
if CheckpointVersion == 1 {
version = logtail.CheckpointVersion1
} else {
version = bat.GetVectorByName(CheckpointAttr_Version).Get(i).(uint32)
}
var tnLoc objectio.Location
if version <= logtail.CheckpointVersion4 {
tnLoc = cnLoc
} else {
tnLoc = objectio.Location(bat.GetVectorByName(CheckpointAttr_AllLocations).Get(i).([]byte))
}
var ckpLSN, truncateLSN uint64
if version >= logtail.CheckpointVersion7 {
ckpLSN = bat.GetVectorByName(CheckpointAttr_CheckpointLSN).Get(i).(uint64)
truncateLSN = bat.GetVectorByName(CheckpointAttr_TruncateLSN).Get(i).(uint64)
}
checkpointEntry := &CheckpointEntry{
start: start,
end: end,
cnLocation: cnLoc,
tnLocation: tnLoc,
state: ST_Finished,
entryType: typ,
version: version,
ckpLSN: ckpLSN,
truncateLSN: truncateLSN,
checkpointEntry := entries[i]
if checkpointEntry.end.Less(maxGlobalEnd) {
return
}
var err2 error
if readType == PrefetchData {
if err2 = checkpointEntry.Prefetch(ctx, r.rt.Fs, datas[i]); err2 != nil {
logutil.Warnf("read %v failed: %v", checkpointEntry.String(), err2)
}
} else if readType == PrefetchMetaIdx {
readCount++
datas[i], err = checkpointEntry.PrefetchMetaIdx(ctx, r.rt.Fs)
if err != nil {
return
Expand Down Expand Up @@ -245,6 +212,7 @@ func (r *runner) Replay(dataFactory catalog.DataFactory) (
if maxGlobal != nil {
logutil.Infof("replay checkpoint %v", maxGlobal)
err = datas[globalIdx].ApplyReplayTo(r.catalog, dataFactory)
applyCount++
if err != nil {
return
}
Expand Down Expand Up @@ -282,6 +250,7 @@ func (r *runner) Replay(dataFactory catalog.DataFactory) (
}
logutil.Infof("replay checkpoint %v", checkpointEntry)
err = datas[i].ApplyReplayTo(r.catalog, dataFactory)
applyCount++
if err != nil {
return
}
Expand Down Expand Up @@ -313,7 +282,10 @@ func (r *runner) Replay(dataFactory catalog.DataFactory) (
logutil.Info("open-tae", common.OperationField("replay"),
common.OperandField("checkpoint"),
common.AnyField("apply cost", applyDuration),
common.AnyField("read cost", readDuration))
common.AnyField("read cost", readDuration),
common.AnyField("total count", totalCount),
common.AnyField("read count", readCount),
common.AnyField("apply count", applyCount))
r.source.Init(maxTs)
return
}
Expand Down Expand Up @@ -394,3 +366,56 @@ func MergeCkpMeta(ctx context.Context, fs fileservice.FileService, cnLocation, t
_, err = writer.WriteEnd(ctx)
return name, err
}

func replayCheckpointEntries(bat *containers.Batch, checkpointVersion int) (entries []*CheckpointEntry, maxGlobalEnd types.TS) {
entries = make([]*CheckpointEntry, bat.Length())
for i := 0; i < bat.Length(); i++ {
start := bat.GetVectorByName(CheckpointAttr_StartTS).Get(i).(types.TS)
end := bat.GetVectorByName(CheckpointAttr_EndTS).Get(i).(types.TS)
cnLoc := objectio.Location(bat.GetVectorByName(CheckpointAttr_MetaLocation).Get(i).([]byte))
typ := ET_Global
if checkpointVersion > 2 {
typ = EntryType(bat.GetVectorByName(CheckpointAttr_Type).Get(i).(int8))
} else {
isIncremental := bat.GetVectorByName(CheckpointAttr_EntryType).Get(i).(bool)
if isIncremental {
typ = ET_Incremental
}
}
var version uint32
if checkpointVersion == 1 {
version = logtail.CheckpointVersion1
} else {
version = bat.GetVectorByName(CheckpointAttr_Version).Get(i).(uint32)
}
var tnLoc objectio.Location
if version <= logtail.CheckpointVersion4 {
tnLoc = cnLoc
} else {
tnLoc = objectio.Location(bat.GetVectorByName(CheckpointAttr_AllLocations).Get(i).([]byte))
}
var ckpLSN, truncateLSN uint64
if version >= logtail.CheckpointVersion7 {
ckpLSN = bat.GetVectorByName(CheckpointAttr_CheckpointLSN).Get(i).(uint64)
truncateLSN = bat.GetVectorByName(CheckpointAttr_TruncateLSN).Get(i).(uint64)
}
checkpointEntry := &CheckpointEntry{
start: start,
end: end,
cnLocation: cnLoc,
tnLocation: tnLoc,
state: ST_Finished,
entryType: typ,
version: version,
ckpLSN: ckpLSN,
truncateLSN: truncateLSN,
}
entries[i] = checkpointEntry
if typ == ET_Global {
if end.Greater(maxGlobalEnd) {
maxGlobalEnd = end
}
}
}
return
}
Loading