Skip to content

Commit

Permalink
Split IsRelationNotFoundError and GetMissingSchemaFromIsRelationNotFo…
Browse files Browse the repository at this point in the history
…undError and move to db_common

rename rate steampipe_rate_limiter -> steampipe_plugin_limiter
rename steampipe_connection_state -> steampipe_connection
fix hcl range for all resources
  • Loading branch information
kaidaguerre committed Sep 19, 2023
1 parent c85b6c3 commit a36dd1b
Show file tree
Hide file tree
Showing 52 changed files with 255 additions and 154 deletions.
5 changes: 1 addition & 4 deletions pkg/connection_sync/wait_for_search_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package connection_sync
import (
"context"

"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_client"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/steampipeconfig"
)
Expand All @@ -24,8 +22,7 @@ func WaitForSearchPathSchemas(ctx context.Context, client db_common.Client, sear

// NOTE: if we failed to load conection state, this must be because we are connected to an older version of the CLI
// just return nil error
_, missingTable, relationNotFound := db_client.IsRelationNotFoundError(err)
if relationNotFound && missingTable == constants.ConnectionStateTable {
if db_common.IsRelationNotFoundError(err) {
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions pkg/constants/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ const (
ServerSettingsTable = "steampipe_server_settings"

// RateLimiterDefinitionTable is the table used to store rate limiters defined in the config
RateLimiterDefinitionTable = "steampipe_rate_limiter"
RateLimiterDefinitionTable = "steampipe_plugin_limiter"
// PluginConfigTable is the table used to store plugin configs
PluginConfigTable = "steampipe_plugin"

// ConnectionStateTable is the table used to store steampipe connection state
ConnectionStateTable = "steampipe_connection_state"
// LegacyConnectionStateTable is the table used to store steampipe connection state
LegacyConnectionStateTable = "steampipe_connection_state"
ConnectionTable = "steampipe_connection"
ConnectionStatePending = "pending"
ConnectionStatePendingIncomplete = "incomplete"
ConnectionStateReady = "ready"
Expand Down Expand Up @@ -94,7 +95,7 @@ const (

// ConnectionStates is a handy array of all states
var ConnectionStates = []string{
ConnectionStateTable,
LegacyConnectionStateTable,
ConnectionStatePending,
ConnectionStateReady,
ConnectionStateUpdating,
Expand Down
6 changes: 3 additions & 3 deletions pkg/db/db_client/db_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func (c *DbClient) closePools() {
func (c *DbClient) loadServerSettings(ctx context.Context) error {
serverSettings, err := serversettings.Load(ctx, c.managementPool)
if err != nil {
if _, _, notFound := IsRelationNotFoundError(err); notFound {
// when connecting to pre-0.21.0 services, the server_settings table will not be available.
if notFound := db_common.IsRelationNotFoundError(err); notFound {
// when connecting to pre-0.21.0 services, the steampipe_server_settings table will not be available.
// this is expected and not an error
// code which uses server_settings should handle this
// code which uses steampipe_server_settings should handle this
log.Printf("[TRACE] could not find %s.%s table. skipping\n", constants.InternalSchema, constants.ServerSettingsTable)
return nil
}
Expand Down
31 changes: 1 addition & 30 deletions pkg/db/db_client/db_client_execute_retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package db_client

import (
"context"
"errors"
"fmt"
"log"
"regexp"
"time"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/sethvargo/go-retry"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/constants"
Expand Down Expand Up @@ -47,7 +44,7 @@ func (c *DbClient) startQueryWithRetries(ctx context.Context, session *db_common

log.Println("[TRACE] queryError:", queryError)
// so there is an error - is it "relation not found"?
missingSchema, _, relationNotFound := IsRelationNotFoundError(queryError)
missingSchema, _, relationNotFound := db_common.GetMissingSchemaFromIsRelationNotFoundError(queryError)
if !relationNotFound {
log.Println("[TRACE] queryError not relation not found")
// just return it
Expand Down Expand Up @@ -138,29 +135,3 @@ func (c *DbClient) startQueryWithRetries(ctx context.Context, session *db_common

return res, err
}

func IsRelationNotFoundError(err error) (string, string, bool) {
if err == nil {
return "", "", false
}
var pgErr *pgconn.PgError
ok := errors.As(err, &pgErr)
if !ok || pgErr.Code != "42P01" {
return "", "", false
}

r := regexp.MustCompile(`^relation "(.*)\.(.*)" does not exist$`)
captureGroups := r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 3 {

return captureGroups[1], captureGroups[2], true
}

// maybe there is no schema
r = regexp.MustCompile(`^relation "(.*)" does not exist$`)
captureGroups = r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 2 {
return "", captureGroups[1], true
}
return "", "", true
}
38 changes: 38 additions & 0 deletions pkg/db/db_common/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package db_common

import (
"errors"
"github.com/jackc/pgx/v5/pgconn"
"regexp"
)

func IsRelationNotFoundError(err error) bool {
_, _, isRelationNotFound := GetMissingSchemaFromIsRelationNotFoundError(err)
return isRelationNotFound
}

func GetMissingSchemaFromIsRelationNotFoundError(err error) (string, string, bool) {
if err == nil {
return "", "", false
}
var pgErr *pgconn.PgError
ok := errors.As(err, &pgErr)
if !ok || pgErr.Code != "42P01" {
return "", "", false
}

r := regexp.MustCompile(`^relation "(.*)\.(.*)" does not exist$`)
captureGroups := r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 3 {

return captureGroups[1], captureGroups[2], true
}

// maybe there is no schema
r = regexp.MustCompile(`^relation "(.*)" does not exist$`)
captureGroups = r.FindStringSubmatch(pgErr.Message)
if len(captureGroups) == 2 {
return "", captureGroups[1], true
}
return "", "", true
}
13 changes: 8 additions & 5 deletions pkg/db/db_local/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/jackc/pgx/v5"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_client"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/introspection"
"github.com/turbot/steampipe/pkg/statushooks"
Expand Down Expand Up @@ -121,8 +120,8 @@ INNER JOIN
"glob": true,
}
expectedTables := map[string]bool{
"connection_state": true, // legacy table name
constants.ConnectionStateTable: true,
"connection_state": true, // previous legacy table name
constants.LegacyConnectionStateTable: true,
}

for _, f := range functions {
Expand Down Expand Up @@ -234,18 +233,22 @@ func initializeConnectionStateTable(ctx context.Context, conn *pgx.Conn) error {
connectionStateMap, err := steampipeconfig.LoadConnectionState(ctx, conn)
if err != nil {
// ignore relation not found error
_, _, isRelationNotFound := db_client.IsRelationNotFoundError(err)
if !isRelationNotFound {
if !db_common.IsRelationNotFoundError(err) {
return err
}
return err
}
// if any connections are in a ready state, set them to pending - we need to run refresh connections before we know this connection is still valid
connectionStateMap.SetReadyConnectionsToPending()
// if any connections are not in a ready or error state, set them to pending_incomplete
connectionStateMap.SetNotReadyConnectionsToIncomplete()

// migration: ensure filename and line numbers are set for all connection states
connectionStateMap.PopulateFilename()

// drop the table and recreate
queries := []db_common.QueryWithArgs{
introspection.GetLegacyConnectionStateTableDropSql(),
introspection.GetConnectionStateTableDropSql(),
introspection.GetConnectionStateTableCreateSql(),
introspection.GetConnectionStateTableGrantSql(),
Expand Down
2 changes: 1 addition & 1 deletion pkg/interactive/interactive_client_autocomplete.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (c *InteractiveClient) initialiseSchemaAndTableSuggestions(connectionStateM
var unqualifiedTablesToAdd = make(map[string]struct{})

// add connection state and rate limit
unqualifiedTablesToAdd[constants.ConnectionStateTable] = struct{}{}
unqualifiedTablesToAdd[constants.ConnectionTable] = struct{}{}
unqualifiedTablesToAdd[constants.RateLimiterDefinitionTable] = struct{}{}

// get the first search path connection for each plugin
Expand Down
57 changes: 40 additions & 17 deletions pkg/introspection/connection_state_table_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
)

// GetConnectionStateTableDropSql returns the sql to create the conneciton state table
// GetLegacyConnectionStateTableDropSql returns the sql to drop the legacy connection state table
func GetLegacyConnectionStateTableDropSql() db_common.QueryWithArgs {
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s;`, constants.InternalSchema, constants.LegacyConnectionStateTable)
return db_common.QueryWithArgs{Query: query}
}

func GetConnectionStateTableDropSql() db_common.QueryWithArgs {
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s;`, constants.InternalSchema, constants.ConnectionStateTable)
query := fmt.Sprintf(`DROP TABLE IF EXISTS %s.%s;`, constants.InternalSchema, constants.ConnectionTable)
return db_common.QueryWithArgs{Query: query}
}

Expand All @@ -29,8 +34,11 @@ func GetConnectionStateTableCreateSql() db_common.QueryWithArgs {
schema_hash TEXT NULL,
comments_set BOOL DEFAULT FALSE,
connection_mod_time TIMESTAMPTZ,
plugin_mod_time TIMESTAMPTZ
);`, constants.InternalSchema, constants.ConnectionStateTable)
plugin_mod_time TIMESTAMPTZ,
file_name TEXT,
start_line_number INTEGER,
end_line_number INTEGER
);`, constants.InternalSchema, constants.ConnectionTable)
return db_common.QueryWithArgs{Query: query}
}

Expand All @@ -39,7 +47,7 @@ func GetConnectionStateTableGrantSql() db_common.QueryWithArgs {
return db_common.QueryWithArgs{Query: fmt.Sprintf(
`GRANT SELECT ON TABLE %s.%s TO %s;`,
constants.InternalSchema,
constants.ConnectionStateTable,
constants.ConnectionTable,
constants.DatabaseUsersRole,
)}
}
Expand All @@ -53,7 +61,7 @@ SET state = '%s',
WHERE
name = $2
`,
constants.InternalSchema, constants.ConnectionStateTable, constants.ConnectionStateError)
constants.InternalSchema, constants.ConnectionTable, constants.ConnectionStateError)
args := []any{constants.ConnectionStateError, err.Error(), connectionName}
return db_common.QueryWithArgs{Query: query, Args: args}
}
Expand All @@ -69,7 +77,7 @@ WHERE
AND state <> 'disabled'
AND state <> 'error'
`,
constants.InternalSchema, constants.ConnectionStateTable, constants.ConnectionStateError)
constants.InternalSchema, constants.ConnectionTable, constants.ConnectionStateError)
args := []any{err.Error()}
return db_common.QueryWithArgs{Query: query, Args: args}
}
Expand All @@ -89,8 +97,11 @@ func GetUpsertConnectionStateSql(c *steampipeconfig.ConnectionState) db_common.Q
schema_hash,
comments_set,
connection_mod_time,
plugin_mod_time)
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,now(),$12)
plugin_mod_time,
file_name,
start_line_number,
end_line_number)
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,now(),$12,$13,$14,$15)
ON CONFLICT (name)
DO
UPDATE SET
Expand All @@ -105,9 +116,12 @@ DO
schema_hash = $10,
comments_set = $11,
connection_mod_time = now(),
plugin_mod_time = $12
plugin_mod_time = $12,
file_name = $13,
start_line_number = $14,
end_line_number = $15
`, constants.InternalSchema, constants.ConnectionStateTable)
`, constants.InternalSchema, constants.ConnectionTable)
args := []any{
c.ConnectionName,
c.State,
Expand All @@ -121,6 +135,9 @@ DO
c.SchemaHash,
c.CommentsSet,
c.PluginModTime,
c.FileName,
c.StartLineNumber,
c.EndLineNumber,
}
return db_common.QueryWithArgs{Query: query, Args: args}
}
Expand All @@ -138,9 +155,12 @@ func GetNewConnectionStateFromConnectionInsertSql(c *modconfig.Connection) db_co
schema_hash,
comments_set,
connection_mod_time,
plugin_mod_time)
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,now(),now())
`, constants.InternalSchema, constants.ConnectionStateTable)
plugin_mod_time,
file_name,
start_line_number,
end_line_number)
VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,now(),now(),$12,$13,$14)
`, constants.InternalSchema, constants.ConnectionTable)

schemaMode := ""
commentsSet := false
Expand All @@ -157,6 +177,9 @@ VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,now(),now())
schemaMode,
schemaHash,
commentsSet,
c.DeclRange.Filename,
c.DeclRange.Start.Line,
c.DeclRange.End.Line,
}

