Skip to content

Commit

Permalink
feat(notification): add postgres queue
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Oct 21, 2022
1 parent f5c3963 commit a455dd6
Show file tree
Hide file tree
Showing 79 changed files with 2,310 additions and 629 deletions.
148 changes: 148 additions & 0 deletions cli/deps.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package cli

import (
"fmt"

"github.com/odpf/salt/log"
"github.com/odpf/siren/config"
"github.com/odpf/siren/core/alert"
"github.com/odpf/siren/core/namespace"
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/core/provider"
"github.com/odpf/siren/core/receiver"
"github.com/odpf/siren/core/rule"
"github.com/odpf/siren/core/subscription"
"github.com/odpf/siren/core/template"
"github.com/odpf/siren/internal/api"
"github.com/odpf/siren/internal/store/postgres"
"github.com/odpf/siren/pkg/httpclient"
"github.com/odpf/siren/pkg/retry"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/plugins/providers/cortex"
"github.com/odpf/siren/plugins/receivers/httpreceiver"
"github.com/odpf/siren/plugins/receivers/pagerduty"
"github.com/odpf/siren/plugins/receivers/slack"
)

type ReceiverClient struct {
SlackClient *slack.Client
PagerDutyClient *pagerduty.Client
HTTPReceiverClient *httpreceiver.Client
}

type ProviderClient struct {
CortexClient *cortex.Client
}

func InitAPIDeps(
logger log.Logger,
cfg config.Config,
pgClient *postgres.Client,
encryptor *secret.Crypto,
queue notification.Queuer,
) (*api.Deps, *ReceiverClient, *ProviderClient, map[string]notification.Notifier, error) {
templateRepository := postgres.NewTemplateRepository(pgClient)
templateService := template.NewService(templateRepository)

alertRepository := postgres.NewAlertRepository(pgClient)
alertHistoryService := alert.NewService(alertRepository)

providerRepository := postgres.NewProviderRepository(pgClient)
providerService := provider.NewService(providerRepository)

namespaceRepository := postgres.NewNamespaceRepository(pgClient)
namespaceService := namespace.NewService(encryptor, namespaceRepository)

cortexClient, err := cortex.NewClient(cortex.Config{Address: cfg.Cortex.Address})
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to init cortex client: %w", err)
}
cortexProviderService := cortex.NewProviderService(cortexClient)

ruleRepository := postgres.NewRuleRepository(pgClient)
ruleService := rule.NewService(
ruleRepository,
templateService,
namespaceService,
map[string]rule.RuleUploader{
provider.TypeCortex: cortexProviderService,
},
)

// plugin receiver services
slackHTTPClient := httpclient.New(cfg.Receivers.Slack.HTTPClient)
slackRetrier := retry.New(cfg.Receivers.Slack.Retry)
slackClient := slack.NewClient(
cfg.Receivers.Slack,
slack.ClientWithHTTPClient(slackHTTPClient),
slack.ClientWithRetrier(slackRetrier),
)
pagerdutyHTTPClient := httpclient.New(cfg.Receivers.Pagerduty.HTTPClient)
pagerdutyRetrier := retry.New(cfg.Receivers.Slack.Retry)
pagerdutyClient := pagerduty.NewClient(
cfg.Receivers.Pagerduty,
pagerduty.ClientWithHTTPClient(pagerdutyHTTPClient),
pagerduty.ClientWithRetrier(pagerdutyRetrier),
)
httpreceiverHTTPClient := httpclient.New(cfg.Receivers.HTTPReceiver.HTTPClient)
httpreceiverRetrier := retry.New(cfg.Receivers.Slack.Retry)
httpreceiverClient := httpreceiver.NewClient(
logger,
cfg.Receivers.HTTPReceiver,
httpreceiver.ClientWithHTTPClient(httpreceiverHTTPClient),
httpreceiver.ClientWithRetrier(httpreceiverRetrier),
)

slackReceiverService := slack.NewReceiverService(slackClient, encryptor)
httpReceiverService := httpreceiver.NewReceiverService()
pagerDutyReceiverService := pagerduty.NewReceiverService()

receiverRepository := postgres.NewReceiverRepository(pgClient)
receiverService := receiver.NewService(
receiverRepository,
map[string]receiver.ConfigResolver{
receiver.TypeSlack: slackReceiverService,
receiver.TypeHTTP: httpReceiverService,
receiver.TypePagerDuty: pagerDutyReceiverService,
},
)

subscriptionRepository := postgres.NewSubscriptionRepository(pgClient)
subscriptionService := subscription.NewService(
subscriptionRepository,
namespaceService,
receiverService,
subscription.RegisterProviderPlugin(provider.TypeCortex, cortexProviderService),
)

