Skip to content

Commit

Permalink
feat(notification): add idempotency in notification api
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Dec 16, 2022
1 parent ac9f5fc commit 77988f6
Show file tree
Hide file tree
Showing 34 changed files with 836 additions and 188 deletions.
3 changes: 2 additions & 1 deletion cli/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func InitAPIDeps(
encryptor *secret.Crypto,
queue notification.Queuer,
) (*api.Deps, map[string]notification.Notifier, error) {
idempotencyRepository := postgres.NewIdempotencyRepository(pgClient)
templateRepository := postgres.NewTemplateRepository(pgClient)
templateService := template.NewService(templateRepository)

Expand Down Expand Up @@ -90,7 +91,7 @@ func InitAPIDeps(
receiver.TypeFile: filePluginService,
}

notificationService := notification.NewService(logger, queue, receiverService, subscriptionService, notifierRegistry)
notificationService := notification.NewService(logger, queue, idempotencyRepository, receiverService, subscriptionService, notifierRegistry)

return &api.Deps{
TemplateService: templateService,
Expand Down
3 changes: 2 additions & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/odpf/siren/config"
"github.com/odpf/siren/core/notification"
"github.com/odpf/siren/internal/server"
"github.com/odpf/siren/internal/store/postgres"
"github.com/odpf/siren/pkg/pgc"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/pkg/telemetry"
Expand Down Expand Up @@ -113,7 +114,7 @@ func serverMigrateCommand() *cobra.Command {
return err
}

if err := pgc.Migrate(cfg.DB); err != nil {
if err := postgres.Migrate(cfg.DB); err != nil {
return err
}
printer.Success("Migration done")
Expand Down
14 changes: 14 additions & 0 deletions core/idempotency/idempotency.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package idempotency

import (
"time"
)

type Idempotency struct {
ID uint64
Scope string
Key string
Success bool
CreatedAt time.Time
UpdatedAt time.Time
}
14 changes: 8 additions & 6 deletions core/notification/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,20 @@ func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

message.MarkPending(time.Now())

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessagePending,
telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageCounter,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

newConfig, err := notifier.PostHookQueueTransformConfigs(ctx, message.Configs)
if err != nil {
message.MarkFailed(time.Now(), false, err)

telemetry.IncrementInt64Counter(ctx, telemetry.MetricReceiverPostHookQueueFailed,
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))
telemetry.IncrementInt64Counter(ctx, telemetry.MetricReceiverHookFailed,
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType),
tag.Upsert(telemetry.TagHookCondition, telemetry.HookConditionPostHookQueue),
)

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageFailed,
telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageCounter,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

Expand All @@ -141,7 +143,7 @@ func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error
if retryable, err := notifier.Send(ctx, message); err != nil {
message.MarkFailed(time.Now(), retryable, err)

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageFailed,
telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageCounter,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

Expand All @@ -153,7 +155,7 @@ func (h *Handler) MessageHandler(ctx context.Context, messages []Message) error

message.MarkPublished(time.Now())

telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessagePublished,
telemetry.IncrementInt64Counter(ctx, telemetry.MetricNotificationMessageCounter,
tag.Upsert(telemetry.TagMessageStatus, message.Status.String()),
tag.Upsert(telemetry.TagReceiverType, message.ReceiverType))

Expand Down
125 changes: 125 additions & 0 deletions core/notification/mocks/idempotency_repository.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions core/notification/notification.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package notification

import (
"context"
"time"

"github.com/odpf/siren/core/idempotency"
"github.com/odpf/siren/pkg/errors"
)

//go:generate mockery --name=IdempotencyRepository -r --case underscore --with-expecter --structname IdempotencyRepository --filename idempotency_repository.go --output=./mocks
type IdempotencyRepository interface {
InsertOnConflictReturning(context.Context, string, string) (*idempotency.Idempotency, error)
UpdateSuccess(context.Context, uint64, bool) error
}

// Notification is a model of notification
type Notification struct {
ID string
Expand Down
Loading

0 comments on commit 77988f6

Please sign in to comment.