Skip to content

Commit

Permalink
enhance: Datacoord to support prioritization of compaction tasks (mil…
Browse files Browse the repository at this point in the history
…vus-io#36547)

See milvus-io#36550

This PR made 2 changes:

1. Introducing a prioritization mechanism, if
`dataCoord.compaction.taskPrioritizer` is set to `level`, compaction
tasks are always executed as the priority of L0>Mix>Clustering
2. `dataCoord.compaction.maxParallelTaskNum` now controls the
parallelism of executing tasks, not the task number of queue +
executing.

---------

Signed-off-by: Ted Xu <ted.xu@zilliz.com>
  • Loading branch information
tedxu authored Oct 9, 2024
1 parent e1b511a commit 5fc7317
Show file tree
Hide file tree
Showing 6 changed files with 418 additions and 120 deletions.
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ dataCoord:
# This configuration takes effect only when dataCoord.enableCompaction is set as true.
enableAutoCompaction: true
indexBasedCompaction: true
taskPrioritizer: default # compaction task prioritizer, options: [default, level]. Default is FIFO, level is prioritized by level: L0 compactions first, then mix compactions, then major compactions.
rpcTimeout: 10
maxParallelTaskNum: 10
workerMaxParallelTaskNum: 2
Expand Down
162 changes: 63 additions & 99 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ import (
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
Expand Down Expand Up @@ -64,7 +62,6 @@ type compactionPlanContext interface {
var (
errChannelNotWatched = errors.New("channel is not watched")
errChannelInBuffer = errors.New("channel is in buffer")
errCompactionBusy = errors.New("compaction task queue is full")
)

var _ compactionPlanContext = (*compactionPlanHandler)(nil)
Expand All @@ -79,8 +76,7 @@ type compactionInfo struct {
}

type compactionPlanHandler struct {
queueGuard lock.RWMutex
queueTasks map[int64]CompactionTask // planID -> task
queueTasks CompactionQueue

executingGuard lock.RWMutex
executingTasks map[int64]CompactionTask // planID -> task
Expand All @@ -96,8 +92,6 @@ type compactionPlanHandler struct {
stopCh chan struct{}
stopOnce sync.Once
stopWg sync.WaitGroup

taskNumber *atomic.Int32
}

func (c *compactionPlanHandler) getCompactionInfo(triggerID int64) *compactionInfo {
Expand Down Expand Up @@ -168,13 +162,11 @@ func summaryCompactionState(tasks []*datapb.CompactionTask) *compactionInfo {

func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64) int {
cnt := 0
c.queueGuard.RLock()
for _, t := range c.queueTasks {
if t.GetTriggerID() == triggerID {
c.queueTasks.ForEach(func(ct CompactionTask) {
if ct.GetTriggerID() == triggerID {
cnt += 1
}
}
c.queueGuard.RUnlock()
})
c.executingGuard.RLock()
for _, t := range c.executingTasks {
if t.GetTriggerID() == triggerID {
Expand All @@ -185,31 +177,24 @@ func (c *compactionPlanHandler) getCompactionTasksNumBySignalID(triggerID int64)
return cnt
}

func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta, allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
func newCompactionPlanHandler(cluster Cluster, sessions session.DataNodeManager, cm ChannelManager, meta CompactionMeta,
allocator allocator.Allocator, analyzeScheduler *taskScheduler, handler Handler,
) *compactionPlanHandler {
return &compactionPlanHandler{
queueTasks: make(map[int64]CompactionTask),
queueTasks: *NewCompactionQueue(256, getPrioritizer()), // Higher capacity will have better ordering in priority, but consumes more memory.
chManager: cm,
meta: meta,
sessions: sessions,
allocator: allocator,
stopCh: make(chan struct{}),
cluster: cluster,
executingTasks: make(map[int64]CompactionTask),
taskNumber: atomic.NewInt32(0),
analyzeScheduler: analyzeScheduler,
handler: handler,
}
}

func (c *compactionPlanHandler) schedule() []CompactionTask {
c.queueGuard.RLock()
if len(c.queueTasks) == 0 {
c.queueGuard.RUnlock()
return nil
}
c.queueGuard.RUnlock()

l0ChannelExcludes := typeutil.NewSet[string]()
mixChannelExcludes := typeutil.NewSet[string]()
clusterChannelExcludes := typeutil.NewSet[string]()
Expand All @@ -231,42 +216,66 @@ func (c *compactionPlanHandler) schedule() []CompactionTask {
}
c.executingGuard.RUnlock()

var picked []CompactionTask
c.queueGuard.RLock()
defer c.queueGuard.RUnlock()
keys := lo.Keys(c.queueTasks)
sort.SliceStable(keys, func(i, j int) bool {
return keys[i] < keys[j]
})
for _, planID := range keys {
t := c.queueTasks[planID]
excluded := make([]CompactionTask, 0)
defer func() {
// Add back the excluded tasks
for _, t := range excluded {
c.queueTasks.Enqueue(t)
}
}()
selected := make([]CompactionTask, 0)

p := getPrioritizer()
if &c.queueTasks.prioritizer != &p {
c.queueTasks.UpdatePrioritizer(p)
}

c.executingGuard.Lock()
tasksToGo := Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt() - len(c.executingTasks)
c.executingGuard.Unlock()
for len(selected) < tasksToGo && c.queueTasks.Len() > 0 {
t, err := c.queueTasks.Dequeue()
if err != nil {
// Will never go here
return selected
}

switch t.GetType() {
case datapb.CompactionType_Level0DeleteCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) ||
mixChannelExcludes.Contain(t.GetChannel()) {
excluded = append(excluded, t)
continue
}
picked = append(picked, t)
l0ChannelExcludes.Insert(t.GetChannel())
selected = append(selected, t)
case datapb.CompactionType_MixCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) {
excluded = append(excluded, t)
continue
}
picked = append(picked, t)
mixChannelExcludes.Insert(t.GetChannel())
mixLabelExcludes.Insert(t.GetLabel())
selected = append(selected, t)
case datapb.CompactionType_ClusteringCompaction:
if l0ChannelExcludes.Contain(t.GetChannel()) ||
mixLabelExcludes.Contain(t.GetLabel()) ||
clusterLabelExcludes.Contain(t.GetLabel()) {
excluded = append(excluded, t)
continue
}
picked = append(picked, t)
clusterChannelExcludes.Insert(t.GetChannel())
clusterLabelExcludes.Insert(t.GetLabel())
selected = append(selected, t)
}

c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingGuard.Unlock()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc()
}
return picked
return selected
}

func (c *compactionPlanHandler) start() {
Expand Down Expand Up @@ -325,26 +334,6 @@ func (c *compactionPlanHandler) loadMeta() {
}
}

func (c *compactionPlanHandler) doSchedule() {
picked := c.schedule()
if len(picked) > 0 {
c.executingGuard.Lock()
for _, t := range picked {
c.executingTasks[t.GetPlanID()] = t
}
c.executingGuard.Unlock()

c.queueGuard.Lock()
for _, t := range picked {
delete(c.queueTasks, t.GetPlanID())
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Executing).Inc()
}
c.queueGuard.Unlock()

}
}

func (c *compactionPlanHandler) loopSchedule() {
log.Info("compactionPlanHandler start loop schedule")
defer c.stopWg.Done()
Expand All @@ -358,7 +347,7 @@ func (c *compactionPlanHandler) loopSchedule() {
return

case <-scheduleTicker.C:
c.doSchedule()
c.schedule()
}
}
}
Expand Down Expand Up @@ -484,22 +473,20 @@ func (c *compactionPlanHandler) stop() {
}

func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
c.queueGuard.Lock()
for id, task := range c.queueTasks {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel), zap.Any("id", id), zap.Any("task_channel", task.GetChannel()))
log.Info("removing tasks by channel", zap.String("channel", channel))
c.queueTasks.RemoveAll(func(task CompactionTask) bool {
if task.GetChannel() == channel {
log.Info("Compaction handler removing tasks by channel",
zap.String("channel", channel),
zap.Int64("planID", task.GetPlanID()),
zap.Int64("node", task.GetNodeID()),
)
delete(c.queueTasks, id)
c.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Pending).Dec()
return true
}
}
c.queueGuard.Unlock()
return false
})

c.executingGuard.Lock()
for id, task := range c.executingTasks {
log.Info("Compaction handler removing tasks by channel",
Expand All @@ -511,7 +498,6 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
zap.Int64("node", task.GetNodeID()),
)
delete(c.executingTasks, id)
c.taskNumber.Dec()
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", task.GetNodeID()), task.GetType().String(), metrics.Executing).Dec()
}
}
Expand All @@ -521,10 +507,7 @@ func (c *compactionPlanHandler) removeTasksByChannel(channel string) {
func (c *compactionPlanHandler) submitTask(t CompactionTask) {
_, span := otel.Tracer(typeutil.DataCoordRole).Start(context.Background(), fmt.Sprintf("Compaction-%s", t.GetType()))
t.SetSpan(span)
c.queueGuard.Lock()
c.queueTasks[t.GetPlanID()] = t
c.queueGuard.Unlock()
c.taskNumber.Add(1)
c.queueTasks.Enqueue(t)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", NullNodeID), t.GetType().String(), metrics.Pending).Inc()
}

Expand All @@ -535,26 +518,24 @@ func (c *compactionPlanHandler) restoreTask(t CompactionTask) {
c.executingGuard.Lock()
c.executingTasks[t.GetPlanID()] = t
c.executingGuard.Unlock()
c.taskNumber.Add(1)
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Executing).Inc()
}

// getCompactionTask return compaction
func (c *compactionPlanHandler) getCompactionTask(planID int64) CompactionTask {
c.queueGuard.RLock()
t, ok := c.queueTasks[planID]
if ok {
c.queueGuard.RUnlock()
var t CompactionTask = nil
c.queueTasks.ForEach(func(task CompactionTask) {
if task.GetPlanID() == planID {
t = task
}
})
if t != nil {
return t
}
c.queueGuard.RUnlock()

c.executingGuard.RLock()
t, ok = c.executingTasks[planID]
if ok {
c.executingGuard.RUnlock()
return t
}
c.executingGuard.RUnlock()
defer c.executingGuard.RUnlock()
t = c.executingTasks[planID]
return t
}

Expand Down Expand Up @@ -669,7 +650,6 @@ func (c *compactionPlanHandler) checkCompaction() error {
metrics.DataCoordCompactionTaskNum.WithLabelValues(fmt.Sprintf("%d", t.GetNodeID()), t.GetType().String(), metrics.Done).Inc()
}
c.executingGuard.Unlock()
c.taskNumber.Sub(int32(len(finishedTasks)))
return nil
}

Expand Down Expand Up @@ -708,23 +688,7 @@ func (c *compactionPlanHandler) pickShardNode(nodeSlots map[int64]int64, t Compa

// isFull return true if the task pool is full
func (c *compactionPlanHandler) isFull() bool {
return c.getTaskCount() >= Params.DataCoordCfg.CompactionMaxParallelTasks.GetAsInt()
}

func (c *compactionPlanHandler) getTaskCount() int {
return int(c.taskNumber.Load())
}

func (c *compactionPlanHandler) getTasksByState(state datapb.CompactionTaskState) []CompactionTask {
c.queueGuard.RLock()
defer c.queueGuard.RUnlock()
tasks := make([]CompactionTask, 0, len(c.queueTasks))
for _, t := range c.queueTasks {
if t.GetState() == state {
tasks = append(tasks, t)
}
}
return tasks
return c.queueTasks.Len() >= c.queueTasks.capacity
}

func (c *compactionPlanHandler) checkDelay(t CompactionTask) {
Expand Down
Loading

0 comments on commit 5fc7317

Please sign in to comment.