return db_common.QueryWithArgs{
Expand All @@ -172,22 +195,22 @@ func GetSetConnectionStateSql(connectionName string, state string) db_common.Que
WHERE
name = $1
`,
constants.InternalSchema, constants.ConnectionStateTable, state,
constants.InternalSchema, constants.ConnectionTable, state,
)
args := []any{connectionName}
return db_common.QueryWithArgs{Query: query, Args: args}
}

func GetDeleteConnectionStateSql(connectionName string) db_common.QueryWithArgs {
query := fmt.Sprintf(`DELETE FROM %s.%s WHERE NAME=$1`, constants.InternalSchema, constants.ConnectionStateTable)
query := fmt.Sprintf(`DELETE FROM %s.%s WHERE NAME=$1`, constants.InternalSchema, constants.ConnectionTable)
args := []any{connectionName}
return db_common.QueryWithArgs{Query: query, Args: args}
}

func GetSetConnectionStateCommentLoadedSql(connectionName string, commentsLoaded bool) db_common.QueryWithArgs {
query := fmt.Sprintf(`UPDATE %s.%s
SET comments_set = $1
WHERE NAME=$2`, constants.InternalSchema, constants.ConnectionStateTable)
WHERE NAME=$2`, constants.InternalSchema, constants.ConnectionTable)
args := []any{commentsLoaded, connectionName}
return db_common.QueryWithArgs{Query: query, Args: args}
}
4 changes: 2 additions & 2 deletions pkg/introspection/rate_limiters_table_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func GetRateLimiterTableCreateSql() db_common.QueryWithArgs {
name TEXT,
plugin TEXT,
plugin_instance TEXT NULL,
source TEXT,
source_type TEXT,
status TEXT,
bucket_size INTEGER,
fill_rate REAL ,
Expand Down Expand Up @@ -44,7 +44,7 @@ func GetRateLimiterTablePopulateSql(settings *modconfig.RateLimiter) db_common.Q
"name",
plugin,
plugin_instance,
source,
source_type,
status,
bucket_size,
fill_rate,
Expand Down
1 change: 0 additions & 1 deletion pkg/pluginmanager_service/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ type PluginManager struct {
}

func NewPluginManager(ctx context.Context, connectionConfig map[string]*sdkproto.ConnectionConfig, pluginConfigs connection.PluginMap, logger hclog.Logger) (*PluginManager, error) {

log.Printf("[INFO] NewPluginManager")
pluginManager := &PluginManager{
logger: logger,
Expand Down
2 changes: 2 additions & 0 deletions pkg/pluginmanager_service/plugin_manager_rate_limiters.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@ func (m *PluginManager) initialiseRateLimiterDefs(ctx context.Context) (e error)
if err != nil {
return err
}
// TODO KAI TACTICAL to force recreation
rateLimiterTableExists = false

if !rateLimiterTableExists {
return m.bootstrapRateLimiterTable(ctx)
Expand Down
Loading

0 comments on commit a36dd1b

Please sign in to comment.