Skip to content

Commit

Permalink
Add steampipe_connection column to all tables. Closes #246
Browse files Browse the repository at this point in the history
  • Loading branch information
kaidaguerre authored Feb 1, 2022
1 parent 8aed818 commit 6c03aee
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 5 deletions.
7 changes: 6 additions & 1 deletion plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"sync"

"github.com/hashicorp/go-hclog"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/cache"
Expand Down Expand Up @@ -335,7 +336,11 @@ func (p *Plugin) buildSchema() (map[string]*proto.TableSchema, error) {

var tables []string
for tableName, table := range p.TableMap {
schema[tableName] = table.GetSchema()
tableSchema, err := table.GetSchema()
if err != nil {
return nil, err
}
schema[tableName] = tableSchema
tables = append(tables, tableName)
}

Expand Down
9 changes: 9 additions & 0 deletions plugin/query_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package plugin

import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
Expand Down Expand Up @@ -511,10 +512,18 @@ func (d *QueryData) buildRow(ctx context.Context, rowData *RowData, rowChan chan
log.Printf("[WARN] getRow failed with error %v", err)
d.streamError(err)
} else {
// NOTE: add the Steampipecontext data to the row
d.addContextData(row)

rowChan <- row
}
}

func (d *QueryData) addContextData(row *proto.Row) {
jsonValue, _ := json.Marshal(map[string]string{"connection_name": d.Connection.Name})
row.Columns[ContextColumnName] = &proto.Column{Value: &proto.Column_JsonValue{JsonValue: jsonValue}}
}

func (d *QueryData) waitForRowsToComplete(rowWg *sync.WaitGroup, rowChan chan *proto.Row) {
log.Println("[TRACE] wait for rows")
rowWg.Wait()
Expand Down
1 change: 0 additions & 1 deletion plugin/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ func (r *RowData) startAllHydrateCalls(rowDataCtx context.Context, rowQueryData

// wait for all hydrate calls to complete
func (r *RowData) waitForHydrateCallsToComplete(rowDataCtx context.Context) (*proto.Row, error) {

var row *proto.Row

// start a go routine which signals via the wait chan when all calls are complete
Expand Down
22 changes: 19 additions & 3 deletions plugin/table_schema.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,39 @@
package plugin

import (
"fmt"

"github.com/turbot/steampipe-plugin-sdk/grpc/proto"
)

func (t Table) GetSchema() *proto.TableSchema {
const ContextColumnName = "_ctx"

func (t Table) GetSchema() (*proto.TableSchema, error) {
schema := &proto.TableSchema{
Columns: make([]*proto.ColumnDefinition, len(t.Columns)),
Columns: make([]*proto.ColumnDefinition, len(t.Columns)+1),
Description: t.Description,
}

// NOTE: we add a column "_ctx" to all tables.
// This is therefore a reserved column name
// column schema
for i, column := range t.Columns {
if column.Name == ContextColumnName {
return nil, fmt.Errorf("column '%s' is reserved and may not be used within a plugin schema", ContextColumnName)
}
schema.Columns[i] = &proto.ColumnDefinition{
Name: column.Name,
Type: column.Type,
Description: column.Description,
}
}
// add _ctx column
schema.Columns[len(t.Columns)] = &proto.ColumnDefinition{
Name: ContextColumnName,
Type: proto.ColumnType_JSON,
Description: "context data",
}

// key columns
if t.Get != nil && len(t.Get.KeyColumns) > 0 {
schema.GetCallKeyColumnList = t.Get.KeyColumns.ToProtobuf()
Expand All @@ -28,5 +44,5 @@ func (t Table) GetSchema() *proto.TableSchema {
}
}

return schema
return schema, nil
}

0 comments on commit 6c03aee

Please sign in to comment.