Skip to content

Commit

Permalink
feat(notification): add postgres queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Oct 13, 2022
1 parent f5c3963 commit f4fbc00
Show file tree
Hide file tree
Showing 41 changed files with 1,306 additions and 252 deletions.
108 changes: 108 additions & 0 deletions cli/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package cli

import (
"fmt"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/salt/cmdx"
"github.com/odpf/salt/log"
"github.com/odpf/salt/printer"
"github.com/odpf/siren/config"
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/plugins/queues"
"github.com/odpf/siren/plugins/queues/postgresq"
"github.com/spf13/cobra"
)

func jobCmd(cmdxConfig *cmdx.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "job <command>",
Aliases: []string{"jobs"},
Short: "Manage siren jobs",
Long: "Jobs management commands.",
Example: heredoc.Doc(`
$ siren job run cleanup_queue
`),
}

cmd.AddCommand(
jobRunCommand(cmdxConfig),
)

return cmd
}

func jobRunCommand(cmdxConfig *cmdx.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "Trigger a job",
Long: heredoc.Doc(`
Trigger a job
`),
Args: cobra.ExactValidArgs(1),
ValidArgs: []string{
"cleanup_queue",
},
Example: heredoc.Doc(`
$ siren job run cleanup_queue
`),
}

cmd.AddCommand(
jobRunCleanupQueueCommand(),
)

return cmd
}

func jobRunCleanupQueueCommand() *cobra.Command {
var configFile string

cmd := &cobra.Command{
Use: "cleanup_queue",
Short: "Cleanup stale messages in queue",
Long: heredoc.Doc(`
Cleaning up all published messages in queue with last updated
more than specific threshold (default 7 days) from now() and
(Optional) cleaning up all pending messages in queue with last updated
more than specific threshold (default 7 days) from now().
`),
Example: heredoc.Doc(`
$ siren job run cleanup_queue
`),
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.Load(configFile)
if err != nil {
return err
}

var queue notification.Queuer
switch cfg.Notification.Queue.Kind {
case queues.KindPostgres:
queue, err = postgresq.New(log.NewZap(), cfg.DB)
if err != nil {
return err
}
default:
printer.Info("Cleanup queue job only works for postgres queue")
return nil
}

spinner := printer.Spin("")
defer spinner.Stop()
printer.Info("Running job cleanup_queue(%s)", string(cfg.Notification.Queue.Kind))
if err := queue.Cleanup(cmd.Context(), queues.FilterCleanup{}); err != nil {
return err
}
spinner.Stop()
printer.Success(fmt.Sprintf("Job cleanup_queue(%s) finished", string(cfg.Notification.Queue.Kind)))
printer.Space()
printer.SuccessIcon()

return nil
},
}

