diff --git a/cmd/cli/main.go b/cmd/cli/main.go index 6797ebf..6ef4644 100644 --- a/cmd/cli/main.go +++ b/cmd/cli/main.go @@ -48,17 +48,13 @@ func main() { json.Unmarshal([]byte(env), &environ) logrus.Debugln("env", environ) } - brewingTask := &model.Task{ - ForcePull: false, - Env: environ, - Command: []string{ - "brewing-worker", - "-whisperEndpoint", - fmt.Sprintf("%s://%s", cfg.WhisperEndpointSchema, cfg.WhisperEndpoint), - "-videoUrl", - videoUrl, - }, - } + brewingTask := model.NewTask(environ, []string{ + "brewing-worker", + "-whisperEndpoint", + fmt.Sprintf("%s://%s", cfg.WhisperEndpointSchema, cfg.WhisperEndpoint), + "-videoUrl", + videoUrl, + }) d, err := dispatcher.NewTaskDispatcher() if err != nil { diff --git a/collection/asrcollection.go b/collection/asrcollection.go index e41ca57..4bbd587 100644 --- a/collection/asrcollection.go +++ b/collection/asrcollection.go @@ -5,6 +5,7 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "time" ) type AsrCollection struct { @@ -21,32 +22,34 @@ func NewAsrCollection(databaseName string) (*AsrCollection, error) { }, nil } -func (ac *AsrCollection) Add(uniqueId string, asrModel any) error { +func (ac *AsrCollection) Add(asrModel any) error { opt := options.Update().SetUpsert(true) + av := asrModel.(*model.AsrResponse) + av.UpdateAt = time.Now() + filter := bson.M{ - "uniqueId": uniqueId, + "uniqueId": av.UniqueId, } update := bson.M{ - "$set": asrModel, + "$set": av, } - _, err := ac.mc. - UpdateOne(nil, filter, update, opt) + _, err := ac.mc.UpdateOne(nil, filter, update, opt) return err } -func (ac *AsrCollection) Update(uniqueId string, asrModel any) error { - return ac.Add(uniqueId, asrModel) +func (ac *AsrCollection) Update(asrModel any) error { + return ac.Add(asrModel) } func (ac *AsrCollection) Get(uniqueId string) (any, error) { filter := bson.M{ "uniqueId": uniqueId, } - var v model.AsrReponse + var v model.AsrResponse err := ac.mc.FindOne(nil, filter).Decode(&v) return &v, err } @@ -56,7 +59,7 @@ func (ac *AsrCollection) List() (any, error) { if err != nil { return nil, err } - var result []model.AsrReponse + var result []model.AsrResponse err = cursor.All(nil, &result) if err != nil { return nil, err diff --git a/collection/taskcollection.go b/collection/taskcollection.go index 019eafe..143ee27 100644 --- a/collection/taskcollection.go +++ b/collection/taskcollection.go @@ -5,11 +5,12 @@ import ( "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" + "time" ) type Collection interface { - Add(string, any) error - Update(string, any) error + Add(any) error + Update(any) error Get(string) (any, error) List() (any, error) Del(string) error @@ -27,25 +28,27 @@ func NewTaskCollection(databaseName string) (*TaskCollection, error) { return &TaskCollection{mc: r.Database(databaseName).Collection("task")}, nil } -func (tc *TaskCollection) Add(uniqueId string, task any) error { +func (tc *TaskCollection) Add(task any) error { opt := options.Update().SetUpsert(true) + tv := task.(*model.Task) + tv.UpdateAt = time.Now() + filter := bson.M{ - "uniqueId": uniqueId, + "uniqueId": tv.UniqueId, } update := bson.M{ - "$set": task, + "$set": tv, } - _, err := tc.mc. - UpdateOne(nil, filter, update, opt) + _, err := tc.mc.UpdateOne(nil, filter, update, opt) return err } -func (tc *TaskCollection) Update(uniqueId string, job any) error { - return tc.Add(uniqueId, job) +func (tc *TaskCollection) Update(job any) error { + return tc.Add(job) } func (tc *TaskCollection) Get(uniqueId string) (any, error) { diff --git a/controller/asrrepo.go b/controller/asrrepo.go index 2bbc609..34e4743 100644 --- a/controller/asrrepo.go +++ b/controller/asrrepo.go @@ -40,7 +40,7 @@ func (j *asrRepo) Add(c *gin.Context) { }) return } - asr, err := model.ConvertToAsrReponse(t.(*model.Task).Stdout) + asr, err := model.ConvertToAsrResponse(t.(*model.Task).Stdout) if err != nil { c.JSON(http.StatusInternalServerError, gin.H{ "msg": err.Error(), @@ -76,7 +76,7 @@ func (j *asrRepo) Run(c *gin.Context) { if e != nil { msg = e.Error() } - asr := r.(*model.AsrReponse) + asr := r.(*model.AsrResponse) notify.Send(taskId, msg, "brewing", asr.BarkToken, "") }() @@ -116,7 +116,7 @@ func (j *asrRepo) Summary(c *gin.Context) { url = s } } - asr := r.(*model.AsrReponse) + asr := r.(*model.AsrResponse) content := asr.Pretty if c.Query("raw") != "" { diff --git a/controller/chain.go b/controller/chain.go index 4d729c1..0798ad8 100644 --- a/controller/chain.go +++ b/controller/chain.go @@ -90,7 +90,7 @@ func runner(repo *asrRepo, task *model.Task) error { jump: task = v.(*model.Task) - asr, err := model.ConvertToAsrReponse(task.Stdout) + asr, err := model.ConvertToAsrResponse(task.Stdout) if err != nil { return err } diff --git a/dispatcher/openaidispatcher.go b/dispatcher/openaidispatcher.go index 1dc8e1a..3f846b3 100644 --- a/dispatcher/openaidispatcher.go +++ b/dispatcher/openaidispatcher.go @@ -25,12 +25,12 @@ func NewOpenaiDispatcher() (Dispatcher, error) { } func (od *OpenaiDispatcher) Add(taskAny any) error { - task := taskAny.(*model.AsrReponse) - return od.ac.Add(task.UniqueId, task) + task := taskAny.(*model.AsrResponse) + return od.ac.Add(task) } func (od *OpenaiDispatcher) Run(taskAny any) error { - task := taskAny.(*model.AsrReponse) + task := taskAny.(*model.AsrResponse) var errors []string parts := ai.Service.SplitContent(task.MakeContent()) c, e := ai.Service.SummaryParallel(parts) @@ -49,7 +49,7 @@ func (od *OpenaiDispatcher) Run(taskAny any) error { results = append(results, strings.Split(cc, "\n")...) } task.Pretty = results - return od.ac.Update(task.UniqueId, task) + return od.ac.Update(task) } func (od OpenaiDispatcher) Del(id string) error { diff --git a/dispatcher/taskdispatcher.go b/dispatcher/taskdispatcher.go index d75ce0d..7da8c28 100644 --- a/dispatcher/taskdispatcher.go +++ b/dispatcher/taskdispatcher.go @@ -40,7 +40,7 @@ func NewTaskDispatcher() (Dispatcher, error) { func (dd *TaskDispatcher) Add(taskAny any) error { task := taskAny.(*model.Task) - return dd.tc.Add(task.UniqueId, task) + return dd.tc.Add(task) } func (dd *TaskDispatcher) Run(taskAny any) error { @@ -50,7 +50,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error { _, _, err := dd.cli.ImageInspectWithRaw(ctx, imageName) if task.ForcePull || client.IsErrNotFound(err) { task.Status = "ImagePull" - if err = dd.tc.Update(task.UniqueId, task); err != nil { + if err = dd.tc.Update(task); err != nil { return err } @@ -71,7 +71,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error { task.Status = "ContainerCreate" logrus.Debugln(task.UniqueId, "ContainerCreate") - if err = dd.tc.Update(task.UniqueId, task); err != nil { + if err = dd.tc.Update(task); err != nil { return err } resp, err := dd.cli.ContainerCreate(ctx, &container.Config{ @@ -85,7 +85,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error { } task.ContainerId = resp.ID - if err = dd.tc.Update(task.UniqueId, task); err != nil { + if err = dd.tc.Update(task); err != nil { return err } @@ -97,7 +97,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error { task.Status = "ContainerWait" logrus.Debugln(task.UniqueId, resp.ID, "ContainerWait") - if err = dd.tc.Update(task.UniqueId, task); err != nil { + if err = dd.tc.Update(task); err != nil { return err } @@ -134,7 +134,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error { task.ExitCode = inspection.State.ExitCode logrus.Debugln(task.UniqueId, resp.ID, "ContainerInspect") - if err = dd.tc.Update(task.UniqueId, task); err != nil { + if err = dd.tc.Update(task); err != nil { return err } diff --git a/model/asr.go b/model/asr.go index b8f261d..3b30a59 100644 --- a/model/asr.go +++ b/model/asr.go @@ -3,20 +3,28 @@ package model import ( "encoding/json" "github.com/chinaboard/brewing/pkg/whisper" + "time" ) -type AsrReponse struct { +type AsrResponse struct { UniqueId string `json:"uniqueId" bson:"uniqueId"` whisper.AsrResp - Content []string `json:"content" bson:"content"` - Errors []string `json:"errors" bson:"errors"` - Pretty []string `json:"pretty" bson:"pretty"` - BarkToken string `json:"barkToken" bson:"barkToken"` - Name string `json:"name" bson:"name"` + Content []string `json:"content" bson:"content"` + Errors []string `json:"errors" bson:"errors"` + Pretty []string `json:"pretty" bson:"pretty"` + BarkToken string `json:"barkToken" bson:"barkToken"` + Name string `json:"name" bson:"name"` + CreateAt time.Time `json:"createAt" bson:"createAt"` + UpdateAt time.Time `bson:"updateAt" bson:"updateAt"` } -func ConvertToAsrReponse(data string) (*AsrReponse, error) { - var v AsrReponse +func ConvertToAsrResponse(data string) (*AsrResponse, error) { + var v AsrResponse err := json.Unmarshal([]byte(data), &v) + if err != nil { + return nil, err + } + v.CreateAt = time.Now() + v.UpdateAt = v.CreateAt return &v, err } diff --git a/model/task.go b/model/task.go index b105c2e..e05c683 100644 --- a/model/task.go +++ b/model/task.go @@ -3,6 +3,7 @@ package model import ( "crypto/sha1" "fmt" + "time" ) type Task struct { @@ -26,6 +27,9 @@ type Task struct { ContainerId string `json:"containerId" bson:"containerId"` BarkToken string `json:"barkToken" bson:"barkToken"` + + CreateAt time.Time `json:"createAt" bson:"createAt"` + UpdateAt time.Time `bson:"updateAt" bson:"updateAt"` } func (b *Task) Hash() string { @@ -35,3 +39,11 @@ func (b *Task) Hash() string { bs := h.Sum(nil) return fmt.Sprintf("%x", bs) } + +func NewTask(env, command []string) *Task { + return &Task{ + Env: env, + Command: command, + CreateAt: time.Now(), + } +}