Skip to content

Commit

Permalink
Add telemetry instrumentation for delegated identity API and add late…
Browse files Browse the repository at this point in the history
…ncy telemetry util (#4399)

* Add telemetry instrumentation for delegated identity API and add latency telemetry util

Signed-off-by: chiragk25 <chirag.d.kapadia@gmail.com>
  • Loading branch information
chiragk25 authored Aug 9, 2023
1 parent ceb07d8 commit 49ead4e
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 56 deletions.
57 changes: 30 additions & 27 deletions doc/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,36 @@ The following metrics are emitted:

## SPIRE Agent

| Type | Keys | Labels | Description |
|--------------|--------------------------------------------|------------------------------|---------------------------------------------------------------------------------------|
| Call Counter | `rpc`, `<service>`, `<method>` | | Call counters over the [SPIRE Agent RPCs](<https://github.com/spiffe/spire-api-sdk>). |
| Call Counter | `agent_key_manager`, `generate_key_pair` | | The KeyManager is generating a key pair. |
| Call Counter | `agent_key_manager`, `fetch_private_key` | | The KeyManager is fetching a private key. |
| Call Counter | `agent_key_manager`, `store_private_key` | | The KeyManager is storing a private key. |
| Call Counter | `agent_svid`, `rotate` | | The Agent's SVID is being rotated. |
| Sample | `cache_manager`, `expiring_svids` | | The number of expiring SVIDs that the Cache Manager has. |
| Sample | `cache_manager`, `outdated_svids` | | The number of outdated SVIDs that the Cache Manager has. |
| Counter | `lru_cache_entry_add` | | The number of entries added to the LRU cache. |
| Counter | `lru_cache_entry_remove` | | The number of entries removed from the LRU cache. |
| Counter | `lru_cache_entry_update` | | The number of entries updated in the LRU cache. |
| Call Counter | `manager`, `sync`, `fetch_entries_updates` | | The Sync Manager is fetching entries updates. |
| Call Counter | `manager`, `sync`, `fetch_svids_updates` | | The Sync Manager is fetching SVIDs updates. |
| Call Counter | `node`, `attestor`, `new_svid` | | The Node Attestor is calling to get an SVID. |
| Gauge | `lru_cache_record_map_size` | | The total number of entries in the LRU cache records map. |
| Counter | `sds_api`, `connections` | | The SDS API has successfully established a connection. |
| Gauge | `sds_api`, `connections` | | The number of active connection that the SDS API has. |
| Gauge | `lru_cache_svid_map_size` | | The total number of SVIDs in the LRU cache SVID map. |
| Counter | `workload_api`, `bundles_update`, `jwt` | | The Workload API has successfully updated a JWT bundle. |
| Counter | `workload_api`, `connection` | | The Workload API has successfully established a new connection. |
| Gauge | `workload_api`, `connections` | | The number of active connections that the Workload API has. |
| Sample | `workload_api`, `discovered_selectors` | | The number of selectors discovered during a workload attestation process. |
| Call Counter | `workload_api`, `workload_attestation` | | The Workload API is performing a workload attestation. |
| Call Counter | `workload_api`, `workload_attestor` | `attestor` | The Workload API is invoking a given attestor. |
| Gauge | `started` | `version`, `trust_domain_id` | Information about the Agent. |
| Gauge | `uptime_in_ms` | | The uptime of the Agent in milliseconds. |
| Type | Keys | Labels | Description |
|--------------|--------------------------------------------------------------------------|------------------------------|---------------------------------------------------------------------------------------|
| Call Counter | `rpc`, `<service>`, `<method>` | | Call counters over the [SPIRE Agent RPCs](<https://github.com/spiffe/spire-api-sdk>). |
| Call Counter | `agent_key_manager`, `generate_key_pair` | | The KeyManager is generating a key pair. |
| Call Counter | `agent_key_manager`, `fetch_private_key` | | The KeyManager is fetching a private key. |
| Call Counter | `agent_key_manager`, `store_private_key` | | The KeyManager is storing a private key. |
| Call Counter | `agent_svid`, `rotate` | | The Agent's SVID is being rotated. |
| Sample | `cache_manager`, `expiring_svids` | | The number of expiring SVIDs that the Cache Manager has. |
| Sample | `cache_manager`, `outdated_svids` | | The number of outdated SVIDs that the Cache Manager has. |
| Counter | `lru_cache_entry_add` | | The number of entries added to the LRU cache. |
| Counter | `lru_cache_entry_remove` | | The number of entries removed from the LRU cache. |
| Counter | `lru_cache_entry_update` | | The number of entries updated in the LRU cache. |
| Call Counter | `manager`, `sync`, `fetch_entries_updates` | | The Sync Manager is fetching entries updates. |
| Call Counter | `manager`, `sync`, `fetch_svids_updates` | | The Sync Manager is fetching SVIDs updates. |
| Call Counter | `node`, `attestor`, `new_svid` | | The Node Attestor is calling to get an SVID. |
| Gauge | `lru_cache_record_map_size` | | The total number of entries in the LRU cache records map. |
| Counter | `sds_api`, `connections` | | The SDS API has successfully established a connection. |
| Gauge | `sds_api`, `connections` | | The number of active connection that the SDS API has. |
| Gauge | `lru_cache_svid_map_size` | | The total number of SVIDs in the LRU cache SVID map. |
| Counter | `workload_api`, `bundles_update`, `jwt` | | The Workload API has successfully updated a JWT bundle. |
| Counter | `workload_api`, `connection` | | The Workload API has successfully established a new connection. |
| Gauge | `workload_api`, `connections` | | The number of active connections that the Workload API has. |
| Sample | `workload_api`, `discovered_selectors` | | The number of selectors discovered during a workload attestation process. |
| Call Counter | `workload_api`, `workload_attestation` | | The Workload API is performing a workload attestation. |
| Call Counter | `workload_api`, `workload_attestor` | `attestor` | The Workload API is invoking a given attestor. |
| Gauge | `started` | `version`, `trust_domain_id` | Information about the Agent. |
| Gauge | `uptime_in_ms` | | The uptime of the Agent in milliseconds. |
| Counter | `delegated_identity_api`, `connection` | | The Delegated Identity API has successfully established a connection. |
| Gauge | `delegated_identity_api`, `connections` | | The number of active connection that the Delegated Identity API has. |
| Latency | `delegated_identity_api`, `subscribe_x509_svid` `first_x509_svid_update` | | The latency fetching first X.509-SVID in Delegated Identity API. |

Note: These are the keys and labels that SPIRE emits, but the format of the
metric once ingested could vary depending on the metric collector. For example,
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (a *Agent) Run(ctx context.Context) error {
}

if a.c.AdminBindAddress != nil {
adminEndpoints := a.newAdminEndpoints(manager, workloadAttestor, a.c.AuthorizedDelegates)
adminEndpoints := a.newAdminEndpoints(metrics, manager, workloadAttestor, a.c.AuthorizedDelegates)
tasks = append(tasks, adminEndpoints.ListenAndServe)
}

Expand Down Expand Up @@ -276,11 +276,12 @@ func (a *Agent) newEndpoints(metrics telemetry.Metrics, mgr manager.Manager, att
})
}

func (a *Agent) newAdminEndpoints(mgr manager.Manager, attestor workload_attestor.Attestor, authorizedDelegates []string) admin_api.Server {
func (a *Agent) newAdminEndpoints(metrics telemetry.Metrics, mgr manager.Manager, attestor workload_attestor.Attestor, authorizedDelegates []string) admin_api.Server {
config := &admin_api.Config{
BindAddr: a.c.AdminBindAddress,
Manager: mgr,
Log: a.c.Log.WithField(telemetry.SubsystemName, telemetry.DebugAPI),
Log: a.c.Log,
Metrics: metrics,
TrustDomain: a.c.TrustDomain,
Uptime: uptime.Uptime,
Attestor: attestor,
Expand Down
3 changes: 3 additions & 0 deletions pkg/agent/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
attestor "github.com/spiffe/spire/pkg/agent/attestor/workload"
"github.com/spiffe/spire/pkg/agent/manager"
"github.com/spiffe/spire/pkg/common/peertracker"
"github.com/spiffe/spire/pkg/common/telemetry"
)

type Config struct {
Expand All @@ -18,6 +19,8 @@ type Config struct {

Log logrus.FieldLogger

Metrics telemetry.Metrics

// Agent trust domain
TrustDomain spiffeid.TrustDomain

Expand Down
32 changes: 32 additions & 0 deletions pkg/agent/api/delegatedidentity/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spiffe/spire/pkg/common/bundleutil"
"github.com/spiffe/spire/pkg/common/idutil"
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/spiffe/spire/pkg/common/telemetry/agent/adminapi"
"github.com/spiffe/spire/pkg/common/x509util"
"github.com/spiffe/spire/pkg/server/api"
"github.com/spiffe/spire/proto/spire/common"
Expand All @@ -39,6 +40,7 @@ type attestor interface {

type Config struct {
Log logrus.FieldLogger
Metrics telemetry.Metrics
Manager manager.Manager
Attestor workloadattestor.Attestor
AuthorizedDelegates []string
Expand All @@ -54,6 +56,7 @@ func New(config Config) *Service {
return &Service{
manager: config.Manager,
attestor: endpoints.PeerTrackerAttestor{Attestor: config.Attestor},
metrics: config.Metrics,
authorizedDelegates: AuthorizedDelegates,
}
}
Expand All @@ -64,6 +67,7 @@ type Service struct {

manager manager.Manager
attestor attestor
metrics telemetry.Metrics

// SPIFFE IDs of delegates that are authorized to use this API
authorizedDelegates map[string]bool
Expand All @@ -82,6 +86,7 @@ func (s *Service) isCallerAuthorized(ctx context.Context, log logrus.FieldLogger
}
}

log = log.WithField("delegate_selectors", callerSelectors)
entries := s.manager.MatchingRegistrationEntries(callerSelectors)
numRegisteredEntries := len(entries)

Expand All @@ -92,6 +97,7 @@ func (s *Service) isCallerAuthorized(ctx context.Context, log logrus.FieldLogger

for _, entry := range entries {
if _, ok := s.authorizedDelegates[entry.SpiffeId]; ok {
log.WithField("delegate_id", entry.SpiffeId).Debug("Caller authorized as delegate")
return callerSelectors, nil
}
}
Expand All @@ -106,8 +112,10 @@ func (s *Service) isCallerAuthorized(ctx context.Context, log logrus.FieldLogger
}

func (s *Service) SubscribeToX509SVIDs(req *delegatedidentityv1.SubscribeToX509SVIDsRequest, stream delegatedidentityv1.DelegatedIdentity_SubscribeToX509SVIDsServer) error {
latency := adminapi.StartFirstX509SVIDUpdateLatency(s.metrics)
ctx := stream.Context()
log := rpccontext.Logger(ctx)
var receivedFirstUpdate bool

cachedSelectors, err := s.isCallerAuthorized(ctx, log, nil)
if err != nil {
Expand All @@ -120,6 +128,11 @@ func (s *Service) SubscribeToX509SVIDs(req *delegatedidentityv1.SubscribeToX509S
return status.Error(codes.InvalidArgument, "could not parse provided selectors")
}

log.WithFields(logrus.Fields{
"delegate_selectors": cachedSelectors,
"request_selectors": selectors,
}).Info("Subscribing to cache changes")

subscriber, err := s.manager.SubscribeToCacheChanges(ctx, selectors)
if err != nil {
log.WithError(err).Error("Subscribe to cache changes failed")
Expand All @@ -130,6 +143,12 @@ func (s *Service) SubscribeToX509SVIDs(req *delegatedidentityv1.SubscribeToX509S
for {
select {
case update := <-subscriber.Updates():
if !receivedFirstUpdate {
// emit latency metric for first update.
latency.Measure()
receivedFirstUpdate = true
}

if _, err := s.isCallerAuthorized(ctx, log, cachedSelectors); err != nil {
return err
}
Expand All @@ -155,6 +174,19 @@ func sendX509SVIDResponse(update *cache.WorkloadUpdate, stream delegatedidentity
return err
}

log = log.WithField(telemetry.Count, len(resp.X509Svids))

// log details on each SVID
// a response has already been sent so nothing is
// blocked on this logic
for i, svid := range resp.X509Svids {
ttl := time.Until(update.Identities[i].SVID[0].NotAfter)
log.WithFields(logrus.Fields{
telemetry.SPIFFEID: svid.X509Svid.Id.String(),
telemetry.TTL: ttl.Seconds(),
}).Debug("Fetched X.509 SVID for delegated identity")
}

return nil
}

Expand Down
Loading

0 comments on commit 49ead4e

Please sign in to comment.