Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[logsearchapi] Maintain table disk usage within given capacity #354

Merged
merged 1 commit into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion logsearchapi/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ docker run --rm -it -e "POSTGRES_PASSWORD=example" -p 5432:5432 postgres:13-alpi
export LOGSEARCH_PG_CONN_STR="postgres://postgres:example@localhost/postgres"
export LOGSEARCH_AUDIT_AUTH_TOKEN=xxx
export LOGSEARCH_QUERY_AUTH_TOKEN=yyy
export LOGSEARCH_MAX_RETENTION_MONTHS=3
export LOGSEARCH_DISK_CAPACITY_GB=5
go build && ./logsearchapi
```

Expand Down
14 changes: 7 additions & 7 deletions logsearchapi/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ import (
)

const (
pgConnStrEnv = "LOGSEARCH_PG_CONN_STR"
auditAuthTokenEnv = "LOGSEARCH_AUDIT_AUTH_TOKEN"
queryAuthTokenEnv = "LOGSEARCH_QUERY_AUTH_TOKEN"
maxRetentionMonthsEnv = "LOGSEARCH_MAX_RETENTION_MONTHS"
pgConnStrEnv = "LOGSEARCH_PG_CONN_STR"
auditAuthTokenEnv = "LOGSEARCH_AUDIT_AUTH_TOKEN"
queryAuthTokenEnv = "LOGSEARCH_QUERY_AUTH_TOKEN"
diskCapacityEnv = "LOGSEARCH_DISK_CAPACITY_GB"
)

func loadEnv() (*server.LogSearch, error) {
Expand All @@ -49,12 +49,12 @@ func loadEnv() (*server.LogSearch, error) {
if queryAuthToken == "" {
return nil, errors.New(queryAuthTokenEnv + " env variable is required.")
}
maxRetentionMonths, err := strconv.Atoi(os.Getenv(maxRetentionMonthsEnv))
diskCapacity, err := strconv.Atoi(os.Getenv(diskCapacityEnv))
if err != nil {
return nil, errors.New(maxRetentionMonthsEnv + " env variable is required and must be an integer.")
return nil, errors.New(diskCapacityEnv + " env variable is required and must be an integer.")
}

return server.NewLogSearch(pgConnStr, auditAuthToken, queryAuthToken, maxRetentionMonths)
return server.NewLogSearch(pgConnStr, auditAuthToken, queryAuthToken, diskCapacity)
}

func main() {
Expand Down
44 changes: 5 additions & 39 deletions logsearchapi/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,6 @@ import (
"github.com/jackc/pgx/v4/pgxpool"
)

const (
createTablePartition QTemplate = `CREATE TABLE %s PARTITION OF %s
FOR VALUES FROM ('%s') TO ('%s');`
)

const (
partitionsPerMonth = 4
)

// QTemplate is used to represent queries that involve string substitution as
// well as SQL positional argument substitution.
type QTemplate string
Expand All @@ -58,11 +49,6 @@ func (t *Table) getCreateStatement() string {
return t.CreateStatement.build(t.Name)
}

func (t *Table) getCreatePartitionStatement(partitionNameSuffix, rangeStart, rangeEnd string) string {
partitionName := fmt.Sprintf("%s_%s", t.Name, partitionNameSuffix)
return createTablePartition.build(partitionName, t.Name, rangeStart, rangeEnd)
}

var (
auditLogEventsTable = Table{
Name: "audit_log_events",
Expand All @@ -88,28 +74,10 @@ var (
response_content_length INT8
) PARTITION BY RANGE (time);`,
}
)

func getPartitionRange(t time.Time) (time.Time, time.Time) {
// Zero out the time and use UTC
t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
daysInMonth := t.AddDate(0, 1, -t.Day()).Day()
quot := daysInMonth / partitionsPerMonth
remDays := daysInMonth % partitionsPerMonth
rangeStart := t.AddDate(0, 0, 1-t.Day())
for {
rangeDays := quot
if remDays > 0 {
rangeDays++
remDays--
}
rangeEnd := rangeStart.AddDate(0, 0, rangeDays)
if t.Before(rangeEnd) {
return rangeStart, rangeEnd
}
rangeStart = rangeEnd
}
}
// Allows iterating on all tables
allTables = []Table{auditLogEventsTable, requestInfoTable}
)

// DBClient is a client object that makes requests to the DB.
type DBClient struct {
Expand Down Expand Up @@ -152,10 +120,8 @@ func (c *DBClient) createTableAndPartition(ctx context.Context, table Table) err
return err
}

start, end := getPartitionRange(time.Now())
partSuffix := start.Format("2006_01_02")
rangeStart, rangeEnd := start.Format("2006-01-02"), end.Format("2006-01-02")
_, err := c.Exec(ctx, table.getCreatePartitionStatement(partSuffix, rangeStart, rangeEnd))
partTimeRange := newPartitionTimeRange(time.Now())
_, err := c.Exec(ctx, table.getCreatePartitionStatement(partTimeRange))
return err
}

Expand Down
244 changes: 244 additions & 0 deletions logsearchapi/server/partitions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// +build go1.13

/*
* Copyright (C) 2020, MinIO, Inc.
*
* This code is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License, version 3,
* along with this program. If not, see <http://www.gnu.org/licenses/>
*
*/

package server

import (
"context"
"fmt"
"log"
"time"

"github.com/georgysavva/scany/pgxscan"
)

const (
createTablePartition QTemplate = `CREATE TABLE %s PARTITION OF %s
FOR VALUES FROM ('%s') TO ('%s');`
)

const (
partitionsPerMonth = 4
)

func (t *Table) getCreatePartitionStatement(p partitionTimeRange) string {
partitionName := fmt.Sprintf("%s_%s", t.Name, p.getPartnameSuffix())
start, end := p.getRangeArgs()
return createTablePartition.build(partitionName, t.Name, start, end)
}

