Skip to content

Commit

Permalink
modify service task fields
Browse files Browse the repository at this point in the history
  • Loading branch information
chinaboard committed Oct 4, 2023
1 parent 0bd386c commit db21ff5
Show file tree
Hide file tree
Showing 9 changed files with 73 additions and 51 deletions.
18 changes: 7 additions & 11 deletions cmd/cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,13 @@ func main() {
json.Unmarshal([]byte(env), &environ)
logrus.Debugln("env", environ)
}
brewingTask := &model.Task{
ForcePull: false,
Env: environ,
Command: []string{
"brewing-worker",
"-whisperEndpoint",
fmt.Sprintf("%s://%s", cfg.WhisperEndpointSchema, cfg.WhisperEndpoint),
"-videoUrl",
videoUrl,
},
}
brewingTask := model.NewTask(environ, []string{
"brewing-worker",
"-whisperEndpoint",
fmt.Sprintf("%s://%s", cfg.WhisperEndpointSchema, cfg.WhisperEndpoint),
"-videoUrl",
videoUrl,
})

d, err := dispatcher.NewTaskDispatcher()
if err != nil {
Expand Down
21 changes: 12 additions & 9 deletions collection/asrcollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)

type AsrCollection struct {
Expand All @@ -21,32 +22,34 @@ func NewAsrCollection(databaseName string) (*AsrCollection, error) {
}, nil
}

