Skip to content

Commit

Permalink
model default/canary with two configurations and one route (kubeflow#51)
Browse files Browse the repository at this point in the history
* Model default/canary with two configurations and one route

* Refactor updateStatus and add propagate functions for sub components

* Fix if error patterns

* Merge change for v1beta1 type
  • Loading branch information
yuzisun authored and k8s-ci-robot committed Apr 29, 2019
1 parent ec3b6f9 commit d137986
Show file tree
Hide file tree
Showing 20 changed files with 956 additions and 615 deletions.
24 changes: 22 additions & 2 deletions config/rbac/rbac_role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,27 @@ rules:
- apiGroups:
- serving.knative.dev
resources:
- services
- configurations
verbs:
- get
- list
- watch
- create
- update
- patch
- delete
- apiGroups:
- serving.knative.dev
resources:
- configurations/status
verbs:
- get
- update
- patch
- apiGroups:
- serving.knative.dev
resources:
- routes
verbs:
- get
- list
Expand All @@ -19,7 +39,7 @@ rules:
- apiGroups:
- serving.knative.dev
resources:
- services/status
- routes/status
verbs:
- get
- update
Expand Down
2 changes: 1 addition & 1 deletion docs/samples/canary.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: "serving.kubeflow.org/v1alpha1"
kind: "KFService"
metadata:
name: "myModel"
name: "my-model"
spec:
default:
# 90% of traffic is sent to this model
Expand Down
2 changes: 1 addition & 1 deletion docs/samples/pinned.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: "serving.kubeflow.org/v1alpha1"
kind: "KFService"
metadata:
name: "myModel"
name: "my-model"
spec:
default:
tensorflow:
Expand Down
2 changes: 1 addition & 1 deletion docs/samples/resource-requests.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
apiVersion: "serving.kubeflow.org/v1alpha1"
kind: "KFService"
metadata:
name: "myModel"
name: "my-model"
annotations:
cloud.google.com/gke-accelerator: nvidia-tesla-t4
spec:
Expand Down
71 changes: 71 additions & 0 deletions pkg/apis/serving/v1alpha1/kfservice_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
Copyright 2019 kubeflow.org.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package v1alpha1

import (
duckv1alpha1 "github.com/knative/pkg/apis/duck/v1alpha1"
knservingv1alpha1 "github.com/knative/serving/pkg/apis/serving/v1alpha1"
)

// ConditionType represents a Service condition value
const (
// ServiceConditionRoutesReady is set when the service's underlying
// routes have reported readiness.
ServiceConditionRoutesReady duckv1alpha1.ConditionType = "RoutesReady"
// ServiceConditionDefaultConfigurationsReady is set when the service's underlying
// default configuration have reported readiness.
ServiceConditionDefaultConfigurationsReady duckv1alpha1.ConditionType = "DefaultConfigurationReady"
// ServiceConditionCanaryConfigurationsReady is set when the service's underlying
// canary configuration have reported readiness.
ServiceConditionCanaryConfigurationsReady duckv1alpha1.ConditionType = "CanaryConfigurationReady"
)

var serviceCondSet = duckv1alpha1.NewLivingConditionSet(
ServiceConditionDefaultConfigurationsReady,
ServiceConditionCanaryConfigurationsReady,
ServiceConditionRoutesReady,
)

// PropagateDefaultConfigurationStatus propagates the default Configuration status and applies its values
// to the Service status.
func (ss *KFServiceStatus) PropagateDefaultConfigurationStatus(dcs *knservingv1alpha1.ConfigurationStatus) {
ss.Default.Name = dcs.LatestCreatedRevisionName
//TODO @yuzisun populate configuration status conditions
}

// PropagateCanaryConfigurationStatus propagates the canary Configuration status and applies its values
// to the Service status.
func (ss *KFServiceStatus) PropagateCanaryConfigurationStatus(ccs *knservingv1alpha1.ConfigurationStatus) {
ss.Canary.Name = ccs.LatestCreatedRevisionName
//TODO @yuzisun populate configuration status conditions
}

// PropagateRouteStatus propagates route's status to the service's status.
func (ss *KFServiceStatus) PropagateRouteStatus(rs *knservingv1alpha1.RouteStatus) {
if rs.Address != nil {
ss.URI.Internal = rs.Address.Hostname
}

for _, traffic := range rs.Traffic {
switch traffic.RevisionName {
case ss.Default.Name:
ss.Default.Traffic = traffic.Percent
case ss.Canary.Name:
ss.Canary.Traffic = traffic.Percent
default:
}
}

//TODO @yuzisun populate route status conditions
}
8 changes: 8 additions & 0 deletions pkg/constants/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,11 @@ func getEnvOrDefault(key string, fallback string) string {
}
return fallback
}

