Skip to content

Commit

Permalink
Merge pull request #83 from nyaruka/sql_cleanup
Browse files Browse the repository at this point in the history
Update test database schema and cleanup sql queries
  • Loading branch information
rowanseymour authored Mar 9, 2023
2 parents 9318e1c + cfbd0e7 commit 1a884ab
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 129 deletions.
99 changes: 48 additions & 51 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,18 @@ func (a *Archive) endDate() time.Time {
return endDate
}

const lookupActiveOrgs = `
SELECT o.id, o.name, o.created_on, o.is_anon
FROM orgs_org o
WHERE o.is_active = TRUE order by o.id
`
const sqlLookupActiveOrgs = `
SELECT id, name, created_on, is_anon
FROM orgs_org
WHERE is_active
ORDER BY id`

// GetActiveOrgs returns the active organizations sorted by id
func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

rows, err := db.QueryxContext(ctx, lookupActiveOrgs)
rows, err := db.QueryxContext(ctx, sqlLookupActiveOrgs)
if err != nil {
return nil, errors.Wrapf(err, "error fetching active orgs")
}
Expand All @@ -121,51 +121,50 @@ func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error
return orgs, nil
}

const lookupOrgArchives = `
SELECT id, org_id, start_date::timestamp with time zone as start_date, period, archive_type, hash, size, record_count, url, rollup_id, needs_deletion
FROM archives_archive WHERE org_id = $1 AND archive_type = $2
ORDER BY start_date asc, period desc
`
const sqlLookupOrgArchives = `
SELECT id, org_id, start_date::timestamp with time zone AS start_date, period, archive_type, hash, size, record_count, url, rollup_id, needs_deletion
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2
ORDER BY start_date ASC, period DESC`

// GetCurrentArchives returns all the current archives for the passed in org and record type
func GetCurrentArchives(ctx context.Context, db *sqlx.DB, org Org, archiveType ArchiveType) ([]*Archive, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

archives := make([]*Archive, 0, 1)
err := db.SelectContext(ctx, &archives, lookupOrgArchives, org.ID, archiveType)
err := db.SelectContext(ctx, &archives, sqlLookupOrgArchives, org.ID, archiveType)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error selecting current archives for org: %d and type: %s", org.ID, archiveType)
}

return archives, nil
}

const lookupArchivesNeedingDeletion = `
SELECT id, org_id, start_date::timestamp with time zone as start_date, period, archive_type, hash, size, record_count, url, rollup_id, needs_deletion
FROM archives_archive WHERE org_id = $1 AND archive_type = $2 AND needs_deletion = TRUE
ORDER BY start_date asc, period desc
`
const sqlLookupArchivesNeedingDeletion = `
SELECT id, org_id, start_date::timestamp with time zone AS start_date, period, archive_type, hash, size, record_count, url, rollup_id, needs_deletion
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2 AND needs_deletion = TRUE
ORDER BY start_date ASC, period DESC`

// GetArchivesNeedingDeletion returns all the archives which need to be deleted
func GetArchivesNeedingDeletion(ctx context.Context, db *sqlx.DB, org Org, archiveType ArchiveType) ([]*Archive, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

archives := make([]*Archive, 0, 1)
err := db.SelectContext(ctx, &archives, lookupArchivesNeedingDeletion, org.ID, archiveType)
err := db.SelectContext(ctx, &archives, sqlLookupArchivesNeedingDeletion, org.ID, archiveType)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error selecting archives needing deletion for org: %d and type: %s", org.ID, archiveType)
}

return archives, nil
}

const lookupCountOrgArchives = `
const sqlCountOrgArchives = `
SELECT count(id)
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2
`
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2`

