Skip to content

Commit

Permalink
event stream: add events for CSI volumes and plugins
Browse files Browse the repository at this point in the history
Adds new topics to the event stream for CSI volumes and CSI plugins. We'll emit
event when either is created or deleted, and when CSI volumes are claimed.
  • Loading branch information
tgross committed Dec 19, 2024
1 parent 7d86532 commit a0f3b46
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 75 deletions.
3 changes: 3 additions & 0 deletions .changelog/24724.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
csi: Added CSI volume and plugin events to the event stream
```
71 changes: 71 additions & 0 deletions nomad/state/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ var MsgTypeEvents = map[structs.MessageType]string{
structs.ServiceRegistrationUpsertRequestType: structs.TypeServiceRegistration,
structs.ServiceRegistrationDeleteByIDRequestType: structs.TypeServiceDeregistration,
structs.ServiceRegistrationDeleteByNodeIDRequestType: structs.TypeServiceDeregistration,
structs.CSIVolumeRegisterRequestType: structs.TypeCSIVolumeRegistered,
structs.CSIVolumeDeregisterRequestType: structs.TypeCSIVolumeDeregistered,
structs.CSIVolumeClaimRequestType: structs.TypeCSIVolumeClaim,
}

func eventsFromChanges(tx ReadTxn, changes Changes) *structs.Events {
Expand Down Expand Up @@ -181,6 +184,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Service: before,
},
}, true
case TableCSIVolumes:
before, ok := change.Before.(*structs.CSIVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIVolume,
Key: before.ID,
FilterKeys: []string{
before.ID,
before.Name,
before.PluginID,
},
Namespace: before.Namespace,
Payload: &structs.CSIVolumeEvent{
Volume: before,
},
}, true
case TableCSIPlugins:
// note: there is no CSIPlugin event type, because CSI plugins don't
// have their own write RPCs; they are always created/removed via
// node updates
before, ok := change.Before.(*structs.CSIPlugin)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIPlugin,
Key: before.ID,
FilterKeys: []string{before.ID},
Payload: &structs.CSIPluginEvent{
Plugin: before,
},
}, true
}
return structs.Event{}, false
}
Expand Down Expand Up @@ -358,6 +395,40 @@ func eventFromChange(change memdb.Change) (structs.Event, bool) {
Service: after,
},
}, true
case TableCSIVolumes:
after, ok := change.After.(*structs.CSIVolume)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIVolume,
Key: after.ID,
FilterKeys: []string{
after.ID,
after.Name,
after.PluginID,
},
Namespace: after.Namespace,
Payload: &structs.CSIVolumeEvent{
Volume: after,
},
}, true
case TableCSIPlugins:
// note: there is no CSIPlugin event type, because CSI plugins don't
// have their own write RPCs; they are always created/removed via
// node updates
after, ok := change.After.(*structs.CSIPlugin)
if !ok {
return structs.Event{}, false
}
return structs.Event{
Topic: structs.TopicCSIPlugin,
Key: after.ID,
FilterKeys: []string{after.ID},
Payload: &structs.CSIPluginEvent{
Plugin: after,
},
}, true
}

return structs.Event{}, false
Expand Down
78 changes: 78 additions & 0 deletions nomad/state/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,84 @@ func Test_eventsFromChanges_ACLBindingRule(t *testing.T) {
must.Eq(t, bindingRule, receivedDeleteChange.Events[0].Payload.(*structs.ACLBindingRuleEvent).ACLBindingRule)
}

func TestEvents_CSIVolumes(t *testing.T) {
ci.Parallel(t)
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer store.StopEventBroker()

index, err := store.LatestIndex()
must.NoError(t, err)

plugin := mock.CSIPlugin()
vol := mock.CSIVolume(plugin)

index++
must.NoError(t, store.UpsertCSIVolume(index, []*structs.CSIVolume{vol}))

alloc := mock.Alloc()
index++
store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc})

claim := &structs.CSIVolumeClaim{
AllocationID: alloc.ID,
NodeID: uuid.Generate(),
Mode: structs.CSIVolumeClaimGC,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateReadyToFree,
}
index++
must.NoError(t, store.CSIVolumeClaim(index, time.Now().UnixNano(), vol.Namespace, vol.ID, claim))

index++
must.NoError(t, store.CSIVolumeDeregister(index, vol.Namespace, []string{vol.ID}, false))

events := WaitForEvents(t, store, 0, 3, 1*time.Second)
must.Len(t, 3, events)
must.Eq(t, "CSIVolume", events[0].Topic)
must.Eq(t, "CSIVolumeRegistered", events[0].Type)
must.Eq(t, "CSIVolume", events[1].Topic)
must.Eq(t, "CSIVolumeClaim", events[1].Type)
must.Eq(t, "CSIVolume", events[2].Topic)
must.Eq(t, "CSIVolumeDeregistered", events[2].Type)

}

func TestEvents_CSIPlugins(t *testing.T) {
ci.Parallel(t)
store := TestStateStoreCfg(t, TestStateStorePublisher(t))
defer store.StopEventBroker()

index, err := store.LatestIndex()
must.NoError(t, err)

node := mock.Node()
plugin := mock.CSIPlugin()

index++
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node))

node = node.Copy()
node.CSINodePlugins = map[string]*structs.CSIInfo{
plugin.ID: {
PluginID: plugin.ID,
Healthy: true,
UpdateTime: time.Now(),
},
}
index++
must.NoError(t, store.UpsertNode(structs.NodeRegisterRequestType, index, node))

events := WaitForEvents(t, store, 0, 3, 1*time.Second)
must.Len(t, 3, events)
must.Eq(t, "Node", events[0].Topic)
must.Eq(t, "NodeRegistration", events[0].Type)
must.Eq(t, "Node", events[1].Topic)
must.Eq(t, "NodeRegistration", events[1].Type)
must.Eq(t, "CSIPlugin", events[2].Topic)
must.Eq(t, "NodeRegistration", events[2].Type)
}

func requireNodeRegistrationEventEqual(t *testing.T, want, got structs.Event) {
t.Helper()

Expand Down
6 changes: 4 additions & 2 deletions nomad/state/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ const (
TableAllocs = "allocs"
TableJobSubmission = "job_submission"
TableHostVolumes = "host_volumes"
TableCSIVolumes = "csi_volumes"
TableCSIPlugins = "csi_plugins"
)

const (
Expand Down Expand Up @@ -1150,7 +1152,7 @@ func clusterMetaTableSchema() *memdb.TableSchema {
// CSIVolumes are identified by id globally, and searchable by driver
func csiVolumeTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "csi_volumes",
Name: TableCSIVolumes,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Expand Down Expand Up @@ -1182,7 +1184,7 @@ func csiVolumeTableSchema() *memdb.TableSchema {
// CSIPlugins are identified by id globally, and searchable by driver
func csiPluginTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "csi_plugins",
Name: TableCSIPlugins,
Indexes: map[string]*memdb.IndexSchema{
"id": {
Name: "id",
Expand Down
Loading

0 comments on commit a0f3b46

Please sign in to comment.