Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update/v7.1.4 #9

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
c0346ba
Add chunkIDs util function
rowanseymour Nov 29, 2021
80dc347
Use more errors.Wrap
rowanseymour Nov 29, 2021
b45faeb
Merge pull request #54 from nyaruka/simplify
rowanseymour Nov 30, 2021
e83191b
Remove msgs_msg.response_to_id
rowanseymour Dec 7, 2021
f786965
Merge pull request #56 from nyaruka/no_response_to
rowanseymour Dec 9, 2021
8cb5fa9
Update CHANGELOG.md for v7.1.0
rowanseymour Dec 9, 2021
d245215
Add deploy/ to .gitignore
rowanseymour Dec 20, 2021
db981d0
Remove references to flowrun.parent_id which is no longer set by mail…
rowanseymour Dec 20, 2021
a5e1527
Merge pull request #57 from nyaruka/no_parent_id
rowanseymour Dec 20, 2021
d875b55
Update CHANGELOG.md for v7.1.1
rowanseymour Dec 20, 2021
b6a3f97
No longer include events in run archives
rowanseymour Jan 6, 2022
cbaebfe
Merge pull request #58 from nyaruka/remove_events
rowanseymour Jan 6, 2022
1acd9b3
Use run status instead of is_active and exit_type
rowanseymour Jan 6, 2022
68a8b45
Merge pull request #59 from nyaruka/use_run_status
rowanseymour Jan 6, 2022
4662b07
Update CHANGELOG.md for v7.1.2
rowanseymour Jan 6, 2022
2628094
Remove deletion of recent runs as these are no longer created
rowanseymour Jan 12, 2022
94fa776
Merge pull request #60 from nyaruka/no_more_recent_runs
rowanseymour Jan 13, 2022
c205797
Update CHANGELOG.md for v7.1.3
rowanseymour Jan 13, 2022
39630b2
Record flow on msgs
rowanseymour Jan 17, 2022
0138f6b
Merge pull request #61 from nyaruka/msg_flow
rowanseymour Jan 17, 2022
4ce0680
Update CHANGELOG.md for v7.1.4
rowanseymour Jan 17, 2022
cbfe998
Merge branch 'main' of https://github.com/nyaruka/rp-archiver into up…
Robi9 Jan 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@ dist/

# Output of the go coverage tool, specifically when used with LiteIDE
*.out
fabric
deploy
fabfile.py
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,24 @@
v7.1.4
----------
* Record flow on msgs

v7.1.3
----------
* Remove deletion of recent runs as these are no longer created

v7.1.2
----------
* Use run status instead of is_active and exit_type
* No longer include events in run archives

v7.1.1
----------
* Remove references to flowrun.parent_id which is no longer set by mailroom

v7.1.0
----------
* Remove msgs_msg.response_to_id

v7.0.0
----------
* Test on PG12 and 13
Expand Down
36 changes: 18 additions & 18 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@ func TestCreateMsgArchive(t *testing.T) {

// should have two records, second will have attachments
assert.Equal(t, 3, task.RecordCount)
assert.Equal(t, int64(483), task.Size)
assert.Equal(t, int64(528), task.Size)
assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), task.StartDate)
assert.Equal(t, "6fe9265860425cf1f9757ba3d91b1a05", task.Hash)
assert.Equal(t, "b3bf00bf1234ea47f14ffd0171a8ead0", task.Hash)
assertArchiveFile(t, task, "messages1.jsonl")

