Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update main.go #396

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 101 additions & 80 deletions golang/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,104 +2,125 @@ package main

import (
"fmt"
"strconv"
"strings"
"sync"
"time"
)

// Приложение эмулирует получение и обработку неких тасков. Пытается и получать, и обрабатывать в многопоточном режиме.
// Приложение должно генерировать таски 10 сек. Каждые 3 секунды должно выводить в консоль результат всех обработанных к этому моменту тасков (отдельно успешные и отдельно с ошибками).

// ЗАДАНИЕ: сделать из плохого кода хороший и рабочий - as best as you can.
// Важно сохранить логику появления ошибочных тасков.
// Важно оставить асинхронные генерацию и обработку тасков.
// Сделать правильную мультипоточность обработки заданий.
// Обновленный код отправить через pull-request в github
// Как видите, никаких привязок к внешним сервисам нет - полный карт-бланш на модификацию кода.

// A Ttype represents a meaninglessness of our life
type Ttype struct {
id int
cT string // время создания
fT string // время выполнения
taskRESULT []byte
var (
taskChan = make(chan Task, 10)
doneTasks = make(chan Task, 10)
undoneTasks = make(chan Task, 10)
wg sync.WaitGroup
allDoneTasks []Task
allUndoneTasks []Task
)

const (
PRINT_INTERVAL = 3 * time.Second
TASK_CREATION_TIMEOUT = 10 * time.Second
)

type Task struct {
id int
creationTime time.Time
processionTime time.Time
succeed bool
err error
}

func main() {
taskCreturer := func(a chan Ttype) {
go func() {
for {
ft := time.Now().Format(time.RFC3339)
if time.Now().Nanosecond()%2 > 0 { // вот такое условие появления ошибочных тасков
ft = "Some error occured"
}
a <- Ttype{cT: ft, id: int(time.Now().Unix())} // передаем таск на выполнение
// Генерирует таски 10 секунд и закрывает taskChan
func taskCreator(taskChan chan<- Task) {
defer close(taskChan)
defer wg.Done()

// Запускаем таймер
timer := time.NewTimer(TASK_CREATION_TIMEOUT)
// id теперь просто идущие подряд числа
for id := 1; ; id++ {
select {
// Если прошло 10 секунд, выходим из функции
case <-timer.C:
return
default:
currentTime := time.Now()
task := Task{
id: id,
creationTime: currentTime,
}
}()
}

superChan := make(chan Ttype, 10)
// Удаляем из числа наносекунд незначащие нули справа
nanoSecStr := strings.TrimRight(strconv.Itoa(currentTime.Nanosecond()), "0")
nanoSec, _ := strconv.Atoi(nanoSecStr)
if nanoSec%2 > 0 {
task.err = fmt.Errorf("random error occurred")
}
// Задержка для замедления генерации
time.Sleep(time.Second)

go taskCreturer(superChan)
taskChan <- task
}
}
}

task_worker := func(a Ttype) Ttype {
tt, _ := time.Parse(time.RFC3339, a.cT)
if tt.After(time.Now().Add(-20 * time.Second)) {
a.taskRESULT = []byte("task has been successed")
// Обрабатаывает таски, отправляет в соответсвующий канал и завершает работу при закрытии taskChan
func taskWorker(taskChan <-chan Task, doneTasks, undoneTasks chan<- Task, quit chan<- struct{}) {
defer wg.Done()
// Сообщаем о завершении работы
defer close(quit)

for task := range taskChan {
task.processionTime = time.Now()
if task.err == nil {
task.succeed = true
doneTasks <- task
} else {
a.taskRESULT = []byte("something went wrong")
task.succeed = false
undoneTasks <- task
}
a.fT = time.Now().Format(time.RFC3339Nano)

time.Sleep(time.Millisecond * 150)

return a
}
}

doneTasks := make(chan Ttype)
undoneTasks := make(chan error)
// Выводит обработанные таски каждые *PRINT_INTERVAL* секунд и завершает работу при закрытии taskChan
func taskSorter(doneTasks, undoneTasks <-chan Task, quit <-chan struct{}) {
defer wg.Done()

ticker := time.NewTicker(PRINT_INTERVAL)
defer ticker.Stop()
for {
select {
case <-quit:
return
case task := <-doneTasks:
allDoneTasks = append(allDoneTasks, task)
case task := <-undoneTasks:
allUndoneTasks = append(allUndoneTasks, task)
case <-ticker.C:
fmt.Printf("Succeed tasks at %s\n", time.Now().Format("15:04:05"))
taskPrinter(allDoneTasks)
fmt.Printf("Unsucceed tasks at %s\n", time.Now().Format("15:04:05"))
taskPrinter(allUndoneTasks)
fmt.Println("----------------------------------------------")

tasksorter := func(t Ttype) {
if string(t.taskRESULT[14:]) == "successed" {
doneTasks <- t
} else {
undoneTasks <- fmt.Errorf("Task id %d time %s, error %s", t.id, t.cT, t.taskRESULT)
}
}

go func() {
// получение тасков
for t := range superChan {
t = task_worker(t)
go tasksorter(t)
}
close(superChan)
}()

result := map[int]Ttype{}
err := []error{}
go func() {
for r := range doneTasks {
go func() {
result[r.id] = r
}()
}
for r := range undoneTasks {
go func() {
err = append(err, r)
}()
}
close(doneTasks)
close(undoneTasks)
}()

time.Sleep(time.Second * 3)
}

println("Errors:")
for r := range err {
println(r)
func taskPrinter(tasks []Task) {
for _, task := range tasks {
fmt.Printf("id: %d, created: %s, processed: %s\n", task.id, task.creationTime.Format("15:04:05"), task.processionTime.Format("15:04:05"))
}
}

println("Done tasks:")
for r := range result {
println(r)
}
func main() {
// Канал, который сообщит о завершении работы
quit := make(chan struct{})

wg.Add(3)
go taskCreator(taskChan)
go taskWorker(taskChan, doneTasks, undoneTasks, quit)
go taskSorter(doneTasks, undoneTasks, quit)
wg.Wait()
}