Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[logsearch] Fix vacuuming to keep tables in sync #1279

Merged
merged 1 commit into from
Sep 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
179 changes: 127 additions & 52 deletions logsearchapi/server/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,32 @@ func (p *partitionTimeRange) next() partitionTimeRange {
return newPartitionTimeRange(p.EndDate)
}

func getPartitionTimeRangeForTable(name string) (partitionTimeRange, error) {
fmtStr := []rune("2006_01_02")
runes := []rune(name)

errFn := func(msg string) error {
title := "invalid partition name"
s := ": " + msg
if msg == "" {
s = ""
}
return fmt.Errorf("%s%s", title, s)
}

if len(runes) <= len(fmtStr) {
return partitionTimeRange{}, errFn("too short")
}

// Split out the date part of the table name
partSuffix := string(runes[len(runes)-len(fmtStr):])
startTime, err := time.Parse(string(fmtStr), partSuffix)
if err != nil {
return partitionTimeRange{}, errFn("bad time value: " + partSuffix)
}
return newPartitionTimeRange(startTime), nil
}

type childTableInfo struct {
ParentSchema string
Parent string
Expand Down Expand Up @@ -158,33 +184,19 @@ func (c *DBClient) getExistingPartitions(ctx context.Context, t Table) (tableNam
return tableNames, nil
}

func (c *DBClient) getTablesDiskUsage(ctx context.Context) (m map[Table]map[string]uint64, _ error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
func (c *DBClient) getTableDiskUsage(ctx context.Context, tableName string) (int64, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

const (
tableSize QTemplate = `SELECT pg_total_relation_size('%s');`
)

m = make(map[Table]map[string]uint64, len(allTables))
for _, table := range allTables {
parts, err := c.getExistingPartitions(ctx, table)
if err != nil {
return nil, err
}
cm := make(map[string]uint64, len(parts))
for _, tableName := range parts {
q := tableSize.build(tableName)
row := c.QueryRowContext(ctx, q)
var size uint64
if err := row.Scan(&size); err != nil {
return nil, fmt.Errorf("Unable to query relation size: %v", err)
}
cm[tableName] = size
}
m[table] = cm
}
return m, nil
q := tableSize.build(tableName)
row := c.QueryRowContext(ctx, q)
var size int64
err := row.Scan(&size)
return size, err
}

func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) error {
Expand All @@ -197,15 +209,6 @@ func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) e
return nil
}

func totalDiskUsage(m map[Table]map[string]uint64) (sum uint64) {
for _, cm := range m {
for _, v := range cm {
sum += v
}
}
return
}

func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) {
const (
highWaterMarkPercent = 90
Expand All @@ -214,69 +217,141 @@ func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) {
return highWaterMarkPercent * float64(totalCap) / 100, lowWaterMarkPercent * float64(totalCap) / 100
}

// getEarliestPartitionStartTime - finds the earliest start time of all existing
// table partitions - this is the minimum start time over the first existing
// partitions for each parent table.
func getEarliestPartitionStartTime(tables map[Table][]string, indices []int) (time.Time, error) {
var earliestStartTime time.Time
isSet := false
for i, table := range allTables {
pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]])
if err != nil {
return time.Time{}, err
}
if !isSet {
earliestStartTime = pt.StartDate
isSet = true
}
if earliestStartTime.After(pt.StartDate) {
earliestStartTime = pt.StartDate
}
}
return earliestStartTime, nil
}

