From 09989f16e171dac4c11b65344117a57c1a9c4b31 Mon Sep 17 00:00:00 2001 From: Aditya Manthramurthy Date: Tue, 6 Sep 2022 17:57:25 -0700 Subject: [PATCH] [logsearch] Fix vacuuming to keep tables in sync - Ensure that child tables of each parent table corresponding to the same time period are deleted together. - Ensure that child tables corresponding the current time period are never deleted due to disk space. - Print a log when disk space cannot be recovered (i.e. when current tables are too large) indicating to increase available storage capacity. --- logsearchapi/server/partitions.go | 179 ++++++++++++++++++------- logsearchapi/server/partitions_test.go | 19 +++ 2 files changed, 146 insertions(+), 52 deletions(-) diff --git a/logsearchapi/server/partitions.go b/logsearchapi/server/partitions.go index b21c06e4316..bc721967fa2 100644 --- a/logsearchapi/server/partitions.go +++ b/logsearchapi/server/partitions.go @@ -117,6 +117,32 @@ func (p *partitionTimeRange) next() partitionTimeRange { return newPartitionTimeRange(p.EndDate) } +func getPartitionTimeRangeForTable(name string) (partitionTimeRange, error) { + fmtStr := []rune("2006_01_02") + runes := []rune(name) + + errFn := func(msg string) error { + title := "invalid partition name" + s := ": " + msg + if msg == "" { + s = "" + } + return fmt.Errorf("%s%s", title, s) + } + + if len(runes) <= len(fmtStr) { + return partitionTimeRange{}, errFn("too short") + } + + // Split out the date part of the table name + partSuffix := string(runes[len(runes)-len(fmtStr):]) + startTime, err := time.Parse(string(fmtStr), partSuffix) + if err != nil { + return partitionTimeRange{}, errFn("bad time value: " + partSuffix) + } + return newPartitionTimeRange(startTime), nil +} + type childTableInfo struct { ParentSchema string Parent string @@ -158,33 +184,19 @@ func (c *DBClient) getExistingPartitions(ctx context.Context, t Table) (tableNam return tableNames, nil } -func (c *DBClient) getTablesDiskUsage(ctx context.Context) (m map[Table]map[string]uint64, _ error) { - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) +func (c *DBClient) getTableDiskUsage(ctx context.Context, tableName string) (int64, error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() const ( tableSize QTemplate = `SELECT pg_total_relation_size('%s');` ) - m = make(map[Table]map[string]uint64, len(allTables)) - for _, table := range allTables { - parts, err := c.getExistingPartitions(ctx, table) - if err != nil { - return nil, err - } - cm := make(map[string]uint64, len(parts)) - for _, tableName := range parts { - q := tableSize.build(tableName) - row := c.QueryRowContext(ctx, q) - var size uint64 - if err := row.Scan(&size); err != nil { - return nil, fmt.Errorf("Unable to query relation size: %v", err) - } - cm[tableName] = size - } - m[table] = cm - } - return m, nil + q := tableSize.build(tableName) + row := c.QueryRowContext(ctx, q) + var size int64 + err := row.Scan(&size) + return size, err } func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) error { @@ -197,15 +209,6 @@ func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) e return nil } -func totalDiskUsage(m map[Table]map[string]uint64) (sum uint64) { - for _, cm := range m { - for _, v := range cm { - sum += v - } - } - return -} - func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) { const ( highWaterMarkPercent = 90 @@ -214,20 +217,53 @@ func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) { return highWaterMarkPercent * float64(totalCap) / 100, lowWaterMarkPercent * float64(totalCap) / 100 } +// getEarliestPartitionStartTime - finds the earliest start time of all existing +// table partitions - this is the minimum start time over the first existing +// partitions for each parent table. +func getEarliestPartitionStartTime(tables map[Table][]string, indices []int) (time.Time, error) { + var earliestStartTime time.Time + isSet := false + for i, table := range allTables { + pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]]) + if err != nil { + return time.Time{}, err + } + if !isSet { + earliestStartTime = pt.StartDate + isSet = true + } + if earliestStartTime.After(pt.StartDate) { + earliestStartTime = pt.StartDate + } + } + return earliestStartTime, nil +} + func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGBs int) (err error) { - tables := make(map[Table][]string) + tables := make(map[Table][]string, len(allTables)) + du := make(map[Table]map[string]int64, len(allTables)) + var totalUsage int64 for _, table := range allTables { + // Find partitions for the parent table `table`. tables[table], err = c.getExistingPartitions(ctx, table) if err != nil { return err } - } - du, err := c.getTablesDiskUsage(ctx) - if err != nil { - return err + + // Query disk usage of each partition in pg + m := make(map[string]int64, len(tables[table])) + for _, partition := range tables[table] { + size, err := c.getTableDiskUsage(ctx, partition) + if err != nil { + return err + } + m[partition] = size + totalUsage += size + } + du[table] = m + } - totalUsage := totalDiskUsage(du) diskCap := uint64(diskCapacityGBs) * 1024 * 1024 * 1024 hi, lo := calculateHiLoWaterMarks(diskCap) @@ -235,48 +271,87 @@ func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGB return nil } + // Print out disk usage after deletes - we defer this call because we could + // exit this func due to an error. + defer func() { + log.Printf("Current tables disk usage: %.1f GB", float64(totalUsage)/float64(1024*1024*1024)) + }() + // Delete oldest child tables in each parent table, until usage is below // `lo`. - var index int + // + // NOTE: Existing partitions for each parent table may not be in sync wrt + // the time periods they correspond to, due to previous errors in deleting + // from the db. So we keep track of the indices of the child tables for each + // parent table to ensure we only delete the oldest tables. + indices := make([]int, len(allTables)) for float64(totalUsage) >= lo { - var recoveredSpace uint64 - for _, table := range allTables { - if index >= len(tables[table]) { - break + earliestStartTime, err := getEarliestPartitionStartTime(tables, indices) + if err != nil { + return err + } + + // Quit without deleting the current partition even if we are over the + // highwater mark! + currentPartStartTime := newPartitionTimeRange(time.Now()).StartDate + if earliestStartTime.Equal(currentPartStartTime) { + type ctinfo struct { + Name string + Size int64 + } + var ct []ctinfo + var total int64 + for i, table := range allTables { + name := tables[table][indices[i]] + ct = append(ct, ctinfo{name, du[table][name]}) + total += du[table][name] } - tableName := tables[table][index] - err = c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached") + + log.Printf("WARNING: highwater mark reached: no non-current tables exist to delete!"+ + " Please increase the value of "+DiskCapacityEnv+" and ensure disk capacity for PostgreSQL!"+ + " Candidate tables and sizes: %v (total usage: %d)", ct, total) + break + } + + // Delete all child tables with the same StartTime = earliestStartTime + for i, table := range allTables { + pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]]) if err != nil { return err } - recoveredSpace += du[table][tableName] + + if pt.StartDate.Equal(earliestStartTime) { + tableName := tables[table][indices[i]] + err := c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached") + if err != nil { + return err + } + indices[i]++ + totalUsage -= du[table][tableName] + } } - totalUsage -= recoveredSpace - index++ } - log.Printf("Current tables disk usage: %.1f GB", float64(totalUsage)/float64(1024*1024*1024)) return nil } // vacuumData should be called in a new go routine. func (c *DBClient) vacuumData(ctx context.Context, diskCapacityGBs int) { - var ( - normalInterval = 1 * time.Hour - retryInterval = 2 * time.Minute - ) + normalInterval := 1 * time.Hour + retryInterval := 2 * time.Minute timer := time.NewTimer(normalInterval) defer timer.Stop() for { select { case <-timer.C: - timer.Reset(retryInterval) // timer fired, reset it right here. err := c.maintainLowWatermarkUsage(ctx, diskCapacityGBs) if err != nil { log.Printf("Error maintaining high-water mark disk usage: %v (retrying in %s)", err, retryInterval) + timer.Reset(retryInterval) continue } + timer.Reset(normalInterval) case <-ctx.Done(): log.Println("Vacuum thread exiting.") diff --git a/logsearchapi/server/partitions_test.go b/logsearchapi/server/partitions_test.go index cc6006f1189..b1893102aed 100644 --- a/logsearchapi/server/partitions_test.go +++ b/logsearchapi/server/partitions_test.go @@ -131,3 +131,22 @@ func TestPartitionTimeRangeNextPrev(t *testing.T) { } } } + +func TestPartitionNameParsing(t *testing.T) { + rand.Seed(time.Now().UnixNano()) + + for i := 0; i < 1000; i++ { + r := randomTime() + p1 := newPartitionTimeRange(r) + + name := "table_" + p1.getPartnameSuffix() + + res, err := getPartitionTimeRangeForTable(name) + if err != nil { + t.Errorf("Test %d: r=%v unexpected err: %v", i, r, err) + } + if !res.isSame(&p1) { + t.Errorf("Test %d: r=%v, expected: %v got %v", i, r, p1.String(), res.String()) + } + } +}