Skip to content

Commit 061cc23

Browse files
committed
update ctx + cleanup + readme
Signed-off-by: Cassandra Coyle <cassie@diagrid.io>
1 parent 16c161c commit 061cc23

File tree

4 files changed

+91
-52
lines changed

4 files changed

+91
-52
lines changed

scheduler-jobs/README.md

+18-7
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
1-
# How-To Run Locally:
1+
# Scheduler Jobs
22

3-
To run locally as a process: `go run scheduler-jobs.go`, assuming you have a scheduler and dapr running accordingly (see below).
3+
## Overview
44

5-
Run Scheduler:
6-
![scheduler_debugger_config.png](scheduler_debugger_config.png)
5+
- schedule 100 oneshot jobs indefinitely (repeat = 1)
6+
- schedule 100 indefinite jobs indefinitely (repeat not set, trigger every 30s)
7+
- schedule repeat-job job indefinitely (repeat = 5, trigger every 1s due immediately)
8+
- indefinitely schedule and delete a create-delete-job job (repeat = 1, trigger every 1s)
79

8-
Run Dapr sidecar:
9-
![sidecar_debugger_config.png](sidecar_debugger_config.png)
10+
## How-To Run Locally:
11+
12+
Run with dapr:
13+
```shell
14+
dapr run \
15+
--app-id scheduler-jobs \
16+
--app-port 8383 \
17+
--dapr-grpc-port 3501 --app-protocol grpc \
18+
--dapr-http-port 3500 --scheduler-host-address=127.0.0.1:50006 --app-channel-address=127.0.0.1 \
19+
-- go run scheduler-jobs.go
20+
```
1021

1122
To run locally as a container:
1223
```shell
1324
docker build -t scheduler-jobs .
14-
docker run -p 3006:3006 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background
25+
docker run -p 8383:8383 --name scheduler-jobs scheduler-jobs # optionally add -d to run in background
1526
# check container is running
1627
docker ps
1728
```

scheduler-jobs/scheduler-jobs.go

+73-45
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ import (
1919
"github.com/dapr/go-sdk/client"
2020
)
2121

22-
const appPort = ":3006"
23-
const daprGRPCPort = "3501"
22+
const appPort = ":8383"
2423

2524
var oneshot atomic.Int64
2625
var indefinite atomic.Int64
@@ -121,8 +120,15 @@ func (s *appServer) OnJobEventAlpha1(ctx context.Context, in *rtv1.JobEventReque
121120
return nil, nil
122121
}
123122

