Skip to content

Commit

Permalink
support watch resources with client-go or kubectl
Browse files Browse the repository at this point in the history
Co-authored-by: duanmeng <duanmeng_yewu@cmss.chinamobile.com>
Co-authored-by: wuyingjun <wuyingjun_yewu@cmss.chinamobile.com>
Co-authored-by: hanweisen <hanweisen_yewu@cmss.chinamobile.com>
Signed-off-by: zhangyongxi <zhangyongxi_yewu@cmss.chinamobile.com>
  • Loading branch information
4 people committed Sep 16, 2022
1 parent 46e794a commit 021c56a
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 8 deletions.
1 change: 1 addition & 0 deletions pkg/kubeapiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func init() {
&metav1.APIGroupList{},
&metav1.APIGroup{},
&metav1.APIResourceList{},
&metav1.WatchEvent{},
)
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/kubeapiserver/resource_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (r *ResourceHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler = handlers.GetResource(storage, reqScope)
case "list":
handler = handlers.ListResource(storage, nil, reqScope, false, r.minRequestTimeout)
case "watch":
handler = handlers.ListResource(storage, storage, reqScope, true, r.minRequestTimeout)
default:
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(gvr.GroupResource(), requestInfo.Verb),
Expand Down
61 changes: 61 additions & 0 deletions pkg/kubeapiserver/resourcerest/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
genericrequest "k8s.io/apiserver/pkg/endpoints/request"
genericfeatures "k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
Expand All @@ -20,8 +21,10 @@ import (
"github.com/clusterpedia-io/api/clusterpedia/v1beta1"
"github.com/clusterpedia-io/clusterpedia/pkg/kubeapiserver/printers"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/negotiation"
"github.com/clusterpedia-io/clusterpedia/pkg/utils/request"
utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch"
)

type RESTStorage struct {
Expand All @@ -38,6 +41,7 @@ type RESTStorage struct {

var _ rest.Lister = &RESTStorage{}
var _ rest.Getter = &RESTStorage{}
var _ rest.Watcher = &RESTStorage{}

func (s *RESTStorage) New() runtime.Object {
return s.NewFunc()
Expand Down Expand Up @@ -121,3 +125,60 @@ func (s *RESTStorage) ConvertToTable(ctx context.Context, object runtime.Object,

return printers.NewDefaultTableConvertor(s.DefaultQualifiedResource).ConvertToTable(ctx, object, tableOptions)
}

func (s *RESTStorage) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
resourceversion := options.ResourceVersion
watchRV, err := cache.NewClusterResourceVersionFromString(resourceversion)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}

watcher := cache.NewCacheWatcher(100)
watchCache := s.Storage.GetStorageConfig().WatchCache
watchCache.Lock()
defer watchCache.Unlock()

initEvents, err := watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
if err != nil {
// To match the uncached watch implementation, once we have passed authn/authz/admission,
// and successfully parsed a resource version, other errors must fail with a watch event of type ERROR,
// rather than a directly returned error.
return newErrWatcher(err), nil
}

func() {
watchCache.WatchersLock.Lock()
defer watchCache.WatchersLock.Unlock()

watchCache.WatchersBuffer = append(watchCache.WatchersBuffer, watcher)
}()

go watcher.Process(ctx, initEvents)
return watcher, nil
}

type errWatcher struct {
result chan watch.Event
}

func newErrWatcher(err error) *errWatcher {
errEvent := utilwatch.NewErrorEvent(err)

// Create a watcher with room for a single event, populate it, and close the channel
watcher := &errWatcher{result: make(chan watch.Event, 1)}
watcher.result <- errEvent
close(watcher.result)

return watcher
}

func (c *errWatcher) ResultChan() <-chan watch.Event {
return c.result
}

func (c *errWatcher) Stop() {
// no-op
}
9 changes: 9 additions & 0 deletions pkg/storage/memorystorage/memory_resource_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,21 @@ func (s *ResourceStorage) dispatchEvents() {
utilruntime.HandleError(fmt.Errorf("unable to understand watch event %#v", cwe.Event))
continue
}
s.dispatchEvent(&cwe.Event)
case <-s.stopCh:
return
}
}
}

func (s *ResourceStorage) dispatchEvent(event *watch.Event) {
s.watchCache.WatchersLock.RLock()
defer s.watchCache.WatchersLock.RUnlock()
for _, watcher := range s.watchCache.WatchersBuffer {
watcher.NonblockingAdd(event)
}
}

func (s *ResourceStorage) GetStorageConfig() *storage.ResourceStorageConfig {
return s.storageConfig
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/storage/memorystorage/memory_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package memorystorage

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/runtime/schema"

internal "github.com/clusterpedia-io/api/clusterpedia"
"github.com/clusterpedia-io/clusterpedia/pkg/storage"
cache "github.com/clusterpedia-io/clusterpedia/pkg/storage/memorystorage/watchcache"
utilwatch "github.com/clusterpedia-io/clusterpedia/pkg/utils/watch"
)

