-
Notifications
You must be signed in to change notification settings - Fork 125
/
parse_alert.go
375 lines (349 loc) · 11.3 KB
/
parse_alert.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
package rillv1
import (
"encoding/json"
"errors"
"fmt"
"net/mail"
"strings"
"time"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime/drivers/slack"
"github.com/rilldata/rill/runtime/pkg/pbutil"
"google.golang.org/protobuf/types/known/structpb"
)
// AlertYAML is the raw structure of an Alert resource defined in YAML (does not include common fields)
type AlertYAML struct {
commonYAML `yaml:",inline"` // Not accessed here, only setting it so we can use KnownFields for YAML parsing
Title string `yaml:"title"`
Refresh *ScheduleYAML `yaml:"refresh"`
Watermark string `yaml:"watermark"` // options: "trigger_time", "inherit"
Intervals struct {
Duration string `yaml:"duration"`
Limit uint `yaml:"limit"`
CheckUnclosed bool `yaml:"check_unclosed"`
} `yaml:"intervals"`
Timeout string `yaml:"timeout"`
Data *DataYAML `yaml:"data"`
For struct {
UserID string `yaml:"user_id"`
UserEmail string `yaml:"user_email"`
Attributes map[string]any `yaml:"attributes"`
} `yaml:"for"`
Query struct { // legacy query
Name string `yaml:"name"`
Args map[string]any `yaml:"args"`
ArgsJSON string `yaml:"args_json"`
For struct {
UserID string `yaml:"user_id"`
UserEmail string `yaml:"user_email"`
Attributes map[string]any `yaml:"attributes"`
} `yaml:"for"`
} `yaml:"query"`
OnRecover *bool `yaml:"on_recover"`
OnFail *bool `yaml:"on_fail"`
OnError *bool `yaml:"on_error"`
Renotify *bool `yaml:"renotify"`
RenotifyAfter string `yaml:"renotify_after"`
Notify struct {
Email struct {
Recipients []string `yaml:"recipients"`
} `yaml:"email"`
Slack struct {
Users []string `yaml:"users"`
Channels []string `yaml:"channels"`
Webhooks []string `yaml:"webhooks"`
} `yaml:"slack"`
} `yaml:"notify"`
Annotations map[string]string `yaml:"annotations"`
// Backwards compatibility
Email struct {
Recipients []string `yaml:"recipients"`
OnRecover *bool `yaml:"on_recover"`
OnFail *bool `yaml:"on_fail"`
OnError *bool `yaml:"on_error"`
Renotify *bool `yaml:"renotify"`
RenotifyAfter string `yaml:"renotify_after"`
} `yaml:"email"`
}
// parseAlert parses an alert definition and adds the resulting resource to p.Resources.
func (p *Parser) parseAlert(node *Node) error {
// Parse YAML
tmp := &AlertYAML{}
err := p.decodeNodeYAML(node, true, tmp)
if err != nil {
return err
}
// Validate SQL or connector isn't set
if node.SQL != "" {
return fmt.Errorf("alerts cannot have SQL")
}
if !node.ConnectorInferred && node.Connector != "" {
return fmt.Errorf("alerts cannot have a connector")
}
// Parse refresh schedule
schedule, err := parseScheduleYAML(tmp.Refresh)
if err != nil {
return err
}
// Parse watermark
watermarkInherit := false
if tmp.Watermark != "" {
switch strings.ToLower(tmp.Watermark) {
case "inherit":
watermarkInherit = true
case "trigger_time":
// Do nothing
default:
return fmt.Errorf(`invalid value %q for property "watermark"`, tmp.Watermark)
}
}
// Validate the interval duration as a standard ISO8601 duration (without Rill extensions) with only one component
if tmp.Intervals.Duration != "" {
err := validateISO8601(tmp.Intervals.Duration, true, true)
if err != nil {
return fmt.Errorf(`invalid value %q for property "intervals.duration"`, tmp.Intervals.Duration)
}
}
// Parse timeout
var timeout time.Duration
if tmp.Timeout != "" {
timeout, err = parseDuration(tmp.Timeout)
if err != nil {
return err
}
}
// Data and query
var resolver string
var resolverProps *structpb.Struct
var queryForUserID, queryForUserEmail string
var queryForAttributes *structpb.Struct
isLegacyQuery := tmp.Data == nil
if !isLegacyQuery {
var refs []ResourceName
resolver, resolverProps, refs, err = p.parseDataYAML(tmp.Data)
if err != nil {
return fmt.Errorf(`failed to parse "data": %w`, err)
}
node.Refs = append(node.Refs, refs...)
// Query for: validate only one of user_id, user_email, or attributes is set
n := 0
if tmp.For.UserID != "" {
n++
queryForUserID = tmp.For.UserID
}
if tmp.For.UserEmail != "" {
n++
_, err := mail.ParseAddress(tmp.For.UserEmail)
if err != nil {
return fmt.Errorf(`invalid value %q for property "for.user_email"`, tmp.For.UserEmail)
}
queryForUserEmail = tmp.For.UserEmail
}
if len(tmp.For.Attributes) > 0 {
n++
queryForAttributes, err = structpb.NewStruct(tmp.For.Attributes)
if err != nil {
return fmt.Errorf(`failed to serialize property "for.attributes": %w`, err)
}
}
if n > 1 {
return fmt.Errorf(`only one of "for.user_id", "for.user_email", or "for.attributes" may be set`)
}
} else {
// Query name
if tmp.Query.Name == "" {
return fmt.Errorf(`invalid value %q for property "query.name"`, tmp.Query.Name)
}
// Query args
if tmp.Query.ArgsJSON != "" {
// Validate JSON
if !json.Valid([]byte(tmp.Query.ArgsJSON)) {
return errors.New(`failed to parse "query.args_json" as JSON`)
}
} else {
// Fall back to query.args if query.args_json is not set
data, err := json.Marshal(tmp.Query.Args)
if err != nil {
return fmt.Errorf(`failed to serialize "query.args" to JSON: %w`, err)
}
tmp.Query.ArgsJSON = string(data)
}
if tmp.Query.ArgsJSON == "" {
return errors.New(`missing query args (must set either "query.args" or "query.args_json")`)
}
// Query for: validate only one of user_id, user_email, or attributes is set
n := 0
if tmp.Query.For.UserID != "" {
n++
queryForUserID = tmp.Query.For.UserID
}
if tmp.Query.For.UserEmail != "" {
n++
_, err := mail.ParseAddress(tmp.Query.For.UserEmail)
if err != nil {
return fmt.Errorf(`invalid value %q for property "query.for.user_email"`, tmp.Query.For.UserEmail)
}
queryForUserEmail = tmp.Query.For.UserEmail
}
if len(tmp.Query.For.Attributes) > 0 {
n++
queryForAttributes, err = structpb.NewStruct(tmp.Query.For.Attributes)
if err != nil {
return fmt.Errorf(`failed to serialize property "query.for.attributes": %w`, err)
}
}
if n > 1 {
return fmt.Errorf(`only one of "query.for.user_id", "query.for.user_email", or "query.for.attributes" may be set`)
}
resolver = "legacy_metrics"
props := map[string]any{
"query_name": tmp.Query.Name,
"query_args_json": tmp.Query.ArgsJSON,
}
resolverProps, err = structpb.NewStruct(props)
if err != nil {
return fmt.Errorf("encountered invalid property type: %w", err)
}
}
if len(tmp.Email.Recipients) > 0 && len(tmp.Notify.Email.Recipients) > 0 {
return errors.New(`cannot set both "email.recipients" and "notify.email.recipients"`)
}
isLegacyNotify := len(tmp.Email.Recipients) > 0
// Validate the input
var renotifyAfter time.Duration
if isLegacyNotify {
// Backwards compatibility
// Validate email recipients
for _, email := range tmp.Email.Recipients {
_, err := mail.ParseAddress(email)
if err != nil {
return fmt.Errorf("invalid recipient email address %q", email)
}
}
// Validate email.renotify_after
if tmp.Email.RenotifyAfter != "" {
renotifyAfter, err = parseDuration(tmp.Email.RenotifyAfter)
if err != nil {
return fmt.Errorf(`invalid value for property "email.renotify_after": %w`, err)
}
}
} else {
// Validate email recipients
for _, email := range tmp.Notify.Email.Recipients {
_, err := mail.ParseAddress(email)
if err != nil {
return fmt.Errorf("invalid recipient email address %q", email)
}
}
// Validate renotify_after
if tmp.RenotifyAfter != "" {
renotifyAfter, err = parseDuration(tmp.RenotifyAfter)
if err != nil {
return fmt.Errorf(`invalid value for property "renotify_after": %w`, err)
}
}
}
// Track alert
r, err := p.insertResource(ResourceKindAlert, node.Name, node.Paths, node.Refs...)
if err != nil {
return err
}
// NOTE: After calling insertResource, an error must not be returned. Any validation should be done before calling it.
r.AlertSpec.Title = tmp.Title
if schedule != nil {
r.AlertSpec.RefreshSchedule = schedule
}
r.AlertSpec.WatermarkInherit = watermarkInherit
r.AlertSpec.IntervalsIsoDuration = tmp.Intervals.Duration
r.AlertSpec.IntervalsLimit = int32(tmp.Intervals.Limit)
r.AlertSpec.IntervalsCheckUnclosed = tmp.Intervals.CheckUnclosed
if timeout != 0 {
r.AlertSpec.TimeoutSeconds = uint32(timeout.Seconds())
}
r.AlertSpec.Resolver = resolver
r.AlertSpec.ResolverProperties = resolverProps
// Note: have already validated that at most one of the cases match
if queryForUserID != "" {
r.AlertSpec.QueryFor = &runtimev1.AlertSpec_QueryForUserId{QueryForUserId: queryForUserID}
} else if queryForUserEmail != "" {
r.AlertSpec.QueryFor = &runtimev1.AlertSpec_QueryForUserEmail{QueryForUserEmail: queryForUserEmail}
} else if queryForAttributes != nil {
r.AlertSpec.QueryFor = &runtimev1.AlertSpec_QueryForAttributes{QueryForAttributes: queryForAttributes}
}
// Notification default settings
r.AlertSpec.NotifyOnRecover = false
r.AlertSpec.NotifyOnFail = true
r.AlertSpec.NotifyOnError = false
r.AlertSpec.Renotify = false
if isLegacyNotify {
// Backwards compatibility
// Override email notification defaults
if tmp.Email.OnRecover != nil {
r.AlertSpec.NotifyOnRecover = *tmp.Email.OnRecover
}
if tmp.Email.OnFail != nil {
r.AlertSpec.NotifyOnFail = *tmp.Email.OnFail
}
if tmp.Email.OnError != nil {
r.AlertSpec.NotifyOnError = *tmp.Email.OnError
}
if tmp.Email.Renotify != nil {
r.AlertSpec.Renotify = *tmp.Email.Renotify
r.AlertSpec.RenotifyAfterSeconds = uint32(renotifyAfter.Seconds())
}
// Email settings
notifier, err := structpb.NewStruct(map[string]any{
"recipients": pbutil.ToSliceAny(tmp.Email.Recipients),
})
if err != nil {
return fmt.Errorf("encountered invalid property type: %w", err)
}
r.AlertSpec.Notifiers = []*runtimev1.Notifier{
{
Connector: "email",
Properties: notifier,
},
}
} else {
// Override notification defaults
if tmp.OnRecover != nil {
r.AlertSpec.NotifyOnRecover = *tmp.OnRecover
}
if tmp.OnFail != nil {
r.AlertSpec.NotifyOnFail = *tmp.OnFail
}
if tmp.OnError != nil {
r.AlertSpec.NotifyOnError = *tmp.OnError
}
if tmp.Renotify != nil {
r.AlertSpec.Renotify = *tmp.Renotify
r.AlertSpec.RenotifyAfterSeconds = uint32(renotifyAfter.Seconds())
}
// Email settings
if len(tmp.Notify.Email.Recipients) > 0 {
props, err := structpb.NewStruct(map[string]any{
"recipients": pbutil.ToSliceAny(tmp.Notify.Email.Recipients),
})
if err != nil {
return fmt.Errorf("encountered invalid property type: %w", err)
}
r.AlertSpec.Notifiers = append(r.AlertSpec.Notifiers, &runtimev1.Notifier{
Connector: "email",
Properties: props,
})
}
// Slack settings
if len(tmp.Notify.Slack.Channels) > 0 || len(tmp.Notify.Slack.Users) > 0 || len(tmp.Notify.Slack.Webhooks) > 0 {
props, err := structpb.NewStruct(slack.EncodeProps(tmp.Notify.Slack.Users, tmp.Notify.Slack.Channels, tmp.Notify.Slack.Webhooks))
if err != nil {
return err
}
r.AlertSpec.Notifiers = append(r.AlertSpec.Notifiers, &runtimev1.Notifier{
Connector: "slack",
Properties: props,
})
}
}
r.AlertSpec.Annotations = tmp.Annotations
return nil
}