Skip to content

Commit

Permalink
event stream: add events for dynamic host volumes
Browse files Browse the repository at this point in the history
Add a new topic to the event stream for host volumes. We'll emit events when a
dynamic host volume is registered or deregistered, and whenever a node
fingerprints with a changed volume.

Ref: https://hashicorp.atlassian.net/browse/NET-11549
  • Loading branch information
tgross committed Dec 19, 2024
1 parent 7d86532 commit 0ee7051
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 43 deletions.
2 changes: 1 addition & 1 deletion nomad/state/deployment_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func WaitForEvents(t *testing.T, s *StateStore, index uint64, minEvents int, tim
}
maxAttempts--
if maxAttempts == 0 {
require.Failf(t, "reached max attempts waiting for desired event count", "count %d", len(got))
require.Failf(t, "reached max attempts waiting for desired event count", "count %d got: %+v", len(got), got)
}
time.Sleep(10 * time.Millisecond)
}
Expand Down
38 changes: 38 additions & 0 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
structs.HostVolumeRegisterRequestType: structs.TypeHostVolumeRegistered,
structs.HostVolumeDeleteRequestType: structs.TypeHostVolumeDeleted,
}

func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
Expand Down Expand Up @@ -181,6 +183,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Service: before,
},
}, true
case TableHostVolumes:
before, ok := change.Before.(*structs.HostVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicHostVolume,
Key: before.ID,
FilterKeys: []string{
before.ID,
before.Name,
before.PluginID,
},
Namespace: before.Namespace,
Payload: &structs.HostVolumeEvent{
Volume: before,
},
}, true
}
return structs.Event{}, false
}
Expand Down Expand Up @@ -358,6 +378,24 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Service: after,
},
}, true
case TableHostVolumes:
after, ok := change.After.(*structs.HostVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicHostVolume,
Key: after.ID,
FilterKeys: []string{
after.ID,
after.Name,
after.PluginID,
},
Namespace: after.Namespace,
Payload: &structs.HostVolumeEvent{
Volume: after,
},
}, true
}

return structs.Event{}, false
Expand Down
43 changes: 43 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,49 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
}

func TestEvents_HostVolumes(t *testing.T) {

ci.Parallel(t)
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer store.StopEventBroker()

index, err := store.LatestIndex()
must.NoError(t, err)

node := mock.Node()
index++
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node, NodeUpsertWithNodePool))

vol := mock.HostVolume()
vol.NodeID = node.ID
index++
must.NoError(t, store.UpsertHostVolume(index, vol))

node = node.Copy()
node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{vol.Name: {
Name: vol.Name,
Path: "/var/nomad/alloc_mounts" + uuid.Generate(),
}}
index++
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node, NodeUpsertWithNodePool))

index++
must.NoError(t, store.DeleteHostVolume(index, vol.Namespace, vol.ID))

events := WaitForEvents(t, store, 0, 5, 1*time.Second)
must.Len(t, 5, events)
must.Eq(t, "Node", events[0].Topic)
must.Eq(t, "NodeRegistration", events[0].Type)
must.Eq(t, "HostVolume", events[1].Topic)
must.Eq(t, "HostVolumeRegistered", events[1].Type)
must.Eq(t, "Node", events[2].Topic)
must.Eq(t, "NodeRegistration", events[2].Type)
must.Eq(t, "HostVolume", events[3].Topic)
must.Eq(t, "NodeRegistration", events[3].Type)
must.Eq(t, "HostVolume", events[4].Topic)
must.Eq(t, "HostVolumeDeleted", events[4].Type)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