type StorageFactory struct {
Expand Down Expand Up @@ -68,6 +70,9 @@ func (s *StorageFactory) CleanCluster(ctx context.Context, cluster string) error
defer storages.Unlock()
for _, rs := range storages.resourceStorages {
rs.watchCache.DeleteIndexer(cluster)
// If a pediacluster is deleted from clusterpedia,then the informer of client-go should be list and watch again
errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("PediaCluster %s is deleted", cluster))
rs.dispatchEvent(&errorEvent)
}
return nil
}
Expand All @@ -77,6 +82,9 @@ func (s *StorageFactory) CleanClusterResource(ctx context.Context, cluster strin
defer storages.Unlock()
if rs, ok := storages.resourceStorages[gvr]; ok {
rs.watchCache.DeleteIndexer(cluster)
// If a gvr is deleted from clusterpedia,then the informer of client-go should be list and watch again
errorEvent := utilwatch.NewErrorEvent(fmt.Errorf("GVR %v is deleted", gvr))
rs.dispatchEvent(&errorEvent)
}
return nil
}
Expand Down
140 changes: 140 additions & 0 deletions pkg/storage/memorystorage/watchcache/cache_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package watchcache

import (
"context"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/watch"
)

// CacheWatcher implements watch.Interface
type CacheWatcher struct {
input chan *watch.Event
result chan watch.Event
done chan struct{}
stopped bool
forget func()
}

func NewCacheWatcher(chanSize int) *CacheWatcher {
return &CacheWatcher{
input: make(chan *watch.Event, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
stopped: false,
forget: func() {},
}
}

// ResultChan implements watch.Interface.
func (c *CacheWatcher) ResultChan() <-chan watch.Event {
return c.result
}

// Stop implements watch.Interface.
func (c *CacheWatcher) Stop() {
c.forget()
}

func (c *CacheWatcher) StopThreadUnsafe() {
if !c.stopped {
c.stopped = true
close(c.done)
close(c.input)
}
}

func (c *CacheWatcher) NonblockingAdd(event *watch.Event) bool {
select {
case c.input <- event:
return true
default:
return false
}
}

// Nil timer means that add will not block (if it can't send event immediately, it will break the watcher)
func (c *CacheWatcher) Add(event *watch.Event, timer *time.Timer) bool {
// Try to send the event immediately, without blocking.
if c.NonblockingAdd(event) {
return true
}

closeFunc := func() {
// This means that we couldn't send event to that watcher.
// Since we don't want to block on it infinitely,
// we simply terminate it.
//klog.V(1).Infof("Forcing watcher close due to unresponsiveness: %v", c.objectType.String())
c.forget()
}

if timer == nil {
closeFunc()
return false
}

// OK, block sending, but only until timer fires.
select {
case c.input <- event:
return true
case <-timer.C:
closeFunc()
return false
}
}

func (c *CacheWatcher) sendWatchCacheEvent(event *watch.Event) {
//watchEvent := c.convertToWatchEvent(event)
watchEvent := event
if watchEvent == nil {
// Watcher is not interested in that object.
return
}

// We need to ensure that if we put event X to the c.result, all
// previous events were already put into it before, no matter whether
// c.done is close or not.
// Thus we cannot simply select from c.done and c.result and this
// would give us non-determinism.
// At the same time, we don't want to block infinitely on putting
// to c.result, when c.done is already closed.

// This ensures that with c.done already close, we at most once go
// into the next select after this. With that, no matter which
// statement we choose there, we will deliver only consecutive
// events.
select {
case <-c.done:
return
default:
}

select {
case c.result <- *watchEvent:
case <-c.done:
}
}

// Process send the events which stored in watchCache into the result channel,and select the event from input channel into result channel continuously.
func (c *CacheWatcher) Process(ctx context.Context, initEvents []*watch.Event) {
defer utilruntime.HandleCrash()

for _, event := range initEvents {
c.sendWatchCacheEvent(event)
}

defer close(c.result)
defer c.Stop()
for {
select {
case event, ok := <-c.input:
if !ok {
return
}
c.sendWatchCacheEvent(event)
case <-ctx.Done():
return
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ func (crv *ClusterResourceVersion) GetClusterResourceVersion() string {
return base64.RawURLEncoding.EncodeToString(bytes)
}

// GetClusterResourceVersionFromEvent return a ClusterResourceVersion from watch event
func GetClusterResourceVersionFromEvent(event *watch.Event) (*ClusterResourceVersion, error) {
accessor, err := meta.Accessor(event.Object)
if err != nil {
return nil, fmt.Errorf("unable to understand watch event %#v", event)
}
return NewClusterResourceVersionFromString(accessor.GetResourceVersion())
}

func (crv *ClusterResourceVersion) IsEqual(another *ClusterResourceVersion) bool {
if len(crv.rvmap) != len(another.rvmap) {
return false
Expand Down
Loading

0 comments on commit 021c56a

Please sign in to comment.