Skip to content

Commit

Permalink
use standalone tcp for health checker
Browse files Browse the repository at this point in the history
Signed-off-by: Iceber Gu <wei.cai-nat@daocloud.io>
  • Loading branch information
Iceber committed Nov 17, 2022
1 parent a969442 commit d08c4b3
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 24 deletions.
35 changes: 23 additions & 12 deletions pkg/synchromanager/clustersynchro/cluster_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

clusterv1alpha2 "github.com/clusterpedia-io/api/cluster/v1alpha2"
Expand Down Expand Up @@ -36,7 +37,9 @@ func (synchro *ClusterSynchro) checkClusterHealthy() {
defer synchro.updateStatus()
lastReadyCondition := synchro.healthyCondition.Load().(metav1.Condition)

if ready, err := checkKubeHealthy(synchro.clusterclient); !ready {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
if ready, err := synchro.healthChecker.Ready(ctx); !ready {
// if the last status was not ConditionTrue, stop resource synchros
if lastReadyCondition.Status != metav1.ConditionTrue {
synchro.stopRunner()
Expand Down Expand Up @@ -86,17 +89,25 @@ func (synchro *ClusterSynchro) checkClusterHealthy() {
synchro.healthyCondition.Store(condition)
}

// TODO(iceber): resolve for more detailed error
func checkKubeHealthy(client kubernetes.Interface) (bool, error) {
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
type healthChecker struct {
client rest.Interface
}

_, err := client.Discovery().RESTClient().Get().AbsPath("/readyz").DoRaw(ctx)
if apierrors.IsNotFound(err) {
_, err = client.Discovery().RESTClient().Get().AbsPath("/healthz").DoRaw(ctx)
}
func newHealthChecker(config *rest.Config) (*healthChecker, error) {
client, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return false, err
return nil, err
}
return &healthChecker{
client: client.RESTClient(),
}, nil
}

// TODO(iceber): resolve for more detailed error
func (checker *healthChecker) Ready(ctx context.Context) (bool, error) {
_, err := checker.client.Get().AbsPath("/readyz").DoRaw(ctx)
if apierrors.IsNotFound(err) {
_, err = checker.client.Get().AbsPath("/healthz").DoRaw(ctx)
}
return true, nil
return err == nil, err
}
26 changes: 18 additions & 8 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@ package clustersynchro
import (
"context"
"fmt"
"net"
"sync"
"sync/atomic"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"

Expand All @@ -18,6 +19,8 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/storageconfig"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro/informer"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/features"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ClusterSynchro struct {
Expand All @@ -27,7 +30,7 @@ type ClusterSynchro struct {
ClusterStatusUpdater ClusterStatusUpdater

storage storage.StorageFactory
clusterclient kubernetes.Interface
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory

Expand Down Expand Up @@ -64,11 +67,6 @@ type ClusterStatusUpdater interface {
type RetryableError error

func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater) (*ClusterSynchro, error) {
clusterclient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("failed to create a cluster client: %w", err)
}

dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config)
if err != nil {
return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err))
Expand All @@ -84,13 +82,25 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, updat
return nil, fmt.Errorf("failed to create lister watcher factory: %w", err)
}

checkerConfig := *config
if clusterpediafeature.FeatureGate.Enabled(features.HealthCheckerWithStandaloneTCP) {
checkerConfig.Dial = (&net.Dialer{
Timeout: 30 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext
}
healthChecker, err := newHealthChecker(&checkerConfig)
if err != nil {
return nil, fmt.Errorf("failed to create a cluster health checker: %w", err)
}

synchro := &ClusterSynchro{
name: name,
RESTConfig: config,
ClusterStatusUpdater: updater,
storage: storage,

clusterclient: clusterclient,
healthChecker: healthChecker,
dynamicDiscovery: dynamicDiscovery,
listerWatcherFactory: listWatchFactory,

Expand Down
14 changes: 10 additions & 4 deletions pkg/synchromanager/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ const (
// owner: @iceber
// alpha: v0.3.0
AllowSyncAllResources featuregate.Feature = "AllowSyncAllResources"

// HealthCheckerWithStandaloneTCP is a feature gate for the cluster health checker to use standalone tcp
// owner: @iceber
// alpha: v0.6.0
HealthCheckerWithStandaloneTCP featuregate.Feature = "HealthCheckerWithStandaloneTCP"
)

func init() {
Expand All @@ -49,8 +54,9 @@ func init() {
// defaultClusterSynchroManagerFeatureGates consists of all known clustersynchro-manager-specific feature keys.
// To add a new feature, define a key for it above and add it here.
var defaultClusterSynchroManagerFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
PruneManagedFields: {Default: true, PreRelease: featuregate.Beta},
PruneLastAppliedConfiguration: {Default: true, PreRelease: featuregate.Beta},
AllowSyncAllCustomResources: {Default: false, PreRelease: featuregate.Alpha},
AllowSyncAllResources: {Default: false, PreRelease: featuregate.Alpha},
PruneManagedFields: {Default: true, PreRelease: featuregate.Beta},
PruneLastAppliedConfiguration: {Default: true, PreRelease: featuregate.Beta},
AllowSyncAllCustomResources: {Default: false, PreRelease: featuregate.Alpha},
AllowSyncAllResources: {Default: false, PreRelease: featuregate.Alpha},
HealthCheckerWithStandaloneTCP: {Default: false, PreRelease: featuregate.Alpha},
}

0 comments on commit d08c4b3

Please sign in to comment.