Skip to content

Commit 7d7e412

Browse files
committed
Merge branch 'main' into remove_topups
2 parents 945df1b + 76a7ddb commit 7d7e412

File tree

17 files changed

+160
-117
lines changed

17 files changed

+160
-117
lines changed

CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,11 @@
1+
v7.5.15
2+
----------
3+
* Update to latest goflow which changes expirations for dial waits
4+
* Add support for time limits on dial waits
5+
* Rework message events to use channel UUID and include channel type
6+
* Simplify getting active call count
7+
* Allow incoming call triggers to take message flows
8+
19
v7.5.14
210
----------
311
* Switch to new ivr_call table

core/ivr/ivr.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ type Service interface {
6868
HangupCall(externalID string) (*httpx.Trace, error)
6969

7070
WriteSessionResponse(ctx context.Context, rt *runtime.Runtime, channel *models.Channel, call *models.Call, session *models.Session, number urns.URN, resumeURL string, req *http.Request, w http.ResponseWriter) error
71-
71+
WriteHangupResponse(w http.ResponseWriter) error
7272
WriteErrorResponse(w http.ResponseWriter, err error) error
73-
7473
WriteEmptyResponse(w http.ResponseWriter, msg string) error
7574

7675
ResumeForRequest(r *http.Request) (Resume, error)

core/tasks/handler/cron.go

+15-14
Original file line numberDiff line numberDiff line change
@@ -105,22 +105,23 @@ func RetryPendingMsgs(ctx context.Context, rt *runtime.Runtime) error {
105105

106106
const unhandledMsgsQuery = `
107107
SELECT org_id, contact_id, msg_id, ROW_TO_JSON(r) FROM (SELECT
108-
m.contact_id as contact_id,
109-
m.org_id as org_id,
110-
m.channel_id as channel_id,
111-
m.id as msg_id,
112-
m.uuid as msg_uuid,
113-
m.external_id as msg_external_id,
114-
u.identity as urn,
115-
m.contact_urn_id as urn_id,
116-
m.text as text,
117-
m.attachments as attachments
108+
m.contact_id AS contact_id,
109+
m.org_id AS org_id,
110+
c.id AS channel_id,
111+
c.uuid AS channel_uuid,
112+
c.channel_type AS channel_type,
113+
m.id AS msg_id,
114+
m.uuid AS msg_uuid,
115+
m.external_id AS msg_external_id,
116+
u.identity AS urn,
117+
m.contact_urn_id AS urn_id,
118+
m.text AS text,
119+
m.attachments AS attachments
118120
FROM
119121
msgs_msg m
120-
JOIN contacts_contacturn as u ON m.contact_urn_id = u.id
122+
INNER JOIN channels_channel c ON c.id = m.channel_id
123+
INNER JOIN contacts_contacturn u ON u.id = m.contact_urn_id
121124
WHERE
122-
m.direction = 'I' AND
123-
m.status = 'P' AND
124-
m.created_on < now() - INTERVAL '5 min'
125+
m.direction = 'I' AND m.status = 'P' AND m.created_on < now() - INTERVAL '5 min'
125126
) r;
126127
`

core/tasks/handler/handler_test.go

+36-34
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,15 @@ func TestMsgEvents(t *testing.T) {
300300

301301
makeMsgTask := func(org *testdata.Org, channel *testdata.Channel, contact *testdata.Contact, text string) *queue.Task {
302302
return &queue.Task{Type: handler.MsgEventType, OrgID: int(org.ID), Task: jsonx.MustMarshal(&handler.MsgEvent{
303-
ContactID: contact.ID,
304-
OrgID: org.ID,
305-
ChannelID: channel.ID,
306-
MsgID: dbMsg.ID(),
307-
MsgUUID: dbMsg.UUID(),
308-
URN: contact.URN,
309-
URNID: contact.URNID,
310-
Text: text,
303+
ContactID: contact.ID,
304+
OrgID: org.ID,
305+
ChannelUUID: channel.UUID,
306+
ChannelType: channel.Type,
307+
MsgID: dbMsg.ID(),
308+
MsgUUID: dbMsg.UUID(),
309+
URN: contact.URN,
310+
URNID: contact.URNID,
311+
Text: text,
311312
})}
312313
}
313314

@@ -587,47 +588,47 @@ func TestTimedEvents(t *testing.T) {
587588
Contact *testdata.Contact
588589
Message string
589590
Response string
590-
ChannelID models.ChannelID
591-
OrgID models.OrgID
591+
Channel *testdata.Channel
592+
Org *testdata.Org
592593
}{
593594
// 0: start the flow
594-
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel.ID, testdata.Org1.ID},
595+
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel, testdata.Org1},
595596

596597
// 1: this expiration does nothing because the times don't match
597-
{handler.ExpirationEventType, testdata.Cathy, "bad", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
598+
{handler.ExpirationEventType, testdata.Cathy, "bad", "", testdata.TwitterChannel, testdata.Org1},
598599

599600
// 2: this checks that the flow wasn't expired
600-
{handler.MsgEventType, testdata.Cathy, "red", "Good choice, I like Red too! What is your favorite beer?", testdata.TwitterChannel.ID, testdata.Org1.ID},
601+
{handler.MsgEventType, testdata.Cathy, "red", "Good choice, I like Red too! What is your favorite beer?", testdata.TwitterChannel, testdata.Org1},
601602

602603
// 3: this expiration will actually take
603-
{handler.ExpirationEventType, testdata.Cathy, "good", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
604+
{handler.ExpirationEventType, testdata.Cathy, "good", "", testdata.TwitterChannel, testdata.Org1},
604605

605606
// 4: we won't get a response as we will be out of the flow
606-
{handler.MsgEventType, testdata.Cathy, "mutzig", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
607+
{handler.MsgEventType, testdata.Cathy, "mutzig", "", testdata.TwitterChannel, testdata.Org1},
607608

608609
// 5: start the parent expiration flow
609-
{handler.MsgEventType, testdata.Cathy, "parent", "Child", testdata.TwitterChannel.ID, testdata.Org1.ID},
610+
{handler.MsgEventType, testdata.Cathy, "parent", "Child", testdata.TwitterChannel, testdata.Org1},
610611

611612
// 6: respond, should bring us out
612-
{handler.MsgEventType, testdata.Cathy, "hi", "Completed", testdata.TwitterChannel.ID, testdata.Org1.ID},
613+
{handler.MsgEventType, testdata.Cathy, "hi", "Completed", testdata.TwitterChannel, testdata.Org1},
613614

614615
// 7: expiring our child should be a no op
615-
{handler.ExpirationEventType, testdata.Cathy, "child", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
616+
{handler.ExpirationEventType, testdata.Cathy, "child", "", testdata.TwitterChannel, testdata.Org1},
616617

617618
// 8: respond one last time, should be done
618-
{handler.MsgEventType, testdata.Cathy, "done", "Ended", testdata.TwitterChannel.ID, testdata.Org1.ID},
619+
{handler.MsgEventType, testdata.Cathy, "done", "Ended", testdata.TwitterChannel, testdata.Org1},
619620

620621
// 9: start our favorite flow again
621-
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel.ID, testdata.Org1.ID},
622+
{handler.MsgEventType, testdata.Cathy, "start", "What is your favorite color?", testdata.TwitterChannel, testdata.Org1},
622623

623624
// 10: timeout on the color question
624-
{handler.TimeoutEventType, testdata.Cathy, "", "Sorry you can't participate right now, I'll try again later.", testdata.TwitterChannel.ID, testdata.Org1.ID},
625+
{handler.TimeoutEventType, testdata.Cathy, "", "Sorry you can't participate right now, I'll try again later.", testdata.TwitterChannel, testdata.Org1},
625626

626627
// 11: start the pick a number flow
627-
{handler.MsgEventType, testdata.Cathy, "pick", "Pick a number between 1-10.", testdata.TwitterChannel.ID, testdata.Org1.ID},
628+
{handler.MsgEventType, testdata.Cathy, "pick", "Pick a number between 1-10.", testdata.TwitterChannel, testdata.Org1},
628629

629630
// 12: try to resume with timeout even tho flow doesn't have one set
630-
{handler.TimeoutEventType, testdata.Cathy, "", "", testdata.TwitterChannel.ID, testdata.Org1.ID},
631+
{handler.TimeoutEventType, testdata.Cathy, "", "", testdata.TwitterChannel, testdata.Org1},
631632
}
632633

633634
last := time.Now()
@@ -642,16 +643,17 @@ func TestTimedEvents(t *testing.T) {
642643
if tc.EventType == handler.MsgEventType {
643644
task = &queue.Task{
644645
Type: tc.EventType,
645-
OrgID: int(tc.OrgID),
646+
OrgID: int(tc.Org.ID),
646647
Task: jsonx.MustMarshal(&handler.MsgEvent{
647-
ContactID: tc.Contact.ID,
648-
OrgID: tc.OrgID,
649-
ChannelID: tc.ChannelID,
650-
MsgID: flows.MsgID(1),
651-
MsgUUID: flows.MsgUUID(uuids.New()),
652-
URN: tc.Contact.URN,
653-
URNID: tc.Contact.URNID,
654-
Text: tc.Message,
648+
ContactID: tc.Contact.ID,
649+
OrgID: tc.Org.ID,
650+
ChannelUUID: tc.Channel.UUID,
651+
ChannelType: tc.Channel.Type,
652+
MsgID: flows.MsgID(1),
653+
MsgUUID: flows.MsgUUID(uuids.New()),
654+
URN: tc.Contact.URN,
655+
URNID: tc.Contact.URNID,
656+
Text: tc.Message,
655657
}),
656658
}
657659
} else if tc.EventType == handler.ExpirationEventType {
@@ -666,15 +668,15 @@ func TestTimedEvents(t *testing.T) {
666668
expiration = time.Now().Add(time.Hour * 24)
667669
}
668670

669-
task = handler.NewExpirationTask(tc.OrgID, tc.Contact.ID, sessionID, expiration)
671+
task = handler.NewExpirationTask(tc.Org.ID, tc.Contact.ID, sessionID, expiration)
670672

671673
} else if tc.EventType == handler.TimeoutEventType {
672674
timeoutOn := time.Now().Round(time.Millisecond) // so that there's no difference between this and what we read from the db
673675

674676
// usually courier will set timeout_on after sending the last message
675677
db.MustExec(`UPDATE flows_flowsession SET timeout_on = $2 WHERE id = $1`, sessionID, timeoutOn)
676678

677-
task = handler.NewTimeoutTask(tc.OrgID, tc.Contact.ID, sessionID, timeoutOn)
679+
task = handler.NewTimeoutTask(tc.Org.ID, tc.Contact.ID, sessionID, timeoutOn)
678680
}
679681

680682
err := handler.QueueHandleTask(rc, tc.Contact.ID, task)

core/tasks/handler/worker.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/nyaruka/gocommon/analytics"
1212
"github.com/nyaruka/gocommon/dbutil"
1313
"github.com/nyaruka/gocommon/urns"
14+
"github.com/nyaruka/goflow/assets"
1415
"github.com/nyaruka/goflow/excellent/types"
1516
"github.com/nyaruka/goflow/flows"
1617
"github.com/nyaruka/goflow/flows/engine"
@@ -382,7 +383,7 @@ func HandleChannelEvent(ctx context.Context, rt *runtime.Runtime, eventType mode
382383
return nil, errors.Wrapf(err, "error loading flow for trigger")
383384
}
384385

385-
// if this is an IVR flow, we need to trigger that start (which happens in a different queue)
386+
// if this is an IVR flow and we don't have a call, trigger that asynchronously
386387
if flow.FlowType() == models.FlowTypeVoice && call == nil {
387388
err = runner.TriggerIVRFlow(ctx, rt, oa.OrgID(), flow.ID(), []models.ContactID{modelContact.ID()}, nil)
388389
if err != nil {
@@ -428,7 +429,7 @@ func HandleChannelEvent(ctx context.Context, rt *runtime.Runtime, eventType mode
428429
// if we have a channel connection we set the connection on the session before our event hooks fire
429430
// so that IVR messages can be created with the right connection reference
430431
var hook models.SessionCommitHook
431-
if call != nil {
432+
if flow.FlowType() == models.FlowTypeVoice && call != nil {
432433
hook = func(ctx context.Context, tx *sqlx.Tx, rp *redis.Pool, oa *models.OrgAssets, sessions []*models.Session) error {
433434
for _, session := range sessions {
434435
session.SetCall(call)
@@ -498,7 +499,7 @@ func handleMsgEvent(ctx context.Context, rt *runtime.Runtime, event *MsgEvent) e
498499
modelContact := contacts[0]
499500

500501
// load the channel for this message
501-
channel := oa.ChannelByID(event.ChannelID)
502+
channel := oa.ChannelByUUID(event.ChannelUUID)
502503

503504
// if we have URNs make sure the message URN is our highest priority (this is usually a noop)
504505
if len(modelContact.URNs()) > 0 {
@@ -792,7 +793,8 @@ type TimedEvent struct {
792793
type MsgEvent struct {
793794
ContactID models.ContactID `json:"contact_id"`
794795
OrgID models.OrgID `json:"org_id"`
795-
ChannelID models.ChannelID `json:"channel_id"`
796+
ChannelUUID assets.ChannelUUID `json:"channel_uuid"`
797+
ChannelType models.ChannelType `json:"channel_type"`
796798
MsgID flows.MsgID `json:"msg_id"`
797799
MsgUUID flows.MsgUUID `json:"msg_uuid"`
798800
MsgExternalID null.String `json:"msg_external_id"`

core/tasks/ivr/worker_test.go

+4
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@ func (s *MockService) WriteSessionResponse(ctx context.Context, rt *runtime.Runt
9292
return nil
9393
}
9494

95+
func (s *MockService) WriteHangupResponse(w http.ResponseWriter) error {
96+
return nil
97+
}
98+
9599
func (s *MockService) WriteErrorResponse(w http.ResponseWriter, err error) error {
96100
return nil
97101
}

go.mod

+2-2
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ require (
1515
github.com/jmoiron/sqlx v1.3.5
1616
github.com/lib/pq v1.10.6
1717
github.com/nyaruka/ezconf v0.2.1
18-
github.com/nyaruka/gocommon v1.30.0
19-
github.com/nyaruka/goflow v0.171.0
18+
github.com/nyaruka/gocommon v1.30.2
19+
github.com/nyaruka/goflow v0.172.2
2020
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d
2121
github.com/nyaruka/null v1.2.0
2222
github.com/nyaruka/redisx v0.2.2

go.sum

+4-4
Original file line numberDiff line numberDiff line change
@@ -218,10 +218,10 @@ github.com/naoina/toml v0.1.1 h1:PT/lllxVVN0gzzSqSlHEmP8MJB4MY2U7STGxiouV4X8=
218218
github.com/naoina/toml v0.1.1/go.mod h1:NBIhNtsFMo3G2szEBne+bO4gS192HuIYRqfvOWb4i1E=
219219
github.com/nyaruka/ezconf v0.2.1 h1:TDXWoqjqYya1uhou1mAJZg7rgFYL98EB0Tb3+BWtUh0=
220220
github.com/nyaruka/ezconf v0.2.1/go.mod h1:ey182kYkw2MIi4XiWe1FR/mzI33WCmTWuceDYYxgnQw=
221-
github.com/nyaruka/gocommon v1.30.0 h1:394CU1fxGXSs+mrd5x8nXnqw2epT9P0ui/9MtSE2ycQ=
222-
github.com/nyaruka/gocommon v1.30.0/go.mod h1:PApT/06fP5Tzs4/kbkJ+rVoyOc9Lbqm1lR0ow8Vqzp0=
223-
github.com/nyaruka/goflow v0.171.0 h1:2wjKClK8Z5kjZUaTSDGg+OihFgHf5PkylbvZM3FFFLI=
224-
github.com/nyaruka/goflow v0.171.0/go.mod h1:BELqxxpx4dqpBvaqM2TSXyq3rdJvX4RPlSAyuevjL+4=
221+
github.com/nyaruka/gocommon v1.30.2 h1:tWpq0jOfwKLCkN8LdE8ZGs22l2OYpTQAvdjAW6scl/Y=
222+
github.com/nyaruka/gocommon v1.30.2/go.mod h1:PApT/06fP5Tzs4/kbkJ+rVoyOc9Lbqm1lR0ow8Vqzp0=
223+
github.com/nyaruka/goflow v0.172.2 h1:wGpVeuiNHzP7Qx8Q7p8HalDq/RrIIpdylrD15rBYbTg=
224+
github.com/nyaruka/goflow v0.172.2/go.mod h1:BELqxxpx4dqpBvaqM2TSXyq3rdJvX4RPlSAyuevjL+4=
225225
github.com/nyaruka/librato v1.0.0 h1:Vznj9WCeC1yZXbBYyYp40KnbmXLbEkjKmHesV/v2SR0=
226226
github.com/nyaruka/librato v1.0.0/go.mod h1:pkRNLFhFurOz0QqBz6/DuTFhHHxAubWxs4Jx+J7yUgg=
227227
github.com/nyaruka/logrus_sentry v0.8.2-0.20190129182604-c2962b80ba7d h1:hyp9u36KIwbTCo2JAJ+TuJcJBc+UZzEig7RI/S5Dvkc=

services/ivr/twiml/client.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ type Redirect struct {
2424
}
2525

2626
type Dial struct {
27-
XMLName string `xml:"Dial"`
28-
Number string `xml:",chardata"`
29-
Action string `xml:"action,attr"`
30-
Timeout int `xml:"timeout,attr,omitempty"`
27+
XMLName string `xml:"Dial"`
28+
Number string `xml:",chardata"`
29+
Action string `xml:"action,attr"`
30+
Timeout int `xml:"timeout,attr,omitempty"`
31+
TimeLimit int `xml:"timeLimit,attr,omitempty"`
3132
}
3233

3334
type Gather struct {

services/ivr/twiml/service.go

+22-14
Original file line numberDiff line numberDiff line change
@@ -376,29 +376,37 @@ func (s *service) WriteSessionResponse(ctx context.Context, rt *runtime.Runtime,
376376
return nil
377377
}
378378

379+
func (s *service) WriteHangupResponse(w http.ResponseWriter) error {
380+
return s.writeResponse(w, &Response{
381+
Commands: []any{Hangup{}},
382+
})
383+
}
384+
379385
// WriteErrorResponse writes an error / unavailable response
380386
func (s *service) WriteErrorResponse(w http.ResponseWriter, err error) error {
381-
r := &Response{Message: strings.Replace(err.Error(), "--", "__", -1)}
382-
r.Commands = append(r.Commands, Say{Text: ivr.ErrorMessage})
383-
r.Commands = append(r.Commands, Hangup{})
384-
385-
body, err := xml.Marshal(r)
386-
if err != nil {
387-
return err
388-
}
389-
_, err = w.Write([]byte(xml.Header + string(body)))
390-
return err
387+
return s.writeResponse(w, &Response{
388+
Message: strings.Replace(err.Error(), "--", "__", -1),
389+
Commands: []any{
390+
Say{Text: ivr.ErrorMessage},
391+
Hangup{},
392+
},
393+
})
391394
}
392395

393396
// WriteEmptyResponse writes an empty (but valid) response
394397
func (s *service) WriteEmptyResponse(w http.ResponseWriter, msg string) error {
395-
r := &Response{Message: strings.Replace(msg, "--", "__", -1)}
398+
return s.writeResponse(w, &Response{
399+
Message: strings.Replace(msg, "--", "__", -1),
400+
})
401+
}
396402

397-
body, err := xml.Marshal(r)
403+
func (s *service) writeResponse(w http.ResponseWriter, resp *Response) error {
404+
marshalled, err := xml.Marshal(resp)
398405
if err != nil {
399406
return err
400407
}
401-
_, err = w.Write([]byte(xml.Header + string(body)))
408+
w.Write([]byte(xml.Header))
409+
_, err = w.Write(marshalled)
402410
return err
403411
}
404412

@@ -496,7 +504,7 @@ func ResponseForSprint(cfg *runtime.Config, number urns.URN, resumeURL string, e
496504

497505
case *events.DialWaitEvent:
498506
hasWait = true
499-
dial := Dial{Action: resumeURL + "&wait_type=dial", Number: event.URN.Path()}
507+
dial := Dial{Action: resumeURL + "&wait_type=dial", Number: event.URN.Path(), Timeout: event.DialLimitSeconds, TimeLimit: event.CallLimitSeconds}
500508
commands = append(commands, dial)
501509
r.Commands = commands
502510
}

services/ivr/twiml/service_test.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,9 @@ func TestResponseForSprint(t *testing.T) {
9898
},
9999
{
100100
[]flows.Event{
101-
events.NewDialWait(urns.URN(`tel:+1234567890`), &expiresOn),
101+
events.NewDialWait(urns.URN(`tel:+1234567890`), 60, 7200, &expiresOn),
102102
},
103-
`<Response><Dial action="http://temba.io/resume?session=1&amp;wait_type=dial">+1234567890</Dial></Response>`,
103+
`<Response><Dial action="http://temba.io/resume?session=1&amp;wait_type=dial" timeout="60" timeLimit="7200">+1234567890</Dial></Response>`,
104104
},
105105
}
106106

services/ivr/vonage/client.go

+8-6
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,18 @@ type CallRequest struct {
2424

2525
NCCO []NCCO `json:"ncco,omitempty"`
2626
MachineDetection string `json:"machine_detection"`
27+
LengthTimer int `json:"length_timer,omitempty"`
2728
RingingTimer int `json:"ringing_timer,omitempty"`
2829
}
2930

3031
// CallResponse is the response from creating a new call
31-
// {
32-
// "uuid": "63f61863-4a51-4f6b-86e1-46edebcf9356",
33-
// "status": "started",
34-
// "direction": "outbound",
35-
// "conversation_uuid": "CON-f972836a-550f-45fa-956c-12a2ab5b7d22"
36-
// }
32+
//
33+
// {
34+
// "uuid": "63f61863-4a51-4f6b-86e1-46edebcf9356",
35+
// "status": "started",
36+
// "direction": "outbound",
37+
// "conversation_uuid": "CON-f972836a-550f-45fa-956c-12a2ab5b7d22"
38+
// }
3739
type CallResponse struct {
3840
UUID string `json:"uuid"`
3941
Status string `json:"status"`

0 commit comments

Comments
 (0)