Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 26 additions & 3 deletions logsearchapi/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,31 @@ func (c *DBClient) checkTableExists(ctx context.Context, table string) (bool, er
return true, nil
}

func (c *DBClient) checkPartitionTableExists(ctx context.Context, table string, givenTime time.Time) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

p := newPartitionTimeRange(givenTime)
partitionTable := fmt.Sprintf("%s_%s", table, p.getPartnameSuffix())
const existsQuery QTemplate = `SELECT 1 FROM %s WHERE false;`
res, _ := c.Query(ctx, existsQuery.build(partitionTable))
if res.Err() != nil {
// check for table does not exist error
if strings.Contains(res.Err().Error(), "(SQLSTATE 42P01)") {
return false, nil
}
return false, res.Err()
}

return true, nil
}

func (c *DBClient) createTablePartition(ctx context.Context, table Table) error {
partTimeRange := newPartitionTimeRange(time.Now())
_, err := c.Exec(ctx, table.getCreatePartitionStatement(partTimeRange))
return err
}

func (c *DBClient) createTableAndPartition(ctx context.Context, table Table) error {
if exists, err := c.checkTableExists(ctx, table.Name); err != nil {
return err
Expand All @@ -120,9 +145,7 @@ func (c *DBClient) createTableAndPartition(ctx context.Context, table Table) err
return err
}

partTimeRange := newPartitionTimeRange(time.Now())
_, err := c.Exec(ctx, table.getCreatePartitionStatement(partTimeRange))
return err
return c.createTablePartition(ctx, table)
}

func (c *DBClient) createTables(ctx context.Context) error {
Expand Down
24 changes: 24 additions & 0 deletions logsearchapi/server/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,27 @@ func (c *DBClient) vacuumData(diskCapacityGBs int) {

}
}

func (c *DBClient) partitionTables() {
checkInterval := 12 * time.Hour
bgCtx := context.Background()
tables := []Table{auditLogEventsTable, requestInfoTable}
for {
// Check if the partition table to store audit logs 12hrs from now exists
halfDayLater := time.Now().Add(12 * time.Hour)
for _, table := range tables {
partitionExists, err := c.checkPartitionTableExists(bgCtx, table.Name, halfDayLater)
log.Printf("Error while checking if partition for %s exists %s", table.Name, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For logging since is is part of k8s start using klog? which provides more k8s friendly logging?

if partitionExists {
continue
}

if err := c.createTablePartition(bgCtx, table); err != nil {
log.Printf("Error while creating partition for %s", table.Name)
}
}

// Check again after 12hrs
time.Sleep(checkInterval)
}
}
1 change: 1 addition & 0 deletions logsearchapi/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func NewLogSearch(pgConnStr, auditAuthToken string, queryAuthToken string, diskC

// Start vacuum thread
go ls.DBClient.vacuumData(ls.DiskCapacityGBs)
go ls.DBClient.partitionTables()

return ls, nil
}
Expand Down