Skip to content

Commit

Permalink
Log instances of flows stealing URNs
Browse files Browse the repository at this point in the history
  • Loading branch information
rowanseymour committed Sep 30, 2024
1 parent d930b85 commit a8b4b18
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 22 deletions.
7 changes: 6 additions & 1 deletion core/hooks/commit_urn_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package hooks
import (
"context"
"fmt"
"log/slog"

"github.com/nyaruka/mailroom/core/models"
"github.com/nyaruka/mailroom/runtime"
Expand All @@ -23,10 +24,14 @@ func (h *commitURNChangesHook) Apply(ctx context.Context, rt *runtime.Runtime, t
changes = append(changes, sessionChanges[len(sessionChanges)-1].(*models.ContactURNsChanged))
}

err := models.UpdateContactURNs(ctx, tx, oa, changes)
affected, err := models.UpdateContactURNs(ctx, tx, oa, changes)
if err != nil {
return fmt.Errorf("error updating contact urns: %w", err)
}

if len(affected) > 0 {
slog.Error("URN changes affected other contacts", "count", len(affected), "org_id", oa.OrgID())
}

return nil
}
30 changes: 16 additions & 14 deletions core/models/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (c *Contact) UpdatePreferredURN(ctx context.Context, db DBorTx, oa *OrgAsse
}

// write our new state to the db
err := UpdateContactURNs(ctx, db, oa, []*ContactURNsChanged{change})
_, err := UpdateContactURNs(ctx, db, oa, []*ContactURNsChanged{change})
if err != nil {
return fmt.Errorf("error updating urns for contact: %w", err)
}
Expand Down Expand Up @@ -1200,7 +1200,7 @@ func UpdateContactLastSeenOn(ctx context.Context, db DBorTx, contactID ContactID
}

// UpdateContactURNs updates the contact urns in our database to match the passed in changes
func UpdateContactURNs(ctx context.Context, db DBorTx, oa *OrgAssets, changes []*ContactURNsChanged) error {
func UpdateContactURNs(ctx context.Context, db DBorTx, oa *OrgAssets, changes []*ContactURNsChanged) ([]*Contact, error) {
// keep track of all our inserts
inserts := make([]any, 0, len(changes))

Expand Down Expand Up @@ -1260,7 +1260,7 @@ func UpdateContactURNs(ctx context.Context, db DBorTx, oa *OrgAssets, changes []
// first update existing URNs
err := BulkQuery(ctx, "updating contact urns", db, sqlUpdateContactURNs, updates)
if err != nil {
return fmt.Errorf("error updating urns: %w", err)
return nil, fmt.Errorf("error updating urns: %w", err)
}

// then detach any URNs that weren't updated (the ones we're not keeping)
Expand All @@ -1271,54 +1271,56 @@ func UpdateContactURNs(ctx context.Context, db DBorTx, oa *OrgAssets, changes []
pq.Array(updatedURNIDs),
)
if err != nil {
return fmt.Errorf("error detaching urns: %w", err)
return nil, fmt.Errorf("error detaching urns: %w", err)
}

var affected []*Contact

if len(inserts) > 0 {
// find the unique ids of the contacts that may be affected by our URN inserts
orphanedIDs, err := queryContactIDs(ctx, db, `SELECT contact_id FROM contacts_contacturn WHERE identity = ANY($1) AND org_id = $2 AND contact_id IS NOT NULL`, pq.Array(identities), oa.OrgID())
if err != nil {
return fmt.Errorf("error finding contacts for URNs: %w", err)
return nil, fmt.Errorf("error finding contacts for URNs: %w", err)
}

// then insert new urns, we do these one by one since we have to deal with conflicts
for _, insert := range inserts {
_, err := db.NamedExecContext(ctx, sqlInsertContactURN, insert)
if err != nil {
return fmt.Errorf("error inserting new urns: %w", err)
return nil, fmt.Errorf("error inserting new urns: %w", err)
}
}

// finally update the contacts who had URNs stolen from them
if len(orphanedIDs) > 0 {
orphans, err := LoadContacts(ctx, db, oa, orphanedIDs)
affected, err = LoadContacts(ctx, db, oa, orphanedIDs)
if err != nil {
return fmt.Errorf("error loading contacts affecting by URN stealing: %w", err)
return nil, fmt.Errorf("error loading contacts affecting by URN stealing: %w", err)
}

// turn them into flow contacts..
flowOrphans := make([]*flows.Contact, len(orphans))
for i, c := range orphans {
flowOrphans := make([]*flows.Contact, len(affected))
for i, c := range affected {
flowOrphans[i], err = c.FlowContact(oa)
if err != nil {
return fmt.Errorf("error creating orphan flow contact: %w", err)
return nil, fmt.Errorf("error creating orphan flow contact: %w", err)
}
}

// and re-calculate their dynamic groups
if err := CalculateDynamicGroups(ctx, db, oa, flowOrphans); err != nil {
return fmt.Errorf("error re-calculating dynamic groups for orphaned contacts: %w", err)
return nil, fmt.Errorf("error re-calculating dynamic groups for orphaned contacts: %w", err)
}

// and mark them as updated
if err := UpdateContactModifiedOn(ctx, db, orphanedIDs); err != nil {
return fmt.Errorf("error updating orphaned contacts: %w", err)
return nil, fmt.Errorf("error updating orphaned contacts: %w", err)
}
}
}

// NOTE: caller needs to update modified on for this contact
return nil
return affected, nil
}

// urnUpdate is our object that represents a single contact URN update
Expand Down
18 changes: 11 additions & 7 deletions core/models/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -602,48 +602,52 @@ func TestUpdateContactURNs(t *testing.T) {
bobURN := urns.URN(fmt.Sprintf("tel:+16055742222?id=%d", testdata.Bob.URNID))

// give Cathy a new higher priority URN
err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{{testdata.Cathy.ID, testdata.Org1.ID, []urns.URN{"tel:+16055700001", cathyURN}}})
_, err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{{testdata.Cathy.ID, testdata.Org1.ID, []urns.URN{"tel:+16055700001", cathyURN}}})
assert.NoError(t, err)

assertContactURNs(testdata.Cathy.ID, []string{"tel:+16055700001", "tel:+16055741111"})

// give Bob a new lower priority URN
err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{{testdata.Bob.ID, testdata.Org1.ID, []urns.URN{bobURN, "tel:+16055700002"}}})
_, err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{{testdata.Bob.ID, testdata.Org1.ID, []urns.URN{bobURN, "tel:+16055700002"}}})
assert.NoError(t, err)

