Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support disabling automatic task startup when starting a service #165

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion server/cdc_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,12 @@ func (e *MetaCDC) ReloadTask() {

metrics.TaskNumVec.Add(taskInfo.TaskID, taskInfo.State)
metrics.TaskStateVec.WithLabelValues(taskInfo.TaskID).Set(float64(taskInfo.State))
if taskInfo.DisableAutoStart {
if taskInfo.State != meta.TaskStatePaused {
_ = e.pauseTaskWithReason(taskInfo.TaskID, "the task is disabled auto start", []meta.TaskState{})
}
continue
}
if err := e.startInternal(taskInfo, taskInfo.State == meta.TaskStateRunning); err != nil {
log.Warn("fail to start the task", zap.Any("task_info", taskInfo), zap.Error(err))
_ = e.pauseTaskWithReason(taskInfo.TaskID, "fail to start task, err: "+err.Error(), []meta.TaskState{})
Expand Down Expand Up @@ -441,6 +447,7 @@ func (e *MetaCDC) Create(req *request.CreateRequest) (resp *request.CreateRespon
ExcludeCollections: excludeCollectionNames,
WriterCacheConfig: req.BufferConfig,
State: meta.TaskStateInitial,
DisableAutoStart: req.DisableAutoStart,
}
for _, collectionInfo := range req.CollectionInfos {
positions := make(map[string]*meta.PositionInfo, len(collectionInfo.Positions))
Expand Down Expand Up @@ -549,7 +556,7 @@ func (e *MetaCDC) validCreateRequest(req *request.CreateRequest) error {
if isMilvusEmpty && kafkaConnectParam.Address == "" {
return servererror.NewClientError("the downstream address is empty")
} else if !isMilvusEmpty && kafkaConnectParam.Address != "" {
return servererror.NewClientError("dont support milvus and kafka at the same time now")
return servererror.NewClientError("don't support milvus and kafka at the same time now")
}

if !isMilvusEmpty {
Expand Down Expand Up @@ -1214,6 +1221,7 @@ func (e *MetaCDC) pauseTaskWithReason(taskID, reason string, currentStates []met
}
cdcTask.State = meta.TaskStatePaused
cdcTask.Reason = reason
metrics.TaskStateVec.WithLabelValues(cdcTask.TaskID).Set(float64(cdcTask.State))
e.cdcTasks.Unlock()

var uKey string
Expand Down
1 change: 1 addition & 0 deletions server/model/meta/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ type TaskInfo struct {
RPCRequestChannelInfo model.ChannelInfo
ExtraInfo model.ExtraInfo
ExcludeCollections []string // it's used for the `*` collection name
DisableAutoStart bool
State TaskState
Reason string
}
Expand Down
1 change: 1 addition & 0 deletions server/model/request/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type CreateRequest struct {
ExtraInfo model.ExtraInfo `json:"extra_info" mapstructure:"extra_info"`
BufferConfig model.BufferConfig `json:"buffer_config" mapstructure:"buffer_config"`
NameMapping []model.NameMapping `json:"name_mapping" mapstructure:"name_mapping"`
DisableAutoStart bool `json:"disable_auto_start" mapstructure:"disable_auto_start"`
// Deprecated
Positions map[string]string `json:"positions" mapstructure:"positions"`
}
Expand Down
Loading