Expand Down
4 changes: 2 additions & 2 deletions nomad/state/state_store_host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (s *StateStore) HostVolumeByID(ws memdb.WatchSet, ns, id string, withAllocs

// UpsertHostVolume upserts a host volume
func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(structs.HostVolumeRegisterRequestType, index)
defer txn.Abort()

if exists, err := s.namespaceExists(txn, vol.Namespace); err != nil {
Expand Down Expand Up @@ -117,7 +117,7 @@ func (s *StateStore) UpsertHostVolume(index uint64, vol *structs.HostVolume) err

// DeleteHostVolume deletes a host volume
func (s *StateStore) DeleteHostVolume(index uint64, ns string, id string) error {
txn := s.db.WriteTxn(index)
txn := s.db.WriteTxnMsgT(structs.HostVolumeDeleteRequestType, index)
defer txn.Abort()

obj, err := txn.First(TableHostVolumes, indexID, ns, id)
Expand Down
4 changes: 4 additions & 0 deletions nomad/stream/event_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,10 @@ func aclAllowsSubscription(aclObj *acl.ACL, subReq *SubscribeRequest) bool {
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityReadJob); !ok {
return false
}
case structs.TopicHostVolume:
if ok := aclObj.AllowNsOp(subReq.Namespace, acl.NamespaceCapabilityHostVolumeRead); !ok {
return false
}
case structs.TopicNode:
if ok := aclObj.AllowNodeRead(); !ok {
return false
Expand Down
9 changes: 9 additions & 0 deletions nomad/structs/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
TopicACLAuthMethod Topic = "ACLAuthMethod"
TopicACLBindingRule Topic = "ACLBindingRule"
TopicService Topic = "Service"
TopicHostVolume Topic = "HostVolume"
TopicAll Topic = "*"

TypeNodeRegistration = "NodeRegistration"
Expand Down Expand Up @@ -63,6 +64,8 @@ const (
TypeACLBindingRuleDeleted = "ACLBindingRuleDeleted"
TypeServiceRegistration = "ServiceRegistration"
TypeServiceDeregistration = "ServiceDeregistration"
TypeHostVolumeRegistered = "HostVolumeRegistered"
TypeHostVolumeDeleted = "HostVolumeDeleted"
)

// Event represents a change in Nomads state.
Expand Down Expand Up @@ -188,3 +191,9 @@ type ACLAuthMethodEvent struct {
type ACLBindingRuleEvent struct {
ACLBindingRule *ACLBindingRule
}

// HostVolumeEvent holds a newly updated or deleted dynamic host volume to be
// used as an event in the event stream
type HostVolumeEvent struct {
Volume *HostVolume
}
85 changes: 45 additions & 40 deletions website/content/api-docs/events.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ the nature of this endpoint individual topics require specific policies.
Note that if you do not include a `topic` parameter all topics will be included
by default, requiring a management token.

| Topic | ACL Required |
| ------------ | -------------------- |
| `*` | `management` |
| `ACLToken` | `management` |
| `ACLPolicy` | `management` |
| `ACLRole` | `management` |
| `Job` | `namespace:read-job` |
| `Allocation` | `namespace:read-job` |
| `Deployment` | `namespace:read-job` |
| `Evaluation` | `namespace:read-job` |
| `Node` | `node:read` |
| `NodePool` | `management` |
| `Service` | `namespace:read-job` |
| Topic | ACL Required |
|--------------|------------------------------|
| `*` | `management` |
| `ACLPolicy` | `management` |
| `ACLRole` | `management` |
| `ACLToken` | `management` |
| `Allocation` | `namespace:read-job` |
| `Deployment` | `namespace:read-job` |
| `Evaluation` | `namespace:read-job` |
| `HostVolume` | `namespace:host-volume-read` |
| `Job` | `namespace:read-job` |
| `NodePool` | `management` |
| `Node` | `node:read` |
| `Service` | `namespace:read-job` |

### Parameters

Expand All @@ -65,50 +66,54 @@ by default, requiring a management token.

### Event Topics

| Topic | Output |
| ---------- | ------------------------------- |
| ACLToken | ACLToken |
| ACLPolicy | ACLPolicy |
| ACLRoles | ACLRole |
| Allocation | Allocation (no job information) |
| Job | Job |
| Evaluation | Evaluation |
| Deployment | Deployment |
| Node | Node |
| NodeDrain | Node |
| NodePool | NodePool |
| Service | Service Registrations |
| Topic | Output |
|------------|----------------------------------------|
| ACLPolicy | ACLPolicy |
| ACLRoles | ACLRole |
| ACLToken | ACLToken |
| Allocation | Allocation (no job information) |
| Deployment | Deployment |
| Evaluation | Evaluation |
| HostVolume | HostVolume (dynamic host volumes only) |
| Job | Job |
| Node | Node |
| NodeDrain | Node |
| NodePool | NodePool |
| Service | Service Registrations |

### Event Types

| Type |
| ----------------------------- |
| ACLTokenUpserted |
| ACLTokenDeleted |
| ACLPolicyUpserted |
|-------------------------------|
| ACLPolicyDeleted |
| ACLRoleUpserted |
| ACLPolicyUpserted |
| ACLRoleDeleted |
| ACLRoleUpserted |
| ACLTokenDeleted |
| ACLTokenUpserted |
| AllocationCreated |
| AllocationUpdated |
| AllocationUpdateDesiredStatus |
| DeploymentStatusUpdate |
| DeploymentPromotion |
| AllocationUpdated |
| DeploymentAllocHealth |
| DeploymentPromotion |
| DeploymentStatusUpdate |
| EvaluationUpdated |
| JobRegistered |
| JobDeregistered |
| HostVolumeDeleted |
| HostVolumeRegistered |
| JobBatchDeregistered |
| NodeRegistration |
| JobDeregistered |
| JobRegistered |
| NodeDeregistration |
| NodeEligibility |
| NodeDrain |
| NodeEligibility |
| NodeEvent |
| NodePoolUpserted |
| NodePoolDeleted |
| NodePoolUpserted |
| NodeRegistration |
| PlanResult |
| ServiceRegistration |
| ServiceDeregistration |
| ServiceRegistration |


### Sample Request

Expand Down

0 comments on commit 0ee7051

Please sign in to comment.