cmd.Flags().StringVarP(&configFile, "config", "c", "config.yaml", "Config file path")
return cmd
}
1 change: 1 addition & 0 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func New() *cobra.Command {
rootCmd.AddCommand(templatesCmd(cmdxConfig))
rootCmd.AddCommand(rulesCmd(cmdxConfig))
rootCmd.AddCommand(alertsCmd(cmdxConfig))
rootCmd.AddCommand(jobCmd(cmdxConfig))

// Help topics
cmdx.SetHelp(rootCmd)
Expand Down
59 changes: 52 additions & 7 deletions cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,12 @@ import (
"github.com/odpf/siren/pkg/retry"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/pkg/telemetry"
"github.com/odpf/siren/pkg/worker.go"
"github.com/odpf/siren/pkg/zaputil"
"github.com/odpf/siren/plugins/providers/cortex"
"github.com/odpf/siren/plugins/queues"
"github.com/odpf/siren/plugins/queues/inmemory"
"github.com/odpf/siren/plugins/queues/postgresq"
"github.com/odpf/siren/plugins/receivers/httpreceiver"
"github.com/odpf/siren/plugins/receivers/pagerduty"
"github.com/odpf/siren/plugins/receivers/slack"
Expand Down Expand Up @@ -152,7 +155,7 @@ func StartServer(ctx context.Context, cfg config.Config) error {
return err
}

encryptor, err := secret.New(cfg.EncryptionKey)
encryptor, err := secret.New(cfg.Service.EncryptionKey)
if err != nil {
return fmt.Errorf("cannot initialize encryptor: %w", err)
}
Expand Down Expand Up @@ -242,8 +245,21 @@ func StartServer(ctx context.Context, cfg config.Config) error {
receiver.TypeHTTP: httpreceiverNotificationService,
}

// for dev purpose only, should not be used in production
queue := inmemory.New(logger, 50)
var queue, dlq notification.Queuer
switch cfg.Notification.Queue.Kind {
case queues.KindPostgres:
queue, err = postgresq.New(logger, cfg.DB)
if err != nil {
return err
}
dlq, err = postgresq.New(logger, cfg.DB, postgresq.WithStrategy(queues.StrategyDLQ))
if err != nil {
return err
}
default:
queue = inmemory.New(logger, 50)
dlq = inmemory.New(logger, 10)
}
notificationService := notification.NewService(logger, queue, receiverService, subscriptionService, notifierRegistry)

apiDeps := &api.Deps{
Expand All @@ -258,12 +274,38 @@ func StartServer(ctx context.Context, cfg config.Config) error {
}

// run worker
notificationHandler := notification.NewHandler(logger, queue, notifierRegistry)

cancelWorkerChan := make(chan struct{})
wg := &sync.WaitGroup{}
wg.Add(1)
go notificationHandler.RunHandler(ctx, wg, cancelWorkerChan)

if len(cfg.Notification.MessageHandlers) == 0 {
cfg.Notification.MessageHandlers = make([]notification.HandlerConfig, 1)
}
if len(cfg.Notification.DLQHandlers) == 0 {
cfg.Notification.DLQHandlers = make([]notification.HandlerConfig, 1)
}

for _, handlerCfg := range cfg.Notification.MessageHandlers {
wg.Add(1)
workerTicker := worker.NewTicker(logger, worker.WithTickerDuration(handlerCfg.PollDuration))
notificationHandler := notification.NewHandler(handlerCfg, logger, queue, notifierRegistry,
notification.HandlerWithIdentifier(workerTicker.GetID()))
go func() {
workerTicker.Run(ctx, wg, cancelWorkerChan, func(ctx context.Context, runningAt time.Time) error {
return notificationHandler.Process(ctx, runningAt)
})
}()
}
for _, handlerCfg := range cfg.Notification.DLQHandlers {
wg.Add(1)
workerDLQTicker := worker.NewTicker(logger, worker.WithTickerDuration(handlerCfg.PollDuration))
notificationDLQHandler := notification.NewHandler(handlerCfg, logger, dlq, notifierRegistry,
notification.HandlerWithIdentifier("dlq"+workerDLQTicker.GetID()))
go func() {
workerDLQTicker.Run(ctx, wg, cancelWorkerChan, func(ctx context.Context, runningAt time.Time) error {
return notificationDLQHandler.Process(ctx, runningAt)
})
}()
}

err = server.RunServer(
ctx,
Expand All @@ -287,6 +329,9 @@ func StartServer(ctx context.Context, cfg config.Config) error {
if err := queue.Stop(timeoutCtx); err != nil {
logger.Error("error stopping queue", "error", err)
}
if err := dlq.Stop(timeoutCtx); err != nil {
logger.Error("error stopping dlq", "error", err)
}

return err
}
Expand Down
19 changes: 10 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/odpf/salt/config"
"github.com/odpf/salt/db"
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/internal/server"
"github.com/odpf/siren/pkg/errors"
"github.com/odpf/siren/pkg/telemetry"
Expand All @@ -30,17 +31,17 @@ func Load(configFile string) (Config, error) {
}

type Log struct {
Level string `yaml:"level" mapstructure:"level" default:"info"`
GCPCompatible bool `yaml:"gcp_compatible" mapstructure:"gcp_compatible" default:"true"`
Level string `mapstructure:"level" default:"info"`
GCPCompatible bool `mapstructure:"gcp_compatible" default:"true"`
}

// Config contains the application configuration
type Config struct {
DB db.Config `mapstructure:"db"`
Cortex cortex.Config `mapstructure:"cortex"`
NewRelic telemetry.NewRelicConfig `mapstructure:"newrelic"`
Service server.Config `mapstructure:"service"`
Log Log `mapstructure:"log"`
EncryptionKey string `mapstructure:"encryption_key"`
Receivers receivers.Config `mapstructure:"receivers"`
DB db.Config `mapstructure:"db"`
Cortex cortex.Config `mapstructure:"cortex"`
NewRelic telemetry.NewRelicConfig `mapstructure:"newrelic"`
Service server.Config `mapstructure:"service"`
Log Log `mapstructure:"log"`
Receivers receivers.Config `mapstructure:"receivers"`
Notification notification.Config `mapstructure:"notification"`
}
12 changes: 10 additions & 2 deletions config/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,20 @@ newrelic:
service:
host: localhost
port: 8080
encryption_key: ____STRING_OF_32_CHARACTERS_____
encryption_key: ____STRING_OF_32_CHARACTERS_____
receivers:
slack:
api_host:
retry:
max_retry: 4
wait_duration: 20ms
http_client:
timeout_ms: 500
timeout_ms: 500
notification:
queue:
kind: postgres
message_handlers:
- receiver_types: slack,http,pagerduty
poll_duration: 5s
batch_size: 1
dlq_handlers:
19 changes: 19 additions & 0 deletions core/notification/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package notification

import (
"time"

"github.com/odpf/siren/plugins/queues"
)

type Config struct {
Queue queues.Config `mapstructure:"queue"`
MessageHandlers []HandlerConfig `mapstructure:"message_handlers"`
DLQHandlers []HandlerConfig `mapstructure:"dlq_handlers"`
}

type HandlerConfig struct {
PollDuration time.Duration `mapstructure:"poll_duration"`
ReceiverTypes []string `mapstructure:"receiver_types"`
BatchSize int `mapstructure:"batch_size"`
}
Loading

0 comments on commit f4fbc00

Please sign in to comment.