diff --git a/backends/rapidpro/backend_test.go b/backends/rapidpro/backend_test.go index 9effe45a2..9d884a7fe 100644 --- a/backends/rapidpro/backend_test.go +++ b/backends/rapidpro/backend_test.go @@ -826,11 +826,11 @@ func (ts *BackendTestSuite) TestOutgoingQueue() { dbMsg.ChannelUUID_, _ = courier.NewChannelUUID("dbc126ed-66bc-4e28-b67b-81dc3327c95d") ts.NotNil(dbMsg) - // serialize our message - msgJSON, err := json.Marshal([]interface{}{dbMsg}) - ts.NoError(err) + var err error - err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, string(msgJSON), queue.HighPriority) + msgStrJSON := `[{"org_id":1,"id":10000,"uuid":"00000000-0000-0000-0000-000000000000","direction":"O","status":"F","visibility":"V","high_priority":true,"urn":"","urn_auth":"","text":"test message","attachments":null,"external_id":"ext1","response_to_id":null,"response_to_external_id":"","metadata":"{\"ticketer_id\":1}","channel_id":10,"contact_id":100,"contact_urn_id":1000,"msg_count":1,"error_count":3,"channel_uuid":"dbc126ed-66bc-4e28-b67b-81dc3327c95d","contact_name":"","next_attempt":"2024-11-06T20:45:31.123208Z","created_on":"2024-11-06T20:30:14.898168Z","modified_on":"2024-11-06T20:30:31.122974Z","queued_on":"2024-11-06T20:30:14.898168Z","sent_on":null}]` + + err = queue.PushOntoQueue(r, msgQueueName, "dbc126ed-66bc-4e28-b67b-81dc3327c95d", 10, msgStrJSON, queue.HighPriority) ts.NoError(err) // pop a message off our queue diff --git a/billing/billing.go b/billing/billing.go index 4afba00f7..2e26b0cad 100644 --- a/billing/billing.go +++ b/billing/billing.go @@ -12,6 +12,11 @@ import ( "github.com/sirupsen/logrus" ) +const ( + RoutingKeyCreate = "create" + RoutingKeyUpdate = "status-update" +) + // Message represents a object that is sent to the billing service // // { @@ -31,10 +36,11 @@ type Message struct { Text string `json:"text,omitempty"` Attachments []string `json:"attachments,omitempty"` QuickReplies []string `json:"quick_replies,omitempty"` + FromTicketer bool `json:"from_ticketer"` } // Create a new message -func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string) Message { +func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, direction, channelType, text string, attachments, quickreplies []string, fromTicketer bool) Message { return Message{ ContactURN: contactURN, ContactUUID: contactUUID, @@ -46,24 +52,25 @@ func NewMessage(contactURN, contactUUID, channelUUID, messageID, messageDate, di Text: text, Attachments: attachments, QuickReplies: quickreplies, + FromTicketer: fromTicketer, } } // Client represents a client interface for billing service type Client interface { - Send(msg Message) error - SendAsync(msg Message, pre func(), post func()) + Send(msg Message, routingKey string) error + SendAsync(msg Message, routingKey string, pre func(), post func()) } // rabbitmqRetryClient represents struct that implements billing service client interface type rabbitmqRetryClient struct { - publisher rabbitroutine.Publisher - conn *rabbitroutine.Connector - queueName string + publisher rabbitroutine.Publisher + conn *rabbitroutine.Connector + exchangeName string } // NewRMQBillingResilientClient creates a new billing service client implementation using RabbitMQ with publish retry and reconnect features -func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, queueName string) (Client, error) { +func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, exchangeName string) (Client, error) { cconn, err := amqp.Dial(url) if err != nil { return nil, err @@ -75,17 +82,6 @@ func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, return nil, errors.Wrap(err, "failed to open a channel to rabbitmq") } defer ch.Close() - _, err = ch.QueueDeclare( - queueName, - false, - false, - false, - false, - nil, - ) - if err != nil { - return nil, errors.Wrap(err, "failed to declare a queue for billing publisher") - } conn := rabbitroutine.NewConnector(rabbitroutine.Config{ ReconnectAttempts: 1000, @@ -123,19 +119,19 @@ func NewRMQBillingResilientClient(url string, retryAttempts int, retryDelay int, }() return &rabbitmqRetryClient{ - publisher: pub, - conn: conn, - queueName: queueName, + publisher: pub, + conn: conn, + exchangeName: exchangeName, }, nil } -func (c *rabbitmqRetryClient) Send(msg Message) error { +func (c *rabbitmqRetryClient) Send(msg Message, routingKey string) error { msgMarshalled, _ := json.Marshal(msg) ctx := context.Background() err := c.publisher.Publish( ctx, - "", - c.queueName, + c.exchangeName, + routingKey, amqp.Publishing{ ContentType: "application/json", Body: msgMarshalled, @@ -147,7 +143,7 @@ func (c *rabbitmqRetryClient) Send(msg Message) error { return nil } -func (c *rabbitmqRetryClient) SendAsync(msg Message, pre func(), post func()) { +func (c *rabbitmqRetryClient) SendAsync(msg Message, routingKey string, pre func(), post func()) { go func() { defer func() { if r := recover(); r != nil { @@ -157,7 +153,7 @@ func (c *rabbitmqRetryClient) SendAsync(msg Message, pre func(), post func()) { if pre != nil { pre() } - err := c.Send(msg) + err := c.Send(msg, routingKey) if err != nil { logrus.WithError(err).Error("fail to send msg to billing service") } diff --git a/billing/billing_test.go b/billing/billing_test.go index 72df5baa5..587af42dc 100644 --- a/billing/billing_test.go +++ b/billing/billing_test.go @@ -13,7 +13,48 @@ import ( "github.com/stretchr/testify/assert" ) -const billingTestQueueName = "testqueue" +const ( + billingTestExchangeName = "test-exchange" + billingTestQueueName = "test-queue" +) + +func initalizeRMQ(ch *amqp.Channel) { + err := ch.ExchangeDeclare( + billingTestExchangeName, + "topic", + true, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatal(err) + } + + _, err = ch.QueueDeclare( + billingTestQueueName, + true, + false, + false, + false, + nil, + ) + if err != nil { + log.Fatal(errors.Wrap(err, "failed to declare a queue for billing publisher")) + } + + err = ch.QueueBind( + billingTestQueueName, + "#", + billingTestExchangeName, + false, + nil, + ) + if err != nil { + log.Fatal(err) + } +} func TestInitialization(t *testing.T) { connURL := "amqp://localhost:5672/" @@ -25,9 +66,13 @@ func TestInitialization(t *testing.T) { if err != nil { log.Fatal(err) } - defer conn.Close() - defer ch.Close() + + initalizeRMQ(ch) + defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() } func TestBillingResilientClient(t *testing.T) { @@ -38,8 +83,11 @@ func TestBillingResilientClient(t *testing.T) { if err != nil { t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer")) } - defer ch.Close() + initalizeRMQ(ch) defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() msgUUID, _ := uuid.NewV4() msg := NewMessage( @@ -53,12 +101,13 @@ func TestBillingResilientClient(t *testing.T) { "hello", nil, nil, + false, ) - billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) + billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName) time.Sleep(1 * time.Second) assert.NoError(t, err) - err = billingClient.Send(msg) + err = billingClient.Send(msg, RoutingKeyCreate) assert.NoError(t, err) msgs, err := ch.Consume( @@ -102,8 +151,11 @@ func TestBillingResilientClientSendAsync(t *testing.T) { if err != nil { t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer")) } - defer ch.Close() + initalizeRMQ(ch) defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() msgUUID, _ := uuid.NewV4() msg := NewMessage( @@ -117,14 +169,16 @@ func TestBillingResilientClientSendAsync(t *testing.T) { "hello", nil, nil, + false, ) - billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) + billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName) time.Sleep(1 * time.Second) assert.NoError(t, err) - billingClient.SendAsync(msg, nil, nil) - + // billingClient.SendAsync(msg, RoutingKeyCreate, nil, nil) + err = billingClient.Send(msg, RoutingKeyCreate) assert.NoError(t, err) + msgs, err := ch.Consume( billingTestQueueName, "", @@ -166,8 +220,11 @@ func TestBillingResilientClientSendAsyncWithPanic(t *testing.T) { if err != nil { t.Fatal(errors.Wrap(err, "failed to declare a channel for consumer")) } - defer ch.Close() + initalizeRMQ(ch) defer ch.QueueDelete(billingTestQueueName, false, false, false) + defer ch.ExchangeDelete(billingTestExchangeName, false, false) + defer ch.Close() + defer conn.Close() msgUUID, _ := uuid.NewV4() msg := NewMessage( @@ -181,13 +238,14 @@ func TestBillingResilientClientSendAsyncWithPanic(t *testing.T) { "hello", nil, nil, + false, ) - billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestQueueName) + billingClient, err := NewRMQBillingResilientClient(connURL, 3, 1000, billingTestExchangeName) time.Sleep(1 * time.Second) assert.NoError(t, err) time.Sleep(1 * time.Second) - billingClient.SendAsync(msg, nil, func() { panic("test panic") }) + billingClient.SendAsync(msg, RoutingKeyCreate, nil, func() { panic("test panic") }) assert.NoError(t, err) msgs, err := ch.Consume( diff --git a/cmd/courier/main.go b/cmd/courier/main.go index ed2419328..ad23b9a38 100644 --- a/cmd/courier/main.go +++ b/cmd/courier/main.go @@ -123,7 +123,7 @@ func main() { if config.RabbitmqURL != "" { billingClient, err := billing.NewRMQBillingResilientClient( - config.RabbitmqURL, config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, config.BillingQueueName) + config.RabbitmqURL, config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, config.BillingExchangeName) if err != nil { logrus.Fatalf("Error creating billing RabbitMQ client: %v", err) } diff --git a/config.go b/config.go index a74a477ba..208d024d1 100644 --- a/config.go +++ b/config.go @@ -56,7 +56,7 @@ type Config struct { RabbitmqURL string `help:"rabbitmq url"` RabbitmqRetryPubAttempts int `help:"rabbitmq retry attempts"` RabbitmqRetryPubDelay int `help:"rabbitmq retry delay"` - BillingQueueName string `help:"billing queue name"` + BillingExchangeName string `help:"billing exchange name"` EmailProxyURL string `help:"email proxy url"` EmailProxyAuthToken string `help:"email proxy auth token"` @@ -91,7 +91,7 @@ func NewConfig() *Config { WaitMediaChannels: []string{}, RabbitmqRetryPubAttempts: 3, RabbitmqRetryPubDelay: 1000, - BillingQueueName: "billing-backup", + BillingExchangeName: "msgs.topic", EmailProxyURL: "http://localhost:9090", EmailProxyAuthToken: "", } diff --git a/handler_test.go b/handler_test.go index b61667ad1..09c9b405b 100644 --- a/handler_test.go +++ b/handler_test.go @@ -45,7 +45,7 @@ func (h *dummyHandler) Initialize(s Server) error { "amqp://localhost:5672/", 3, 100, - s.Config().BillingQueueName, + s.Config().BillingExchangeName, ) if err != nil { logrus.Fatalf("Error creating billing RabbitMQ client: %v", err) diff --git a/handlers/facebookapp/facebookapp.go b/handlers/facebookapp/facebookapp.go index 1d4bc6aa1..f1b1fb494 100644 --- a/handlers/facebookapp/facebookapp.go +++ b/handlers/facebookapp/facebookapp.go @@ -717,8 +717,9 @@ func (h *handler) processCloudWhatsAppPayload(ctx context.Context, channel couri "", nil, nil, + false, ) - h.Server().Billing().SendAsync(billingMsg, nil, nil) + h.Server().Billing().SendAsync(billingMsg, billing.RoutingKeyUpdate, nil, nil) } } } diff --git a/sender.go b/sender.go index 9b8d3f889..115875d73 100644 --- a/sender.go +++ b/sender.go @@ -5,6 +5,7 @@ import ( "fmt" "time" + "github.com/buger/jsonparser" "github.com/nyaruka/courier/billing" "github.com/nyaruka/courier/utils" "github.com/nyaruka/librato" @@ -292,8 +293,12 @@ func (w *Sender) sendMessage(msg Msg) { librato.Gauge(fmt.Sprintf("courier.msg_send_%s", msg.Channel().ChannelType()), secondDuration) } - if status.Status() != MsgErrored && status.Status() != MsgFailed { + sentOk := status.Status() != MsgErrored && status.Status() != MsgFailed + if sentOk && w.foreman.server.Billing() != nil { if msg.Channel().ChannelType() != "WAC" { + // if ticketer_type is eg: "wenichats" it is a message from ticketer sent by an agent, so must be sent to billing anyway + ticketerType, _ := jsonparser.GetString(msg.Metadata(), "ticketer_type") + fromTicketer := ticketerType != "" billingMsg := billing.NewMessage( string(msg.URN().Identity()), "", @@ -305,10 +310,9 @@ func (w *Sender) sendMessage(msg Msg) { msg.Text(), msg.Attachments(), msg.QuickReplies(), + fromTicketer, ) - if w.foreman.server.Billing() != nil { - w.foreman.server.Billing().SendAsync(billingMsg, nil, nil) - } + w.foreman.server.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil) } } } diff --git a/server.go b/server.go index 84f065c40..4e1a4464c 100644 --- a/server.go +++ b/server.go @@ -574,6 +574,7 @@ func handleBilling(s *server, msg Msg) error { msg.Text(), msg.Attachments(), msg.QuickReplies(), + false, ) billingMsg.ChannelType = string(msg.Channel().ChannelType()) billingMsg.Text = msg.Text() @@ -581,7 +582,7 @@ func handleBilling(s *server, msg Msg) error { billingMsg.QuickReplies = msg.QuickReplies() if s.Billing() != nil { - s.Billing().SendAsync(billingMsg, nil, nil) + s.Billing().SendAsync(billingMsg, billing.RoutingKeyCreate, nil, nil) } return nil diff --git a/server_test.go b/server_test.go index 5eacc8d03..aed965226 100644 --- a/server_test.go +++ b/server_test.go @@ -25,7 +25,7 @@ func TestServer(t *testing.T) { "amqp://localhost:5672/", config.RabbitmqRetryPubAttempts, config.RabbitmqRetryPubDelay, - config.BillingQueueName, + config.BillingExchangeName, ) if err != nil { logrus.Fatalf("Error creating billing RabbitMQ client: %v", err)