type partitionTimeRange struct {
GivenTime time.Time
StartDate, EndDate time.Time
}

// newPartitionTimeRange computes the partitionTimeRange including the
// givenTime.
func newPartitionTimeRange(givenTime time.Time) partitionTimeRange {
// Zero out the time and use UTC
t := givenTime
t = time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, time.UTC)
lastDateOfMonth := t.AddDate(0, 1, -t.Day())
daysInMonth := lastDateOfMonth.Day()
quot := daysInMonth / partitionsPerMonth
remDays := daysInMonth % partitionsPerMonth
rangeStart := t.AddDate(0, 0, 1-t.Day())
var rangeEnd time.Time
for {
rangeDays := quot
if remDays > 0 {
rangeDays++
remDays--
}
rangeEnd = rangeStart.AddDate(0, 0, rangeDays)
if t.Before(rangeEnd) {
break
}
rangeStart = rangeEnd
}
return partitionTimeRange{
GivenTime: givenTime,
StartDate: rangeStart,
EndDate: rangeEnd,
}
}

func (p *partitionTimeRange) getPartnameSuffix() string {
return p.StartDate.Format("2006_01_02")
}

func (p *partitionTimeRange) getRangeArgs() (string, string) {
return p.StartDate.Format("2006_01_02"), p.EndDate.Format("2006_01_02")
}

type childTableInfo struct {
ParentSchema string
Parent string
ChildSchema string
Child string
}

// getExistingPartitions returns child tables of the given table in
// lexicographical order.
func (c *DBClient) getExistingPartitions(ctx context.Context, t Table) (tableNames []string, _ error) {
const (
listPartitions QTemplate = `SELECT nmsp_parent.nspname AS parent_schema,
parent.relname AS parent,
nmsp_child.nspname AS child_schema,
child.relname AS child
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname='%s'
ORDER BY child.relname ASC;`
)

q := listPartitions.build(t.Name)
rows, _ := c.Query(ctx, q)
var childTables []childTableInfo
if err := pgxscan.ScanAll(&childTables, rows); err != nil {
return nil, fmt.Errorf("Error accessing db: %v", err)
}
for _, ct := range childTables {
tableNames = append(tableNames, ct.Child)
}

return tableNames, nil
}

func (c *DBClient) getTablesDiskUsage(ctx context.Context) (m map[Table]map[string]uint64, _ error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

const (
tableSize QTemplate = `SELECT pg_total_relation_size('%s');`
)

m = make(map[Table]map[string]uint64, len(allTables))
for _, table := range allTables {
parts, err := c.getExistingPartitions(ctx, table)
if err != nil {
return nil, err
}
cm := make(map[string]uint64, len(parts))
for _, tableName := range parts {
q := tableSize.build(tableName)
row := c.QueryRow(ctx, q)
var size uint64
if err := row.Scan(&size); err != nil {
return nil, fmt.Errorf("Unable to query relation size: %v", err)
}
cm[tableName] = size
}
m[table] = cm
}
return m, nil
}

func (c *DBClient) deleteChildTable(ctx context.Context, table, reason string) error {
q := fmt.Sprintf("DROP TABLE %s;", table)
_, err := c.Exec(ctx, q)
if err != nil {
return fmt.Errorf("Table deletion error for %s: %v (attempted for reason: %s)", table, err, reason)
}
log.Printf("Deleted table `%s` (%s)", table, reason)
return nil
}

func totalDiskUsage(m map[Table]map[string]uint64) (sum uint64) {
for _, cm := range m {
for _, v := range cm {
sum += v
}
}
return
}

func calculateHiLoWaterMarks(totalCap uint64) (hi, lo float64) {
const (
highWaterMarkPercent = 90
lowWaterMarkPercent = 70
)
return highWaterMarkPercent * float64(totalCap) / 100, lowWaterMarkPercent * float64(totalCap) / 100
}

func (c *DBClient) maintainLowWatermarkUsage(ctx context.Context, diskCapacityGBs int) (err error) {
tables := make(map[Table][]string)
for _, table := range allTables {
tables[table], err = c.getExistingPartitions(ctx, table)
if err != nil {
return err
}
}
du, err := c.getTablesDiskUsage(ctx)
if err != nil {
return err
}

totalUsage := totalDiskUsage(du)
diskCap := uint64(diskCapacityGBs) * 1024 * 1024 * 1024
hi, lo := calculateHiLoWaterMarks(diskCap)
var index int
if float64(totalUsage) <= hi {
return nil
}

// Delete oldest child tables in each parent table, until usage is below
// `lo`.
for float64(totalUsage) >= lo {
var recoveredSpace uint64
for _, table := range allTables {
if index >= len(tables[table]) {
break
}
tableName := tables[table][index]
err = c.deleteChildTable(ctx, tableName, "disk usage high-water mark reached")
if err != nil {
return err
}
recoveredSpace += du[table][tableName]
}
totalUsage -= recoveredSpace
index++
}
log.Printf("Current tables disk usage: %.1f GB", float64(totalUsage)/float64(1024*1024*1024))
return nil
}

// vacuumData should be called in a new go routine.
func (c *DBClient) vacuumData(diskCapacityGBs int) {
var (
normalInterval time.Duration = 1 * time.Hour
retryInterval time.Duration = 2 * time.Minute
)
timer := time.NewTimer(normalInterval)
for range timer.C {
err := c.maintainLowWatermarkUsage(context.Background(), diskCapacityGBs)
if err != nil {
log.Printf("Error maintaining high-water mark disk usage: %v (retrying in %s)", err, retryInterval)
timer.Reset(retryInterval)
continue
}

timer.Reset(normalInterval)

}
}
Loading