func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGBs int) (err error) {
tables := make(map[Table][]string)
tables := make(map[Table][]string, len(allTables))
du := make(map[Table]map[string]int64, len(allTables))
var totalUsage int64
for _, table := range allTables {
// Find partitions for the parent table `table`.
tables[table], err = c.getExistingPartitions(ctx, table)
if err != nil {
return err
}
}
du, err := c.getTablesDiskUsage(ctx)
if err != nil {
return err

// Query disk usage of each partition in pg
m := make(map[string]int64, len(tables[table]))
for _, partition := range tables[table] {
size, err := c.getTableDiskUsage(ctx, partition)
if err != nil {
return err
}
m[partition] = size
totalUsage += size
}
du[table] = m

}

totalUsage := totalDiskUsage(du)
diskCap := uint64(diskCapacityGBs) * 1024 * 1024 * 1024
hi, lo := calculateHiLoWaterMarks(diskCap)

if float64(totalUsage) <= hi {
return nil
}

// Print out disk usage after deletes - we defer this call because we could
// exit this func due to an error.
defer func() {
log.Printf("Current tables disk usage: %.1f GB", float64(totalUsage)/float64(1024*1024*1024))
}()

// Delete oldest child tables in each parent table, until usage is below
// `lo`.
var index int
//
// NOTE: Existing partitions for each parent table may not be in sync wrt
// the time periods they correspond to, due to previous errors in deleting
// from the db. So we keep track of the indices of the child tables for each
// parent table to ensure we only delete the oldest tables.
indices := make([]int, len(allTables))
for float64(totalUsage) >= lo {
var recoveredSpace uint64
for _, table := range allTables {
if index >= len(tables[table]) {
break
earliestStartTime, err := getEarliestPartitionStartTime(tables, indices)
if err != nil {
return err
}

// Quit without deleting the current partition even if we are over the
// highwater mark!
currentPartStartTime := newPartitionTimeRange(time.Now()).StartDate
if earliestStartTime.Equal(currentPartStartTime) {
type ctinfo struct {
Name string
Size int64
}
var ct []ctinfo
var total int64
for i, table := range allTables {
name := tables[table][indices[i]]
ct = append(ct, ctinfo{name, du[table][name]})
total += du[table][name]
}
tableName := tables[table][index]
err = c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached")

log.Printf("WARNING: highwater mark reached: no non-current tables exist to delete!"+
" Please increase the value of "+DiskCapacityEnv+" and ensure disk capacity for PostgreSQL!"+
" Candidate tables and sizes: %v (total usage: %d)", ct, total)
break
}

// Delete all child tables with the same StartTime = earliestStartTime
for i, table := range allTables {
pt, err := getPartitionTimeRangeForTable(tables[table][indices[i]])
if err != nil {
return err
}
recoveredSpace += du[table][tableName]

if pt.StartDate.Equal(earliestStartTime) {
tableName := tables[table][indices[i]]
err := c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached")
if err != nil {
return err
}
indices[i]++
totalUsage -= du[table][tableName]
}
}
totalUsage -= recoveredSpace
index++
}
log.Printf("Current tables disk usage: %.1f GB", float64(totalUsage)/float64(1024*1024*1024))
return nil
}

// vacuumData should be called in a new go routine.
func (c *DBClient) vacuumData(ctx context.Context, diskCapacityGBs int) {
var (
normalInterval = 1 * time.Hour
retryInterval = 2 * time.Minute
)
normalInterval := 1 * time.Hour
retryInterval := 2 * time.Minute
timer := time.NewTimer(normalInterval)
defer timer.Stop()

for {
select {
case <-timer.C:
timer.Reset(retryInterval) // timer fired, reset it right here.

err := c.maintainLowWatermarkUsage(ctx, diskCapacityGBs)
if err != nil {
log.Printf("Error maintaining high-water mark disk usage: %v (retrying in %s)", err, retryInterval)
timer.Reset(retryInterval)
continue
}
timer.Reset(normalInterval)

case <-ctx.Done():
log.Println("Vacuum thread exiting.")
Expand Down
19 changes: 19 additions & 0 deletions logsearchapi/server/partitions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,22 @@ func TestPartitionTimeRangeNextPrev(t *testing.T) {
}
}
}

func TestPartitionNameParsing(t *testing.T) {
rand.Seed(time.Now().UnixNano())

for i := 0; i < 1000; i++ {
r := randomTime()
p1 := newPartitionTimeRange(r)

name := "table_" + p1.getPartnameSuffix()

res, err := getPartitionTimeRangeForTable(name)
if err != nil {
t.Errorf("Test %d: r=%v unexpected err: %v", i, r, err)
}
if !res.isSame(&p1) {
t.Errorf("Test %d: r=%v, expected: %v got %v", i, r, p1.String(), res.String())
}
}
}