forked from neoxelox/kit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
enqueuer.go
122 lines (100 loc) · 2.71 KB
/
enqueuer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
package kit
import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"runtime"
"time"
"github.com/hibiken/asynq"
)
const (
_ENQUEUER_REDIS_DSN = "%s:%d"
)
var (
_ENQUEUER_DEFAULT_MAX_CONNS = 10 * runtime.GOMAXPROCS(-1)
_ENQUEUER_DEFAULT_READ_TIMEOUT = 30 * time.Second
_ENQUEUER_DEFAULT_WRITE_TIMEOUT = 30 * time.Second
_ENQUEUER_DEFAULT_DIAL_TIMEOUT = 30 * time.Second
)
type EnqueuerConfig struct {
CacheHost string
CachePort int
CacheSSLMode bool
CachePassword string
CacheMaxConns *int
CacheReadTimeout *time.Duration
CacheWriteTimeout *time.Duration
CacheDialTimeout *time.Duration
}
type Enqueuer struct {
config EnqueuerConfig
observer Observer
client asynq.Client
}
func NewEnqueuer(observer Observer, config EnqueuerConfig) *Enqueuer {
if config.CacheMaxConns == nil {
config.CacheMaxConns = ptr(_ENQUEUER_DEFAULT_MAX_CONNS)
}
if config.CacheReadTimeout == nil {
config.CacheReadTimeout = ptr(_ENQUEUER_DEFAULT_READ_TIMEOUT)
}
if config.CacheWriteTimeout == nil {
config.CacheWriteTimeout = ptr(_ENQUEUER_DEFAULT_WRITE_TIMEOUT)
}
if config.CacheDialTimeout == nil {
config.CacheDialTimeout = ptr(_ENQUEUER_DEFAULT_DIAL_TIMEOUT)
}
dsn := fmt.Sprintf(_ENQUEUER_REDIS_DSN, config.CacheHost, config.CachePort)
var ssl *tls.Config
if config.CacheSSLMode {
ssl = &tls.Config{
MinVersion: tls.VersionTLS12,
}
}
redisConfig := asynq.RedisClientOpt{
Addr: dsn,
TLSConfig: ssl,
Password: config.CachePassword,
DialTimeout: *config.CacheDialTimeout,
ReadTimeout: *config.CacheReadTimeout,
WriteTimeout: *config.CacheWriteTimeout,
PoolSize: *config.CacheMaxConns,
}
return &Enqueuer{
config: config,
observer: observer,
client: *asynq.NewClient(redisConfig),
}
}
func (self *Enqueuer) Enqueue(ctx context.Context, task string, params any, options ...asynq.Option) error {
payload, err := json.Marshal(params)
if err != nil {
return ErrEnqueuerGeneric().Wrap(err)
}
info, err := self.client.EnqueueContext(ctx, asynq.NewTask(task, payload), options...)
if err != nil {
return ErrEnqueuerGeneric().Wrap(err)
}
self.observer.Infof(ctx, "Enqueued task %s on queue %s with id %s", info.Type, info.Queue, info.ID)
return nil
}
func (self *Enqueuer) Close(ctx context.Context) error {
err := Utils.Deadline(ctx, func(exceeded <-chan struct{}) error {
self.observer.Info(ctx, "Closing enqueuer")
err := self.client.Close()
if err != nil {
return ErrEnqueuerGeneric().WrapAs(err)
}
self.observer.Info(ctx, "Closed enqueuer")
return nil
})
switch {
case err == nil:
return nil
case ErrDeadlineExceeded().Is(err):
return ErrEnqueuerTimedOut()
default:
return ErrEnqueuerGeneric().Wrap(err)
}
}