From 01c064dcdaf7957d9a228114f049d2b8694c93ea Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Thu, 13 Jan 2022 13:13:57 -0800 Subject: [PATCH 1/8] Add access-key column and indices to speed-up common queries --- logsearchapi/server/db-migrations.go | 151 +++++++++++++++++++++++++++ logsearchapi/server/db.go | 4 +- logsearchapi/server/events.go | 27 +++++ logsearchapi/server/server.go | 6 ++ 4 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 logsearchapi/server/db-migrations.go diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go new file mode 100644 index 00000000000..5d19f514e3a --- /dev/null +++ b/logsearchapi/server/db-migrations.go @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2022, MinIO, Inc. + * + * This code is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License, version 3, + * along with this program. If not, see + * + */ + +package server + +import ( + "context" + "log" + + "github.com/lib/pq" +) + +// dbMigration represents a DB migration using db-client c. +// Note: a migration func should be idempotent. +type dbMigration func(ctx context.Context, c *DBClient) error + +var allMigrations = []dbMigration{ + addAccessKeyColAndIndex, + addAuditLogIndices, + addReqInfoIndices, + + // Add new migrations here below +} + +func (c *DBClient) runMigrations(ctx context.Context) error { + for _, migration := range allMigrations { + if err := migration(ctx, c); err != nil { + return err + } + } + return nil +} + +func duplicateColErr(err error) bool { + if pqerr, ok := err.(*pq.Error); ok && + pqerr.Code == "42701" { + return true + } + return false +} + +func duplicateTblErr(err error) bool { + if pqerr, ok := err.(*pq.Error); ok && + pqerr.Code == "42P07" { + return true + } + return false +} + +func (c *DBClient) runQueries(ctx context.Context, queries []string, ignoreErr func(error) bool) error { + for _, query := range queries { + if _, err := c.ExecContext(ctx, query); err != nil { + if ignoreErr(err) { + continue + } + return err + } + } + return nil +} + +func updateAccessKeyCol(ctx context.Context, c *DBClient) { + updQ := `WITH req AS ( + SELECT log->>'requestID' AS request_id, + COALESCE( + substring( + log->'requestHeader'->>'Authorization', + e'^AWS4-HMAC-SHA256\\s+Credential\\s*=\\s*([^/]+)' + ), + substring( + log->'requestHeader'->>'Authorization', + e'^AWS\\s+([^:]+)' + ) + ) AS access_key + FROM audit_log_events + ORDER BY event_time + LIMIT $1 + OFFSET $2 + ) + UPDATE request_info + SET access_key = req.access_key + FROM req + WHERE request_info.request_id = req.request_id;` + + for off, lim := 0, 1000; ; off += lim { + res, err := c.ExecContext(ctx, updQ, lim, off) + if err != nil { + log.Printf("Failed to update access_key column in request_info: %v", err) + return + } + if rows, err := res.RowsAffected(); err == nil { + if rows == 0 { + break + } + } + } +} + +func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { + queries := []string{ + `alter table request_info add access_key text`, + `create index request_info_access_key_index on request_info (access_key)`, + } + + return c.runQueries(ctx, queries, func(err error) bool { + if duplicateColErr(err) { + go updateAccessKeyCol(ctx, c) + return true + } + if duplicateTblErr(err) { + return true + } + return false + }) +} + +func addAuditLogIndices(ctx context.Context, c *DBClient) error { + queries := []string{ + `create index audit_log_events_log_index on audit_log_events USING btree ((log->>'requestID'))`, + `create index audit_log_events_event_time_index on audit_log_events (event_time desc)`, + } + + return c.runQueries(ctx, queries, duplicateTblErr) +} + +func addReqInfoIndices(ctx context.Context, c *DBClient) error { + queries := []string{ + `create index request_info_api_name_index on request_info (api_name)`, + `create index request_info_bucket_index on request_info (bucket)`, + `create index request_info_object_index on request_info (object)`, + `create index request_info_request_id_index on request_info (request_id)`, + `create index request_info_response_status_index on request_info (response_status)`, + `create index request_info_time_index on request_info (time)`, + } + + return c.runQueries(ctx, queries, duplicateTblErr) +} diff --git a/logsearchapi/server/db.go b/logsearchapi/server/db.go index 179807dadee..ecc7d702d11 100644 --- a/logsearchapi/server/db.go +++ b/logsearchapi/server/db.go @@ -186,6 +186,7 @@ func (c *DBClient) InsertEvent(ctx context.Context, eventBytes []byte) (err erro insertAuditLogEvent QTemplate = `INSERT INTO %s (event_time, log) VALUES ($1, $2);` insertRequestInfo QTemplate = `INSERT INTO %s (time, api_name, + access_key, bucket, object, time_to_response_ns, @@ -196,7 +197,7 @@ func (c *DBClient) InsertEvent(ctx context.Context, eventBytes []byte) (err erro response_status_code, request_content_length, response_content_length) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12);` + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13);` ) // Start a database transaction @@ -232,6 +233,7 @@ func (c *DBClient) InsertEvent(ctx context.Context, eventBytes []byte) (err erro _, err = tx.ExecContext(ctx, insertRequestInfo.build(requestInfoTable.Name), event.Time, event.API.Name, + event.API.AccessKey, event.API.Bucket, event.API.Object, event.API.TimeToResponse, diff --git a/logsearchapi/server/events.go b/logsearchapi/server/events.go index e85779b0f55..7501795d427 100644 --- a/logsearchapi/server/events.go +++ b/logsearchapi/server/events.go @@ -31,6 +31,7 @@ type Entry struct { Trigger string `json:"trigger"` API struct { Name string `json:"name,omitempty"` + AccessKey string `json:"accessKey,omitempty"` Bucket string `json:"bucket,omitempty"` Object string `json:"object,omitempty"` Status string `json:"status,omitempty"` @@ -51,6 +52,7 @@ type Entry struct { // API is struct with same info an Entry.API, but with more strong types. type API struct { Name string `json:"name,omitempty"` + AccessKey string `json:"accessKey,omitempty"` Bucket string `json:"bucket,omitempty"` Object string `json:"object,omitempty"` Status string `json:"status,omitempty"` @@ -130,9 +132,34 @@ func EventFromEntry(e *Entry) (*Event, error) { if err != nil { return nil, err } + + // Parse access key + if authHdr, ok := e.ReqHeader["Authorization"]; ok { + ret.API.AccessKey = parseAccessKey(authHdr) + } return &ret, nil } +func parseAccessKey(authHdr string) string { + v4Prefix := "AWS4-HMAC-SHA256 Credential=" + v2Prefix := "AWS " + var splits []string + switch { + case strings.HasPrefix(authHdr, v4Prefix): + authHdr = strings.TrimPrefix(authHdr, v4Prefix) + splits = strings.Split(authHdr, "/") + + case strings.HasPrefix(authHdr, v2Prefix): + authHdr = strings.TrimPrefix(authHdr, v2Prefix) + splits = strings.Split(authHdr, ":") + } + if len(splits) > 0 { + return splits[0] + } + + return "" +} + func parseJSONEvent(b []byte) (*Event, error) { var entry Entry if err := json.Unmarshal(b, &entry); err != nil { diff --git a/logsearchapi/server/server.go b/logsearchapi/server/server.go index 7292253616d..2726d64e052 100644 --- a/logsearchapi/server/server.go +++ b/logsearchapi/server/server.go @@ -58,6 +58,12 @@ func NewLogSearch(pgConnStr, auditAuthToken string, queryAuthToken string, diskC return nil, fmt.Errorf("Error initializing tables: %v", err) } + // Run migrations on db + err = ls.DBClient.runMigrations(context.Background()) + if err != nil { + return nil, fmt.Errorf("error running migrations: %v", err) + } + // Initialize muxer ls.ServeMux = http.NewServeMux() ls.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {}) From 42329a7a593d888c4f17f49ead60c3ae16d5cf7c Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 19 Jan 2022 11:41:30 -0800 Subject: [PATCH 2/8] Update only uninitialized access_key rows --- logsearchapi/server/db-migrations.go | 48 ++++++++++++++-------------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go index 5d19f514e3a..134aeb3f4fa 100644 --- a/logsearchapi/server/db-migrations.go +++ b/logsearchapi/server/db-migrations.go @@ -75,26 +75,24 @@ func (c *DBClient) runQueries(ctx context.Context, queries []string, ignoreErr f func updateAccessKeyCol(ctx context.Context, c *DBClient) { updQ := `WITH req AS ( - SELECT log->>'requestID' AS request_id, - COALESCE( - substring( - log->'requestHeader'->>'Authorization', - e'^AWS4-HMAC-SHA256\\s+Credential\\s*=\\s*([^/]+)' - ), - substring( - log->'requestHeader'->>'Authorization', - e'^AWS\\s+([^:]+)' - ) - ) AS access_key - FROM audit_log_events - ORDER BY event_time - LIMIT $1 - OFFSET $2 - ) - UPDATE request_info - SET access_key = req.access_key - FROM req - WHERE request_info.request_id = req.request_id;` + SELECT log->>'requestID' AS request_id, + COALESCE( + substring( + log->'requestHeader'->>'Authorization', + e'^AWS4-HMAC-SHA256\\s+Credential\\s*=\\s*([^/]+)' + ), + substring(log->'requestHeader'->>'Authorization', e'^AWS\\s+([^:]+)') + ) AS access_key + FROM audit_log_events AS a JOIN request_info AS b ON (a.event_time = b.time) + WHERE b.access_key IS NULL + ORDER BY event_time + LIMIT $1 + OFFSET $2 + ) + UPDATE request_info + SET access_key = req.access_key + FROM req + WHERE request_info.request_id = req.request_id` for off, lim := 0, 1000; ; off += lim { res, err := c.ExecContext(ctx, updQ, lim, off) @@ -102,10 +100,12 @@ func updateAccessKeyCol(ctx context.Context, c *DBClient) { log.Printf("Failed to update access_key column in request_info: %v", err) return } - if rows, err := res.RowsAffected(); err == nil { - if rows == 0 { - break - } + + if rows, err := res.RowsAffected(); err != nil { + log.Printf("Failed to get rows affected: %v", err) + return + } else if rows < 1000 { + break } } } From 395072274af62db10757b7ca7e2a83136a2e1b17 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Wed, 19 Jan 2022 11:57:51 -0800 Subject: [PATCH 3/8] Fix copyright header --- logsearchapi/server/db-migrations.go | 31 ++++++++++++++-------------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go index 134aeb3f4fa..bfaff0666ba 100644 --- a/logsearchapi/server/db-migrations.go +++ b/logsearchapi/server/db-migrations.go @@ -1,19 +1,18 @@ -/* - * Copyright (C) 2022, MinIO, Inc. - * - * This code is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License, version 3, - * along with this program. If not, see - * - */ +// This file is part of MinIO Operator +// Copyright (c) 2022 MinIO, Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . package server From aded025d561e072df846ed1bc0c21b302b440793 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Mon, 24 Jan 2022 17:06:06 -0800 Subject: [PATCH 4/8] Address review comments - Capitalize SQL keywords - Create index concurrently during DB migration --- logsearchapi/server/db-migrations.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go index bfaff0666ba..308231c1650 100644 --- a/logsearchapi/server/db-migrations.go +++ b/logsearchapi/server/db-migrations.go @@ -111,8 +111,8 @@ func updateAccessKeyCol(ctx context.Context, c *DBClient) { func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { queries := []string{ - `alter table request_info add access_key text`, - `create index request_info_access_key_index on request_info (access_key)`, + `ALTER table request_info ADD access_key text`, + `CREATE INDEX CONCURRENTLY request_info_access_key_index ON request_info (access_key)`, } return c.runQueries(ctx, queries, func(err error) bool { @@ -129,8 +129,8 @@ func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { func addAuditLogIndices(ctx context.Context, c *DBClient) error { queries := []string{ - `create index audit_log_events_log_index on audit_log_events USING btree ((log->>'requestID'))`, - `create index audit_log_events_event_time_index on audit_log_events (event_time desc)`, + `CREATE INDEX CONCURRENTLY audit_log_events_log_index ON audit_log_events USING btree ((log->>'requestID'))`, + `CREATE INDEX CONCURRENTLY audit_log_events_event_time_index ON audit_log_events (event_time desc)`, } return c.runQueries(ctx, queries, duplicateTblErr) @@ -138,12 +138,12 @@ func addAuditLogIndices(ctx context.Context, c *DBClient) error { func addReqInfoIndices(ctx context.Context, c *DBClient) error { queries := []string{ - `create index request_info_api_name_index on request_info (api_name)`, - `create index request_info_bucket_index on request_info (bucket)`, - `create index request_info_object_index on request_info (object)`, - `create index request_info_request_id_index on request_info (request_id)`, - `create index request_info_response_status_index on request_info (response_status)`, - `create index request_info_time_index on request_info (time)`, + `CREATE INDEX CONCURRENTLY request_info_api_name_index ON request_info (api_name)`, + `CREATE INDEX CONCURRENTLY request_info_bucket_index ON request_info (bucket)`, + `CREATE INDEX CONCURRENTLY request_info_object_index ON request_info (object)`, + `CREATE INDEX CONCURRENTLY request_info_request_id_index ON request_info (request_id)`, + `CREATE INDEX CONCURRENTLY request_info_response_status_index ON request_info (response_status)`, + `CREATE INDEX CONCURRENTLY request_info_time_index ON request_info (time)`, } return c.runQueries(ctx, queries, duplicateTblErr) From 079f3818449b4da1844955df249f4578c50ecbd6 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Mon, 24 Jan 2022 17:17:23 -0800 Subject: [PATCH 5/8] Remove concurrent index creation for partitioned tables --- logsearchapi/server/db-migrations.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go index 308231c1650..52c40879660 100644 --- a/logsearchapi/server/db-migrations.go +++ b/logsearchapi/server/db-migrations.go @@ -112,7 +112,7 @@ func updateAccessKeyCol(ctx context.Context, c *DBClient) { func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { queries := []string{ `ALTER table request_info ADD access_key text`, - `CREATE INDEX CONCURRENTLY request_info_access_key_index ON request_info (access_key)`, + `CREATE INDEX request_info_access_key_index ON request_info (access_key)`, } return c.runQueries(ctx, queries, func(err error) bool { @@ -129,8 +129,8 @@ func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { func addAuditLogIndices(ctx context.Context, c *DBClient) error { queries := []string{ - `CREATE INDEX CONCURRENTLY audit_log_events_log_index ON audit_log_events USING btree ((log->>'requestID'))`, - `CREATE INDEX CONCURRENTLY audit_log_events_event_time_index ON audit_log_events (event_time desc)`, + `CREATE INDEX audit_log_events_log_index ON audit_log_events USING btree ((log->>'requestID'))`, + `CREATE INDEX audit_log_events_event_time_index ON audit_log_events (event_time desc)`, } return c.runQueries(ctx, queries, duplicateTblErr) @@ -138,12 +138,12 @@ func addAuditLogIndices(ctx context.Context, c *DBClient) error { func addReqInfoIndices(ctx context.Context, c *DBClient) error { queries := []string{ - `CREATE INDEX CONCURRENTLY request_info_api_name_index ON request_info (api_name)`, - `CREATE INDEX CONCURRENTLY request_info_bucket_index ON request_info (bucket)`, - `CREATE INDEX CONCURRENTLY request_info_object_index ON request_info (object)`, - `CREATE INDEX CONCURRENTLY request_info_request_id_index ON request_info (request_id)`, - `CREATE INDEX CONCURRENTLY request_info_response_status_index ON request_info (response_status)`, - `CREATE INDEX CONCURRENTLY request_info_time_index ON request_info (time)`, + `CREATE INDEX request_info_api_name_index ON request_info (api_name)`, + `CREATE INDEX request_info_bucket_index ON request_info (bucket)`, + `CREATE INDEX request_info_object_index ON request_info (object)`, + `CREATE INDEX request_info_request_id_index ON request_info (request_id)`, + `CREATE INDEX request_info_response_status_index ON request_info (response_status)`, + `CREATE INDEX request_info_time_index ON request_info (time)`, } return c.runQueries(ctx, queries, duplicateTblErr) From b6b681b2bc2c3819a22d86ac9ffce5eb3f0b8d09 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Thu, 27 Jan 2022 10:18:15 -0800 Subject: [PATCH 6/8] Fix bug in updateAccessKeyCol --- logsearchapi/server/db-migrations.go | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go index 52c40879660..146089e1794 100644 --- a/logsearchapi/server/db-migrations.go +++ b/logsearchapi/server/db-migrations.go @@ -86,15 +86,20 @@ func updateAccessKeyCol(ctx context.Context, c *DBClient) { WHERE b.access_key IS NULL ORDER BY event_time LIMIT $1 - OFFSET $2 ) UPDATE request_info SET access_key = req.access_key FROM req WHERE request_info.request_id = req.request_id` - for off, lim := 0, 1000; ; off += lim { - res, err := c.ExecContext(ctx, updQ, lim, off) + for lim := 1000; ; { + select { + case <-ctx.Done(): + return + default: + } + + res, err := c.ExecContext(ctx, updQ, lim) if err != nil { log.Printf("Failed to update access_key column in request_info: %v", err) return @@ -114,10 +119,8 @@ func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { `ALTER table request_info ADD access_key text`, `CREATE INDEX request_info_access_key_index ON request_info (access_key)`, } - - return c.runQueries(ctx, queries, func(err error) bool { + err := c.runQueries(ctx, queries, func(err error) bool { if duplicateColErr(err) { - go updateAccessKeyCol(ctx, c) return true } if duplicateTblErr(err) { @@ -125,6 +128,8 @@ func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { } return false }) + go updateAccessKeyCol(ctx, c) + return err } func addAuditLogIndices(ctx context.Context, c *DBClient) error { From e539fa072d2ee040a45a8b9764aa1f2e01e537f5 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Thu, 27 Jan 2022 14:31:39 -0800 Subject: [PATCH 7/8] Improve index creation such that logsearchapi is ready to receive incoming audit log requests sooner. --- logsearchapi/server/db-migrations.go | 197 ++++++++++++++++++++++++--- logsearchapi/server/server.go | 8 ++ 2 files changed, 186 insertions(+), 19 deletions(-) diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go index 146089e1794..298118fd65d 100644 --- a/logsearchapi/server/db-migrations.go +++ b/logsearchapi/server/db-migrations.go @@ -18,6 +18,7 @@ package server import ( "context" + "fmt" "log" "github.com/lib/pq" @@ -28,9 +29,7 @@ import ( type dbMigration func(ctx context.Context, c *DBClient) error var allMigrations = []dbMigration{ - addAccessKeyColAndIndex, - addAuditLogIndices, - addReqInfoIndices, + addAccessKeyCol, // Add new migrations here below } @@ -72,6 +71,8 @@ func (c *DBClient) runQueries(ctx context.Context, queries []string, ignoreErr f return nil } +// updateAccessKeyCol updates request_info records which where created before +// the introduction of access_key column. func updateAccessKeyCol(ctx context.Context, c *DBClient) { updQ := `WITH req AS ( SELECT log->>'requestID' AS request_id, @@ -114,10 +115,11 @@ func updateAccessKeyCol(ctx context.Context, c *DBClient) { } } -func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { +// addAccessKeyCol adds a new column access_key, to request_info table to store +// API requests access key/user information wherever applicable. +func addAccessKeyCol(ctx context.Context, c *DBClient) error { queries := []string{ `ALTER table request_info ADD access_key text`, - `CREATE INDEX request_info_access_key_index ON request_info (access_key)`, } err := c.runQueries(ctx, queries, func(err error) bool { if duplicateColErr(err) { @@ -132,24 +134,181 @@ func addAccessKeyColAndIndex(ctx context.Context, c *DBClient) error { return err } -func addAuditLogIndices(ctx context.Context, c *DBClient) error { - queries := []string{ - `CREATE INDEX audit_log_events_log_index ON audit_log_events USING btree ((log->>'requestID'))`, - `CREATE INDEX audit_log_events_event_time_index ON audit_log_events (event_time desc)`, +// CreateIndices creates table indexes for audit_log_events and request_info tables. +// See auditLogIndices, reqInfoIndices functions for actual indices details. +func (c *DBClient) CreateIndices(ctx context.Context) error { + tables := []struct { + t Table + indices []indexOpts + }{ + { + t: auditLogEventsTable, + indices: auditLogIndices(), + }, + { + t: requestInfoTable, + indices: reqInfoIndices(), + }, + } + + for _, table := range tables { + // The following procedure creates indices on all partitions of + // this table. If an index was created on any of its partitions, + // it checks if newer partitions were created meanwhile, so as + // to create indices on those partitions too. + for { + partitions, err := c.getExistingPartitions(ctx, table.t) + if err != nil { + return err + } + + var indexCreated bool + for _, partition := range partitions { + indexed, err := c.CreatePartitionIndices(ctx, table.indices, partition) + if err != nil { + return err + } + indexCreated = indexCreated || indexed + } + if !indexCreated { + break + } + } + + // No more new non-indexed table partitions, creating + // parent table indices. + err := c.CreateParentIndices(ctx, table.indices) + if err != nil { + return err + } } + return nil +} - return c.runQueries(ctx, queries, duplicateTblErr) +// CreatePartitionIndices creates all indices described by optses on partition. +// It returns true if a new index was created on this partition. Note: this +// function ignores the index already exists error. +func (c *DBClient) CreatePartitionIndices(ctx context.Context, optses []indexOpts, partition string) (indexed bool, err error) { + for _, opts := range optses { + q := opts.createPartitionQuery(partition) + _, err := c.ExecContext(ctx, q) + if err == nil { + indexed = true + } + if err != nil && !duplicateTblErr(err) { + return indexed, err + } + } + return indexed, nil } -func addReqInfoIndices(ctx context.Context, c *DBClient) error { - queries := []string{ - `CREATE INDEX request_info_api_name_index ON request_info (api_name)`, - `CREATE INDEX request_info_bucket_index ON request_info (bucket)`, - `CREATE INDEX request_info_object_index ON request_info (object)`, - `CREATE INDEX request_info_request_id_index ON request_info (request_id)`, - `CREATE INDEX request_info_response_status_index ON request_info (response_status)`, - `CREATE INDEX request_info_time_index ON request_info (time)`, +// CreateParentIndices creates all indices specified by optses on the parent table. +func (c *DBClient) CreateParentIndices(ctx context.Context, optses []indexOpts) error { + for _, opts := range optses { + q := opts.createParentQuery() + _, err := c.ExecContext(ctx, q) + if err != nil && !duplicateTblErr(err) { + return err + } + } + return nil +} + +// auditLogIndices is a slice of audit_log_events' table indices specified as +// indexOpt values. +func auditLogIndices() []indexOpts { + return []indexOpts{ + { + tableName: "audit_log_events", + indexSuffix: "log", + col: idxCol{name: `(log->>'requestID')`}, + idxType: "btree", + }, + { + tableName: "audit_log_events", + col: idxCol{ + name: "event_time", + order: colDesc, + }, + }, } +} + +// reqInfoIndices is a slice of request_info's table indices specified as indexOpt values. +func reqInfoIndices() []indexOpts { + var idxOpts []indexOpts + cols := []string{"access_key", "api_name", "bucket", "object", "request_id", "response_status", "time"} + for _, col := range cols { + idxOpts = append(idxOpts, indexOpts{ + tableName: "request_info", + col: idxCol{name: col}, + }) + } + return idxOpts +} + +type colOrder bool + +const ( + colAsc colOrder = false + colDesc colOrder = true +) + +type idxCol struct { + name string + order colOrder +} + +func (col idxCol) colWithOrder() string { + if col.order == colDesc { + return fmt.Sprintf("(%s DESC)", col.name) + } + return fmt.Sprintf("(%s)", col.name) +} - return c.runQueries(ctx, queries, duplicateTblErr) +// indexOpts type is used to specify a table index +type indexOpts struct { + tableName string + indexSuffix string + col idxCol + order colOrder + idxType string +} + +func (opts indexOpts) colWithOrder() string { + return opts.col.colWithOrder() +} + +func (opts indexOpts) createParentQuery() string { + var idxName string + if opts.indexSuffix != "" { + idxName = fmt.Sprintf("%s_%s_index", opts.tableName, opts.indexSuffix) + } else { + idxName = fmt.Sprintf("%s_%s_index", opts.tableName, opts.col.name) + } + + var q string + if opts.idxType != "" { + q = fmt.Sprintf("CREATE INDEX %s ON %s USING %s %s", idxName, opts.tableName, opts.idxType, opts.colWithOrder()) + } else { + q = fmt.Sprintf("CREATE INDEX %s ON %s %s", idxName, opts.tableName, opts.colWithOrder()) + } + return q +} + +func (opts indexOpts) createPartitionQuery(partition string) string { + var idxName string + if opts.indexSuffix != "" { + idxName = fmt.Sprintf("%s_%s_index", partition, opts.indexSuffix) + } else { + idxName = fmt.Sprintf("%s_%s_index", partition, opts.col.name) + } + + var q string + if opts.idxType != "" { + q = fmt.Sprintf("CREATE INDEX CONCURRENTLY %s ON %s USING %s %s", idxName, partition, opts.idxType, opts.colWithOrder()) + } else { + q = fmt.Sprintf("CREATE INDEX CONCURRENTLY %s ON %s %s", idxName, partition, opts.colWithOrder()) + } + return q } diff --git a/logsearchapi/server/server.go b/logsearchapi/server/server.go index 2726d64e052..ccc90443163 100644 --- a/logsearchapi/server/server.go +++ b/logsearchapi/server/server.go @@ -64,6 +64,14 @@ func NewLogSearch(pgConnStr, auditAuthToken string, queryAuthToken string, diskC return nil, fmt.Errorf("error running migrations: %v", err) } + // Create indices on db + go func() { + err := ls.DBClient.CreateIndices(context.Background()) + if err != nil { + log.Printf("Failed to create some indices: %v", err) + } + }() + // Initialize muxer ls.ServeMux = http.NewServeMux() ls.HandleFunc("/status", func(w http.ResponseWriter, r *http.Request) {}) From 95c0ec255ed7675ccbbbcee78d2c30f94b219549 Mon Sep 17 00:00:00 2001 From: Krishnan Parthasarathi Date: Mon, 31 Jan 2022 18:00:09 -0800 Subject: [PATCH 8/8] Fix lint errors --- logsearchapi/server/db-migrations.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/logsearchapi/server/db-migrations.go b/logsearchapi/server/db-migrations.go index 298118fd65d..dc7ed1fc5be 100644 --- a/logsearchapi/server/db-migrations.go +++ b/logsearchapi/server/db-migrations.go @@ -250,7 +250,6 @@ func reqInfoIndices() []indexOpts { type colOrder bool const ( - colAsc colOrder = false colDesc colOrder = true ) @@ -271,7 +270,6 @@ type indexOpts struct { tableName string indexSuffix string col idxCol - order colOrder idxType string }