func (ac *AsrCollection) Add(uniqueId string, asrModel any) error {
func (ac *AsrCollection) Add(asrModel any) error {
opt := options.Update().SetUpsert(true)

av := asrModel.(*model.AsrResponse)
av.UpdateAt = time.Now()

filter := bson.M{
"uniqueId": uniqueId,
"uniqueId": av.UniqueId,
}

update := bson.M{
"$set": asrModel,
"$set": av,
}

_, err := ac.mc.
UpdateOne(nil, filter, update, opt)
_, err := ac.mc.UpdateOne(nil, filter, update, opt)

return err
}

func (ac *AsrCollection) Update(uniqueId string, asrModel any) error {
return ac.Add(uniqueId, asrModel)
func (ac *AsrCollection) Update(asrModel any) error {
return ac.Add(asrModel)
}

func (ac *AsrCollection) Get(uniqueId string) (any, error) {
filter := bson.M{
"uniqueId": uniqueId,
}
var v model.AsrReponse
var v model.AsrResponse
err := ac.mc.FindOne(nil, filter).Decode(&v)
return &v, err
}
Expand All @@ -56,7 +59,7 @@ func (ac *AsrCollection) List() (any, error) {
if err != nil {
return nil, err
}
var result []model.AsrReponse
var result []model.AsrResponse
err = cursor.All(nil, &result)
if err != nil {
return nil, err
Expand Down
21 changes: 12 additions & 9 deletions collection/taskcollection.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ import (
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"time"
)

type Collection interface {
Add(string, any) error
Update(string, any) error
Add(any) error
Update(any) error
Get(string) (any, error)
List() (any, error)
Del(string) error
Expand All @@ -27,25 +28,27 @@ func NewTaskCollection(databaseName string) (*TaskCollection, error) {
return &TaskCollection{mc: r.Database(databaseName).Collection("task")}, nil
}

func (tc *TaskCollection) Add(uniqueId string, task any) error {
func (tc *TaskCollection) Add(task any) error {
opt := options.Update().SetUpsert(true)

tv := task.(*model.Task)
tv.UpdateAt = time.Now()

filter := bson.M{
"uniqueId": uniqueId,
"uniqueId": tv.UniqueId,
}

update := bson.M{
"$set": task,
"$set": tv,
}

_, err := tc.mc.
UpdateOne(nil, filter, update, opt)
_, err := tc.mc.UpdateOne(nil, filter, update, opt)

return err
}

func (tc *TaskCollection) Update(uniqueId string, job any) error {
return tc.Add(uniqueId, job)
func (tc *TaskCollection) Update(job any) error {
return tc.Add(job)
}

func (tc *TaskCollection) Get(uniqueId string) (any, error) {
Expand Down
6 changes: 3 additions & 3 deletions controller/asrrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (j *asrRepo) Add(c *gin.Context) {
})
return
}
asr, err := model.ConvertToAsrReponse(t.(*model.Task).Stdout)
asr, err := model.ConvertToAsrResponse(t.(*model.Task).Stdout)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{
"msg": err.Error(),
Expand Down Expand Up @@ -76,7 +76,7 @@ func (j *asrRepo) Run(c *gin.Context) {
if e != nil {
msg = e.Error()
}
asr := r.(*model.AsrReponse)
asr := r.(*model.AsrResponse)
notify.Send(taskId, msg, "brewing", asr.BarkToken, "")
}()

Expand Down Expand Up @@ -116,7 +116,7 @@ func (j *asrRepo) Summary(c *gin.Context) {
url = s
}
}
asr := r.(*model.AsrReponse)
asr := r.(*model.AsrResponse)
content := asr.Pretty

if c.Query("raw") != "" {
Expand Down
2 changes: 1 addition & 1 deletion controller/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func runner(repo *asrRepo, task *model.Task) error {

jump:
task = v.(*model.Task)
asr, err := model.ConvertToAsrReponse(task.Stdout)
asr, err := model.ConvertToAsrResponse(task.Stdout)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions dispatcher/openaidispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ func NewOpenaiDispatcher() (Dispatcher, error) {
}

func (od *OpenaiDispatcher) Add(taskAny any) error {
task := taskAny.(*model.AsrReponse)
return od.ac.Add(task.UniqueId, task)
task := taskAny.(*model.AsrResponse)
return od.ac.Add(task)
}

func (od *OpenaiDispatcher) Run(taskAny any) error {
task := taskAny.(*model.AsrReponse)
task := taskAny.(*model.AsrResponse)
var errors []string
parts := ai.Service.SplitContent(task.MakeContent())
c, e := ai.Service.SummaryParallel(parts)
Expand All @@ -49,7 +49,7 @@ func (od *OpenaiDispatcher) Run(taskAny any) error {
results = append(results, strings.Split(cc, "\n")...)
}
task.Pretty = results
return od.ac.Update(task.UniqueId, task)
return od.ac.Update(task)
}

func (od OpenaiDispatcher) Del(id string) error {
Expand Down
12 changes: 6 additions & 6 deletions dispatcher/taskdispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewTaskDispatcher() (Dispatcher, error) {

func (dd *TaskDispatcher) Add(taskAny any) error {
task := taskAny.(*model.Task)
return dd.tc.Add(task.UniqueId, task)
return dd.tc.Add(task)
}

func (dd *TaskDispatcher) Run(taskAny any) error {
Expand All @@ -50,7 +50,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error {
_, _, err := dd.cli.ImageInspectWithRaw(ctx, imageName)
if task.ForcePull || client.IsErrNotFound(err) {
task.Status = "ImagePull"
if err = dd.tc.Update(task.UniqueId, task); err != nil {
if err = dd.tc.Update(task); err != nil {
return err
}

Expand All @@ -71,7 +71,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error {

task.Status = "ContainerCreate"
logrus.Debugln(task.UniqueId, "ContainerCreate")
if err = dd.tc.Update(task.UniqueId, task); err != nil {
if err = dd.tc.Update(task); err != nil {
return err
}
resp, err := dd.cli.ContainerCreate(ctx, &container.Config{
Expand All @@ -85,7 +85,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error {
}

task.ContainerId = resp.ID
if err = dd.tc.Update(task.UniqueId, task); err != nil {
if err = dd.tc.Update(task); err != nil {
return err
}

Expand All @@ -97,7 +97,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error {

task.Status = "ContainerWait"
logrus.Debugln(task.UniqueId, resp.ID, "ContainerWait")
if err = dd.tc.Update(task.UniqueId, task); err != nil {
if err = dd.tc.Update(task); err != nil {
return err
}

Expand Down Expand Up @@ -134,7 +134,7 @@ func (dd *TaskDispatcher) Run(taskAny any) error {
task.ExitCode = inspection.State.ExitCode

logrus.Debugln(task.UniqueId, resp.ID, "ContainerInspect")
if err = dd.tc.Update(task.UniqueId, task); err != nil {
if err = dd.tc.Update(task); err != nil {
return err
}

Expand Down
24 changes: 16 additions & 8 deletions model/asr.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,28 @@ package model
import (
"encoding/json"
"github.com/chinaboard/brewing/pkg/whisper"
"time"
)

type AsrReponse struct {
type AsrResponse struct {
UniqueId string `json:"uniqueId" bson:"uniqueId"`
whisper.AsrResp
Content []string `json:"content" bson:"content"`
Errors []string `json:"errors" bson:"errors"`
Pretty []string `json:"pretty" bson:"pretty"`
BarkToken string `json:"barkToken" bson:"barkToken"`
Name string `json:"name" bson:"name"`
Content []string `json:"content" bson:"content"`
Errors []string `json:"errors" bson:"errors"`
Pretty []string `json:"pretty" bson:"pretty"`
BarkToken string `json:"barkToken" bson:"barkToken"`
Name string `json:"name" bson:"name"`
CreateAt time.Time `json:"createAt" bson:"createAt"`
UpdateAt time.Time `bson:"updateAt" bson:"updateAt"`
}

func ConvertToAsrReponse(data string) (*AsrReponse, error) {
var v AsrReponse
func ConvertToAsrResponse(data string) (*AsrResponse, error) {
var v AsrResponse
err := json.Unmarshal([]byte(data), &v)
if err != nil {
return nil, err
}
v.CreateAt = time.Now()
v.UpdateAt = v.CreateAt
return &v, err
}
12 changes: 12 additions & 0 deletions model/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package model
import (
"crypto/sha1"
"fmt"
"time"
)

type Task struct {
Expand All @@ -26,6 +27,9 @@ type Task struct {
ContainerId string `json:"containerId" bson:"containerId"`

BarkToken string `json:"barkToken" bson:"barkToken"`

CreateAt time.Time `json:"createAt" bson:"createAt"`
UpdateAt time.Time `bson:"updateAt" bson:"updateAt"`
}

func (b *Task) Hash() string {
Expand All @@ -35,3 +39,11 @@ func (b *Task) Hash() string {
bs := h.Sum(nil)
return fmt.Sprintf("%x", bs)
}

func NewTask(env, command []string) *Task {
return &Task{
Env: env,
Command: command,
CreateAt: time.Now(),
}
}

0 comments on commit db21ff5

Please sign in to comment.