124-
func scheduleOneshotJobs(daprClient client.Client) {
123+
func scheduleOneshotJobs(ctx context.Context, daprClient client.Client) {
125124
for i := 0; i < 100; i++ {
125+
select {
126+
case <-ctx.Done():
127+
log.Println("context canceled, stopping scheduleOneshotJobs.")
128+
return
129+
default:
130+
}
131+
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
126132
name := "oneshot-job-" + strconv.Itoa(i)
127133
req := &client.Job{
128134
Name: name,
@@ -132,15 +138,23 @@ func scheduleOneshotJobs(daprClient client.Client) {
132138
TTL: "40s",
133139
Data: nil,
134140
}
135-
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
141+
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
142+
cancel()
136143
if err != nil {
137144
log.Printf("Error scheduling oneshot job '%s': %s\n", name, err)
138145
}
139146
}
140147
}
141148

142-
func scheduleIndefiniteJobs(daprClient client.Client) {
149+
func scheduleIndefiniteJobs(ctx context.Context, daprClient client.Client) {
143150
for i := 0; i < 100; i++ {
151+
select {
152+
case <-ctx.Done():
153+
log.Println("context canceled, stopping scheduleOneshotJobs.")
154+
return
155+
default:
156+
}
157+
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
144158
name := "indefinite-job-" + strconv.Itoa(i)
145159
req := &client.Job{
146160
Name: name,
@@ -149,14 +163,15 @@ func scheduleIndefiniteJobs(daprClient client.Client) {
149163
TTL: "40s",
150164
Data: nil,
151165
}
152-
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
166+
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
167+
cancel()
153168
if err != nil {
154169
log.Printf("Error scheduling indefinite job '%s': %s\n", name, err)
155170
}
156171
}
157172
}
158173

159-
func scheduleRepeatedJob(daprClient client.Client) {
174+
func scheduleRepeatedJob(ctx context.Context, daprClient client.Client) {
160175
name := "repeat-job"
161176
req := &client.Job{
162177
Name: name,
@@ -166,13 +181,15 @@ func scheduleRepeatedJob(daprClient client.Client) {
166181
TTL: "10s",
167182
Data: nil,
168183
}
169-
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
184+
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
185+
defer cancel()
186+
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
170187
if err != nil {
171188
log.Printf("Error scheduling repeat job '%s': %s\n", name, err)
172189
}
173190
}
174191

175-
func scheduleSingleJob(daprClient client.Client) {
192+
func scheduleSingleJob(ctx context.Context, daprClient client.Client) {
176193
name := "create-delete-job"
177194
req := &client.Job{
178195
Name: name,
@@ -182,15 +199,19 @@ func scheduleSingleJob(daprClient client.Client) {
182199
TTL: "3s",
183200
Data: nil,
184201
}
185-
err := daprClient.ScheduleJobAlpha1(context.Background(), req)
202+
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
203+
defer cancel()
204+
err := daprClient.ScheduleJobAlpha1(jobCtx, req)
186205
if err != nil {
187206
log.Printf("Error scheduling single job '%s': %s\n", name, err)
188207
}
189208
}
190209

191-
func deleteSingleJob(daprClient client.Client) {
210+
func deleteSingleJob(ctx context.Context, daprClient client.Client) {
192211
name := "create-delete-job"
193-
err := daprClient.DeleteJobAlpha1(context.Background(), name)
212+
jobCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
213+
defer cancel()
214+
err := daprClient.DeleteJobAlpha1(jobCtx, name)
194215
if err != nil {
195216
log.Printf("Error deleting single job '%s': %s\n", name, err)
196217
}
@@ -209,8 +230,7 @@ func main() {
209230
}
210231
}(ctx)
211232

212-
// --dapr-grpc-port=3501
213-
daprClient, err := client.NewClientWithPort(daprGRPCPort)
233+
daprClient, err := client.NewClient()
214234
if err != nil {
215235
log.Fatalf("Error getting dapr client: %v", err)
216236
}
@@ -222,46 +242,54 @@ func main() {
222242
time.Sleep(5 * time.Second)
223243

224244
// Schedule initial batch of jobs
225-
go scheduleOneshotJobs(daprClient)
226-
go scheduleIndefiniteJobs(daprClient)
227-
go scheduleRepeatedJob(daprClient)
245+
go scheduleOneshotJobs(ctx, daprClient)
246+
go scheduleIndefiniteJobs(ctx, daprClient)
247+
go scheduleRepeatedJob(ctx, daprClient)
228248

229249
// Schedule additional oneshot jobs once 100 are triggered
230250
go func(ctx context.Context) {
231-
select {
232-
case <-ctx.Done():
233-
return
234-
case <-oneshotDoneCh:
235-
log.Println("Received input that maxOneshotTriggerCount was reached. Sleeping...")
236-
time.Sleep(10 * time.Second)
237-
log.Println("Scheduling next batch of oneshot jobs...")
238-
go scheduleOneshotJobs(daprClient)
251+
for {
252+
select {
253+
case <-ctx.Done():
254+
log.Println("context canceled, stopping oneshot scheduling goroutine.")
255+
return
256+
case <-oneshotDoneCh:
257+
log.Println("Received input that maxOneshotTriggerCount was reached. Sleeping...")
258+
time.Sleep(10 * time.Second)
259+
log.Println("Scheduling next batch of oneshot jobs...")
260+
go scheduleOneshotJobs(ctx, daprClient)
261+
}
239262
}
240263
}(ctx)
241264

242265
// Schedule additional indefinite jobs once 100 are triggered
243266
go func(ctx context.Context) {
244-
select {
245-
case <-ctx.Done():
246-
return
247-
case <-indefiniteDoneCh:
248-
log.Println("Received input that maxIndefiniteTriggerCount was reached. Sleeping...")
249-
time.Sleep(10 * time.Second)
250-
log.Println("Scheduling next batch of indefinite jobs...")
251-
go scheduleIndefiniteJobs(daprClient)
267+
for {
268+
select {
269+
case <-ctx.Done():
270+
log.Println("context canceled, stopping indefinite scheduling goroutine.")
271+
return
272+
case <-indefiniteDoneCh:
273+
log.Println("Received input that maxIndefiniteTriggerCount was reached. Sleeping...")
274+
time.Sleep(10 * time.Second)
275+
log.Println("Scheduling next batch of indefinite jobs...")
276+
go scheduleIndefiniteJobs(ctx, daprClient)
277+
}
252278
}
253279
}(ctx)
254280

255-
// Schedule job to trigger immediately every second for 1s
281+
// Schedule job to trigger immediately every second for 1s for 5 times max (repeats)
256282
go func(ctx context.Context) {
257-
select {
258-
case <-ctx.Done():
259-
return
260-
case <-repeatDoneCh:
261-
log.Println("Received input that maxRepeatTriggerCount was reached. Sleeping...")
262-
time.Sleep(60 * time.Second)
263-
log.Println("Scheduling next repeated job...")
264-
go scheduleRepeatedJob(daprClient)
283+
for {
284+
select {
285+
case <-ctx.Done():
286+
return
287+
case <-repeatDoneCh:
288+
log.Println("Received input that maxRepeatTriggerCount was reached. Sleeping...")
289+
time.Sleep(60 * time.Second)
290+
log.Println("Scheduling next repeated job...")
291+
go scheduleRepeatedJob(ctx, daprClient)
292+
}
265293
}
266294
}(ctx)
267295

@@ -274,13 +302,13 @@ func main() {
274302
case <-receivedSingleJobDoneCh:
275303
log.Println("Received input that the create-delete-job triggered, now deleting the job...")
276304
// received triggered job, now delete it & set atomic int to 0
277-
deleteSingleJob(daprClient)
305+
deleteSingleJob(ctx, daprClient)
278306
log.Println("Successfully deleted create-delete-job.")
279307
}
280308
}
281309
}(ctx)
282310

283-
go scheduleSingleJob(daprClient)
311+
go scheduleSingleJob(ctx, daprClient)
284312

285313
// Reschedule the create-delete job after deletion, ensure triggers once
286314
go func(ctx context.Context) {
@@ -292,7 +320,7 @@ func main() {
292320
log.Println("Received input that the create-delete-job was deleted. Sleeping for 5 seconds...")
293321
time.Sleep(5 * time.Second)
294322
log.Println("Scheduling create-delete-job...")
295-
scheduleSingleJob(daprClient)
323+
scheduleSingleJob(ctx, daprClient)
296324
log.Println("Successfully scheduled create-delete-job.")
297325
}
298326
}
-116 KB
Binary file not shown.
-149 KB
Binary file not shown.

0 commit comments

Comments
 (0)