From e46459e28a623e1bbc46d3d84e091570ccb1328b Mon Sep 17 00:00:00 2001 From: Alexey Date: Mon, 19 Aug 2024 10:42:32 +0400 Subject: [PATCH] Update main.go --- golang/main.go | 181 +++++++++++++++++++++++++++---------------------- 1 file changed, 101 insertions(+), 80 deletions(-) diff --git a/golang/main.go b/golang/main.go index 536c63dd..0054f5b1 100644 --- a/golang/main.go +++ b/golang/main.go @@ -2,104 +2,125 @@ package main import ( "fmt" + "strconv" + "strings" + "sync" "time" ) -// Приложение эмулирует получение и обработку неких тасков. Пытается и получать, и обрабатывать в многопоточном режиме. -// Приложение должно генерировать таски 10 сек. Каждые 3 секунды должно выводить в консоль результат всех обработанных к этому моменту тасков (отдельно успешные и отдельно с ошибками). - -// ЗАДАНИЕ: сделать из плохого кода хороший и рабочий - as best as you can. -// Важно сохранить логику появления ошибочных тасков. -// Важно оставить асинхронные генерацию и обработку тасков. -// Сделать правильную мультипоточность обработки заданий. -// Обновленный код отправить через pull-request в github -// Как видите, никаких привязок к внешним сервисам нет - полный карт-бланш на модификацию кода. - -// A Ttype represents a meaninglessness of our life -type Ttype struct { - id int - cT string // время создания - fT string // время выполнения - taskRESULT []byte +var ( + taskChan = make(chan Task, 10) + doneTasks = make(chan Task, 10) + undoneTasks = make(chan Task, 10) + wg sync.WaitGroup + allDoneTasks []Task + allUndoneTasks []Task +) + +const ( + PRINT_INTERVAL = 3 * time.Second + TASK_CREATION_TIMEOUT = 10 * time.Second +) + +type Task struct { + id int + creationTime time.Time + processionTime time.Time + succeed bool + err error } -func main() { - taskCreturer := func(a chan Ttype) { - go func() { - for { - ft := time.Now().Format(time.RFC3339) - if time.Now().Nanosecond()%2 > 0 { // вот такое условие появления ошибочных тасков - ft = "Some error occured" - } - a <- Ttype{cT: ft, id: int(time.Now().Unix())} // передаем таск на выполнение +// Генерирует таски 10 секунд и закрывает taskChan +func taskCreator(taskChan chan<- Task) { + defer close(taskChan) + defer wg.Done() + + // Запускаем таймер + timer := time.NewTimer(TASK_CREATION_TIMEOUT) + // id теперь просто идущие подряд числа + for id := 1; ; id++ { + select { + // Если прошло 10 секунд, выходим из функции + case <-timer.C: + return + default: + currentTime := time.Now() + task := Task{ + id: id, + creationTime: currentTime, } - }() - } - - superChan := make(chan Ttype, 10) + // Удаляем из числа наносекунд незначащие нули справа + nanoSecStr := strings.TrimRight(strconv.Itoa(currentTime.Nanosecond()), "0") + nanoSec, _ := strconv.Atoi(nanoSecStr) + if nanoSec%2 > 0 { + task.err = fmt.Errorf("random error occurred") + } + // Задержка для замедления генерации + time.Sleep(time.Second) - go taskCreturer(superChan) + taskChan <- task + } + } +} - task_worker := func(a Ttype) Ttype { - tt, _ := time.Parse(time.RFC3339, a.cT) - if tt.After(time.Now().Add(-20 * time.Second)) { - a.taskRESULT = []byte("task has been successed") +// Обрабатаывает таски, отправляет в соответсвующий канал и завершает работу при закрытии taskChan +func taskWorker(taskChan <-chan Task, doneTasks, undoneTasks chan<- Task, quit chan<- struct{}) { + defer wg.Done() + // Сообщаем о завершении работы + defer close(quit) + + for task := range taskChan { + task.processionTime = time.Now() + if task.err == nil { + task.succeed = true + doneTasks <- task } else { - a.taskRESULT = []byte("something went wrong") + task.succeed = false + undoneTasks <- task } - a.fT = time.Now().Format(time.RFC3339Nano) - time.Sleep(time.Millisecond * 150) - - return a } +} - doneTasks := make(chan Ttype) - undoneTasks := make(chan error) +// Выводит обработанные таски каждые *PRINT_INTERVAL* секунд и завершает работу при закрытии taskChan +func taskSorter(doneTasks, undoneTasks <-chan Task, quit <-chan struct{}) { + defer wg.Done() + + ticker := time.NewTicker(PRINT_INTERVAL) + defer ticker.Stop() + for { + select { + case <-quit: + return + case task := <-doneTasks: + allDoneTasks = append(allDoneTasks, task) + case task := <-undoneTasks: + allUndoneTasks = append(allUndoneTasks, task) + case <-ticker.C: + fmt.Printf("Succeed tasks at %s\n", time.Now().Format("15:04:05")) + taskPrinter(allDoneTasks) + fmt.Printf("Unsucceed tasks at %s\n", time.Now().Format("15:04:05")) + taskPrinter(allUndoneTasks) + fmt.Println("----------------------------------------------") - tasksorter := func(t Ttype) { - if string(t.taskRESULT[14:]) == "successed" { - doneTasks <- t - } else { - undoneTasks <- fmt.Errorf("Task id %d time %s, error %s", t.id, t.cT, t.taskRESULT) } } - go func() { - // получение тасков - for t := range superChan { - t = task_worker(t) - go tasksorter(t) - } - close(superChan) - }() - - result := map[int]Ttype{} - err := []error{} - go func() { - for r := range doneTasks { - go func() { - result[r.id] = r - }() - } - for r := range undoneTasks { - go func() { - err = append(err, r) - }() - } - close(doneTasks) - close(undoneTasks) - }() - - time.Sleep(time.Second * 3) +} - println("Errors:") - for r := range err { - println(r) +func taskPrinter(tasks []Task) { + for _, task := range tasks { + fmt.Printf("id: %d, created: %s, processed: %s\n", task.id, task.creationTime.Format("15:04:05"), task.processionTime.Format("15:04:05")) } +} - println("Done tasks:") - for r := range result { - println(r) - } +func main() { + // Канал, который сообщит о завершении работы + quit := make(chan struct{}) + + wg.Add(3) + go taskCreator(taskChan) + go taskWorker(taskChan, doneTasks, undoneTasks, quit) + go taskSorter(doneTasks, undoneTasks, quit) + wg.Wait() }