// GetCurrentArchiveCount returns the archive count for the passed in org and record type
func GetCurrentArchiveCount(ctx context.Context, db *sqlx.DB, org Org, archiveType ArchiveType) (int, error) {
Expand All @@ -174,7 +173,7 @@ func GetCurrentArchiveCount(ctx context.Context, db *sqlx.DB, org Org, archiveTy

var archiveCount int

err := db.GetContext(ctx, &archiveCount, lookupCountOrgArchives, org.ID, archiveType)
err := db.GetContext(ctx, &archiveCount, sqlCountOrgArchives, org.ID, archiveType)
if err != nil {
return 0, errors.Wrapf(err, "error querying archive count for org: %d and type: %s", org.ID, archiveType)
}
Expand All @@ -183,12 +182,11 @@ func GetCurrentArchiveCount(ctx context.Context, db *sqlx.DB, org Org, archiveTy
}

// between is inclusive on both sides
const lookupOrgDailyArchivesForDateRange = `
SELECT id, start_date::timestamp with time zone as start_date, period, archive_type, hash, size, record_count, url, rollup_id
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND start_date BETWEEN $4 AND $5
ORDER BY start_date asc
`
const sqlLookupOrgDailyArchivesForDateRange = `
SELECT id, start_date::timestamp with time zone AS start_date, period, archive_type, hash, size, record_count, url, rollup_id
FROM archives_archive
WHERE org_id = $1 AND archive_type = $2 AND period = $3 AND start_date BETWEEN $4 AND $5
ORDER BY start_date ASC`

// GetDailyArchivesForDateRange returns all the current archives for the passed in org and record type and date range
func GetDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, org Org, archiveType ArchiveType, startDate time.Time, endDate time.Time) ([]*Archive, error) {
Expand All @@ -197,7 +195,7 @@ func GetDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, org Org, arc

existingArchives := make([]*Archive, 0, 1)

err := db.SelectContext(ctx, &existingArchives, lookupOrgDailyArchivesForDateRange, org.ID, archiveType, DayPeriod, startDate, endDate)
err := db.SelectContext(ctx, &existingArchives, sqlLookupOrgDailyArchivesForDateRange, org.ID, archiveType, DayPeriod, startDate, endDate)
if err != nil && err != sql.ErrNoRows {
return nil, errors.Wrapf(err, "error selecting daily archives for org: %d and type: %s", org.ID, archiveType)
}
Expand All @@ -218,19 +216,21 @@ func GetMissingDailyArchives(ctx context.Context, db *sqlx.DB, now time.Time, or
return GetMissingDailyArchivesForDateRange(ctx, db, startDate, endDate, org, archiveType)
}

const lookupMissingDailyArchive = `
const sqlLookupMissingDailyArchive = `
WITH month_days(missing_day) AS (
select GENERATE_SERIES($1::timestamp with time zone, $2::timestamp with time zone, '1 day')::date
), curr_archives AS (
SELECT start_date FROM archives_archive WHERE org_id = $3 AND period = $4 AND archive_type=$5
UNION DISTINCT
-- also get the overlapping days for the monthly rolled up archives
SELECT GENERATE_SERIES(start_date, (start_date + '1 month'::interval) - '1 second'::interval, '1 day')::date AS start_date
FROM archives_archive WHERE org_id = $3 AND period = 'M' AND archive_type=$5
FROM archives_archive
WHERE org_id = $3 AND period = 'M' AND archive_type = $5
)
SELECT missing_day::timestamp WITH TIME ZONE FROM month_days LEFT JOIN curr_archives ON curr_archives.start_date = month_days.missing_day
WHERE curr_archives.start_date IS NULL
`
SELECT missing_day::timestamp with time zone
FROM month_days
LEFT JOIN curr_archives ON curr_archives.start_date = month_days.missing_day
WHERE curr_archives.start_date IS NULL`

// GetMissingDailyArchivesForDateRange returns all them missing daily archives between the two passed in date ranges
func GetMissingDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, startDate time.Time, endDate time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) {
Expand All @@ -239,7 +239,7 @@ func GetMissingDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, start

missing := make([]*Archive, 0, 1)

rows, err := db.QueryxContext(ctx, lookupMissingDailyArchive, startDate, endDate, org.ID, DayPeriod, archiveType)
rows, err := db.QueryxContext(ctx, sqlLookupMissingDailyArchive, startDate, endDate, org.ID, DayPeriod, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting missing daily archives for org: %d and type: %s", org.ID, archiveType)
}
Expand Down Expand Up @@ -268,14 +268,16 @@ func GetMissingDailyArchivesForDateRange(ctx context.Context, db *sqlx.DB, start

// startDate is truncated to the first of the month
// endDate for range is not inclusive so we must deduct 1 second
const lookupMissingMonthlyArchive = `
const sqlLookupMissingMonthlyArchive = `
WITH month_days(missing_month) AS (
SELECT generate_series(date_trunc('month', $1::timestamp with time zone), $2::timestamp with time zone - '1 second'::interval, '1 month')::date
), curr_archives AS (
SELECT start_date FROM archives_archive WHERE org_id = $3 and period = $4 and archive_type=$5
SELECT start_date FROM archives_archive WHERE org_id = $3 and period = $4 and archive_type = $5
)
SELECT missing_month::timestamp with time zone from month_days LEFT JOIN curr_archives ON curr_archives.start_date = month_days.missing_month
WHERE curr_archives.start_date IS NULL
SELECT missing_month::timestamp with time zone
FROM month_days
LEFT JOIN curr_archives ON curr_archives.start_date = month_days.missing_month
WHERE curr_archives.start_date IS NULL
`

// GetMissingMonthlyArchives gets which montly archives are currently missing for this org
Expand All @@ -291,7 +293,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time,

missing := make([]*Archive, 0, 1)

rows, err := db.QueryxContext(ctx, lookupMissingMonthlyArchive, startDate, endDate, org.ID, MonthPeriod, archiveType)
rows, err := db.QueryxContext(ctx, sqlLookupMissingMonthlyArchive, startDate, endDate, org.ID, MonthPeriod, archiveType)
if err != nil {
return nil, errors.Wrapf(err, "error getting missing monthly archive for org: %d and type: %s", org.ID, archiveType)
}
Expand Down Expand Up @@ -593,11 +595,10 @@ func UploadArchive(ctx context.Context, s3Client s3iface.S3API, bucket string, a
return nil
}

const insertArchive = `
const sqlInsertArchive = `
INSERT INTO archives_archive(archive_type, org_id, created_on, start_date, period, record_count, size, hash, url, needs_deletion, build_time, rollup_id)
VALUES(:archive_type, :org_id, :created_on, :start_date, :period, :record_count, :size, :hash, :url, :needs_deletion, :build_time, :rollup_id)
RETURNING id
`
VALUES(:archive_type, :org_id, :created_on, :start_date, :period, :record_count, :size, :hash, :url, :needs_deletion, :build_time, :rollup_id)
RETURNING id`

// WriteArchiveToDB write an archive to the Database
func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error {
Expand All @@ -612,7 +613,7 @@ func WriteArchiveToDB(ctx context.Context, db *sqlx.DB, archive *Archive) error
return errors.Wrapf(err, "error starting transaction")
}

rows, err := tx.NamedQuery(insertArchive, archive)
rows, err := tx.NamedQuery(sqlInsertArchive, archive)
if err != nil {
tx.Rollback()
return errors.Wrapf(err, "error inserting archive")
Expand Down Expand Up @@ -827,11 +828,7 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
return created, failed, nil
}

const setArchiveDeleted = `
UPDATE archives_archive
SET needs_deletion = FALSE, deleted_on = $2
WHERE id = $1
`
const sqlUpdateArchiveDeleted = `UPDATE archives_archive SET needs_deletion = FALSE, deleted_on = $2 WHERE id = $1`

var deleteTransactionSize = 100

Expand Down
11 changes: 6 additions & 5 deletions archives/archives_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,26 @@ func TestGetMissingDayArchives(t *testing.T) {

orgs, err := GetActiveOrgs(ctx, db, config)
assert.NoError(t, err)
assert.Len(t, orgs, 3)

now := time.Date(2018, 1, 8, 12, 30, 0, 0, time.UTC)

// org 1 is too new, no tasks
tasks, err := GetMissingDailyArchives(ctx, db, now, orgs[0], MessageType)
assert.NoError(t, err)
assert.Equal(t, 0, len(tasks))
assert.Len(t, tasks, 0)

// org 2 should have some
tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[1], MessageType)
assert.NoError(t, err)
assert.Equal(t, 61, len(tasks))
assert.Len(t, tasks, 61)
assert.Equal(t, time.Date(2017, 8, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate)
assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), tasks[60].StartDate)

// org 3 is the same as 2, but two of the tasks have already been built
tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[2], MessageType)
assert.NoError(t, err)
assert.Equal(t, 31, len(tasks))
assert.Len(t, tasks, 31)
assert.Equal(t, time.Date(2017, 8, 11, 0, 0, 0, 0, time.UTC), tasks[0].StartDate)
assert.Equal(t, time.Date(2017, 10, 1, 0, 0, 0, 0, time.UTC), tasks[21].StartDate)
assert.Equal(t, time.Date(2017, 10, 10, 0, 0, 0, 0, time.UTC), tasks[30].StartDate)
Expand All @@ -67,13 +68,13 @@ func TestGetMissingDayArchives(t *testing.T) {
orgs[2].RetentionPeriod = 200
tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[2], MessageType)
assert.NoError(t, err)
assert.Equal(t, 0, len(tasks))
assert.Len(t, tasks, 0)

// org 1 again, but lowering the archive period so we have tasks
orgs[0].RetentionPeriod = 2
tasks, err = GetMissingDailyArchives(ctx, db, now, orgs[0], MessageType)
assert.NoError(t, err)
assert.Equal(t, 58, len(tasks))
assert.Len(t, tasks, 58)
assert.Equal(t, time.Date(2017, 11, 10, 0, 0, 0, 0, time.UTC), tasks[0].StartDate)
assert.Equal(t, time.Date(2017, 12, 1, 0, 0, 0, 0, time.UTC), tasks[21].StartDate)
assert.Equal(t, time.Date(2017, 12, 10, 0, 0, 0, 0, time.UTC), tasks[30].StartDate)
Expand Down
2 changes: 1 addition & 1 deletion archives/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func DeleteArchivedMessages(ctx context.Context, config *Config, db *sqlx.DB, s3
deletedOn := dates.Now()

// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
_, err = db.ExecContext(outer, sqlUpdateArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return errors.Wrap(err, "error setting archive as deleted")
}
Expand Down
2 changes: 1 addition & 1 deletion archives/runs.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func DeleteArchivedRuns(ctx context.Context, config *Config, db *sqlx.DB, s3Clie
deletedOn := dates.Now()

// all went well! mark our archive as no longer needing deletion
_, err = db.ExecContext(outer, setArchiveDeleted, archive.ID, deletedOn)
_, err = db.ExecContext(outer, sqlUpdateArchiveDeleted, archive.ID, deletedOn)
if err != nil {
return errors.Wrap(err, "error setting archive as deleted")
}
Expand Down
Loading

0 comments on commit 1a884ab

Please sign in to comment.