DeleteArchiveFile(task)
Expand All @@ -163,8 +163,8 @@ func TestCreateMsgArchive(t *testing.T) {

// should have one record
assert.Equal(t, 1, task.RecordCount)
assert.Equal(t, int64(290), task.Size)
assert.Equal(t, "a719c7ec64c516a6e159d26a70cb4225", task.Hash)
assert.Equal(t, int64(294), task.Size)
assert.Equal(t, "bd163ead077774425aa559e30d48ca87", task.Hash)
assertArchiveFile(t, task, "messages2.jsonl")

DeleteArchiveFile(task)
Expand Down Expand Up @@ -218,8 +218,8 @@ func TestCreateRunArchive(t *testing.T) {

// should have two record
assert.Equal(t, 2, task.RecordCount)
assert.Equal(t, int64(642), task.Size)
assert.Equal(t, "f793f863f5e060b9d67c5688a555da6a", task.Hash)
assert.Equal(t, int64(472), task.Size)
assert.Equal(t, "734d437e1c66d09e033d698c732178f8", task.Hash)
assertArchiveFile(t, task, "runs1.jsonl")

DeleteArchiveFile(task)
Expand All @@ -238,8 +238,8 @@ func TestCreateRunArchive(t *testing.T) {

// should have one record
assert.Equal(t, 1, task.RecordCount)
assert.Equal(t, int64(497), task.Size)
assert.Equal(t, "074de71dfb619c78dbac5b6709dd66c2", task.Hash)
assert.Equal(t, int64(490), task.Size)
assert.Equal(t, "c2138e3c3009a9c09fc55482903d93e4", task.Hash)
assertArchiveFile(t, task, "runs2.jsonl")

DeleteArchiveFile(task)
Expand Down Expand Up @@ -341,14 +341,14 @@ func TestArchiveOrgMessages(t *testing.T) {
assert.Equal(t, time.Date(2017, 8, 12, 0, 0, 0, 0, time.UTC), created[2].StartDate)
assert.Equal(t, DayPeriod, created[2].Period)
assert.Equal(t, 3, created[2].RecordCount)
assert.Equal(t, int64(483), created[2].Size)
assert.Equal(t, "6fe9265860425cf1f9757ba3d91b1a05", created[2].Hash)
assert.Equal(t, int64(528), created[2].Size)
assert.Equal(t, "b3bf00bf1234ea47f14ffd0171a8ead0", created[2].Hash)

assert.Equal(t, time.Date(2017, 8, 13, 0, 0, 0, 0, time.UTC), created[3].StartDate)
assert.Equal(t, DayPeriod, created[3].Period)
assert.Equal(t, 1, created[3].RecordCount)
assert.Equal(t, int64(306), created[3].Size)
assert.Equal(t, "7ece4401d3afac9c08a913398f213ffa", created[3].Hash)
assert.Equal(t, int64(312), created[3].Size)
assert.Equal(t, "32e61b1431217b59fca0170f637d78a3", created[3].Hash)

assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[60].StartDate)
assert.Equal(t, DayPeriod, created[60].Period)
Expand All @@ -359,8 +359,8 @@ func TestArchiveOrgMessages(t *testing.T) {
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[61].StartDate)
assert.Equal(t, MonthPeriod, created[61].Period)
assert.Equal(t, 4, created[61].RecordCount)
assert.Equal(t, int64(509), created[61].Size)
assert.Equal(t, "9e40be76913bf58655b70ee96dcac25d", created[61].Hash)
assert.Equal(t, int64(553), created[61].Size)
assert.Equal(t, "156e45e29b6587cb85ccf75e03800b00", created[61].Hash)

assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[62].StartDate)
assert.Equal(t, MonthPeriod, created[62].Period)
Expand Down Expand Up @@ -469,8 +469,8 @@ func TestArchiveOrgRuns(t *testing.T) {
assert.Equal(t, time.Date(2017, 8, 1, 0, 0, 0, 0, time.UTC), created[0].StartDate)
assert.Equal(t, MonthPeriod, created[0].Period)
assert.Equal(t, 1, created[0].RecordCount)
assert.Equal(t, int64(497), created[0].Size)
assert.Equal(t, "074de71dfb619c78dbac5b6709dd66c2", created[0].Hash)
assert.Equal(t, int64(490), created[0].Size)
assert.Equal(t, "c2138e3c3009a9c09fc55482903d93e4", created[0].Hash)

assert.Equal(t, time.Date(2017, 9, 1, 0, 0, 0, 0, time.UTC), created[1].StartDate)
assert.Equal(t, MonthPeriod, created[1].Period)
Expand All @@ -487,8 +487,8 @@ func TestArchiveOrgRuns(t *testing.T) {
assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), created[11].StartDate)
assert.Equal(t, DayPeriod, created[11].Period)
assert.Equal(t, 2, created[11].RecordCount)
assert.Equal(t, int64(2002), created[11].Size)
assert.Equal(t, "b75d6ee33ce26b786f1b341e875ecd62", created[11].Hash)
assert.Equal(t, int64(1984), created[11].Size)
assert.Equal(t, "869cc00ad4cca0371d07c88d8cf2bf26", created[11].Hash)

assert.Equal(t, 12, len(deleted))

Expand Down
55 changes: 16 additions & 39 deletions archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ SELECT rec.visibility, row_to_json(rec) FROM (
row_to_json(contact) as contact,
CASE WHEN oo.is_anon = False THEN ccu.identity ELSE null END as urn,
row_to_json(channel) as channel,
row_to_json(flow) as flow,
CASE WHEN direction = 'I' THEN 'in'
WHEN direction = 'O' THEN 'out'
ELSE NULL
Expand Down Expand Up @@ -61,6 +62,7 @@ SELECT rec.visibility, row_to_json(rec) FROM (
JOIN LATERAL (select uuid, name from contacts_contact cc where cc.id = mm.contact_id) as contact ON True
LEFT JOIN contacts_contacturn ccu ON mm.contact_urn_id = ccu.id
LEFT JOIN LATERAL (select uuid, name from channels_channel ch where ch.id = mm.channel_id) as channel ON True
LEFT JOIN LATERAL (select uuid, name from flows_flow f where f.id = mm.flow_id) as flow ON True
LEFT JOIN LATERAL (select coalesce(jsonb_agg(label_row), '[]'::jsonb) as data from (select uuid, name from msgs_label ml INNER JOIN msgs_msg_labels mml ON ml.id = mml.label_id AND mml.msg_id = mm.id) as label_row) as labels_agg ON True

WHERE mm.org_id = $1 AND mm.created_on >= $2 AND mm.created_on < $3
Expand Down Expand Up @@ -123,12 +125,6 @@ DELETE FROM msgs_msg_labels
WHERE msg_id IN(?)
`

const unlinkResponses = `
UPDATE msgs_msg
SET response_to_id = NULL
WHERE response_to_id IN(?)
`

const deleteMessages = `
DELETE FROM msgs_msg
WHERE id IN(?)
Expand Down Expand Up @@ -189,75 +185,58 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
}
rows.Close()

log.WithFields(logrus.Fields{
"msg_count": len(msgIDs),
}).Debug("found messages")
log.WithField("msg_count", len(msgIDs)).Debug("found messages")

// verify we don't see more messages than there are in our archive (fewer is ok)
if visibleCount > archive.RecordCount {
return fmt.Errorf("more messages in the database: %d than in archive: %d", visibleCount, archive.RecordCount)
}

// ok, delete our messages in batches, we do this in transactions as it spans a few different queries
for startIdx := 0; startIdx < len(msgIDs); startIdx += deleteTransactionSize {
for _, idBatch := range chunkIDs(msgIDs, deleteTransactionSize) {
// no single batch should take more than a few minutes
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

start := time.Now()

endIdx := startIdx + deleteTransactionSize
if endIdx > len(msgIDs) {
endIdx = len(msgIDs)
}
batchIDs := msgIDs[startIdx:endIdx]

// start our transaction
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return err
}

// first update our delete_reason
err = executeInQuery(ctx, tx, setMessageDeleteReason, batchIDs)
err = executeInQuery(ctx, tx, setMessageDeleteReason, idBatch)
if err != nil {
return fmt.Errorf("error updating delete reason: %s", err.Error())
return errors.Wrap(err, "error updating delete reason")
}

// now delete any channel logs
err = executeInQuery(ctx, tx, deleteMessageLogs, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLogs, idBatch)
if err != nil {
return fmt.Errorf("error removing channel logs: %s", err.Error())
return errors.Wrap(err, "error removing channel logs")
}

// then any labels
err = executeInQuery(ctx, tx, deleteMessageLabels, batchIDs)
err = executeInQuery(ctx, tx, deleteMessageLabels, idBatch)
if err != nil {
return fmt.Errorf("error removing message labels: %s", err.Error())
}

// unlink any responses
err = executeInQuery(ctx, tx, unlinkResponses, batchIDs)
if err != nil {
return fmt.Errorf("error unlinking responses: %s", err.Error())
return errors.Wrap(err, "error removing message labels")
}

// finally, delete our messages
err = executeInQuery(ctx, tx, deleteMessages, batchIDs)
err = executeInQuery(ctx, tx, deleteMessages, idBatch)
if err != nil {
return fmt.Errorf("error deleting messages: %s", err.Error())
return errors.Wrap(err, "error deleting messages")
}

// commit our transaction
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing message delete transaction: %s", err.Error())
return errors.Wrap(err, "error committing message delete transaction")
}

log.WithFields(logrus.Fields{
"elapsed": time.Since(start),
"count": len(batchIDs),
}).Debug("deleted batch of messages")
log.WithField("elapsed", time.Since(start)).WithField("count", len(idBatch)).Debug("deleted batch of messages")

cancel()
}
Expand All @@ -270,14 +249,12 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return fmt.Errorf("error setting archive as deleted: %s", err.Error())
return errors.Wrap(err, "error setting archive as deleted")
}
archive.NeedsDeletion = false
archive.DeletedOn = &deletedOn

logrus.WithFields(logrus.Fields{
"elapsed": time.Since(start),
}).Info("completed deleting messages")
logrus.WithField("elapsed", time.Since(start)).Info("completed deleting messages")

return nil
}
Expand Down
Loading