-
Notifications
You must be signed in to change notification settings - Fork 19
/
kafka.go
82 lines (68 loc) · 2.11 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// Copyright 2020 The Moov Authors
// Use of this source code is governed by an Apache License
// license that can be found in the LICENSE file.
package notify
import (
"context"
"fmt"
"github.com/moov-io/achgateway/internal/kafka"
"github.com/moov-io/achgateway/internal/service"
"github.com/moov-io/achgateway/pkg/compliance"
"github.com/moov-io/achgateway/pkg/models"
"github.com/moov-io/base/log"
"gocloud.dev/pubsub"
)
type Kafka struct {
publisher *pubsub.Topic
cfg *service.KafkaConfig
}
func NewKafka(logger log.Logger, cfg *service.KafkaConfig) (*Kafka, error) {
publisher, err := kafka.OpenTopic(logger, cfg)
if err != nil {
return nil, err
}
return &Kafka{publisher: publisher, cfg: cfg}, nil
}
type event struct {
Direction Direction `json:"direction"`
FileName string `json:"fileName"`
Entries int `json:"entries"`
DebitTotal string `json:"debitTotal"`
CreditTotal string `json:"creditTotal"`
Hostname string `json:"hostname"`
UploadStatus uploadStatus `json:"uploadStatus"`
}
func (s *Kafka) Info(ctx context.Context, msg *Message) error {
event := marshalKafkaMessage(success, msg)
return s.send(ctx, event)
}
func (s *Kafka) Critical(ctx context.Context, msg *Message) error {
event := marshalKafkaMessage(failed, msg)
return s.send(ctx, event)
}
func marshalKafkaMessage(status uploadStatus, msg *Message) event {
entries := countEntries(msg.File)
debitTotal := convertDollar(msg.File.Control.TotalDebitEntryDollarAmountInFile)
creditTotal := convertDollar(msg.File.Control.TotalCreditEntryDollarAmountInFile)
return event{
UploadStatus: status,
Direction: msg.Direction,
FileName: msg.Filename,
Entries: entries,
DebitTotal: debitTotal,
CreditTotal: creditTotal,
Hostname: msg.Hostname,
}
}
func (s *Kafka) send(ctx context.Context, evt event) error {
bs, err := compliance.Protect(s.cfg.Transform, models.Event{
Type: "",
Event: evt,
})
if err != nil {
return fmt.Errorf("unable to protect notifer kafka event: %v", err)
}
return s.publisher.Send(ctx, &pubsub.Message{
Body: bs,
})
}