From 6271dfe4995a373604995b65c31a4b1ead3fd53e Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Tue, 24 Nov 2020 16:00:03 -0800 Subject: [PATCH] Add go routine to create partition tables periodically --- logsearchapi/server/db.go | 29 ++++++++++++++++++++++++++--- logsearchapi/server/partitions.go | 24 ++++++++++++++++++++++++ logsearchapi/server/server.go | 1 + 3 files changed, 51 insertions(+), 3 deletions(-) diff --git a/logsearchapi/server/db.go b/logsearchapi/server/db.go index 03ae9d3fd57..31035d8dc1e 100644 --- a/logsearchapi/server/db.go +++ b/logsearchapi/server/db.go @@ -109,6 +109,31 @@ func (c *DBClient) checkTableExists(ctx context.Context, table string) (bool, er return true, nil } +func (c *DBClient) checkPartitionTableExists(ctx context.Context, table string, givenTime time.Time) (bool, error) { + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + + p := newPartitionTimeRange(givenTime) + partitionTable := fmt.Sprintf("%s_%s", table, p.getPartnameSuffix()) + const existsQuery QTemplate = `SELECT 1 FROM %s WHERE false;` + res, _ := c.Query(ctx, existsQuery.build(partitionTable)) + if res.Err() != nil { + // check for table does not exist error + if strings.Contains(res.Err().Error(), "(SQLSTATE 42P01)") { + return false, nil + } + return false, res.Err() + } + + return true, nil +} + +func (c *DBClient) createTablePartition(ctx context.Context, table Table) error { + partTimeRange := newPartitionTimeRange(time.Now()) + _, err := c.Exec(ctx, table.getCreatePartitionStatement(partTimeRange)) + return err +} + func (c *DBClient) createTableAndPartition(ctx context.Context, table Table) error { if exists, err := c.checkTableExists(ctx, table.Name); err != nil { return err @@ -120,9 +145,7 @@ func (c *DBClient) createTableAndPartition(ctx context.Context, table Table) err return err } - partTimeRange := newPartitionTimeRange(time.Now()) - _, err := c.Exec(ctx, table.getCreatePartitionStatement(partTimeRange)) - return err + return c.createTablePartition(ctx, table) } func (c *DBClient) createTables(ctx context.Context) error { diff --git a/logsearchapi/server/partitions.go b/logsearchapi/server/partitions.go index 6a91c8e50c9..013bfd88ed4 100644 --- a/logsearchapi/server/partitions.go +++ b/logsearchapi/server/partitions.go @@ -242,3 +242,27 @@ func (c *DBClient) vacuumData(diskCapacityGBs int) { } } + +func (c *DBClient) partitionTables() { + checkInterval := 12 * time.Hour + bgCtx := context.Background() + tables := []Table{auditLogEventsTable, requestInfoTable} + for { + // Check if the partition table to store audit logs 12hrs from now exists + halfDayLater := time.Now().Add(12 * time.Hour) + for _, table := range tables { + partitionExists, err := c.checkPartitionTableExists(bgCtx, table.Name, halfDayLater) + log.Printf("Error while checking if partition for %s exists %s", table.Name, err) + if partitionExists { + continue + } + + if err := c.createTablePartition(bgCtx, table); err != nil { + log.Printf("Error while creating partition for %s", table.Name) + } + } + + // Check again after 12hrs + time.Sleep(checkInterval) + } +} diff --git a/logsearchapi/server/server.go b/logsearchapi/server/server.go index 6ffedacf6f1..b88f8e9791c 100644 --- a/logsearchapi/server/server.go +++ b/logsearchapi/server/server.go @@ -69,6 +69,7 @@ func NewLogSearch(pgConnStr, auditAuthToken string, queryAuthToken string, diskC // Start vacuum thread go ls.DBClient.vacuumData(ls.DiskCapacityGBs) + go ls.DBClient.partitionTables() return ls, nil }