Skip to content

Commit

Permalink
support disabling automatic task startup when starting a service
Browse files Browse the repository at this point in the history
  • Loading branch information
SimFG committed Dec 2, 2024
1 parent dbd13f5 commit 94fccf0
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 1 deletion.
10 changes: 9 additions & 1 deletion server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (e *MetaCDC) ReloadTask() {

metrics.TaskNumVec.Add(taskInfo.TaskID, taskInfo.State)
metrics.TaskStateVec.WithLabelValues(taskInfo.TaskID).Set(float64(taskInfo.State))
if taskInfo.DisableAutoStart {
if taskInfo.State != meta.TaskStatePaused {
_ = e.pauseTaskWithReason(taskInfo.TaskID, "the task is disabled auto start", []meta.TaskState{})
}
continue
}
if err := e.startInternal(taskInfo, taskInfo.State == meta.TaskStateRunning); err != nil {
log.Warn("fail to start the task", zap.Any("task_info", taskInfo), zap.Error(err))
_ = e.pauseTaskWithReason(taskInfo.TaskID, "fail to start task, err: "+err.Error(), []meta.TaskState{})
Expand Down Expand Up @@ -441,6 +447,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
ExcludeCollections: excludeCollectionNames,
WriterCacheConfig: req.BufferConfig,
State: meta.TaskStateInitial,
DisableAutoStart: req.DisableAutoStart,
}
for _, collectionInfo := range req.CollectionInfos {
positions := make(map[string]*meta.PositionInfo, len(collectionInfo.Positions))
Expand Down Expand Up @@ -549,7 +556,7 @@ func (e *MetaCDC) validCreateRequest(req *request.CreateRequest) error {
if isMilvusEmpty && kafkaConnectParam.Address == "" {
return servererror.NewClientError("the downstream address is empty")
} else if !isMilvusEmpty && kafkaConnectParam.Address != "" {
return servererror.NewClientError("dont support milvus and kafka at the same time now")
return servererror.NewClientError("don't support milvus and kafka at the same time now")
}

if !isMilvusEmpty {
Expand Down Expand Up @@ -1214,6 +1221,7 @@ func (e *MetaCDC) pauseTaskWithReason(taskID, reason string, currentStates []met
}
cdcTask.State = meta.TaskStatePaused
cdcTask.Reason = reason
metrics.TaskStateVec.WithLabelValues(cdcTask.TaskID).Set(float64(cdcTask.State))
e.cdcTasks.Unlock()

var uKey string
Expand Down
1 change: 1 addition & 0 deletions server/model/meta/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type TaskInfo struct {
RPCRequestChannelInfo model.ChannelInfo
ExtraInfo model.ExtraInfo
ExcludeCollections []string // it's used for the `*` collection name
DisableAutoStart bool
State TaskState
Reason string
}
Expand Down
1 change: 1 addition & 0 deletions server/model/request/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CreateRequest struct {
ExtraInfo model.ExtraInfo `json:"extra_info" mapstructure:"extra_info"`
BufferConfig model.BufferConfig `json:"buffer_config" mapstructure:"buffer_config"`
NameMapping []model.NameMapping `json:"name_mapping" mapstructure:"name_mapping"`
DisableAutoStart bool `json:"disable_auto_start" mapstructure:"disable_auto_start"`
// Deprecated
Positions map[string]string `json:"positions" mapstructure:"positions"`
}
Expand Down

0 comments on commit 94fccf0

Please sign in to comment.