Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update main.go #385

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 74 additions & 77 deletions golang/main.go
Original file line number Diff line number Diff line change
@@ -1,105 +1,102 @@
package main

import (
"context"
"fmt"
"sync"
"time"
)

// Приложение эмулирует получение и обработку неких тасков. Пытается и получать, и обрабатывать в многопоточном режиме.
// Приложение должно генерировать таски 10 сек. Каждые 3 секунды должно выводить в консоль результат всех обработанных к этому моменту тасков (отдельно успешные и отдельно с ошибками).

// ЗАДАНИЕ: сделать из плохого кода хороший и рабочий - as best as you can.
// Важно сохранить логику появления ошибочных тасков.
// Важно оставить асинхронные генерацию и обработку тасков.
// Сделать правильную мультипоточность обработки заданий.
// Обновленный код отправить через pull-request в github
// Как видите, никаких привязок к внешним сервисам нет - полный карт-бланш на модификацию кода.

// A Ttype represents a meaninglessness of our life
type Ttype struct {
id int
cT string // время создания
fT string // время выполнения
taskRESULT []byte
cT string
fT string
taskRESULT string
}

func main() {
taskCreturer := func(a chan Ttype) {
go func() {
for {
ft := time.Now().Format(time.RFC3339)
if time.Now().Nanosecond()%2 > 0 { // вот такое условие появления ошибочных тасков
ft = "Some error occured"
}
a <- Ttype{cT: ft, id: int(time.Now().Unix())} // передаем таск на выполнение
}
}()
}

superChan := make(chan Ttype, 10)

go taskCreturer(superChan)
func taskCreator(ctx context.Context, wg *sync.WaitGroup, tasks chan<- Ttype) {
defer wg.Done()
taskID := 1

task_worker := func(a Ttype) Ttype {
tt, _ := time.Parse(time.RFC3339, a.cT)
if tt.After(time.Now().Add(-20 * time.Second)) {
a.taskRESULT = []byte("task has been successed")
} else {
a.taskRESULT = []byte("something went wrong")
for {
select {
case <-ctx.Done():
return
default:
ft := time.Now().Format(time.RFC3339)
if time.Now().Nanosecond()%2 > 0 {
ft = "Some error occurred"
}
tasks <- Ttype{id: taskID, cT: ft}
taskID++
time.Sleep(500 * time.Millisecond)
}
a.fT = time.Now().Format(time.RFC3339Nano)

time.Sleep(time.Millisecond * 150)

return a
}
}

doneTasks := make(chan Ttype)
undoneTasks := make(chan error)
func taskWorker(wg *sync.WaitGroup, tasks <-chan Ttype, doneTasks chan<- Ttype, undoneTasks chan<- error) {
defer wg.Done()

tasksorter := func(t Ttype) {
if string(t.taskRESULT[14:]) == "successed" {
doneTasks <- t
for task := range tasks {
// Simulate task processing
task.fT = time.Now().Format(time.RFC3339Nano)
parsedTime, err := time.Parse(time.RFC3339, task.cT)
if err != nil || parsedTime.Before(time.Now().Add(-20*time.Second)) {
task.taskRESULT = "something went wrong"
undoneTasks <- fmt.Errorf("Task ID %d: %s", task.id, task.taskRESULT)
} else {
undoneTasks <- fmt.Errorf("Task id %d time %s, error %s", t.id, t.cT, t.taskRESULT)
task.taskRESULT = "task has been successful"
doneTasks <- task
}
time.Sleep(150 * time.Millisecond)
}
}

go func() {
// получение тасков
for t := range superChan {
t = task_worker(t)
go tasksorter(t)
func resultPrinter(ctx context.Context, doneTasks <-chan Ttype, undoneTasks <-chan error) {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
fmt.Println("Processed Tasks:")
for len(doneTasks) > 0 {
task := <-doneTasks
fmt.Printf("Success: Task ID %d, Created at %s, Finished at %s\n", task.id, task.cT, task.fT)
}
for len(undoneTasks) > 0 {
err := <-undoneTasks
fmt.Println("Error:", err)
}
}
close(superChan)
}()
}
}

result := map[int]Ttype{}
err := []error{}
go func() {
for r := range doneTasks {
go func() {
result[r.id] = r
}()
}
for r := range undoneTasks {
go func() {
err = append(err, r)
}()
}
close(doneTasks)
close(undoneTasks)
}()
func main() {
tasks := make(chan Ttype, 10)
doneTasks := make(chan Ttype, 10)
undoneTasks := make(chan error, 10)

time.Sleep(time.Second * 3)
var wg sync.WaitGroup
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

println("Errors:")
for r := range err {
println(r)
}
wg.Add(1)
go taskCreator(ctx, &wg, tasks)

println("Done tasks:")
for r := range result {
println(r)
numWorkers := 5
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go taskWorker(&wg, tasks, doneTasks, undoneTasks)
}

go resultPrinter(ctx, doneTasks, undoneTasks)

wg.Wait()
close(tasks)
close(doneTasks)
close(undoneTasks)
}