Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

clustersynchro: handle each of the pages in the resource list stage #591

Merged
merged 6 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/binding-apiserver/app/binding_apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/generated/clientset/versioned"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
"github.com/clusterpedia-io/clusterpedia/pkg/version/verflag"
)
Expand Down Expand Up @@ -56,7 +57,7 @@ func NewClusterPediaServerCommand(ctx context.Context) *cobra.Command {
return err
}

synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, nil)
synchromanager := synchromanager.NewManager(crdclient, config.StorageFactory, clustersynchro.ClusterSyncConfig{})
go synchromanager.Run(1, ctx.Done())

server, err := completedConfig.New()
Expand Down
3 changes: 2 additions & 1 deletion cmd/clustersynchro-manager/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
kubestatemetrics "github.com/clusterpedia-io/clusterpedia/pkg/kube_state_metrics"
metrics "github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

type Config struct {
Expand All @@ -19,8 +20,8 @@ type Config struct {
WorkerNumber int
MetricsServerConfig metrics.Config
KubeMetricsServerConfig *kubestatemetrics.ServerConfig
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
StorageFactory storage.StorageFactory
ClusterSyncConfig clustersynchro.ClusterSyncConfig

LeaderElection componentbaseconfig.LeaderElectionConfiguration
ClientConnection componentbaseconfig.ClientConnectionConfiguration
Expand Down
13 changes: 11 additions & 2 deletions cmd/clustersynchro-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/clusterpedia-io/clusterpedia/pkg/metrics"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
storageoptions "github.com/clusterpedia-io/clusterpedia/pkg/storage/options"
"github.com/clusterpedia-io/clusterpedia/pkg/synchromanager/clustersynchro"
)

const (
Expand All @@ -44,7 +45,8 @@ type Options struct {
Metrics *metrics.Options
KubeStateMetrics *kubestatemetrics.Options

WorkerNumber int // WorkerNumber is the number of worker goroutines
WorkerNumber int // WorkerNumber is the number of worker goroutines
PageSizeForResourceSync int64
}

func NewClusterSynchroManagerOptions() (*Options, error) {
Expand Down Expand Up @@ -89,6 +91,9 @@ func (o *Options) Flags() cliflag.NamedFlagSets {
genericfs.Int32Var(&o.ClientConnection.Burst, "kube-api-burst", o.ClientConnection.Burst, "Burst to use while talking with kubernetes apiserver.")
genericfs.IntVar(&o.WorkerNumber, "worker-number", o.WorkerNumber, "The number of worker goroutines.")

syncfs := fss.FlagSet("resource sync")
syncfs.Int64Var(&o.PageSizeForResourceSync, "page-size", o.PageSizeForResourceSync, "The requested chunk size of initial and resync watch lists for resource sync")

options.BindLeaderElectionFlags(&o.LeaderElection, genericfs)

fs := fss.FlagSet("misc")
Expand Down Expand Up @@ -165,7 +170,11 @@ func (o *Options) Config() (*config.Config, error) {

MetricsServerConfig: metricsConfig,
KubeMetricsServerConfig: kubeStateMetricsServerConfig,
MetricsStoreBuilder: metricsStoreBuilder,

ClusterSyncConfig: clustersynchro.ClusterSyncConfig{
MetricsStoreBuilder: metricsStoreBuilder,
PageSizeForResourceSync: o.PageSizeForResourceSync,
},

LeaderElection: o.LeaderElection,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/clustersynchro-manager/app/synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func NewClusterSynchroManagerCommand(ctx context.Context) *cobra.Command {
}

func Run(ctx context.Context, c *config.Config) error {
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.MetricsStoreBuilder)
synchromanager := synchromanager.NewManager(c.CRDClient, c.StorageFactory, c.ClusterSyncConfig)

go func() {
metrics.RunServer(c.MetricsServerConfig)
Expand Down
11 changes: 11 additions & 0 deletions hack/verify-vendor.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,14 @@ else
echo "the file 'reflector.go' in vendor has been changed, please update the 'cache/.reflector.go' and 'reflector.go' in the pkg/synchromanager/clustersynchro/informer"
exit 1
fi

pager=0
diff vendor/k8s.io/client-go/tools/pager/pager.go pkg/synchromanager/clustersynchro/informer/pager/.pager.go.copy || pager=$?

if [[ $pager -eq 0 ]]
then
echo "'pager.go' is up to date."
else
echo "the file 'pager.go' in vendor has been changed, please update the '.pager.go.copy' and 'pager.go' in the pkg/synchromanager/clustersynchro/informer/pager"
exit 1
fi
36 changes: 21 additions & 15 deletions pkg/synchromanager/clustersynchro/cluster_synchro.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,19 @@ import (
clusterpediafeature "github.com/clusterpedia-io/clusterpedia/pkg/utils/feature"
)

type ClusterSyncConfig struct {
MetricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
PageSizeForResourceSync int64
}

type ClusterSynchro struct {
name string

RESTConfig *rest.Config
ClusterStatusUpdater ClusterStatusUpdater

storage storage.StorageFactory
metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder
syncConfig ClusterSyncConfig
healthChecker *healthChecker
dynamicDiscovery discovery.DynamicDiscoveryInterface
listerWatcherFactory informer.DynamicListerWatcherFactory
Expand Down Expand Up @@ -69,7 +74,7 @@ type ClusterStatusUpdater interface {

type RetryableError error

func New(name string, config *rest.Config, storage storage.StorageFactory, metricsStoreBuilder *kubestatemetrics.MetricsStoreBuilder, updater ClusterStatusUpdater) (*ClusterSynchro, error) {
func New(name string, config *rest.Config, storage storage.StorageFactory, updater ClusterStatusUpdater, syncConfig ClusterSyncConfig) (*ClusterSynchro, error) {
dynamicDiscovery, err := discovery.NewDynamicDiscoveryManager(name, config)
if err != nil {
return nil, RetryableError(fmt.Errorf("failed to create dynamic discovery manager: %w", err))
Expand Down Expand Up @@ -103,6 +108,7 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
ClusterStatusUpdater: updater,
storage: storage,

syncConfig: syncConfig,
healthChecker: healthChecker,
dynamicDiscovery: dynamicDiscovery,
listerWatcherFactory: listWatchFactory,
Expand All @@ -115,8 +121,6 @@ func New(name string, config *rest.Config, storage storage.StorageFactory, metri
stopRunnerCh: make(chan struct{}),

storageResourceVersions: make(map[schema.GroupVersionResource]map[string]interface{}),

metricsStoreBuilder: metricsStoreBuilder,
}

var refresherOnce sync.Once
Expand Down Expand Up @@ -352,18 +356,20 @@ func (s *ClusterSynchro) refreshSyncResources() {
}

var metricsStore *kubestatemetrics.MetricsStore
if s.metricsStoreBuilder != nil {
metricsStore = s.metricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
if s.syncConfig.MetricsStoreBuilder != nil {
metricsStore = s.syncConfig.MetricsStoreBuilder.GetMetricStore(s.name, config.syncResource)
}
synchro := newResourceSynchro(
s.name,
config.syncResource,
config.kind,
s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
rvs,
config.convertor,
resourceStorage,
metricsStore,
synchro := newResourceSynchro(s.name,
ResourceSynchroConfig{
GroupVersionResource: config.syncResource,
Kind: config.kind,
ListerWatcher: s.listerWatcherFactory.ForResource(metav1.NamespaceAll, config.syncResource),
ObjectConvertor: config.convertor,
ResourceStorage: resourceStorage,
MetricsStore: metricsStore,
ResourceVersions: rvs,
PageSizeForInformer: s.syncConfig.PageSizeForResourceSync,
},
)
s.waitGroup.StartWithChannel(s.closer, synchro.Run)
s.storageResourceSynchros.Store(storageGVR, synchro)
Expand Down
13 changes: 12 additions & 1 deletion pkg/synchromanager/clustersynchro/informer/named_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ type Config struct {

// WatchListPageSize is the requested chunk size of initial and relist watch lists.
WatchListPageSize int64

// StreamHandle of paginated list, resources within a pager will be processed
// as soon as possible instead of waiting until all resources are pulled before calling the ResourceHandler.
StreamHandleForPaginatedList bool

// Force paging, Reflector will sometimes use APIServer's cache,
// even if paging is specified APIServer will return all resources for performance,
// then it will skip Reflector's streaming memory optimization.
ForcePaginatedList bool
}

type controller struct {
Expand Down Expand Up @@ -86,6 +95,8 @@ func (c *controller) Run(stopCh <-chan struct{}) {
}
r.ShouldResync = c.config.ShouldResync
r.WatchListPageSize = c.config.WatchListPageSize
r.ForcePaginatedList = c.config.ForcePaginatedList
r.StreamHandleForPaginatedList = c.config.StreamHandleForPaginatedList

c.reflectorMutex.Lock()
c.reflector = r
Expand Down Expand Up @@ -120,7 +131,7 @@ func (c *controller) HasSynced() bool {
if c.queue == nil {
return false
}
return c.queue.HasSynced()
return c.queue.HasSynced() && c.reflector.HasInitializedSynced()
}

func (c *controller) LastSyncResourceVersion() string {
Expand Down
Loading
Loading