diff --git a/manager/csi/manager.go b/manager/csi/manager.go index c639c9df99..70baa82d3a 100644 --- a/manager/csi/manager.go +++ b/manager/csi/manager.go @@ -463,28 +463,53 @@ func (vm *Manager) deleteVolume(ctx context.Context, v *api.Volume) error { // leak. It's acceptable for now because we expect neither exceptionally long // lived managers nor exceptionally high plugin churn. func (vm *Manager) getPlugin(name string) (Plugin, error) { - // if the plugin already exists, we can just return it. - if p, ok := vm.plugins[name]; ok { - return p, nil - } - - // otherwise, we need to load the plugin. - pc, err := vm.pg.Get(name, DockerCSIPluginCap) - if err != nil { - return nil, err - } - - if pc == nil { - return nil, errors.New("driver \"" + name + "\" not found") - } - - pa, ok := pc.(mobyplugin.AddrPlugin) - if !ok { - return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr") - } - - p := vm.newPlugin(pa, vm.provider) - vm.plugins[name] = p - - return p, nil + // normalize driver name by stripping any tag (e.g. ":latest") + canon := name + if i := strings.IndexRune(name, ':'); i >= 0 { + canon = name[:i] + } + + // Fast path: exact key or canonical key already loaded + if p, ok := vm.plugins[name]; ok { + return p, nil + } + if p, ok := vm.plugins[canon]; ok { + // also alias the original name to it for future lookups + vm.plugins[name] = p + return p, nil + } + + // Try plugin getter with full name first + pc, err := vm.pg.Get(name, DockerCSIPluginCap) + if err != nil { + // retry using canonical name if different + if canon != name { + pc2, err2 := vm.pg.Get(canon, DockerCSIPluginCap) + if err2 == nil && pc2 != nil { + pc = pc2 + } + } + if pc == nil { + return nil, err + } + } + + if pc == nil { + return nil, errors.New("driver \"" + name + "\" not found") + } + + pa, ok := pc.(mobyplugin.AddrPlugin) + if !ok { + return nil, errors.New("plugin for driver \"" + name + "\" does not implement PluginAddr") + } + + // create plugin instance once + p := vm.newPlugin(pa, vm.provider) + + // store under canonical, plugin-reported, and requested names + vm.plugins[canon] = p + vm.plugins[pa.Name()] = p + vm.plugins[name] = p + + return p, nil } diff --git a/manager/csi/manager_test.go b/manager/csi/manager_test.go index 6cd2b73e3e..ba3db118dc 100644 --- a/manager/csi/manager_test.go +++ b/manager/csi/manager_test.go @@ -755,4 +755,58 @@ var _ = Describe("Manager", func() { Expect(vm.pendingVolumes.Outstanding()).To(Equal(1)) }) }) + + Describe("plugin name canonicalization", func() { + It("should reuse the same plugin instance for tagged and untagged names", func() { + pluginGetter.Plugins["plug1"] = &testutils.FakePlugin{ + PluginName: "plug1", + PluginAddr: &net.UnixAddr{ + Net: "unix", + Name: "unix:///whatever.sock", + }, + } + + node := &api.Node{ + ID: "nodeA", + Description: &api.NodeDescription{ + CSIInfo: []*api.NodeCSIInfo{{ + PluginName: "plug1", // node reports untagged name + NodeID: "plug1NodeA", + }}, + }, + } + + volume := &api.Volume{ + ID: "volumeA", + Spec: api.VolumeSpec{ + Annotations: api.Annotations{Name: "volumeA"}, + Driver: &api.Driver{ + Name: "plug1:latest", // volume uses tagged name + }, + }, + VolumeInfo: &api.VolumeInfo{ + VolumeContext: map[string]string{}, + VolumeID: "plug1VolA", + }, + PublishStatus: []*api.VolumePublishStatus{{ + NodeID: "nodeA", + State: api.VolumePublishStatus_PENDING_PUBLISH, + }}, + } + + err := s.Update(func(tx store.Tx) error { + Expect(store.CreateNode(tx, node)).To(Succeed()) + Expect(store.CreateVolume(tx, volume)).To(Succeed()) + return nil + }) + Expect(err).ToNot(HaveOccurred()) + + vm.init(context.Background()) + vm.processVolume(ctx, volume.ID, 0) + + Expect(pluginMaker.plugins).To(HaveKey("plug1")) + // verify that publish succeeded and reused same fakePlugin instance + Expect(pluginMaker.plugins["plug1"].volumesPublished[volume.ID]).To(ContainElement("nodeA")) + }) + }) })