Skip to content

Commit

Permalink
👋my geek time
Browse files Browse the repository at this point in the history
  • Loading branch information
zkep committed Jan 1, 2025
1 parent 21205db commit e5ec321
Show file tree
Hide file tree
Showing 2 changed files with 151 additions and 123 deletions.
63 changes: 27 additions & 36 deletions internal/service/audio_visual.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,48 +171,39 @@ func Video(ctx context.Context, dir, fileName string, req *PlayMeta) (string, er
_ = os.RemoveAll(destDir)
return concatPath, nil
}
batch := global.GPool.NewBatch()
for _, t := range req.Parts {
partURL, destName := t.Src, t.Dest
if partStat, _ := global.Storage.Stat(destName); partStat != nil && partStat.Size() > 0 {
continue
}
batch.Queue(func(pctx context.Context) (any, error) {
global.LOG.Info("video part start", zap.String("part", path.Base(destName)))
err = zhttp.NewRequest().
Before(func(r *http.Request) {
r.Header.Set("Accept", "application/json, text/plain, */*")
r.Header.Set("User-Agent", zhttp.RandomUserAgent())
}).
After(func(r *http.Response) error {
if zhttp.IsHTTPSuccessStatus(r.StatusCode) {
if partStat, err1 := global.Storage.Put(destName, r.Body); err1 != nil {
return err1
} else if partStat.Size() <= 0 {
return fmt.Errorf("[%s] is empty", destName)
}
global.LOG.Info("video part end", zap.String("part", path.Base(destName)))
return nil
}
if zhttp.IsHTTPStatusSleep(r.StatusCode) {
time.Sleep(time.Second * 10)
}
if zhttp.IsHTTPStatusRetryable(r.StatusCode) {
return fmt.Errorf("http status: %s, %s", r.Status, r.Request.URL.String())
err = zhttp.NewRequest().
Before(func(r *http.Request) {
r.Header.Set("Accept", "application/json, text/plain, */*")
r.Header.Set("User-Agent", zhttp.RandomUserAgent())
}).
After(func(r *http.Response) error {
if zhttp.IsHTTPSuccessStatus(r.StatusCode) {
if partStat, err1 := global.Storage.Put(destName, r.Body); err1 != nil {
return err1
} else if partStat.Size() <= 0 {
return fmt.Errorf("[%s] is empty", destName)
}
return zhttp.BreakRetryError(fmt.Errorf(
"break http status: %s,%s", r.Status, r.Request.URL.String()))
}).
DoWithRetry(pctx, http.MethodGet, partURL, nil)
if err != nil {
return "", err
}
return nil, nil
})
}

if _, err = batch.Wait(retryCtx); err != nil {
return "", err
global.LOG.Info("video part end", zap.String("part", path.Base(destName)))
return nil
}
if zhttp.IsHTTPStatusSleep(r.StatusCode) {
time.Sleep(time.Second * 10)
}
if zhttp.IsHTTPStatusRetryable(r.StatusCode) {
return fmt.Errorf("http status: %s, %s", r.Status, r.Request.URL.String())
}
return zhttp.BreakRetryError(fmt.Errorf(
"break http status: %s,%s", r.Status, r.Request.URL.String()))
}).
DoWithRetry(retryCtx, http.MethodGet, partURL, nil)
if err != nil {
return "", err
}
}

ffmpeg_command := []string{
Expand Down
211 changes: 124 additions & 87 deletions internal/task/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func TaskHandler(ctx context.Context, t time.Time) error {
func iterators(ctx context.Context, loaded bool) error {
timeCtx, timeCancel := context.WithTimeout(ctx, time.Hour)
defer timeCancel()
hasMore, page, psize := true, 1, 5
hasMore, page, psize := true, 1, 10
orderTasks, batchTasks := make([]*model.Task, 0, psize), make([]*model.Task, 0, psize)
for hasMore {
var ls []*model.Task
tx := global.DB.Model(&model.Task{})
Expand All @@ -65,7 +66,33 @@ func iterators(ctx context.Context, loaded bool) error {
ls = ls[:psize]
}
page++
orderTasks = orderTasks[:0]
batchTasks = batchTasks[:0]
for _, value := range ls {
if len(value.RewriteHls) == 0 {
orderTasks = append(orderTasks, value)
} else {
batchTasks = append(batchTasks, value)
}
}

batch := global.GPool.NewBatch()
for _, value := range batchTasks {
x := value
batch.Queue(func(pctx context.Context) (any, error) {
err := worker(pctx, x)
if err != nil {
global.LOG.Error("task handler worker", zap.Error(err), zap.String("taskid", x.TaskId))
}
return nil, err
})
}
if _, err := batch.Wait(timeCtx); err != nil {
global.LOG.Error("task handler wait", zap.Error(err))
return err
}

for _, value := range orderTasks {
x := value
if err := worker(timeCtx, x); err != nil {
global.LOG.Error("task handler worker", zap.Error(err), zap.String("taskid", x.TaskId))
Expand All @@ -79,102 +106,112 @@ func iterators(ctx context.Context, loaded bool) error {
func worker(ctx context.Context, x *model.Task) error {
switch x.TaskType {
case service.TASK_TYPE_PRODUCT:
var count int64
return doProduct(ctx, x)
case service.TASK_TYPE_ARTICLE:
return doArticle(ctx, x)
}
return nil
}

func doProduct(_ context.Context, x *model.Task) error {
var count int64
if err := global.DB.Model(&model.Task{}).
Where("task_pid = ?", x.TaskId).
Where("status <= ?", service.TASK_STATUS_RUNNING).
Count(&count).Error; err != nil {
global.LOG.Error("worker", zap.Error(err), zap.String("taskId", x.TaskId))
return err
}
status := service.TASK_STATUS_FINISHED
if count > 0 {
global.LOG.Info("worker sub task",
zap.Int64("pending", count), zap.String("taskId", x.TaskId))
status = service.TASK_STATUS_PENDING
}
var statistics task.TaskStatistics
if err := json.Unmarshal(x.Statistics, &statistics); err != nil {
global.LOG.Error("worker Unmarshal", zap.Error(err), zap.String("taskId", x.TaskId))
}
if statistics.Items == nil {
statistics.Items = make(map[int]int, 5)
}
for _, item := range service.ALLStatus {
var itemCount int64
if err := global.DB.Model(&model.Task{}).
Where("task_pid = ?", x.TaskId).
Where("status <= ?", service.TASK_STATUS_RUNNING).
Count(&count).Error; err != nil {
global.LOG.Error("worker", zap.Error(err), zap.String("taskId", x.TaskId))
return err
}
status := service.TASK_STATUS_FINISHED
if count > 0 {
global.LOG.Info("worker sub task",
zap.Int64("pending", count), zap.String("taskId", x.TaskId))
status = service.TASK_STATUS_PENDING
}
var statistics task.TaskStatistics
if err := json.Unmarshal(x.Statistics, &statistics); err != nil {
global.LOG.Error("worker Unmarshal", zap.Error(err), zap.String("taskId", x.TaskId))
}
if statistics.Items == nil {
statistics.Items = make(map[int]int, 5)
}
for _, item := range service.ALLStatus {
var itemCount int64
if err := global.DB.Model(&model.Task{}).
Where("task_pid = ?", x.TaskId).
Where("status = ?", item).
Count(&itemCount).Error; err != nil {
global.LOG.Error("worker count", zap.Error(err), zap.String("taskId", x.TaskId))
}
statistics.Items[item] = int(itemCount)
}
raw, _ := json.Marshal(statistics)
m := model.Task{
Id: x.Id,
Status: int32(status),
Statistics: raw,
UpdatedAt: time.Now().Unix(),
}
if status == service.TASK_STATUS_FINISHED {
dir := global.Storage.GetKey(x.TaskId, false)
message := task.TaskMessage{Object: dir}
m.Message, _ = json.Marshal(message)
Where("status = ?", item).
Count(&itemCount).Error; err != nil {
global.LOG.Error("worker count", zap.Error(err), zap.String("taskId", x.TaskId))
}
if err := global.DB.Where(&model.Task{Id: x.Id}).Updates(&m).Error; err != nil {
global.LOG.Error("worker Updates", zap.Error(err), zap.String("taskId", x.TaskId))
statistics.Items[item] = int(itemCount)
}
raw, _ := json.Marshal(statistics)
m := model.Task{
Id: x.Id,
Status: int32(status),
Statistics: raw,
UpdatedAt: time.Now().Unix(),
}
if status == service.TASK_STATUS_FINISHED {
dir := global.Storage.GetKey(x.TaskId, false)
message := task.TaskMessage{Object: dir}
m.Message, _ = json.Marshal(message)
}
if err := global.DB.Where(&model.Task{Id: x.Id}).Updates(&m).Error; err != nil {
global.LOG.Error("worker Updates", zap.Error(err), zap.String("taskId", x.TaskId))
return err
}
return nil
}

func doArticle(ctx context.Context, x *model.Task) error {
m := model.Task{
Id: x.Id,
Status: service.TASK_STATUS_RUNNING,
UpdatedAt: time.Now().Unix(),
}
if len(x.RewriteHls) == 0 {
aid, err := strconv.ParseInt(x.OtherId, 10, 64)
if err != nil {
return err
}
case service.TASK_TYPE_ARTICLE:
m := model.Task{
Id: x.Id,
Status: service.TASK_STATUS_RUNNING,
UpdatedAt: time.Now().Unix(),
}
if len(x.RewriteHls) == 0 {
aid, err := strconv.ParseInt(x.OtherId, 10, 64)
if err != nil {
return err
}
var u model.User
if err = global.DB.Where(&model.User{RoleId: user.AdminRoleId}).First(&u).Error; err != nil {
return err
}
if u.AccessToken == "" {
return errors.New("no access token, please refresh geektime cookie")
}
article, err1 := service.GetArticleInfo(ctx, u.Uid, u.AccessToken, geek.ArticlesInfoRequest{Id: aid})
if err1 != nil {
return err1
}
var info geek.ArticleInfoRaw
if err = json.Unmarshal(article.Raw, &info); err != nil {
return err
}
m.Raw = info.Data
x.Raw = info.Data
}
if err := global.DB.Where(&model.Task{Id: x.Id}).Updates(m).Error; err != nil {
global.LOG.Error("worker Updates", zap.Error(err), zap.String("taskId", x.TaskId))
var u model.User
if err = global.DB.Where(&model.User{RoleId: user.AdminRoleId}).First(&u).Error; err != nil {
return err
}
status := service.TASK_STATUS_FINISHED
if err := service.Download(ctx, x); err != nil {
global.LOG.Error("worker download", zap.Error(err), zap.String("taskId", x.TaskId))
status = service.TASK_STATUS_ERROR
message := task.TaskMessage{Text: err.Error()}
x.Message, _ = json.Marshal(message)
if u.AccessToken == "" {
return errors.New("no access token, please refresh geektime cookie")
}
article, err1 := service.GetArticleInfo(ctx, u.Uid, u.AccessToken, geek.ArticlesInfoRequest{Id: aid})
if err1 != nil {
return err1
}
m.Ciphertext = x.Ciphertext
m.RewriteHls = x.RewriteHls
m.Message = x.Message
m.Status = int32(status)
m.UpdatedAt = time.Now().Unix()
if err := global.DB.Where(&model.Task{Id: x.Id}).Updates(&m).Error; err != nil {
global.LOG.Error("worker Updates", zap.Error(err), zap.String("taskId", x.TaskId))
var info geek.ArticleInfoRaw
if err = json.Unmarshal(article.Raw, &info); err != nil {
return err
}
m.Raw = info.Data
x.Raw = info.Data
}
if err := global.DB.Where(&model.Task{Id: x.Id}).Updates(m).Error; err != nil {
global.LOG.Error("worker Updates", zap.Error(err), zap.String("taskId", x.TaskId))
return err
}
status := service.TASK_STATUS_FINISHED
if err := service.Download(ctx, x); err != nil {
global.LOG.Error("worker download", zap.Error(err), zap.String("taskId", x.TaskId))
status = service.TASK_STATUS_ERROR
message := task.TaskMessage{Text: err.Error()}
x.Message, _ = json.Marshal(message)
}
m.Ciphertext = x.Ciphertext
m.RewriteHls = x.RewriteHls
m.Message = x.Message
m.Status = int32(status)
m.UpdatedAt = time.Now().Unix()
if err := global.DB.Where(&model.Task{Id: x.Id}).Updates(&m).Error; err != nil {
global.LOG.Error("worker Updates", zap.Error(err), zap.String("taskId", x.TaskId))
return err
}
return nil
}

0 comments on commit e5ec321

Please sign in to comment.