Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Billing msg metadata #155

Merged
merged 3 commits into from
Dec 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,11 +826,11 @@ func (ts *BackendTestSuite) TestOutgoingQueue() {
dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d")
ts.NotNil(dbMsg)

// serialize our message
msgJSON, err := json.Marshal([]interface{}{dbMsg})
ts.NoError(err)
var err error

err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, string(msgJSON), queue.HighPriority)
msgStrJSON := `[{"org_id":1,"id":10000,"uuid":"00000000-0000-0000-0000-000000000000","direction":"O","status":"F","visibility":"V","high_priority":true,"urn":"","urn_auth":"","text":"test message","attachments":null,"external_id":"ext1","response_to_id":null,"response_to_external_id":"","metadata":"{\"ticketer_id\":1}","channel_id":10,"contact_id":100,"contact_urn_id":1000,"msg_count":1,"error_count":3,"channel_uuid":"dbc126ed-66bc-4e28-b67b-81dc3327c95d","contact_name":"","next_attempt":"2024-11-06T20:45:31.123208Z","created_on":"2024-11-06T20:30:14.898168Z","modified_on":"2024-11-06T20:30:31.122974Z","queued_on":"2024-11-06T20:30:14.898168Z","sent_on":null}]`

err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, msgStrJSON, queue.HighPriority)
ts.NoError(err)

// pop a message off our queue
Expand Down
48 changes: 22 additions & 26 deletions billing/billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/sirupsen/logrus"
)

const (
RoutingKeyCreate = "create"
RoutingKeyUpdate = "status-update"
)

// Message represents a object that is sent to the billing service
//
// {
Expand All @@ -31,10 +36,11 @@ type Message struct {
Text string `json:"text,omitempty"`
Attachments []string `json:"attachments,omitempty"`
QuickReplies []string `json:"quick_replies,omitempty"`
FromTicketer bool `json:"from_ticketer"`
}

// Create a new message
func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string) Message {
func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string, fromTicketer bool) Message {
return Message{
ContactURN: contactURN,
ContactUUID: contactUUID,
Expand All @@ -46,24 +52,25 @@ func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, di
Text: text,
Attachments: attachments,
QuickReplies: quickreplies,
FromTicketer: fromTicketer,
}
}

// Client represents a client interface for billing service
type Client interface {
Send(msg Message) error
SendAsync(msg Message, pre func(), post func())
Send(msg Message, routingKey string) error
SendAsync(msg Message, routingKey string, pre func(), post func())
}

// rabbitmqRetryClient represents struct that implements billing service client interface
type rabbitmqRetryClient struct {
publisher rabbitroutine.Publisher
conn *rabbitroutine.Connector
queueName string
publisher rabbitroutine.Publisher
conn *rabbitroutine.Connector
exchangeName string
}

