Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add runtime.Runtime to hold config, DB and S3 #104

Merged
merged 1 commit into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 49 additions & 49 deletions archives/archives.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/lib/pq"
"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/gocommon/dates"
"github.com/nyaruka/gocommon/s3x"
"github.com/nyaruka/rp-archiver/runtime"
)

// ArchiveType is the type for the archives
Expand Down Expand Up @@ -97,19 +97,19 @@ const sqlLookupActiveOrgs = `
ORDER BY id`

// GetActiveOrgs returns the active organizations sorted by id
func GetActiveOrgs(ctx context.Context, db *sqlx.DB, conf *Config) ([]Org, error) {
func GetActiveOrgs(ctx context.Context, rt *runtime.Runtime) ([]Org, error) {
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()

rows, err := db.QueryxContext(ctx, sqlLookupActiveOrgs)
rows, err := rt.DB.QueryxContext(ctx, sqlLookupActiveOrgs)
if err != nil {
return nil, fmt.Errorf("error fetching active orgs: %w", err)
}
defer rows.Close()

orgs := make([]Org, 0, 10)
for rows.Next() {
org := Org{RetentionPeriod: conf.RetentionPeriod}
org := Org{RetentionPeriod: rt.Config.RetentionPeriod}
err = rows.StructScan(&org)
if err != nil {
return nil, fmt.Errorf("error scanning active org: %w", err)
Expand Down Expand Up @@ -320,7 +320,7 @@ func GetMissingMonthlyArchives(ctx context.Context, db *sqlx.DB, now time.Time,
}

// BuildRollupArchive builds a monthly archive from the files present on S3
func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client *s3x.Service, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error {
func BuildRollupArchive(ctx context.Context, rt *runtime.Runtime, monthlyArchive *Archive, now time.Time, org Org, archiveType ArchiveType) error {
ctx, cancel := context.WithTimeout(ctx, time.Hour)
defer cancel()

Expand All @@ -335,7 +335,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client
}

// grab all the daily archives we need
missingDailies, err := GetMissingDailyArchivesForDateRange(ctx, db, startDate, endDate, org, archiveType)
missingDailies, err := GetMissingDailyArchivesForDateRange(ctx, rt.DB, startDate, endDate, org, archiveType)
if err != nil {
return err
}
Expand All @@ -346,7 +346,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client

// great, we have all the dailies we need, download them
filename := fmt.Sprintf("%s_%d_%s_%d_%02d_", monthlyArchive.ArchiveType, monthlyArchive.Org.ID, monthlyArchive.Period, monthlyArchive.StartDate.Year(), monthlyArchive.StartDate.Month())
file, err := os.CreateTemp(conf.TempDir, filename)
file, err := os.CreateTemp(rt.Config.TempDir, filename)
if err != nil {
return fmt.Errorf("error creating temp file: %s: %w", filename, err)
}
Expand All @@ -357,7 +357,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client

recordCount := 0

dailies, err := GetDailyArchivesForDateRange(ctx, db, org, archiveType, startDate, endDate)
dailies, err := GetDailyArchivesForDateRange(ctx, rt.DB, org, archiveType, startDate, endDate)
if err != nil {
return err
}
Expand All @@ -375,7 +375,7 @@ func BuildRollupArchive(ctx context.Context, db *sqlx.DB, conf *Config, s3Client
continue
}

reader, err := GetS3File(ctx, s3Client, daily.URL)
reader, err := GetS3File(ctx, rt.S3, daily.URL)
if err != nil {
return fmt.Errorf("error reading S3 URL: %s: %w", daily.URL, err)
}
Expand Down Expand Up @@ -549,7 +549,7 @@ func CreateArchiveFile(ctx context.Context, db *sqlx.DB, archive *Archive, archi
}

// UploadArchive uploads the passed archive file to S3
func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, archive *Archive) error {
func UploadArchive(ctx context.Context, rt *runtime.Runtime, archive *Archive) error {
ctx, cancel := context.WithTimeout(ctx, time.Minute*15)
defer cancel()

Expand All @@ -568,7 +568,7 @@ func UploadArchive(ctx context.Context, s3Client *s3x.Service, bucket string, ar
archive.Hash)
}

err := UploadToS3(ctx, s3Client, bucket, archivePath, archive)
err := UploadToS3(ctx, rt.S3, rt.Config.S3Bucket, archivePath, archive)
if err != nil {
return fmt.Errorf("error uploading archive to S3: %w", err)
}
Expand Down Expand Up @@ -675,8 +675,8 @@ func DeleteArchiveFile(archive *Archive) error {
}

// CreateOrgArchives builds all the missing archives for the passed in org
func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
archiveCount, err := GetCurrentArchiveCount(ctx, db, org, archiveType)
func CreateOrgArchives(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, error) {
archiveCount, err := GetCurrentArchiveCount(ctx, rt.DB, org, archiveType)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("error getting current archive count: %w", err)
}
Expand All @@ -685,60 +685,60 @@ func CreateOrgArchives(ctx context.Context, now time.Time, config *Config, db *s

// no existing archives means this might be a backfill, figure out if there are full months we can build first
if archiveCount == 0 {
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
archives, err := GetMissingMonthlyArchives(ctx, rt.DB, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("error getting missing monthly archives: %w", err)
}

// we first create monthly archives
monthliesCreated, monthliesFailed = createArchives(ctx, db, config, s3Client, org, archives)
monthliesCreated, monthliesFailed = createArchives(ctx, rt, org, archives)
}

// then add in daily archives taking into account the monthly that have been built
daily, err := GetMissingDailyArchives(ctx, db, now, org, archiveType)
daily, err := GetMissingDailyArchives(ctx, rt.DB, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("error getting missing daily archives: %w", err)
}

// we then create missing daily archives
dailiesCreated, dailiesFailed = createArchives(ctx, db, config, s3Client, org, daily)
dailiesCreated, dailiesFailed = createArchives(ctx, rt, org, daily)

defer ctx.Done()

return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil
}

func createArchive(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, archive *Archive) error {
err := CreateArchiveFile(ctx, db, archive, config.TempDir)
func createArchive(ctx context.Context, rt *runtime.Runtime, archive *Archive) error {
err := CreateArchiveFile(ctx, rt.DB, archive, rt.Config.TempDir)
if err != nil {
return fmt.Errorf("error writing archive file: %w", err)
}

defer func() {
if !config.KeepFiles {
if !rt.Config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
slog.Error("error deleting temporary archive file", "error", err)
}
}
}()

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if rt.Config.UploadToS3 {
err = UploadArchive(ctx, rt, archive)
if err != nil {
return fmt.Errorf("error writing archive to s3: %w", err)
}
}

err = WriteArchiveToDB(ctx, db, archive)
err = WriteArchiveToDB(ctx, rt.DB, archive)
if err != nil {
return fmt.Errorf("error writing record to db: %w", err)
}

return nil
}

func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *s3x.Service, org Org, archives []*Archive) ([]*Archive, []*Archive) {
func createArchives(ctx context.Context, rt *runtime.Runtime, org Org, archives []*Archive) ([]*Archive, []*Archive) {
log := slog.With("org_id", org.ID, "org_name", org.Name)

created := make([]*Archive, 0, len(archives))
Expand All @@ -748,7 +748,7 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *
log.With("start_date", archive.StartDate, "end_date", archive.endDate(), "period", archive.Period, "archive_type", archive.ArchiveType).Debug("starting archive")
start := dates.Now()

err := createArchive(ctx, db, config, s3Client, archive)
err := createArchive(ctx, rt, archive)
if err != nil {
log.Error("error creating archive", "error", err)
failed = append(failed, archive)
Expand All @@ -762,14 +762,14 @@ func createArchives(ctx context.Context, db *sqlx.DB, config *Config, s3Client *
}

// RollupOrgArchives rolls up monthly archives from our daily archives
func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
func RollupOrgArchives(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, error) {
ctx, cancel := context.WithTimeout(ctx, time.Hour*3)
defer cancel()

log := slog.With("org_id", org.ID, "org_name", org.Name, "archive_type", archiveType)

// get our missing monthly archives
archives, err := GetMissingMonthlyArchives(ctx, db, now, org, archiveType)
archives, err := GetMissingMonthlyArchives(ctx, rt.DB, now, org, archiveType)
if err != nil {
return nil, nil, err
}
Expand All @@ -782,30 +782,30 @@ func RollupOrgArchives(ctx context.Context, now time.Time, config *Config, db *s
log := log.With("start_date", archive.StartDate)
start := dates.Now()

err = BuildRollupArchive(ctx, db, config, s3Client, archive, now, org, archiveType)
err = BuildRollupArchive(ctx, rt, archive, now, org, archiveType)
if err != nil {
log.Error("error building monthly archive", "error", err)
failed = append(failed, archive)
continue
}

if config.UploadToS3 {
err = UploadArchive(ctx, s3Client, config.S3Bucket, archive)
if rt.Config.UploadToS3 {
err = UploadArchive(ctx, rt, archive)
if err != nil {
log.Error("error writing archive to s3", "error", err)
failed = append(failed, archive)
continue
}
}

err = WriteArchiveToDB(ctx, db, archive)
err = WriteArchiveToDB(ctx, rt.DB, archive)
if err != nil {
log.Error("error writing record to db", "error", err)
failed = append(failed, archive)
continue
}

if !config.KeepFiles {
if !rt.Config.KeepFiles {
err := DeleteArchiveFile(archive)
if err != nil {
log.Error("error deleting temporary file", "error", err)
Expand All @@ -825,9 +825,9 @@ const sqlUpdateArchiveDeleted = `UPDATE archives_archive SET needs_deletion = FA
var deleteTransactionSize = 100

// DeleteArchivedOrgRecords deletes all the records for the given org based on archives already created
func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, error) {
func DeleteArchivedOrgRecords(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, error) {
// get all the archives that haven't yet been deleted
archives, err := GetArchivesNeedingDeletion(ctx, db, org, archiveType)
archives, err := GetArchivesNeedingDeletion(ctx, rt.DB, org, archiveType)
if err != nil {
return nil, fmt.Errorf("error finding archives needing deletion '%s'", archiveType)
}
Expand All @@ -848,15 +848,15 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config

switch a.ArchiveType {
case MessageType:
err = DeleteArchivedMessages(ctx, config, db, s3Client, a)
err = DeleteArchivedMessages(ctx, rt, a)
if err == nil {
err = DeleteBroadcasts(ctx, now, config, db, org)
err = DeleteBroadcasts(ctx, rt, now, org)
}

case RunType:
err = DeleteArchivedRuns(ctx, config, db, s3Client, a)
err = DeleteArchivedRuns(ctx, rt, a)
if err == nil {
err = DeleteFlowStarts(ctx, now, config, db, org)
err = DeleteFlowStarts(ctx, rt, now, org)
}

default:
Expand All @@ -876,11 +876,11 @@ func DeleteArchivedOrgRecords(ctx context.Context, now time.Time, config *Config
}

// ArchiveOrg looks for any missing archives for the passed in org, creating and uploading them as necessary, returning the created archives
func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3Client *s3x.Service, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
func ArchiveOrg(ctx context.Context, rt *runtime.Runtime, now time.Time, org Org, archiveType ArchiveType) ([]*Archive, []*Archive, []*Archive, []*Archive, []*Archive, error) {
log := slog.With("org_id", org.ID, "org_name", org.Name)
start := dates.Now()

dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, err := CreateOrgArchives(ctx, rt, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("error creating archives: %w", err)
}
Expand All @@ -891,7 +891,7 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
log.Info("completed archival for org", "elapsed", elapsed, "records_per_second", rate)
}

rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, now, cfg, db, s3Client, org, archiveType)
rollupsCreated, rollupsFailed, err := RollupOrgArchives(ctx, rt, now, org, archiveType)
if err != nil {
return nil, nil, nil, nil, nil, fmt.Errorf("error rolling up archives: %w", err)
}
Expand All @@ -902,8 +902,8 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3

// finally delete any archives not yet actually archived
var deleted []*Archive
if cfg.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, now, cfg, db, s3Client, org, archiveType)
if rt.Config.Delete {
deleted, err = DeleteArchivedOrgRecords(ctx, rt, now, org, archiveType)
if err != nil {
return dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, nil, fmt.Errorf("error deleting archived records: %w", err)
}
Expand All @@ -913,12 +913,12 @@ func ArchiveOrg(ctx context.Context, now time.Time, cfg *Config, db *sqlx.DB, s3
}

// ArchiveActiveOrgs fetches active orgs and archives messages and runs
func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error {
func ArchiveActiveOrgs(rt *runtime.Runtime) error {
start := dates.Now()

// get our active orgs
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
orgs, err := GetActiveOrgs(ctx, db, cfg)
orgs, err := GetActiveOrgs(ctx, rt)
cancel()

if err != nil {
Expand All @@ -937,8 +937,8 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Hour*12)
log := slog.With("org_id", org.ID, "org_name", org.Name)

if cfg.ArchiveMessages {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, MessageType)
if rt.Config.ArchiveMessages {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, rt, start, org, MessageType)
if err != nil {
log.Error("error archiving org messages", "error", err, "archive_type", MessageType)
}
Expand All @@ -948,8 +948,8 @@ func ArchiveActiveOrgs(db *sqlx.DB, cfg *Config, s3Client *s3x.Service) error {
totalMsgsRollupsCreated += len(monthliesCreated)
totalMsgsRollupsFailed += len(monthliesFailed)
}
if cfg.ArchiveRuns {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, start, cfg, db, s3Client, org, RunType)
if rt.Config.ArchiveRuns {
dailiesCreated, dailiesFailed, monthliesCreated, monthliesFailed, _, err := ArchiveOrg(ctx, rt, start, org, RunType)
if err != nil {
log.Error("error archiving org runs", "error", err, "archive_type", RunType)
}
Expand Down
Loading
Loading