diff --git a/pkg/synchromanager/clustersynchro/cluster_monitor.go b/pkg/synchromanager/clustersynchro/cluster_monitor.go index 914f36a82..1cd013e61 100644 --- a/pkg/synchromanager/clustersynchro/cluster_monitor.go +++ b/pkg/synchromanager/clustersynchro/cluster_monitor.go @@ -8,7 +8,8 @@ import ( apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" "k8s.io/klog/v2" clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2" @@ -36,7 +37,9 @@ func (synchro *ClusterSynchro) checkClusterHealthy() { defer synchro.updateStatus() lastReadyCondition := synchro.healthyCondition.Load().(metav1.Condition) - if ready, err := checkKubeHealthy(synchro.clusterclient); !ready { + ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) + defer cancel() + if ready, err := synchro.healthChecker.Ready(ctx); !ready { // if the last status was not ConditionTrue, stop resource synchros if lastReadyCondition.Status != metav1.ConditionTrue { synchro.stopRunner() @@ -86,17 +89,25 @@ func (synchro *ClusterSynchro) checkClusterHealthy() { synchro.healthyCondition.Store(condition) } -// TODO(iceber): resolve for more detailed error -func checkKubeHealthy(client kubernetes.Interface) (bool, error) { - ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second) - defer cancel() +type healthChecker struct { + client rest.Interface +} - _, err := client.Discovery().RESTClient().Get().AbsPath("/readyz").DoRaw(ctx) - if apierrors.IsNotFound(err) { - _, err = client.Discovery().RESTClient().Get().AbsPath("/healthz").DoRaw(ctx) - } +func newHealthChecker(config *rest.Config) (*healthChecker, error) { + client, err := discovery.NewDiscoveryClientForConfig(config) if err != nil { - return false, err + return nil, err + } + return &healthChecker{ + client: client.RESTClient(), + }, nil +} + +// TODO(iceber): resolve for more detailed error +func (checker *healthChecker) Ready(ctx context.Context) (bool, error) { + _, err := checker.client.Get().AbsPath("/readyz").DoRaw(ctx) + if apierrors.IsNotFound(err) { + _, err = checker.client.Get().AbsPath("/healthz").DoRaw(ctx) } - return true, nil + return err == nil, err } diff --git a/pkg/synchromanager/clustersynchro/cluster_synchro.go b/pkg/synchromanager/clustersynchro/cluster_synchro.go index 203872519..4552194e7 100644 --- a/pkg/synchromanager/clustersynchro/cluster_synchro.go +++ b/pkg/synchromanager/clustersynchro/cluster_synchro.go @@ -3,13 +3,14 @@ package clustersynchro import ( "context" "fmt" + "net" "sync" "sync/atomic" + "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -18,6 +19,8 @@ import ( "github.com/clusterpedia-io/clusterpedia/pkg/storage" "github.com/clusterpedia-io/clusterpedia/pkg/storageconfig" "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer" + "github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features" + clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature" ) type ClusterSynchro struct { @@ -27,7 +30,7 @@ type ClusterSynchro struct { ClusterStatusUpdater ClusterStatusUpdater storage storage.StorageFactory - clusterclient kubernetes.Interface + healthChecker *healthChecker dynamicDiscovery discovery.DynamicDiscoveryInterface listerWatcherFactory informer.DynamicListerWatcherFactory @@ -64,11 +67,6 @@ type ClusterStatusUpdater interface { type RetryableError error func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater) (*ClusterSynchro, error) { - clusterclient, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, fmt.Errorf("failed to create a cluster client: %w", err) - } - dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config) if err != nil { return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err)) @@ -84,13 +82,25 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat return nil, fmt.Errorf("failed to create lister watcher factory: %w", err) } + checkerConfig := *config + if clusterpediafeature.FeatureGate.Enabled(features.HealthCheckerWithStandaloneTCP) { + checkerConfig.Dial = (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext + } + healthChecker, err := newHealthChecker(&checkerConfig) + if err != nil { + return nil, fmt.Errorf("failed to create a cluster health checker: %w", err) + } + synchro := &ClusterSynchro{ name: name, RESTConfig: config, ClusterStatusUpdater: updater, storage: storage, - clusterclient: clusterclient, + healthChecker: healthChecker, dynamicDiscovery: dynamicDiscovery, listerWatcherFactory: listWatchFactory, diff --git a/pkg/synchromanager/features/features.go b/pkg/synchromanager/features/features.go index 5ce56ec0b..e46f85d01 100644 --- a/pkg/synchromanager/features/features.go +++ b/pkg/synchromanager/features/features.go @@ -40,6 +40,11 @@ const ( // owner: @iceber // alpha: v0.3.0 AllowSyncAllResources featuregate.Feature = "AllowSyncAllResources" + + // HealthCheckerWithStandaloneTCP is a feature gate for the cluster health checker to use standalone tcp + // owner: @iceber + // alpha: v0.6.0 + HealthCheckerWithStandaloneTCP featuregate.Feature = "HealthCheckerWithStandaloneTCP" ) func init() { @@ -49,8 +54,9 @@ func init() { // defaultClusterSynchroManagerFeatureGates consists of all known clustersynchro-manager-specific feature keys. // To add a new feature, define a key for it above and add it here. var defaultClusterSynchroManagerFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{ - PruneManagedFields: {Default: true, PreRelease: featuregate.Beta}, - PruneLastAppliedConfiguration: {Default: true, PreRelease: featuregate.Beta}, - AllowSyncAllCustomResources: {Default: false, PreRelease: featuregate.Alpha}, - AllowSyncAllResources: {Default: false, PreRelease: featuregate.Alpha}, + PruneManagedFields: {Default: true, PreRelease: featuregate.Beta}, + PruneLastAppliedConfiguration: {Default: true, PreRelease: featuregate.Beta}, + AllowSyncAllCustomResources: {Default: false, PreRelease: featuregate.Alpha}, + AllowSyncAllResources: {Default: false, PreRelease: featuregate.Alpha}, + HealthCheckerWithStandaloneTCP: {Default: false, PreRelease: featuregate.Alpha}, }