This repository has been archived by the owner on Feb 21, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjobs-executor.go
95 lines (82 loc) · 2.5 KB
/
jobs-executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package main
import (
"log"
"time"
)
const (
MIN_START_DELAY_IN_SECONDS = 1
)
// Jobs executor.
type JobsExecutor struct {
strava *Strava
commonInterval time.Duration
unlockedInterval time.Duration
jobs []*JobDetails
activitiesPoster *ActivitiesPoster
teamsUnlockInfo *TeamsUnlockInfo
addedJobs chan *JobDetails
removedJobs chan *JobDetails
repo *Repository
}
// Create new job executor.
func NewJobsExecutor(strava *Strava, commonInterval, unlockedInterval time.Duration, activitiesPoster *ActivitiesPoster, teamsUnlockInfo *TeamsUnlockInfo, repo *Repository) *JobsExecutor {
return &JobsExecutor{
strava: strava,
commonInterval: commonInterval,
unlockedInterval: unlockedInterval,
activitiesPoster: activitiesPoster,
teamsUnlockInfo: teamsUnlockInfo,
repo: repo,
addedJobs: make(chan *JobDetails, 10),
removedJobs: make(chan *JobDetails, 10),
}
}
// Add new job.
func (e *JobsExecutor) AddJob(job *JobDetails) {
e.addedJobs <- job
}
// Remove a job.
func (e *JobsExecutor) RemoveJob(job *JobDetails) {
e.removedJobs <- job
}
// Run jobs executor.
func (e *JobsExecutor) Run() {
clubMonitors := make(map[string]*ClubMonitor)
emptyMonitors := make(chan string, 10)
for i, job := range e.loadExistingJobs() {
startDelay := MIN_START_DELAY_IN_SECONDS + i/5
e.startJobMonitoring(clubMonitors, job, time.Duration(startDelay)*time.Second, emptyMonitors)
}
for {
select {
case job := <-e.addedJobs:
e.startJobMonitoring(clubMonitors, job, 0, emptyMonitors)
case job := <-e.removedJobs:
if listener, ok := clubMonitors[job.ClubId]; ok {
listener.RemoveJob(job)
}
case clubId := <-emptyMonitors:
delete(clubMonitors, clubId)
}
}
}
// Start job monitoring.
func (e *JobsExecutor) startJobMonitoring(clubMonitors map[string]*ClubMonitor, job *JobDetails, startDelay time.Duration, emptyMonitors chan<- string) {
var clubMonitor *ClubMonitor
var ok bool
if clubMonitor, ok = clubMonitors[job.ClubId]; !ok {
clubMonitor = NewClubMonitor(e.strava, job.ClubId, job, e.commonInterval, e.unlockedInterval, startDelay, emptyMonitors, e.activitiesPoster, e.teamsUnlockInfo)
clubMonitors[job.ClubId] = clubMonitor
go clubMonitor.Run()
} else {
clubMonitor.AddJob(job)
}
}
// Load existing job details from the database.
func (e *JobsExecutor) loadExistingJobs() []*JobDetails {
jobs, err := e.repo.JobDetails.ListAll()
if err != nil {
log.Printf("Can't load job details from the database: %v\n", err)
}
return jobs
}