Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Node/Driver labeling mechanism #2

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (plugin *TestPlugin) CanSupport(spec *volume.Spec) bool {
return true
}

func (plugin *TestPlugin) IsMigratedToCSI() bool {
func (plugin *TestPlugin) IsMigratableToCSI() bool {
davidz627 marked this conversation as resolved.
Show resolved Hide resolved
return false
}

Expand Down
1 change: 1 addition & 0 deletions pkg/volume/csi/nodeinfomanager/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go_library(
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//vendor/github.com/container-storage-interface/spec/lib/go/csi/v0:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/kubernetes-csi/kubernetes-csi-migration-library/plugins:go_default_library",
],
)

Expand Down
36 changes: 30 additions & 6 deletions pkg/volume/csi/nodeinfomanager/nodeinfomanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"

csiPlugins "github.com/kubernetes-csi/kubernetes-csi-migration-library/plugins"
)

const (
Expand All @@ -45,6 +47,12 @@ const (

var nodeKind = v1.SchemeGroupVersion.WithKind("Node")

var migratedDrivers = map[string](func() bool){
csiPlugins.GCEPDDriverName: func() bool {
return utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigrationGCE)
},
}

// nodeInfoManager contains necessary common dependencies to update node info on both
// the Node and CSINodeInfo objects.
type nodeInfoManager struct {
Expand Down Expand Up @@ -370,6 +378,15 @@ func (nim *nodeInfoManager) createNodeInfoObject(
return err // do not wrap error
}

isMigratable := false
if driverIsMigratableFunc, ok := migratedDrivers[driverName]; ok {
isMigratable = driverIsMigratableFunc()
glog.V(4).Infof("CSI Driver %v found in migrated driver list, and migration status is %v", driverName, isMigratable)

} else {
glog.V(4).Infof("CSI Driver %v not found in migrated driver map", driverName)
}

nodeInfo := &csiv1alpha1.CSINodeInfo{
ObjectMeta: metav1.ObjectMeta{
Name: string(nim.nodeName),
Expand All @@ -384,9 +401,10 @@ func (nim *nodeInfoManager) createNodeInfoObject(
},
CSIDrivers: []csiv1alpha1.CSIDriverInfo{
{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys,
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys,
IsDriverMigratableOnNode: isMigratable,
},
},
}
Expand Down Expand Up @@ -429,11 +447,17 @@ func (nim *nodeInfoManager) updateNodeInfoObject(
}
}

isMigratable := false
if driverIsMigratableFunc, ok := migratedDrivers[driverName]; ok {
isMigratable = driverIsMigratableFunc()
davidz627 marked this conversation as resolved.
Show resolved Hide resolved
}

// Append new driver
driverInfo := csiv1alpha1.CSIDriverInfo{
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
Driver: driverName,
NodeID: driverNodeID,
TopologyKeys: topologyKeys.List(),
IsDriverMigratableOnNode: isMigratable,
}
newDriverInfos = append(newDriverInfos, driverInfo)
nodeInfo.CSIDrivers = newDriverInfos
Expand Down
6 changes: 4 additions & 2 deletions pkg/volume/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ func (pm *VolumePluginMgr) IsPluginMigratableBySpec(spec *Spec) (bool, error) {
}

if len(matches) == 0 {
return false, fmt.Errorf("no volume plugin matched")
// Not a known plugin (flex) in which case it is not migratable
return false, nil
}
if len(matches) > 1 {
return false, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
Expand All @@ -571,7 +572,8 @@ func (pm *VolumePluginMgr) IsPluginMigratableByName(name string) (bool, error) {
}
}
if len(matches) == 0 {
return false, fmt.Errorf("no volume plugin matched")
// Not a known plugin (flex) in which case it is not migratable
return false, nil
}
if len(matches) > 1 {
return false, fmt.Errorf("multiple volume plugins matched: %s", strings.Join(matchedPluginNames, ","))
Expand Down
4 changes: 4 additions & 0 deletions pkg/volume/util/operationexecutor/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,13 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/informers/externalversions:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/listers/csi/v1alpha1:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/github.com/kubernetes-csi/kubernetes-csi-migration-library:go_default_library",
],
Expand Down
155 changes: 136 additions & 19 deletions pkg/volume/util/operationexecutor/operation_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
csiapi "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
csiapiinformer "k8s.io/csi-api/pkg/client/informers/externalversions"
csiinformerlisters "k8s.io/csi-api/pkg/client/listers/csi/v1alpha1"
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
"k8s.io/kubernetes/pkg/features"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
Expand Down Expand Up @@ -65,6 +69,16 @@ type operationGenerator struct {

// blkUtil provides volume path related operations for block volume
blkUtil volumepathhandler.BlockVolumePathHandler

// csiNodeInfoLister is an informer for the CSINodeInfo CRs
// It currently can only be used on the attach/detach controller
// and only when the CSINodeInfo CRD is installed
csiNodeInfoLister csiinformerlisters.CSINodeInfoLister

// csiNodeInfoHasSynced returns true if the informer is running and is synced
// It currently can only be used on the attach/detach controller
// and only when the CSINodeInfo CRD is installed
csiNodeInfoHasSynced func() bool
}

// NewOperationGenerator is returns instance of operationGenerator
Expand All @@ -73,14 +87,29 @@ func NewOperationGenerator(kubeClient clientset.Interface,
recorder record.EventRecorder,
checkNodeCapabilitiesBeforeMount bool,
blkUtil volumepathhandler.BlockVolumePathHandler) OperationGenerator {

return &operationGenerator{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
recorder: recorder,
og := &operationGenerator{
kubeClient: kubeClient,
volumePluginMgr: volumePluginMgr,
recorder: recorder,
checkNodeCapabilitiesBeforeMount: checkNodeCapabilitiesBeforeMount,
blkUtil: blkUtil,

blkUtil: blkUtil,
}

if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
csiKubeClient := volumePluginMgr.Host.GetCSIClient()
if csiKubeClient == nil {
glog.Warningf("The client for CSI Custom Resources is not available, skipping informer initialization")
} else {
factory := csiapiinformer.NewSharedInformerFactory(csiKubeClient, 1*time.Minute)
csiNodeInfos := factory.Csi().V1alpha1().CSINodeInfos()
og.csiNodeInfoHasSynced = csiNodeInfos.Informer().HasSynced
og.csiNodeInfoLister = csiNodeInfos.Lister()
go factory.Start(wait.NeverStop)
}
}

return og
}

// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
Expand Down Expand Up @@ -300,8 +329,13 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
}

originalSpec := volumeToAttach.VolumeSpec
// isMigrated will check both CSIMigration and the plugin specific feature gate
if isMigrated(og.volumePluginMgr, volumeToAttach.VolumeSpec) {

// isMigratable will check both CSIMigration and the plugin specific feature gate and Node Migration
im, err := isMigratable(og, volumeToAttach.VolumeSpec, volumeToAttach.VolumeName, volumeToAttach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToAttach.GenerateErrorDetailed("AttachVolume.IsMigratable failed", err)
}
if im {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
Expand Down Expand Up @@ -392,17 +426,21 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(

if volumeToDetach.VolumeSpec != nil {
// Get attacher plugin
// isMigrated will check both CSIMigration and the plugin specific feature gate
if isMigrated(og.volumePluginMgr, volumeToDetach.VolumeSpec) {
// isMigratable will check both CSIMigration and the plugin specific feature gate
im, err := isMigratable(og, volumeToDetach.VolumeSpec, volumeToDetach.VolumeName, volumeToDetach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.IsMigratable failed", err)
}
if im {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.FindAttachablePluginBySpec failed", err)
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.FindAttachablePluginBySpec failed", err)
}

csiSpec, err := translateSpec(volumeToDetach.VolumeSpec)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("AttachVolume.TranslateSpec failed", err)
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.TranslateSpec failed", err)
}

volumeToDetach.VolumeSpec = csiSpec
Expand Down Expand Up @@ -430,8 +468,12 @@ func (og *operationGenerator) GenerateDetachVolumeFunc(

// TODO(dyzz): This case can't distinguish between PV and In-line which is necessary because
// if it was PV it may have been migrated, but the same plugin with in-line may not have been.
// Suggestions welcome...
if csiMigration.IsMigratedByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) {
// fixing this depends on CSI in-line volumes implementation: PR #68232
inm, err := isNodeMigratable(og, pluginName, volumeToDetach.NodeName)
if err != nil {
return volumetypes.GeneratedOperations{}, volumeToDetach.GenerateErrorDetailed("DetachVolume.IsNodeMigratable failed", err)
}
if csiMigration.IsMigratedByName(pluginName) && utilfeature.DefaultFeatureGate.Enabled(features.CSIMigration) && inm {
// The volume represented by this spec is CSI and thus should be migrated
attachableVolumePlugin, err = og.volumePluginMgr.FindAttachablePluginByName(csi.CSIPluginName)
if err != nil || attachableVolumePlugin == nil {
Expand Down Expand Up @@ -1504,14 +1546,89 @@ func isDeviceOpened(deviceToDetach AttachedVolume, mounter mount.Interface) (boo
return deviceOpened, nil
}

func isMigrated(vpm *volume.VolumePluginMgr, spec *volume.Spec) bool {
func isNodeMigratable(og *operationGenerator, pluginName string, nodeName types.NodeName) (bool, error) {
// TODO(dyzz): Write a test that runs with feature flags on, tries to do an inline volume with a migratable volume but without
// driver installed. Should get the correct error message. Then clean up. Try to do a migratable volume but WITH driver installed,
// should succeed (even better if we can check that the driver did something).
var err error
var nodeInfo *csiapi.CSINodeInfo

pm, err := og.volumePluginMgr.IsPluginMigratableByName(pluginName)
if err != nil {
return false, err
}

if !pm {
// Feature flags aren't on so migration as a whole is just turned off
return false, nil
}

if !utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
return false, fmt.Errorf("failed to check if node migrated, CSINodeInfo feature gate not enabled")
}

if og.csiNodeInfoHasSynced() {
nodeInfo, err = og.csiNodeInfoLister.Get(string(nodeName))
if err != nil {
return false, fmt.Errorf("failed to get CSI node info for node %v from informer: %v", string(nodeName), err)
}
} else {
glog.Warningf("CSINodeInfo informer not synced, please check that CSINodeInfo CRD is installed. If so, this warning should not appear again after ~1 minute")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is still some crazy race there: CRD gets installed, informer is (slowly) syncing and it gets an old CSINodeInfo. User/kubelet changes CSINodeInfo, the informer is not synced yet and the volume plugin does API GET at this line. It gets new CSINodeInfo.

Now, the informer finishes sync and it still has old CSINodeInfo, with the new one enqueued (or waiting somewhere in API server). Second call to this function does informer.Get(), which returns the old value. It seems the plugin goes back in time.

In this particular case, the informer has old CSINodeInfo for very short time (depending on API server connectivity), so it perhaps does not really matter...

Can we just return error here and kubelet / A/D controller re-tries in a while? Frankly, I think that whole CSI volume plugin should return errors in all calls until the informers are synced.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little confused about the race you described. Are you saying that if informer is synced it may have stale API objects? I was under the impression that synced informer means that any GET to the informer will guarantee to be the freshest object, otherwise what's the point in having an inconsistent cache?

The only two options I see here in the current code are:

  1. Informer synced -> get from informer (freshest object)
  2. Informer not yet synced -> get from API Server (freshest object)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, informers can return old data. It does List + Watch and the watch can have delay on network. That's OK. Informer will let you know when it gets new data.

GET from API server can get old data too - at the time your client parses the data there may be already new on the server. This time, you don't get event when it changes.

In this particular case, GET from API server could be faster than initial List + Watch and return something newer than the informer. Under "normal" conditions the informer syncs very quickly, so this discussion is more or less academical.

// Fallback to a GET
nodeInfo, err = og.volumePluginMgr.Host.GetCSIClient().CsiV1alpha1().CSINodeInfos().Get(string(nodeName), metav1.GetOptions{})
if err != nil {
return false, fmt.Errorf("failed to get CSI node info for node %v: %v", string(nodeName), err)
}
}

driverName, err := csiMigration.GetCSINameFromIntreeName(pluginName)
if err != nil {
return false, err
}

for _, driver := range nodeInfo.CSIDrivers {
if driver.Driver == driverName {
return driver.IsDriverMigratableOnNode, nil
}
}
// The plugin is migrated but the driver is not installed
return false, fmt.Errorf("plugin %v is migratable but driver %v is not installed. Please install the driver and retry", pluginName, driverName)
}

func isMigratable(og *operationGenerator, spec *volume.Spec, uniqueVolumeName v1.UniqueVolumeName, nodeName types.NodeName) (bool, error) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to isVolumeMigratable? Or isVolumePluginMigratable? It's confusing to see isMigratable and isNodeMigratable.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am doing large scale fix for all naming to be consistent with #2 (comment).

thanks for pointing this out.

// IsPluginMigratableBySpec tests whether feature flags are on
pm, err := og.volumePluginMgr.IsPluginMigratableBySpec(spec)
if err != nil {
return false, err
}

if !pm {
// Feature flags aren't on so migration as a whole is just turned off
return false, nil
}

pluginName, _, err := util.SplitUniqueName(uniqueVolumeName)
if err != nil {
return false, fmt.Errorf("failed to split unique name %v: %v", uniqueVolumeName, err)
}
driverName, err := csiMigration.GetCSINameFromIntreeName(pluginName)
if err != nil {
return false, err
}

if csiMigration.IsPVMigrated(spec.PersistentVolume) || csiMigration.IsInlineMigrated(spec.Volume) {
migratable, err := vpm.IsPluginMigratableBySpec(spec)
if err == nil && migratable {
return true
inm, err := isNodeMigratable(og, pluginName, nodeName)
if err != nil {
return false, fmt.Errorf("failed to check if driver migrated on node: %v", err)
}
if inm {
return true, nil
}
}
return false
// If feature flags are on but we're not migratable for some other reason
// it is an error and a good error about installing the driver should be thrown
return false, fmt.Errorf("plugin %v is migratable but driver %v is not installed. Please install the driver and retry", pluginName, driverName)

}

func translateSpec(spec *volume.Spec) (*volume.Spec, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
if utilfeature.DefaultFeatureGate.Enabled(features.CSIDriverRegistry) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csidrivers").RuleOrDie())
}
if utilfeature.DefaultFeatureGate.Enabled(features.CSINodeInfo) {
role.Rules = append(role.Rules, rbacv1helpers.NewRule("get", "watch", "list").Groups("csi.storage.k8s.io").Resources("csinodeinfos").RuleOrDie())
}
}

return role
Expand Down
6 changes: 6 additions & 0 deletions staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ type CSIDriverInfo struct {
// determine which labels it should retrieve from the node object and pass
// back to the driver.
TopologyKeys []string `json:"topologyKeys"`

// IsDriverMigratableOnNode is a boolean representing whether the node supports
// migration to CSI for this driver. It is used by storage controllers on
// the control plane to determine whether to use a migrated version of the
// plugin for this driver on this node.
IsDriverMigratableOnNode bool `json:"isDriverMigratableOnNode"`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. should you regenerate the clients?

  2. I'd use DriverMigrated (or VolumePluginMigrated to emphasize what's migrated)

  • OnNode it's implied by CSINodeInfo. The other field is no TopologyKeysOnNode, but TopologyKeys.
  • We don't use Is prefix for booleans, like AttachRequired or ReadOnly.
  1. Again, "Migratable" vs "Migrated". Does this field say that now CSI must be used (= driver is migrated, don't attach the old way) or may be used (=migratable, attach any way you want, kubelet will handle that)?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I think the naming has been a little inconsistent.

Going forwards I will refer to anything related to feature flags as "Migrated"
whereas the fact that translation library exists is "Migratable"

The difference being that feature flags is something the admin sets to choose to migrate so it is Migrated. Whereas there is another concept of - does the logic to do this exist (we have translation logic for this plugin) - is Migratable.

Please flag any incorrect usage in code reviews

@ddebroy @leakingtapan

}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down