func DefaultConfigurationName(name string) string {
return name + "-default"
}

func CanaryConfigurationName(name string) string {
return name + "-canary"
}
110 changes: 66 additions & 44 deletions pkg/controller/kfservice/kfservice_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ package service

import (
"context"
"k8s.io/apimachinery/pkg/types"

"github.com/kubeflow/kfserving/pkg/reconciler/ksvc"
"github.com/kubeflow/kfserving/pkg/reconciler/ksvc/resources"
v1 "k8s.io/api/core/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/client-go/tools/record"

Expand Down Expand Up @@ -77,17 +78,22 @@ func add(mgr manager.Manager, r reconcile.Reconciler) error {
}

// Watch for changes to KFService
err = c.Watch(&source.Kind{Type: &kfservingv1alpha1.KFService{}}, &handler.EnqueueRequestForObject{})
if err != nil {
if err = c.Watch(&source.Kind{Type: &kfservingv1alpha1.KFService{}}, &handler.EnqueueRequestForObject{}); err != nil {
return err
}

// Watch for changes to Knative Service
err = c.Watch(&source.Kind{Type: &knservingv1alpha1.Service{}}, &handler.EnqueueRequestForOwner{
kfservingController := &handler.EnqueueRequestForOwner{
IsController: true,
OwnerType: &kfservingv1alpha1.KFService{},
})
if err != nil {
}

// Watch for changes to Knative Configuration
if err = c.Watch(&source.Kind{Type: &knservingv1alpha1.Configuration{}}, kfservingController); err != nil {
return err
}

// Watch for changes to Knative Route
if err = c.Watch(&source.Kind{Type: &knservingv1alpha1.Route{}}, kfservingController); err != nil {
return err
}

Expand All @@ -105,15 +111,16 @@ type ReconcileService struct {

// Reconcile reads that state of the cluster for a Service object and makes changes based on the state read
// and what is in the Service.Spec
// +kubebuilder:rbac:groups=serving.knative.dev,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=serving.knative.dev,resources=services/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=serving.knative.dev,resources=configurations,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=serving.knative.dev,resources=configurations/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=serving.knative.dev,resources=routes,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=serving.knative.dev,resources=routes/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=serving.kubeflow.org,resources=kfservices,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=serving.kubeflow.org,resources=kfservices/status,verbs=get;update;patch
func (r *ReconcileService) Reconcile(request reconcile.Request) (reconcile.Result, error) {
// Fetch the KFService instance
kfsvc := &kfservingv1alpha1.KFService{}
err := r.Get(context.TODO(), request.NamespacedName, kfsvc)
if err != nil {
if err := r.Get(context.TODO(), request.NamespacedName, kfsvc); err != nil {
if errors.IsNotFound(err) {
// Object not found, return. Created objects are automatically garbage collected.
// For additional cleanup logic use finalizers.
Expand All @@ -123,64 +130,79 @@ func (r *ReconcileService) Reconcile(request reconcile.Request) (reconcile.Resul
return reconcile.Result{}, err
}

desiredService, err := resources.CreateKnativeService(kfsvc)
serviceReconciler := ksvc.NewServiceReconciler(r.Client)
// Reconcile configurations
desiredDefault, desiredCanary := resources.CreateKnativeConfiguration(kfsvc)

if err := controllerutil.SetControllerReference(kfsvc, desiredDefault, r.scheme); err != nil {
return reconcile.Result{}, err
}

if desiredCanary != nil {
if err := controllerutil.SetControllerReference(kfsvc, desiredCanary, r.scheme); err != nil {
return reconcile.Result{}, err
}
}

defaultConfiguration, err := serviceReconciler.ReconcileConfiguarion(context.TODO(), desiredDefault)
if err != nil {
log.Error(err, "Failed to create desired Knative Serving Service", "name", kfsvc.Name)
log.Error(err, "Failed to reconcile default model spec", "name", desiredDefault.Name)
r.Recorder.Eventf(kfsvc, v1.EventTypeWarning, "InternalError", err.Error())
return reconcile.Result{}, err
}
if err := controllerutil.SetControllerReference(kfsvc, desiredService, r.scheme); err != nil {
return reconcile.Result{}, err
kfsvc.Status.PropagateDefaultConfigurationStatus(&defaultConfiguration.Status)

if desiredCanary != nil {
canaryConfiguration, err := serviceReconciler.ReconcileConfiguarion(context.TODO(), desiredCanary)
if err != nil {
log.Error(err, "Failed to reconcile canary model spec", "name", desiredCanary.Name)
r.Recorder.Eventf(kfsvc, v1.EventTypeWarning, "InternalError", err.Error())
return reconcile.Result{}, err
}
kfsvc.Status.PropagateCanaryConfigurationStatus(&canaryConfiguration.Status)
}

serviceReconciler := ksvc.NewServiceReconciler(r.Client)

ksvc, err := serviceReconciler.Reconcile(context.TODO(), desiredService)
// Reconcile route
desiredRoute := resources.CreateKnativeRoute(kfsvc)
if err := controllerutil.SetControllerReference(kfsvc, desiredRoute, r.scheme); err != nil {
return reconcile.Result{}, err
}
route, err := serviceReconciler.ReconcileRoute(context.TODO(), desiredRoute)
if err != nil {
log.Error(err, "Failed to reconcile Knative Serving Service", "name", desiredService.Name)
log.Error(err, "Failed to reconcile route", "name", desiredRoute.Name)
r.Recorder.Eventf(kfsvc, v1.EventTypeWarning, "InternalError", err.Error())
return reconcile.Result{}, err
}
kfsvc.Status.PropagateRouteStatus(&route.Status)

if err = r.updateStatus(kfsvc, ksvc); err != nil {
if err = r.updateStatus(kfsvc); err != nil {
r.Recorder.Eventf(kfsvc, v1.EventTypeWarning, "InternalError", err.Error())
}
return reconcile.Result{}, nil
}

func (r *ReconcileService) updateStatus(before *kfservingv1alpha1.KFService, ksvc *knservingv1alpha1.Service) error {
after := before.DeepCopy()
if ksvc.Status.Address != nil {
after.Status.URI.Internal = ksvc.Status.Address.Hostname
}
if before.Spec.Canary == nil || before.Spec.Canary.TrafficPercent == 0 {
after.Status.Default.Name = ksvc.Status.LatestCreatedRevisionName
after.Status.Canary.Name = ""
} else {
after.Status.Canary.Name = ksvc.Status.LatestCreatedRevisionName
}
for _, traffic := range ksvc.Status.Traffic {
switch traffic.RevisionName {
case after.Status.Default.Name:
after.Status.Default.Traffic = traffic.Percent
case after.Status.Canary.Name:
after.Status.Canary.Traffic = traffic.Percent
default:
func (r *ReconcileService) updateStatus(desiredService *kfservingv1alpha1.KFService) error {
existing := &kfservingv1alpha1.KFService{}
namespacedName := types.NamespacedName{Name: desiredService.Name, Namespace: desiredService.Namespace}
if err := r.Get(context.TODO(), namespacedName, existing); err != nil {
if errors.IsNotFound(err) {
return err
}
return err
}
if equality.Semantic.DeepEqual(before.Status, after.Status) {
if equality.Semantic.DeepEqual(existing.Status, desiredService.Status) {
// If we didn't change anything then don't call updateStatus.
// This is important because the copy we loaded from the informer's
// cache may be stale and we don't want to overwrite a prior update
// to status with this stale state.
} else if err := r.Update(context.TODO(), after); err != nil {
log.Error(err, "Failed to update kfserving service status")
r.Recorder.Eventf(after, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for kfserving service %q: %v", after.Name, err)
} else if err := r.Update(context.TODO(), desiredService); err != nil {
log.Error(err, "Failed to update KFService status")
r.Recorder.Eventf(desiredService, v1.EventTypeWarning, "UpdateFailed",
"Failed to update status for KFService %q: %v", desiredService.Name, err)
return err
} else if err == nil {
// If there was a difference and there was no error.
r.Recorder.Eventf(after, v1.EventTypeNormal, "Updated", "Updated Service %q", after.GetName())
r.Recorder.Eventf(desiredService, v1.EventTypeNormal, "Updated", "Updated Service %q", desiredService.GetName())
}

return nil
Expand Down
5 changes: 3 additions & 2 deletions pkg/controller/kfservice/kfservice_controller_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@ var cfg *rest.Config

func TestMain(m *testing.M) {
t := &envtest.Environment{
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds")},
CRDDirectoryPaths: []string{filepath.Join("..", "..", "..", "config", "crds"),
filepath.Join("..", "..", "..", "test", "crds")},
}
err := apis.AddToScheme(scheme.Scheme)
if err != nil {
log.Error(err, "failed to add to scheme")
}
err = knservingv1alpha1.SchemeBuilder.AddToScheme(scheme.Scheme)
if err != nil {
log.Error(err, "failed to add kn service to scheme")
log.Error(err, "failed to add knative serving to scheme")
}
if cfg, err = t.Start(); err != nil {
stdlog.Fatal(err)
Expand Down
Loading

0 comments on commit d137986

Please sign in to comment.