Skip to content

Commit

Permalink
[logsearch] Fix vacuuming to tables in sync
Browse files Browse the repository at this point in the history
- Ensure that child tables of each parent table corresponding to the
same time period are deleted together.

- Ensure that child tables corresponding the current time period are
never deleted due to disk space.

- Print a log when disk space cannot be recovered (i.e. when current tables
are too large) indicating to increase available storage capacity.
  • Loading branch information
donatello committed Sep 7, 2022
1 parent 09ef071 commit a412910
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 50 deletions.
155 changes: 105 additions & 50 deletions logsearchapi/server/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ func (p *partitionTimeRange) next() partitionTimeRange {
return newPartitionTimeRange(p.EndDate)
}

func getPartitionTimeRangeForTable(name string) (partitionTimeRange, error) {
fmtStr := []rune("2006_01_02")
runes := []rune(name)

errFn := func(msg string) error {
title := "invalid partition name"
s := ": " + msg
if msg == "" {
s = ""
}
return fmt.Errorf("%s%s", title, s)
}

if len(runes) <= len(fmtStr) {
return partitionTimeRange{}, errFn("too short")
}

// Split out the date part of the table name
partSuffix := string(runes[len(runes)-len(fmtStr):])
startTime, err := time.Parse(string(fmtStr), partSuffix)
if err != nil {
return partitionTimeRange{}, errFn("bad time value: " + partSuffix)
}
return newPartitionTimeRange(startTime), nil
}

type childTableInfo struct {
ParentSchema string
Parent string
Expand Down Expand Up @@ -158,33 +184,19 @@ func (c *DBClient) getExistingPartitions(ctx context.Context, t Table) (tableNam
return tableNames, nil
}

func (c *DBClient) getTablesDiskUsage(ctx context.Context) (m map[Table]map[string]uint64, _ error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
func (c *DBClient) getTableDiskUsage(ctx context.Context, tableName string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

const (
tableSize QTemplate = `SELECT pg_total_relation_size('%s');`
)

m = make(map[Table]map[string]uint64, len(allTables))
for _, table := range allTables {
parts, err := c.getExistingPartitions(ctx, table)
if err != nil {
return nil, err
}
cm := make(map[string]uint64, len(parts))
for _, tableName := range parts {
q := tableSize.build(tableName)
row := c.QueryRowContext(ctx, q)
var size uint64
if err := row.Scan(&size); err != nil {
return nil, fmt.Errorf("Unable to query relation size: %v", err)
}
cm[tableName] = size
}
m[table] = cm
}
return m, nil
q := tableSize.build(tableName)
row := c.QueryRowContext(ctx, q)
var size int64
err := row.Scan(&size)
return size, err
}

func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) error {
Expand All @@ -197,15 +209,6 @@ func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) e
return nil
}

func totalDiskUsage(m map[Table]map[string]uint64) (sum uint64) {
for _, cm := range m {
for _, v := range cm {
sum += v
}
}
return
}

func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) {
const (
highWaterMarkPercent = 90
Expand All @@ -215,19 +218,28 @@ func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) {
}

func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGBs int) (err error) {
tables := make(map[Table][]string)
tables := make(map[Table][]string, len(allTables))
du := make(map[Table]map[string]int64, len(allTables))
var totalUsage int64
for _, table := range allTables {
tables[table], err = c.getExistingPartitions(ctx, table)
if err != nil {
return err
}
}
du, err := c.getTablesDiskUsage(ctx)
if err != nil {
return err

m := make(map[string]int64, len(tables[table]))
for _, partition := range tables[table] {
size, err := c.getTableDiskUsage(ctx, partition)
if err != nil {
return err
}
m[partition] = size
totalUsage += size
}
du[table] = m

}

totalUsage := totalDiskUsage(du)
diskCap := uint64(diskCapacityGBs) * 1024 * 1024 * 1024
hi, lo := calculateHiLoWaterMarks(diskCap)

Expand All @@ -237,46 +249,89 @@ func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGB

// Delete oldest child tables in each parent table, until usage is below
// `lo`.
var index int
//
// NOTE: Existing partitions for each parent table may not be in sync wrt
// the time periods they correspond to, due to previous errors in deleting
// from the db. So we keep track of the indices of the child tables for each
// parent table to ensure we only delete the oldest tables.
indices := make([]int, len(allTables))
for float64(totalUsage) >= lo {
var recoveredSpace uint64
for _, table := range allTables {
if index >= len(tables[table]) {
break
var recoveredSpace int64

// Find the minStartTime of the first existing partition of each of the
// parent tables.
currentPartStartTime := newPartitionTimeRange(time.Now()).StartDate
var minStartTime time.Time
isSet := false
for i, table := range allTables {
pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]])
if err != nil {
return err
}
if !isSet {
minStartTime = pt.StartDate
}
if minStartTime.After(pt.StartDate) {
minStartTime = pt.StartDate
}
}

// Quit without deleting the current partition even if we are over the
// highwater mark!
if minStartTime.Equal(currentPartStartTime) {
var candidateTables []string
for i, table := range allTables {
candidateTables = append(candidateTables, tables[table][indices[i]])
}
tableName := tables[table][index]
err = c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached")

log.Printf("WARNING: highwater mark reached: no non-current tables exist to delete!"+
" Please increase the value of "+DiskCapacityEnv+" and ensure disk capacity for PostgreSQL!"+
" Candidate tables are: %v", candidateTables)
break
}

// Delete all child tables with the same StartTime = minStartTime
for i, table := range allTables {
pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]])
if err != nil {
return err
}
recoveredSpace += du[table][tableName]

if pt.StartDate.Equal(minStartTime) {
tableName := tables[table][indices[i]]
err := c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached")
if err != nil {
return err
}
indices[i] += 1
recoveredSpace += du[table][tableName]
}
}

totalUsage -= recoveredSpace
index++
}
log.Printf("Current tables disk usage: %.1f GB", float64(totalUsage)/float64(1024*1024*1024))
return nil
}

// vacuumData should be called in a new go routine.
func (c *DBClient) vacuumData(ctx context.Context, diskCapacityGBs int) {
var (
normalInterval = 1 * time.Hour
retryInterval = 2 * time.Minute
)
normalInterval := 1 * time.Hour
retryInterval := 2 * time.Minute
timer := time.NewTimer(normalInterval)
defer timer.Stop()

for {
select {
case <-timer.C:
timer.Reset(retryInterval) // timer fired, reset it right here.

err := c.maintainLowWatermarkUsage(ctx, diskCapacityGBs)
if err != nil {
log.Printf("Error maintaining high-water mark disk usage: %v (retrying in %s)", err, retryInterval)
timer.Reset(retryInterval)
continue
}
timer.Reset(normalInterval)

case <-ctx.Done():
log.Println("Vacuum thread exiting.")
Expand Down
19 changes: 19 additions & 0 deletions logsearchapi/server/partitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,22 @@ func TestPartitionTimeRangeNextPrev(t *testing.T) {
}
}
}

func TestPartitionNameParsing(t *testing.T) {
rand.Seed(time.Now().UnixNano())

for i := 0; i < 1000; i++ {
r := randomTime()
p1 := newPartitionTimeRange(r)

name := "table_" + p1.getPartnameSuffix()

res, err := getPartitionTimeRangeForTable(name)
if err != nil {
t.Errorf("Test %d: r=%v unexpected err: %v", i, r, err)
}
if !res.isSame(&p1) {
t.Errorf("Test %d: r=%v, expected: %v got %v", i, r, p1.String(), res.String())
}
}
}

0 comments on commit a412910

Please sign in to comment.