// notification
slackNotificationService := slack.NewNotificationService(slackClient, encryptor)
pagerdutyNotificationService := pagerduty.NewNotificationService(pagerdutyClient)
httpreceiverNotificationService := httpreceiver.NewNotificationService(httpreceiverClient)

notifierRegistry := map[string]notification.Notifier{
receiver.TypeSlack: slackNotificationService,
receiver.TypePagerDuty: pagerdutyNotificationService,
receiver.TypeHTTP: httpreceiverNotificationService,
}

notificationService := notification.NewService(logger, queue, receiverService, subscriptionService, notifierRegistry)

return &api.Deps{
TemplateService: templateService,
RuleService: ruleService,
AlertService: alertHistoryService,
ProviderService: providerService,
NamespaceService: namespaceService,
ReceiverService: receiverService,
SubscriptionService: subscriptionService,
NotificationService: notificationService,
}, &ReceiverClient{
SlackClient: slackClient,
PagerDutyClient: pagerdutyClient,
HTTPReceiverClient: httpreceiverClient,
}, &ProviderClient{
CortexClient: cortexClient,
}, notifierRegistry,
nil
}
108 changes: 108 additions & 0 deletions cli/job.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package cli

import (
"fmt"

"github.com/MakeNowJust/heredoc"
"github.com/odpf/salt/cmdx"
"github.com/odpf/salt/log"
"github.com/odpf/salt/printer"
"github.com/odpf/siren/config"
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/plugins/queues"
"github.com/odpf/siren/plugins/queues/postgresq"
"github.com/spf13/cobra"
)

func jobCmd(cmdxConfig *cmdx.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "job <command>",
Aliases: []string{"jobs"},
Short: "Manage siren jobs",
Long: "Jobs management commands.",
Example: heredoc.Doc(`
$ siren job run cleanup_queue
`),
}

cmd.AddCommand(
jobRunCommand(cmdxConfig),
)

return cmd
}

func jobRunCommand(cmdxConfig *cmdx.Config) *cobra.Command {
cmd := &cobra.Command{
Use: "run",
Short: "Trigger a job",
Long: heredoc.Doc(`
Trigger a job
`),
Args: cobra.ExactValidArgs(1),
ValidArgs: []string{
"cleanup_queue",
},
Example: heredoc.Doc(`
$ siren job run cleanup_queue
`),
}

cmd.AddCommand(
jobRunCleanupQueueCommand(),
)

return cmd
}

func jobRunCleanupQueueCommand() *cobra.Command {
var configFile string

cmd := &cobra.Command{
Use: "cleanup_queue",
Short: "Cleanup stale messages in queue",
Long: heredoc.Doc(`
Cleaning up all published messages in queue with last updated
more than specific threshold (default 7 days) from now() and
(Optional) cleaning up all pending messages in queue with last updated
more than specific threshold (default 7 days) from now().
`),
Example: heredoc.Doc(`
$ siren job run cleanup_queue
`),
RunE: func(cmd *cobra.Command, args []string) error {
cfg, err := config.Load(configFile)
if err != nil {
return err
}

var queue notification.Queuer
switch cfg.Notification.Queue.Kind {
case queues.KindPostgres:
queue, err = postgresq.New(log.NewZap(), cfg.DB)
if err != nil {
return err
}
default:
printer.Info("Cleanup queue job only works for postgres queue")
return nil
}

spinner := printer.Spin("")
defer spinner.Stop()
printer.Info("Running job cleanup_queue(%s)", cfg.Notification.Queue.Kind.String())
if err := queue.Cleanup(cmd.Context(), queues.FilterCleanup{}); err != nil {
return err
}
spinner.Stop()
printer.Success(fmt.Sprintf("Job cleanup_queue(%s) finished", cfg.Notification.Queue.Kind.String()))
printer.Space()
printer.SuccessIcon()

return nil
},
}

cmd.Flags().StringVarP(&configFile, "config", "c", "config.yaml", "Config file path")
return cmd
}
2 changes: 2 additions & 0 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ func New() *cobra.Command {
rootCmd.AddCommand(templatesCmd(cmdxConfig))
rootCmd.AddCommand(rulesCmd(cmdxConfig))
rootCmd.AddCommand(alertsCmd(cmdxConfig))
rootCmd.AddCommand(jobCmd(cmdxConfig))
rootCmd.AddCommand(workerCmd())

// Help topics
cmdx.SetHelp(rootCmd)
Expand Down
Loading

0 comments on commit a455dd6

Please sign in to comment.