// NewRMQBillingResilientClient creates a new billing service client implementation using RabbitMQ with publish retry and reconnect features
func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, queueName string) (Client, error) {
func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, exchangeName string) (Client, error) {
cconn, err := amqp.Dial(url)
if err != nil {
return nil, err
Expand All @@ -75,17 +82,6 @@ func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int,
return nil, errors.Wrap(err, "failed to open a channel to rabbitmq")
}
defer ch.Close()
_, err = ch.QueueDeclare(
queueName,
false,
false,
false,
false,
nil,
)
if err != nil {
return nil, errors.Wrap(err, "failed to declare a queue for billing publisher")
}

conn := rabbitroutine.NewConnector(rabbitroutine.Config{
ReconnectAttempts: 1000,
Expand Down Expand Up @@ -123,19 +119,19 @@ func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int,
}()

return &rabbitmqRetryClient{
publisher: pub,
conn: conn,
queueName: queueName,
publisher: pub,
conn: conn,
exchangeName: exchangeName,
}, nil
}

func (c *rabbitmqRetryClient) Send(msg Message) error {
func (c *rabbitmqRetryClient) Send(msg Message, routingKey string) error {
msgMarshalled, _ := json.Marshal(msg)
ctx := context.Background()
err := c.publisher.Publish(
ctx,
"",
c.queueName,
c.exchangeName,
routingKey,
amqp.Publishing{
ContentType: "application/json",
Body: msgMarshalled,
Expand All @@ -147,7 +143,7 @@ func (c *rabbitmqRetryClient) Send(msg Message) error {
return nil
}

func (c *rabbitmqRetryClient) SendAsync(msg Message, pre func(), post func()) {
func (c *rabbitmqRetryClient) SendAsync(msg Message, routingKey string, pre func(), post func()) {
go func() {
defer func() {
if r := recover(); r != nil {
Expand All @@ -157,7 +153,7 @@ func (c *rabbitmqRetryClient) SendAsync(msg Message, pre func(), post func()) {
if pre != nil {
pre()
}
err := c.Send(msg)
err := c.Send(msg, routingKey)
if err != nil {
logrus.WithError(err).Error("fail to send msg to billing service")
}
Expand Down
84 changes: 71 additions & 13 deletions billing/billing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,48 @@ import (
"github.com/stretchr/testify/assert"
)

const billingTestQueueName = "testqueue"
const (
billingTestExchangeName = "test-exchange"
billingTestQueueName = "test-queue"
)

func initalizeRMQ(ch *amqp.Channel) {
err := ch.ExchangeDeclare(
billingTestExchangeName,
"topic",
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(err)
}

_, err = ch.QueueDeclare(
billingTestQueueName,
true,
false,
false,
false,
nil,
)
if err != nil {
log.Fatal(errors.Wrap(err, "failed to declare a queue for billing publisher"))
}

err = ch.QueueBind(
billingTestQueueName,
"#",
billingTestExchangeName,
false,
nil,
)
if err != nil {
log.Fatal(err)
}
}

func TestInitialization(t *testing.T) {
connURL := "amqp://localhost:5672/"
Expand All @@ -25,9 +66,13 @@ func TestInitialization(t *testing.T) {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
defer ch.Close()

initalizeRMQ(ch)

defer ch.QueueDelete(billingTestQueueName, false, false, false)
defer ch.ExchangeDelete(billingTestExchangeName, false, false)
defer ch.Close()
defer conn.Close()
}

func TestBillingResilientClient(t *testing.T) {
Expand All @@ -38,8 +83,11 @@ func TestBillingResilientClient(t *testing.T) {
if err != nil {
t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer"))
}
defer ch.Close()
initalizeRMQ(ch)
defer ch.QueueDelete(billingTestQueueName, false, false, false)
defer ch.ExchangeDelete(billingTestExchangeName, false, false)
defer ch.Close()
defer conn.Close()

msgUUID, _ := uuid.NewV4()
msg := NewMessage(
Expand All @@ -53,12 +101,13 @@ func TestBillingResilientClient(t *testing.T) {
"hello",
nil,
nil,
false,
)

billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName)
billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName)
time.Sleep(1 * time.Second)
assert.NoError(t, err)
err = billingClient.Send(msg)
err = billingClient.Send(msg, RoutingKeyCreate)
assert.NoError(t, err)

msgs, err := ch.Consume(
Expand Down Expand Up @@ -102,8 +151,11 @@ func TestBillingResilientClientSendAsync(t *testing.T) {
if err != nil {
t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer"))
}
defer ch.Close()
initalizeRMQ(ch)
defer ch.QueueDelete(billingTestQueueName, false, false, false)
defer ch.ExchangeDelete(billingTestExchangeName, false, false)
defer ch.Close()
defer conn.Close()

msgUUID, _ := uuid.NewV4()
msg := NewMessage(
Expand All @@ -117,14 +169,16 @@ func TestBillingResilientClientSendAsync(t *testing.T) {
"hello",
nil,
nil,
false,
)

billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName)
billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName)
time.Sleep(1 * time.Second)
assert.NoError(t, err)
billingClient.SendAsync(msg, nil, nil)

// billingClient.SendAsync(msg, RoutingKeyCreate, nil, nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove comment

err = billingClient.Send(msg, RoutingKeyCreate)
assert.NoError(t, err)

msgs, err := ch.Consume(
billingTestQueueName,
"",
Expand Down Expand Up @@ -166,8 +220,11 @@ func TestBillingResilientClientSendAsyncWithPanic(t *testing.T) {
if err != nil {
t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer"))
}
defer ch.Close()
initalizeRMQ(ch)
defer ch.QueueDelete(billingTestQueueName, false, false, false)
defer ch.ExchangeDelete(billingTestExchangeName, false, false)
defer ch.Close()
defer conn.Close()

msgUUID, _ := uuid.NewV4()
msg := NewMessage(
Expand All @@ -181,13 +238,14 @@ func TestBillingResilientClientSendAsyncWithPanic(t *testing.T) {
"hello",
nil,
nil,
false,
)

billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName)
billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName)
time.Sleep(1 * time.Second)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
billingClient.SendAsync(msg, nil, func() { panic("test panic") })
billingClient.SendAsync(msg, RoutingKeyCreate, nil, func() { panic("test panic") })

assert.NoError(t, err)
msgs, err := ch.Consume(
Expand Down
2 changes: 1 addition & 1 deletion cmd/courier/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func main() {

if config.RabbitmqURL != "" {
billingClient, err := billing.NewRMQBillingResilientClient(
config.RabbitmqURL, config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, config.BillingQueueName)
config.RabbitmqURL, config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, config.BillingExchangeName)
if err != nil {
logrus.Fatalf("Error creating billing RabbitMQ client: %v", err)
}
Expand Down
4 changes: 2 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Config struct {
RabbitmqURL string `help:"rabbitmq url"`
RabbitmqRetryPubAttempts int `help:"rabbitmq retry attempts"`
RabbitmqRetryPubDelay int `help:"rabbitmq retry delay"`
BillingQueueName string `help:"billing queue name"`
BillingExchangeName string `help:"billing exchange name"`

EmailProxyURL string `help:"email proxy url"`
EmailProxyAuthToken string `help:"email proxy auth token"`
Expand Down Expand Up @@ -91,7 +91,7 @@ func NewConfig() *Config {
WaitMediaChannels: []string{},
RabbitmqRetryPubAttempts: 3,
RabbitmqRetryPubDelay: 1000,
BillingQueueName: "billing-backup",
BillingExchangeName: "msgs.topic",
EmailProxyURL: "http://localhost:9090",
EmailProxyAuthToken: "",
}
Expand Down
2 changes: 1 addition & 1 deletion handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (h *dummyHandler) Initialize(s Server) error {
"amqp://localhost:5672/",
3,
100,
s.Config().BillingQueueName,
s.Config().BillingExchangeName,
)
if err != nil {
logrus.Fatalf("Error creating billing RabbitMQ client: %v", err)
Expand Down
3 changes: 2 additions & 1 deletion handlers/facebookapp/facebookapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -717,8 +717,9 @@ func (h *handler) processCloudWhatsAppPayload(ctx context.Context, channel couri
"",
nil,
nil,
false,
)
h.Server().Billing().SendAsync(billingMsg, nil, nil)
h.Server().Billing().SendAsync(billingMsg, billing.RoutingKeyUpdate, nil, nil)
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/buger/jsonparser"
"github.com/nyaruka/courier/billing"
"github.com/nyaruka/courier/utils"
"github.com/nyaruka/librato"
Expand Down Expand Up @@ -292,8 +293,12 @@ func (w *Sender) sendMessage(msg Msg) {
librato.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration)
}

if status.Status() != MsgErrored && status.Status() != MsgFailed {
sentOk := status.Status() != MsgErrored && status.Status() != MsgFailed
if sentOk && w.foreman.server.Billing() != nil {
if msg.Channel().ChannelType() != "WAC" {
// if ticketer_type is eg: "wenichats" it is a message from ticketer sent by an agent, so must be sent to billing anyway
ticketerType, _ := jsonparser.GetString(msg.Metadata(), "ticketer_type")
fromTicketer := ticketerType != ""
billingMsg := billing.NewMessage(
string(msg.URN().Identity()),
"",
Expand All @@ -305,10 +310,9 @@ func (w *Sender) sendMessage(msg Msg) {
msg.Text(),
msg.Attachments(),
msg.QuickReplies(),
fromTicketer,
)
if w.foreman.server.Billing() != nil {
w.foreman.server.Billing().SendAsync(billingMsg, nil, nil)
}
w.foreman.server.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil)
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion server.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,14 +574,15 @@ func handleBilling(s *server, msg Msg) error {
msg.Text(),
msg.Attachments(),
msg.QuickReplies(),
false,
)
billingMsg.ChannelType = string(msg.Channel().ChannelType())
billingMsg.Text = msg.Text()
billingMsg.Attachments = msg.Attachments()
billingMsg.QuickReplies = msg.QuickReplies()

if s.Billing() != nil {
s.Billing().SendAsync(billingMsg, nil, nil)
s.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil)
}

return nil
Expand Down
Loading