Skip to content

Commit

Permalink
Add go routine to create partition tables periodically
Browse files Browse the repository at this point in the history
  • Loading branch information
krisis committed Nov 25, 2020
1 parent fc90c2d commit 6271dfe
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 3 deletions.
29 changes: 26 additions & 3 deletions logsearchapi/server/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,31 @@ func (c *DBClient) checkTableExists(ctx context.Context, table string) (bool, er
return true, nil
}

func (c *DBClient) checkPartitionTableExists(ctx context.Context, table string, givenTime time.Time) (bool, error) {
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()

p := newPartitionTimeRange(givenTime)
partitionTable := fmt.Sprintf("%s_%s", table, p.getPartnameSuffix())
const existsQuery QTemplate = `SELECT 1 FROM %s WHERE false;`
res, _ := c.Query(ctx, existsQuery.build(partitionTable))
if res.Err() != nil {
// check for table does not exist error
if strings.Contains(res.Err().Error(), "(SQLSTATE 42P01)") {
return false, nil
}
return false, res.Err()
}

return true, nil
}

func (c *DBClient) createTablePartition(ctx context.Context, table Table) error {
partTimeRange := newPartitionTimeRange(time.Now())
_, err := c.Exec(ctx, table.getCreatePartitionStatement(partTimeRange))
return err
}

func (c *DBClient) createTableAndPartition(ctx context.Context, table Table) error {
if exists, err := c.checkTableExists(ctx, table.Name); err != nil {
return err
Expand All @@ -120,9 +145,7 @@ func (c *DBClient) createTableAndPartition(ctx context.Context, table Table) err
return err
}

partTimeRange := newPartitionTimeRange(time.Now())
_, err := c.Exec(ctx, table.getCreatePartitionStatement(partTimeRange))
return err
return c.createTablePartition(ctx, table)
}

func (c *DBClient) createTables(ctx context.Context) error {
Expand Down
24 changes: 24 additions & 0 deletions logsearchapi/server/partitions.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,3 +242,27 @@ func (c *DBClient) vacuumData(diskCapacityGBs int) {

}
}

func (c *DBClient) partitionTables() {
checkInterval := 12 * time.Hour
bgCtx := context.Background()
tables := []Table{auditLogEventsTable, requestInfoTable}
for {
// Check if the partition table to store audit logs 12hrs from now exists
halfDayLater := time.Now().Add(12 * time.Hour)
for _, table := range tables {
partitionExists, err := c.checkPartitionTableExists(bgCtx, table.Name, halfDayLater)
log.Printf("Error while checking if partition for %s exists %s", table.Name, err)
if partitionExists {
continue
}

if err := c.createTablePartition(bgCtx, table); err != nil {
log.Printf("Error while creating partition for %s", table.Name)
}
}

// Check again after 12hrs
time.Sleep(checkInterval)
}
}
1 change: 1 addition & 0 deletions logsearchapi/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func NewLogSearch(pgConnStr, auditAuthToken string, queryAuthToken string, diskC

// Start vacuum thread
go ls.DBClient.vacuumData(ls.DiskCapacityGBs)
go ls.DBClient.partitionTables()

return ls, nil
}
Expand Down

0 comments on commit 6271dfe

Please sign in to comment.