assertContactURNs(testdata.Bob.ID, []string{"tel:+16055742222", "tel:+16055700002"})
assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contacturn WHERE contact_id IS NULL`).Returns(0) // shouldn't be any orphan URNs
assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contacturn`).Returns(numInitialURNs + 2) // but 2 new URNs

// remove a URN from Cathy
err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{{testdata.Cathy.ID, testdata.Org1.ID, []urns.URN{"tel:+16055700001"}}})
_, err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{{testdata.Cathy.ID, testdata.Org1.ID, []urns.URN{"tel:+16055700001"}}})
assert.NoError(t, err)

assertContactURNs(testdata.Cathy.ID, []string{"tel:+16055700001"})
assertdb.Query(t, rt.DB, `SELECT count(*) FROM contacts_contacturn WHERE contact_id IS NULL`).Returns(1) // now orphaned

t1 := time.Now()

// steal a URN from Bob
err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{
// steal a URN from Bob and give to Alexandria
affected, err := models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{
{testdata.Cathy.ID, testdata.Org1.ID, []urns.URN{"tel:+16055700001", "tel:+16055700002"}},
{testdata.Alexandria.ID, testdata.Org1.ID, []urns.URN{"tel:+16055742222"}},
})
assert.NoError(t, err)
assert.Len(t, affected, 1)
assert.Equal(t, testdata.Bob.ID, affected[0].ID())

assertContactURNs(testdata.Cathy.ID, []string{"tel:+16055700001", "tel:+16055700002"})
assertContactURNs(testdata.Alexandria.ID, []string{"tel:+16055742222"})
assertContactURNs(testdata.Bob.ID, []string(nil))
assertModifiedOnUpdated(testdata.Bob.ID, t1)
assertGroups(testdata.Bob.ID, []string{"Active", "No URN"})

// steal the URN back from Cathy whilst simulataneously adding new URN to Cathy and not-changing anything for George
err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{
// steal the URN back from Alexandria whilst simulataneously adding new URN to Cathy and not-changing anything for George
affected, err = models.UpdateContactURNs(ctx, rt.DB, oa, []*models.ContactURNsChanged{
{testdata.Bob.ID, testdata.Org1.ID, []urns.URN{"tel:+16055742222", "tel:+16055700002"}},
{testdata.Cathy.ID, testdata.Org1.ID, []urns.URN{"tel:+16055700001", "tel:+16055700003"}},
{testdata.George.ID, testdata.Org1.ID, []urns.URN{"tel:+16055743333"}},
})
assert.NoError(t, err)
assert.Len(t, affected, 1)
assert.Equal(t, testdata.Alexandria.ID, affected[0].ID())

assertContactURNs(testdata.Cathy.ID, []string{"tel:+16055700001", "tel:+16055700003"})
assertContactURNs(testdata.Bob.ID, []string{"tel:+16055742222", "tel:+16055700002"})
Expand Down

0 comments on commit a8b4b18

Please sign in to comment.