diff --git a/docs/tables/gcp_vertex_ai_endpoint.md b/docs/tables/gcp_vertex_ai_endpoint.md new file mode 100644 index 00000000..3474d58f --- /dev/null +++ b/docs/tables/gcp_vertex_ai_endpoint.md @@ -0,0 +1,68 @@ +# Table: gcp_vertex_ai_endpoint + +AI Platform is a managed service that enables you to easily build machine learning models, that work on any type of data, of any size. + +### Basic info + +```sql +select + name, + display_name, + create_time, + network +from + gcp_vertex_ai_endpoint; +``` + +### List endpoints that are exposed via private service connect + +```sql +select + name, + display_name, + create_time, + enable_private_service_connect +from + gcp_vertex_ai_endpoint +where + enable_private_service_connect; +``` + +### List endpoints created in the last 30 days + +```sql +select + name, + display_name, + network, + create_time, + update_time +from + gcp_vertex_ai_endpoint +where + create_time >= now() - interval '30' day; +``` + +### Get customer-managed key details of endpoints + +```sql +select + name, + create_time, + encryption_spec ->> 'KmsKeyName' as kms_key_name +from + gcp_vertex_ai_endpoint; +``` + +### Get prediction request response config of endpoints + +```sql +select + name, + network, + predict_request_response_logging_config ->> 'Enabled' as enabled, + predict_request_response_logging_config ->> 'SamplingRate' as sampling_rate, + predict_request_response_logging_config ->> 'BigqueryDestination' as bigquery_destination +from + gcp_vertex_ai_endpoint; +``` \ No newline at end of file diff --git a/gcp/plugin.go b/gcp/plugin.go index e39c9222..58e2009d 100644 --- a/gcp/plugin.go +++ b/gcp/plugin.go @@ -126,6 +126,7 @@ func Plugin(ctx context.Context) *plugin.Plugin { "gcp_sql_database_instance_metric_cpu_utilization_hourly": tableGcpSQLDatabaseInstanceMetricCpuUtilizationHourly(ctx), "gcp_storage_bucket": tableGcpStorageBucket(ctx), "gcp_storage_object": tableGcpStorageObject(ctx), + "gcp_vertex_ai_endpoint": tableGcpVertexAIEndpoint(ctx), /* https://github.com/turbot/steampipe/issues/108 "gcp_compute_route": tableGcpComputeRoute(ctx), diff --git a/gcp/service.go b/gcp/service.go index b7ab55ea..8db2fffc 100644 --- a/gcp/service.go +++ b/gcp/service.go @@ -3,7 +3,8 @@ package gcp import ( "context" - "cloud.google.com/go/redis/apiv1" + aiplatform "cloud.google.com/go/aiplatform/apiv1" + redis "cloud.google.com/go/redis/apiv1" "github.com/turbot/steampipe-plugin-sdk/v5/plugin" "google.golang.org/api/accessapproval/v1" "google.golang.org/api/apikeys/v2" @@ -24,6 +25,7 @@ import ( "google.golang.org/api/iam/v1" "google.golang.org/api/logging/v2" "google.golang.org/api/monitoring/v3" + "google.golang.org/api/option" "google.golang.org/api/pubsub/v1" "google.golang.org/api/run/v2" "google.golang.org/api/serviceusage/v1" @@ -54,6 +56,68 @@ func AccessApprovalService(ctx context.Context, d *plugin.QueryData) (*accessapp return svc, nil } +type AIplatfromServiceClients struct { + Endpoint *aiplatform.EndpointClient + Dataset *aiplatform.DatasetClient + Index *aiplatform.IndexClient + Job *aiplatform.JobClient +} + +func AIService(ctx context.Context, d *plugin.QueryData, clientType string) (*AIplatfromServiceClients, error) { + // have we already created and cached the service? + matrixLocation := d.EqualsQualString(matrixKeyLocation) + + // Default to us-central1 for building the supported locations for the resources like Endpoint, Dataset, Index, Job etc... + if matrixLocation == "" { + matrixLocation = "us-central1" + } + + serviceCacheKey := "AIService" + matrixLocation + clientType + if cachedData, ok := d.ConnectionManager.Cache.Get(serviceCacheKey); ok { + return cachedData.(*AIplatfromServiceClients), nil + } + + // To get config arguments from plugin config file + opts := setSessionConfig(ctx, d.Connection) + opts = append(opts, option.WithEndpoint(matrixLocation+"-aiplatform.googleapis.com:443")) + + clients := &AIplatfromServiceClients{} + + switch clientType { + case "Endpoint": + svc, err := aiplatform.NewEndpointClient(ctx, opts...) + if err != nil { + return nil, err + } + clients.Endpoint = svc + return clients, nil + case "Dataset": + svc, err := aiplatform.NewDatasetClient(ctx, opts...) + if err != nil { + return nil, err + } + clients.Dataset = svc + return clients, nil + case "Index": + svc, err := aiplatform.NewIndexClient(ctx, opts...) + if err != nil { + return nil, err + } + clients.Index = svc + return clients, nil + case "Job": + svc, err := aiplatform.NewJobClient(ctx, opts...) + if err != nil { + return nil, err + } + clients.Job = svc + return clients, nil + } + + d.ConnectionManager.Cache.Set(serviceCacheKey, clients) + return clients, nil +} + func APIKeysService(ctx context.Context, d *plugin.QueryData) (*apikeys.Service, error) { // have we already created and cached the service? serviceCacheKey := "APIKeysService" diff --git a/gcp/table_gcp_vertex_ai_endpoint.go b/gcp/table_gcp_vertex_ai_endpoint.go new file mode 100644 index 00000000..ceb25871 --- /dev/null +++ b/gcp/table_gcp_vertex_ai_endpoint.go @@ -0,0 +1,291 @@ +package gcp + +import ( + "context" + "strings" + + "cloud.google.com/go/aiplatform/apiv1/aiplatformpb" + "github.com/turbot/go-kit/types" + "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto" + "github.com/turbot/steampipe-plugin-sdk/v5/plugin/transform" + "google.golang.org/api/iterator" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/turbot/steampipe-plugin-sdk/v5/plugin" +) + +//// TABLE DEFINITION + +func tableGcpVertexAIEndpoint(_ context.Context) *plugin.Table { + return &plugin.Table{ + Name: "gcp_vertex_ai_endpoint", + Description: "GCP Vertex AI Endpoint", + Get: &plugin.GetConfig{ + KeyColumns: plugin.SingleColumn("name"), + Hydrate: getAIPlatformEndpoint, + ShouldIgnoreError: isIgnorableError([]string{"Unimplemented"}), + }, + List: &plugin.ListConfig{ + Hydrate: listAIPlatformEndpoints, + ShouldIgnoreError: isIgnorableError([]string{"Unimplemented"}), + }, + GetMatrixItemFunc: BuildVertexAILocationListByClientType("Endpoint"), + Columns: []*plugin.Column{ + { + Name: "name", + Type: proto.ColumnType_STRING, + Transform: transform.FromField("Name").Transform(lastPathElement), + Description: "The resource name of the Endpoint.", + }, + { + Name: "create_time", + Description: "Timestamp when this Endpoint was created.", + Type: proto.ColumnType_TIMESTAMP, + Transform: transform.FromField("CreateTime").Transform(convertTimestamppbAsTime), + }, + { + Name: "display_name", + Type: proto.ColumnType_STRING, + Description: "Human-readable display name of this key that you can modify.", + }, + { + Name: "description", + Description: "The description of the Endpoint.", + Type: proto.ColumnType_STRING, + }, + { + Name: "enable_private_service_connect", + Description: "If true, expose the Endpoint via private service connect. Only one of the fields, network or enable_private_service_connect, can be set.", + Type: proto.ColumnType_BOOL, + }, + { + Name: "etag", + Description: "Used to perform consistent read-modify-write updates. If not set, a blind 'overwrite' update happens.", + Type: proto.ColumnType_STRING, + }, + { + Name: "model_deployment_monitoring_job", + Description: "Resource name of the Model Monitoring job associated with this Endpoint if monitoring is enabled by JobService.CreateModelDeploymentMonitoringJob.", + Type: proto.ColumnType_STRING, + }, + { + Name: "network", + Description: "The full name of the Google Compute Engine network (https://cloud.google.com//compute/docs/networks-and-firewalls#networks) to which the Endpoint should be peered.", + Type: proto.ColumnType_STRING, + }, + { + Name: "update_time", + Description: "Timestamp when this Endpoint was last updated.", + Type: proto.ColumnType_TIMESTAMP, + Transform: transform.FromField("UpdateTime").Transform(convertTimestamppbAsTime), + }, + + // JSON columns + { + Name: "deployed_models", + Description: "The models deployed in this Endpoint. To add or remove DeployedModels use EndpointService.DeployModel and EndpointService.UndeployModel respectively.", + Type: proto.ColumnType_JSON, + }, + { + Name: "encryption_spec", + Description: "Customer-managed encryption key spec for an Endpoint. If set, this Endpoint and all sub-resources of this Endpoint will be secured by this key.", + Type: proto.ColumnType_JSON, + }, + { + Name: "predict_request_response_logging_config", + Description: "Configures the request-response logging for online prediction.", + Type: proto.ColumnType_JSON, + }, + { + Name: "traffic_split", + Description: "A map from a DeployedModel's ID to the percentage of this Endpoint's traffic that should be forwarded to that DeployedModel.", + Type: proto.ColumnType_JSON, + }, + { + Name: "labels", + Description: "The labels with user-defined metadata to organize your Endpoints.", + Type: proto.ColumnType_JSON, + }, + + // standard steampipe columns + { + Name: "title", + Description: ColumnDescriptionTitle, + Type: proto.ColumnType_STRING, + Transform: transform.FromField("DisplayName"), + }, + { + Name: "tags", + Description: ColumnDescriptionTags, + Type: proto.ColumnType_JSON, + Transform: transform.FromField("Labels"), + }, + { + Name: "akas", + Description: ColumnDescriptionAkas, + Type: proto.ColumnType_JSON, + Transform: transform.FromP(gcpAIPlatformTurbotData, "Akas"), + }, + + // Standard gcp columns + { + Name: "location", + Description: ColumnDescriptionLocation, + Type: proto.ColumnType_STRING, + Transform: transform.FromP(gcpAIPlatformTurbotData, "Location"), + }, + { + Name: "project", + Description: ColumnDescriptionProject, + Type: proto.ColumnType_STRING, + Hydrate: plugin.HydrateFunc(getProject).WithCache(), + Transform: transform.FromValue(), + }, + }, + } +} + +//// LIST FUNCTION + +func listAIPlatformEndpoints(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) { + logger := plugin.Logger(ctx) + + region := d.EqualsQualString("location") + + var location string + matrixLocation := d.EqualsQualString(matrixKeyLocation) + // Since, when the service API is disabled, matrixLocation value will be nil + if matrixLocation != "" { + location = matrixLocation + } + + // Minimize API call as per given location + if region != "" && region != location { + return nil, nil + } + + // Get project details + getProjectCached := plugin.HydrateFunc(getProject).WithCache() + projectId, err := getProjectCached(ctx, d, h) + if err != nil { + logger.Error("gcp_vertex_ai_endpoint.listAIPlatformEndpoints", "cache_error", err) + return nil, err + } + project := projectId.(string) + + // Page size should be in range of [0, 100]. + pageSize := types.Int64(100) + limit := d.QueryContext.Limit + if d.QueryContext.Limit != nil { + if *limit < *pageSize { + pageSize = limit + } + } + + // Create Service Connection + service, err := AIService(ctx, d, "Endpoint") + if err != nil { + logger.Error("gcp_vertex_ai_endpoint.listAIPlatformEndpoints", "service_error", err) + return nil, err + } + + input := &aiplatformpb.ListEndpointsRequest{ + Parent: "projects/" + project + "/locations/" + location, + PageSize: int32(*pageSize), + } + + it := service.Endpoint.ListEndpoints(ctx, input) + + for { + resp, err := it.Next() + if err != nil { + if strings.Contains(err.Error(), "404") { + return nil, nil + } + if err == iterator.Done { + break + } + logger.Error("gcp_vertex_ai_endpoint.listAIPlatformEndpoints", "api_error", err) + return nil, err + } + + d.StreamListItem(ctx, resp) + + // Check if context has been cancelled or if the limit has been hit (if specified) + // if there is a limit, it will return the number of rows required to reach this limit + if d.RowsRemaining(ctx) == 0 { + return nil, nil + } + } + + return nil, nil +} + +//// HYDRATE FUNCTIONS + +func getAIPlatformEndpoint(ctx context.Context, d *plugin.QueryData, h *plugin.HydrateData) (interface{}, error) { + logger := plugin.Logger(ctx) + + matrixLocation := d.EqualsQualString(matrixKeyLocation) + + // Get project details + getProjectCached := plugin.HydrateFunc(getProject).WithCache() + projectId, err := getProjectCached(ctx, d, h) + if err != nil { + logger.Error("gcp_vertex_ai_endpoint.getAIPlatformEndpoint", "cache_error", err) + return nil, err + } + project := projectId.(string) + + name := d.EqualsQualString("name") + + // Validate - name should not be blank + if name == "" { + return nil, nil + } + + // Create Service Connection + service, err := AIService(ctx, d, "Endpoint") + if err != nil { + if strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "NotFound") { + return nil, nil + } + logger.Error("gcp_vertex_ai_endpoint.getAIPlatformEndpoint", "service_error", err) + return nil, err + } + input := &aiplatformpb.GetEndpointRequest{ + Name: "projects/" + project + "/locations/" + matrixLocation + "/endpoints/" + name, + } + op, err := service.Endpoint.GetEndpoint(ctx, input) + if err != nil { + if strings.Contains(err.Error(), "404") || strings.Contains(err.Error(), "NotFound") { + return nil, nil + } + logger.Error("gcp_vertex_ai_endpoint.getAIPlatformEndpoint", "api_error", err) + return nil, err + } + return op, nil +} + +/// TRANSFORM FUNCTIONS + +func gcpAIPlatformTurbotData(ctx context.Context, d *transform.TransformData) (interface{}, error) { + param := d.Param.(string) + AIData := d.HydrateItem.(*aiplatformpb.Endpoint) + akas := []string{"gcp://aiplatform.googleapis.com/" + AIData.Name} + + turbotData := map[string]interface{}{ + "Location": strings.Split(AIData.Name, "/")[3], + "Akas": akas, + } + return turbotData[param], nil +} + +func convertTimestamppbAsTime(ctx context.Context, d *transform.TransformData) (interface{}, error) { + v := d.Value + if v != nil { + timeValue := v.(*timestamppb.Timestamp) + return timeValue.AsTime(), nil + } + return nil, nil +} diff --git a/gcp/vertex_ai_location_list.go b/gcp/vertex_ai_location_list.go new file mode 100644 index 00000000..7d66c688 --- /dev/null +++ b/gcp/vertex_ai_location_list.go @@ -0,0 +1,86 @@ +package gcp + +import ( + "context" + "strings" + + aiplatform "cloud.google.com/go/aiplatform/apiv1" + "github.com/turbot/steampipe-plugin-sdk/v5/plugin" + "google.golang.org/api/iterator" + "google.golang.org/genproto/googleapis/cloud/location" +) + +func BuildVertexAILocationListByClientType(clientType string) func(ctx context.Context, d *plugin.QueryData) []map[string]interface{} { + return BuildVertexAILocationList(clientType) +} + +func BuildVertexAILocationList(clientType string) func(ctx context.Context, d *plugin.QueryData) []map[string]interface{} { + return func(ctx context.Context, d *plugin.QueryData) []map[string]interface{} { + // have we already created and cached the locations? + locationCacheKey := "BuildVertexAILocationList" + clientType + if cachedData, ok := d.ConnectionManager.Cache.Get(locationCacheKey); ok { + plugin.Logger(ctx).Trace("listlocationDetails:", cachedData.([]map[string]interface{})) + return cachedData.([]map[string]interface{}) + } + + // Create Service Connection + service, err := AIService(ctx, d, clientType) + if err != nil { + return nil + } + + // Get project details + projectData, err := activeProject(ctx, d) + if err != nil { + return nil + } + project := projectData.Project + + var resourceLocations []*location.Location + input := &location.ListLocationsRequest{ + Name: "projects/" + project, + } + + switch clientType { + case "Endpoint": + resp := service.Endpoint.ListLocations(ctx, input) + resourceLocations = append(resourceLocations, iterateLocationResponse(resp)...) + case "Dataset": + resp := service.Dataset.ListLocations(ctx, input) + resourceLocations = append(resourceLocations, iterateLocationResponse(resp)...) + case "Index": + resp := service.Index.ListLocations(ctx, input) + resourceLocations = append(resourceLocations, iterateLocationResponse(resp)...) + case "Job": + resp := service.Job.ListLocations(ctx, input) + resourceLocations = append(resourceLocations, iterateLocationResponse(resp)...) + } + + // validate location list + matrix := make([]map[string]interface{}, len(resourceLocations)) + for i, location := range resourceLocations { + matrix[i] = map[string]interface{}{matrixKeyLocation: location.LocationId} + } + d.ConnectionManager.Cache.Set(locationCacheKey, matrix) + return matrix + + } +} + +func iterateLocationResponse(response *aiplatform.LocationIterator) []*location.Location { + var loc []*location.Location + for { + + res, err := response.Next() + if err != nil { + if strings.Contains(err.Error(), "404") { + return nil + } + if err == iterator.Done { + break + } + } + loc = append(loc, res) + } + return loc +} diff --git a/go.mod b/go.mod index 744e2051..9b363b1b 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/turbot/steampipe-plugin-gcp go 1.21 require ( + cloud.google.com/go/aiplatform v1.48.0 github.com/mitchellh/go-homedir v1.1.0 github.com/turbot/go-kit v0.8.0 github.com/turbot/steampipe-plugin-sdk/v5 v5.6.2 diff --git a/go.sum b/go.sum index 20abfdac..acca266b 100644 --- a/go.sum +++ b/go.sum @@ -34,6 +34,8 @@ cloud.google.com/go v0.110.6 h1:8uYAkj3YHTP/1iwReuHPxLSbdcyc+dSBbzFMrVwDR6Q= cloud.google.com/go v0.110.6/go.mod h1:+EYjdK8e5RME/VY/qLCAtuyALQ9q67dvuum8i+H5xsI= cloud.google.com/go/aiplatform v1.22.0/go.mod h1:ig5Nct50bZlzV6NvKaTwmplLLddFx0YReh9WfTO5jKw= cloud.google.com/go/aiplatform v1.24.0/go.mod h1:67UUvRBKG6GTayHKV8DBv2RtR1t93YRu5B1P3x99mYY= +cloud.google.com/go/aiplatform v1.48.0 h1:M5davZWCTzE043rJCn+ZLW6hSxfG1KAx4vJTtas2/ec= +cloud.google.com/go/aiplatform v1.48.0/go.mod h1:Iu2Q7sC7QGhXUeOhAj/oCK9a+ULz1O4AotZiqjQ8MYA= cloud.google.com/go/analytics v0.11.0/go.mod h1:DjEWCu41bVbYcKyvlws9Er60YE4a//bK6mnhWvQeFNI= cloud.google.com/go/analytics v0.12.0/go.mod h1:gkfj9h6XRf9+TS4bmuhPEShsh3hH8PAZzm/41OOhQd4= cloud.google.com/go/area120 v0.5.0/go.mod h1:DE/n4mp+iqVyvxHN41Vf1CR602GiHQjFPusMFW6bGR4=