Skip to content

Commit

Permalink
Add telemetry instrumentation for delegated identity API and add late…
Browse files Browse the repository at this point in the history
…ncy telemetry util
  • Loading branch information
chiragk25 committed Aug 1, 2023
1 parent a5050ef commit 7e11efa
Show file tree
Hide file tree
Showing 10 changed files with 247 additions and 53 deletions.
59 changes: 31 additions & 28 deletions doc/telemetry.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,33 +58,36 @@ The following metrics are emitted:

## SPIRE Agent

| Type | Keys | Labels | Description |
|--------------|--------------------------------------------|------------------------------|---------------------------------------------------------------------------------------|
| Call Counter | `rpc`, `<service>`, `<method>` | | Call counters over the [SPIRE Agent RPCs](<https://github.com/spiffe/spire-api-sdk>). |
| Call Counter | `agent_key_manager`, `generate_key_pair` | | The KeyManager is generating a key pair. |
| Call Counter | `agent_key_manager`, `fetch_private_key` | | The KeyManager is fetching a private key. |
| Call Counter | `agent_key_manager`, `store_private_key` | | The KeyManager is storing a private key. |
| Call Counter | `agent_svid`, `rotate` | | The Agent's SVID is being rotated. |
| Sample | `cache_manager`, `expiring_svids` | | The number of expiring SVIDs that the Cache Manager has. |
| Sample | `cache_manager`, `outdated_svids` | | The number of outdated SVIDs that the Cache Manager has. |
| Counter | `lru_cache_entry_add` | | The number of entries added to the LRU cache. |
| Counter | `lru_cache_entry_remove` | | The number of entries removed from the LRU cache. |
| Counter | `lru_cache_entry_update` | | The number of entries updated in the LRU cache. |
| Call Counter | `manager`, `sync`, `fetch_entries_updates` | | The Sync Manager is fetching entries updates. |
| Call Counter | `manager`, `sync`, `fetch_svids_updates` | | The Sync Manager is fetching SVIDs updates. |
| Call Counter | `node`, `attestor`, `new_svid` | | The Node Attestor is calling to get an SVID. |
| Gauge | `lru_cache_record_map_size` | | The total number of entries in the LRU cache records map. |
| Counter | `sds_api`, `connections` | | The SDS API has successfully established a connection. |
| Gauge | `sds_api`, `connections` | | The number of active connection that the SDS API has. |
| Gauge | `lru_cache_svid_map_size` | | The total number of SVIDs in the LRU cache SVID map. |
| Counter | `workload_api`, `bundles_update`, `jwt` | | The Workload API has successfully updated a JWT bundle. |
| Counter | `workload_api`, `connection` | | The Workload API has successfully established a new connection. |
| Gauge | `workload_api`, `connections` | | The number of active connections that the Workload API has. |
| Sample | `workload_api`, `discovered_selectors` | | The number of selectors discovered during a workload attestation process. |
| Call Counter | `workload_api`, `workload_attestation` | | The Workload API is performing a workload attestation. |
| Call Counter | `workload_api`, `workload_attestor` | `attestor` | The Workload API is invoking a given attestor. |
| Gauge | `started` | `version`, `trust_domain_id` | Information about the Agent. |
| Gauge | `uptime_in_ms` | | The uptime of the Agent in milliseconds. |
| Type | Keys | Labels | Description |
|--------------|--------------------------------------------------------------------------|------------------------------|---------------------------------------------------------------------------------------|
| Call Counter | `rpc`, `<service>`, `<method>` | | Call counters over the [SPIRE Agent RPCs](<https://github.com/spiffe/spire-api-sdk>). |
| Call Counter | `agent_key_manager`, `generate_key_pair` | | The KeyManager is generating a key pair. |
| Call Counter | `agent_key_manager`, `fetch_private_key` | | The KeyManager is fetching a private key. |
| Call Counter | `agent_key_manager`, `store_private_key` | | The KeyManager is storing a private key. |
| Call Counter | `agent_svid`, `rotate` | | The Agent's SVID is being rotated. |
| Sample | `cache_manager`, `expiring_svids` | | The number of expiring SVIDs that the Cache Manager has. |
| Sample | `cache_manager`, `outdated_svids` | | The number of outdated SVIDs that the Cache Manager has. |
| Counter | `lru_cache_entry_add` | | The number of entries added to the LRU cache. |
| Counter | `lru_cache_entry_remove` | | The number of entries removed from the LRU cache. |
| Counter | `lru_cache_entry_update` | | The number of entries updated in the LRU cache. |
| Call Counter | `manager`, `sync`, `fetch_entries_updates` | | The Sync Manager is fetching entries updates. |
| Call Counter | `manager`, `sync`, `fetch_svids_updates` | | The Sync Manager is fetching SVIDs updates. |
| Call Counter | `node`, `attestor`, `new_svid` | | The Node Attestor is calling to get an SVID. |
| Gauge | `lru_cache_record_map_size` | | The total number of entries in the LRU cache records map. |
| Counter | `sds_api`, `connections` | | The SDS API has successfully established a connection. |
| Gauge | `sds_api`, `connections` | | The number of active connection that the SDS API has. |
| Gauge | `lru_cache_svid_map_size` | | The total number of SVIDs in the LRU cache SVID map. |
| Counter | `workload_api`, `bundles_update`, `jwt` | | The Workload API has successfully updated a JWT bundle. |
| Counter | `workload_api`, `connection` | | The Workload API has successfully established a new connection. |
| Gauge | `workload_api`, `connections` | | The number of active connections that the Workload API has. |
| Sample | `workload_api`, `discovered_selectors` | | The number of selectors discovered during a workload attestation process. |
| Call Counter | `workload_api`, `workload_attestation` | | The Workload API is performing a workload attestation. |
| Call Counter | `workload_api`, `workload_attestor` | `attestor` | The Workload API is invoking a given attestor. |
| Gauge | `started` | `version`, `trust_domain_id` | Information about the Agent. |
| Gauge | `uptime_in_ms` | | The uptime of the Agent in milliseconds. |
| Counter | `delegated_identity_api`, `connections` | | The Delegated Identity API has successfully established a connection. |
| Gauge | `delegated_identity_api`, `connections` | | The number of active connection that the Delegated Identity API has. |
| Latency | `delegated_identity_api`, `subscribe_x509_svid` `first_x509_svid_update` | | The latency fetching first X.509-SVID in Delegated Identity API. |

Note: These are the keys and labels that SPIRE emits, but the format of the
metric once ingested could vary depending on the metric collector. For example,
Expand Down Expand Up @@ -114,4 +117,4 @@ would produce the following metrics:
```text
spire_server.rpc.agent.v1.agent.attest_agent:1|c|#status:OK
spire_server.rpc.agent.v1.agent.attest_agent.elapsed_time:1.045773|ms|#status:OK
```
```
3 changes: 3 additions & 0 deletions pkg/agent/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
attestor "github.com/spiffe/spire/pkg/agent/attestor/workload"
"github.com/spiffe/spire/pkg/agent/manager"
"github.com/spiffe/spire/pkg/common/peertracker"
"github.com/spiffe/spire/pkg/common/telemetry"
)

type Config struct {
Expand All @@ -18,6 +19,8 @@ type Config struct {

Log logrus.FieldLogger

Metrics telemetry.Metrics

// Agent trust domain
TrustDomain spiffeid.TrustDomain

Expand Down
34 changes: 34 additions & 0 deletions pkg/agent/api/delegatedidentity/v1/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/spiffe/spire/pkg/common/bundleutil"
"github.com/spiffe/spire/pkg/common/idutil"
"github.com/spiffe/spire/pkg/common/telemetry"
"github.com/spiffe/spire/pkg/common/telemetry/agent/adminapi"
"github.com/spiffe/spire/pkg/common/x509util"
"github.com/spiffe/spire/pkg/server/api"
"github.com/spiffe/spire/proto/spire/common"
Expand All @@ -39,6 +40,7 @@ type attestor interface {

type Config struct {
Log logrus.FieldLogger
Metrics telemetry.Metrics
Manager manager.Manager
Attestor workloadattestor.Attestor
AuthorizedDelegates []string
Expand All @@ -54,6 +56,7 @@ func New(config Config) *Service {
return &Service{
manager: config.Manager,
attestor: endpoints.PeerTrackerAttestor{Attestor: config.Attestor},
metrics: config.Metrics,
authorizedDelegates: AuthorizedDelegates,
}
}
Expand All @@ -64,6 +67,7 @@ type Service struct {

manager manager.Manager
attestor attestor
metrics telemetry.Metrics

// SPIFFE IDs of delegates that are authorized to use this API
authorizedDelegates map[string]bool
Expand Down Expand Up @@ -92,6 +96,10 @@ func (s *Service) isCallerAuthorized(ctx context.Context, log logrus.FieldLogger

for _, entry := range entries {
if _, ok := s.authorizedDelegates[entry.SpiffeId]; ok {
log.WithFields(logrus.Fields{
"delegate_id": entry.SpiffeId,
"delegate_selectors": callerSelectors,
}).Debug("Caller authorized as delegate")
return callerSelectors, nil
}
}
Expand All @@ -108,6 +116,8 @@ func (s *Service) isCallerAuthorized(ctx context.Context, log logrus.FieldLogger
func (s *Service) SubscribeToX509SVIDs(req *delegatedidentityv1.SubscribeToX509SVIDsRequest, stream delegatedidentityv1.DelegatedIdentity_SubscribeToX509SVIDsServer) error {
ctx := stream.Context()
log := rpccontext.Logger(ctx)
latency := adminapi.StartFirstUpdateLatency(s.metrics)
var receivedFirstUpdate bool

cachedSelectors, err := s.isCallerAuthorized(ctx, log, nil)
if err != nil {
Expand All @@ -120,6 +130,11 @@ func (s *Service) SubscribeToX509SVIDs(req *delegatedidentityv1.SubscribeToX509S
return status.Error(codes.InvalidArgument, "could not parse provided selectors")
}

log.WithFields(logrus.Fields{
"delegate_selectors": cachedSelectors,
"request_selectors": selectors,
}).Debug("Subscribing to cache changes")

subscriber, err := s.manager.SubscribeToCacheChanges(ctx, selectors)
if err != nil {
log.WithError(err).Error("Subscribe to cache changes failed")
Expand All @@ -130,6 +145,12 @@ func (s *Service) SubscribeToX509SVIDs(req *delegatedidentityv1.SubscribeToX509S
for {
select {
case update := <-subscriber.Updates():
if !receivedFirstUpdate {
// emit latency metric for first update.
latency.Measure()
receivedFirstUpdate = true
}

if _, err := s.isCallerAuthorized(ctx, log, cachedSelectors); err != nil {
return err
}
Expand All @@ -155,6 +176,19 @@ func sendX509SVIDResponse(update *cache.WorkloadUpdate, stream delegatedidentity
return err
}

log = log.WithField(telemetry.Count, len(resp.X509Svids))

// log details on each SVID
// a response has already been sent so nothing is
// blocked on this logic
for i, svid := range resp.X509Svids {
ttl := time.Until(update.Identities[i].SVID[0].NotAfter)
log.WithFields(logrus.Fields{
telemetry.SPIFFEID: svid.X509Svid.Id.String(),
telemetry.TTL: ttl.Seconds(),
}).Debug("Fetched X.509 SVID")
}

return nil
}

Expand Down
50 changes: 37 additions & 13 deletions pkg/agent/api/delegatedidentity/v1/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto"
"crypto/x509"
"errors"
"github.com/spiffe/spire/test/fakes/fakemetrics"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -68,15 +69,16 @@ func TestSubscribeToX509SVIDs(t *testing.T) {
identities[1].Entry.Hint = "external"

for _, tt := range []struct {
testName string
identities []cache.Identity
updates []*cache.WorkloadUpdate
authSpiffeID []string
expectCode codes.Code
expectMsg string
attestErr error
managerErr error
expectResp *delegatedidentityv1.SubscribeToX509SVIDsResponse
testName string
identities []cache.Identity
updates []*cache.WorkloadUpdate
authSpiffeID []string
expectCode codes.Code
expectMsg string
attestErr error
managerErr error
expectMetrics []fakemetrics.MetricItem
expectResp *delegatedidentityv1.SubscribeToX509SVIDsResponse
}{
{
testName: "attest error",
Expand Down Expand Up @@ -128,6 +130,7 @@ func TestSubscribeToX509SVIDs(t *testing.T) {
},
},
},
expectMetrics: generateSubscribeToX509SVIDMetrics(),
},
{
testName: "workload update with two identities",
Expand Down Expand Up @@ -164,15 +167,17 @@ func TestSubscribeToX509SVIDs(t *testing.T) {
},
},
},
expectMetrics: generateSubscribeToX509SVIDMetrics(),
},
{
testName: "no workload update",
authSpiffeID: []string{"spiffe://example.org/one"},
identities: []cache.Identity{
identities[0],
},
updates: []*cache.WorkloadUpdate{{}},
expectResp: &delegatedidentityv1.SubscribeToX509SVIDsResponse{},
updates: []*cache.WorkloadUpdate{{}},
expectResp: &delegatedidentityv1.SubscribeToX509SVIDsResponse{},
expectMetrics: generateSubscribeToX509SVIDMetrics(),
},
{
testName: "workload update without identity.SVID",
Expand All @@ -185,8 +190,9 @@ func TestSubscribeToX509SVIDs(t *testing.T) {
identityFromX509SVIDWithoutSVID(x509SVID1),
}},
},
expectCode: codes.Internal,
expectMsg: "could not serialize response",
expectCode: codes.Internal,
expectMsg: "could not serialize response",
expectMetrics: generateSubscribeToX509SVIDMetrics(),
},
{
testName: "workload update with identity and federated bundles",
Expand Down Expand Up @@ -217,6 +223,7 @@ func TestSubscribeToX509SVIDs(t *testing.T) {
},
FederatesWith: []string{federatedBundle1.TrustDomain().IDString()},
},
expectMetrics: generateSubscribeToX509SVIDMetrics(),
},
{
testName: "workload update with identity and two federated bundles",
Expand Down Expand Up @@ -249,17 +256,20 @@ func TestSubscribeToX509SVIDs(t *testing.T) {
FederatesWith: []string{federatedBundle1.TrustDomain().IDString(),
federatedBundle2.TrustDomain().IDString()},
},
expectMetrics: generateSubscribeToX509SVIDMetrics(),
},
} {
tt := tt
t.Run(tt.testName, func(t *testing.T) {
metrics := fakemetrics.New()
params := testParams{
CA: ca,
Identities: tt.identities,
Updates: tt.updates,
AuthSpiffeID: tt.authSpiffeID,
AttestErr: tt.attestErr,
ManagerErr: tt.managerErr,
Metrics: metrics,
}
runTest(t, params,
func(ctx context.Context, client delegatedidentityv1.DelegatedIdentityClient) {
Expand All @@ -275,6 +285,7 @@ func TestSubscribeToX509SVIDs(t *testing.T) {

spiretest.RequireGRPCStatus(t, err, tt.expectCode, tt.expectMsg)
spiretest.RequireProtoEqual(t, tt.expectResp, resp)
require.Equal(t, tt.expectMetrics, metrics.AllMetrics())
})
})
}
Expand Down Expand Up @@ -672,6 +683,7 @@ type testParams struct {
AuthSpiffeID []string
AttestErr error
ManagerErr error
Metrics *fakemetrics.FakeMetrics
}

func runTest(t *testing.T, params testParams, fn func(ctx context.Context, client delegatedidentityv1.DelegatedIdentityClient)) {
Expand All @@ -691,6 +703,7 @@ func runTest(t *testing.T, params testParams, fn func(ctx context.Context, clien
service := New(Config{
Log: log,
Manager: manager,
Metrics: params.Metrics,
AuthorizedDelegates: params.AuthSpiffeID,
})

Expand Down Expand Up @@ -854,3 +867,14 @@ func newTestCache() *cache.Cache {
log, _ := test.NewNullLogger()
return cache.New(log, trustDomain1, bundle1, telemetry.Blackhole{})
}

func generateSubscribeToX509SVIDMetrics() []fakemetrics.MetricItem {
return []fakemetrics.MetricItem{
{
Type: fakemetrics.MeasureSinceWithLabelsType,
Key: []string{telemetry.DelegatedIdentityAPI, telemetry.SubscribeX509SVIDs, telemetry.FirstX509SVIDUpdate, telemetry.ElapsedTime},
Val: 0,
Labels: []telemetry.Label{},
},
}
}
Loading

0 comments on commit 7e11efa

Please sign in to comment.