diff --git a/api/v1beta1/pinotschema_types.go b/api/v1beta1/pinotschema_types.go index ef961e3..2f29523 100644 --- a/api/v1beta1/pinotschema_types.go +++ b/api/v1beta1/pinotschema_types.go @@ -29,23 +29,14 @@ type PinotSchemaSpec struct { PinotSchemaJson string `json:"schema.json"` } -type PinotSchemaConditionType string - -const ( - PinotSchemaCreateSuccess PinotSchemaConditionType = "PinotSchemaCreateSuccess" - PinotSchemaUpdateSuccess PinotSchemaConditionType = "PinotSchemaUpdateSuccess" - PinotSchemaCreateFail PinotSchemaConditionType = "PinotSchemaCreateFail" - PinotSchemaUpdateFail PinotSchemaConditionType = "PinotSchemaUpdateFail" -) - // PinotSchemaStatus defines the observed state of PinotSchema type PinotSchemaStatus struct { - Type PinotSchemaConditionType `json:"type,omitempty"` - Status v1.ConditionStatus `json:"status,omitempty"` - Reason string `json:"reason,omitempty"` - Message string `json:"message,omitempty"` - LastUpdateTime string `json:"lastUpdateTime,omitempty"` - CurrentSchemasJson string `json:"currentSchemas.json"` + Type string `json:"type,omitempty"` + Status v1.ConditionStatus `json:"status,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + CurrentSchemasJson string `json:"currentSchemas.json"` } // +kubebuilder:object:root=true diff --git a/api/v1beta1/pinottable_types.go b/api/v1beta1/pinottable_types.go index f5955f3..163b8a3 100644 --- a/api/v1beta1/pinottable_types.go +++ b/api/v1beta1/pinottable_types.go @@ -39,25 +39,18 @@ type PinotTableSpec struct { PinotTableType PinotTableType `json:"pinotTableType"` // +required PinotTablesJson string `json:"tables.json"` + // +optional + SegmentReload bool `json:"segmentReload"` } -type PinotTableConditionType string - -const ( - PinotTableCreateSuccess PinotSchemaConditionType = "PinotTableCreateSuccess" - PinotTableUpdateSuccess PinotSchemaConditionType = "PinotTableUpdateSuccess" - PinotTableCreateFail PinotSchemaConditionType = "PinotTableCreateFail" - PinotTableUpdateFail PinotSchemaConditionType = "PinotTableUpdateFail" -) - // PinotTableStatus defines the observed state of PinotTable type PinotTableStatus struct { - Type PinotTableConditionType `json:"type,omitempty"` - Status v1.ConditionStatus `json:"status,omitempty"` - Reason string `json:"reason,omitempty"` - Message string `json:"message,omitempty"` - LastUpdateTime string `json:"lastUpdateTime,omitempty"` - CurrentTableJson string `json:"currentTable.json"` + Type string `json:"type,omitempty"` + Status v1.ConditionStatus `json:"status,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + CurrentTableJson string `json:"currentTable.json"` } // +kubebuilder:object:root=true diff --git a/api/v1beta1/pinottenant_types.go b/api/v1beta1/pinottenant_types.go index a5c0f6d..5b53999 100644 --- a/api/v1beta1/pinottenant_types.go +++ b/api/v1beta1/pinottenant_types.go @@ -38,23 +38,14 @@ type PinotTenantSpec struct { PinotTenantsJson string `json:"tenants.json"` } -type PinotTenantConditionType string - -const ( - PinotTenantCreateSuccess PinotTenantConditionType = "PinotTenantCreateSuccess" - PinotTenantUpdateSuccess PinotTenantConditionType = "PinotTenantUpdateSuccess" - PinotTenantCreateFail PinotTenantConditionType = "PinotTenantCreateFail" - PinotTenantUpdateFail PinotTenantConditionType = "PinotTenantUpdateFail" -) - // PinotTenantStatus defines the observed state of PinotTenant type PinotTenantStatus struct { - Type PinotTenantConditionType `json:"type,omitempty"` - Status v1.ConditionStatus `json:"status,omitempty"` - Reason string `json:"reason,omitempty"` - Message string `json:"message,omitempty"` - LastUpdateTime string `json:"lastUpdateTime,omitempty"` - CurrentTenantsJson string `json:"currentTenants.json"` + Type string `json:"type,omitempty"` + Status v1.ConditionStatus `json:"status,omitempty"` + Reason string `json:"reason,omitempty"` + Message string `json:"message,omitempty"` + LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` + CurrentTenantsJson string `json:"currentTenants.json"` } //+kubebuilder:object:root=true diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index b472187..5c83721 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -302,7 +302,7 @@ func (in *PinotSchema) DeepCopyInto(out *PinotSchema) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotSchema. @@ -373,6 +373,7 @@ func (in *PinotSchemaSpec) DeepCopy() *PinotSchemaSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PinotSchemaStatus) DeepCopyInto(out *PinotSchemaStatus) { *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotSchemaStatus. @@ -450,7 +451,7 @@ func (in *PinotTable) DeepCopyInto(out *PinotTable) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTable. @@ -521,6 +522,7 @@ func (in *PinotTableSpec) DeepCopy() *PinotTableSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PinotTableStatus) DeepCopyInto(out *PinotTableStatus) { *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTableStatus. @@ -539,7 +541,7 @@ func (in *PinotTenant) DeepCopyInto(out *PinotTenant) { out.TypeMeta = in.TypeMeta in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) out.Spec = in.Spec - out.Status = in.Status + in.Status.DeepCopyInto(&out.Status) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTenant. @@ -610,6 +612,7 @@ func (in *PinotTenantSpec) DeepCopy() *PinotTenantSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PinotTenantStatus) DeepCopyInto(out *PinotTenantStatus) { *out = *in + in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTenantStatus. diff --git a/config/crd/bases/datainfra.io_pinotschemas.yaml b/config/crd/bases/datainfra.io_pinotschemas.yaml index 39a7375..cbee2b9 100644 --- a/config/crd/bases/datainfra.io_pinotschemas.yaml +++ b/config/crd/bases/datainfra.io_pinotschemas.yaml @@ -56,6 +56,7 @@ spec: currentSchemas.json: type: string lastUpdateTime: + format: date-time type: string message: type: string diff --git a/config/crd/bases/datainfra.io_pinottables.yaml b/config/crd/bases/datainfra.io_pinottables.yaml index 6aad82f..9d95936 100644 --- a/config/crd/bases/datainfra.io_pinottables.yaml +++ b/config/crd/bases/datainfra.io_pinottables.yaml @@ -51,6 +51,8 @@ spec: type: string pinotTableType: type: string + segmentReload: + type: boolean tables.json: type: string required: @@ -65,6 +67,7 @@ spec: currentTable.json: type: string lastUpdateTime: + format: date-time type: string message: type: string diff --git a/config/crd/bases/datainfra.io_pinottenants.yaml b/config/crd/bases/datainfra.io_pinottenants.yaml index 5379be1..901abb4 100644 --- a/config/crd/bases/datainfra.io_pinottenants.yaml +++ b/config/crd/bases/datainfra.io_pinottenants.yaml @@ -52,6 +52,7 @@ spec: currentTenants.json: type: string lastUpdateTime: + format: date-time type: string message: type: string diff --git a/internal/http/http.go b/internal/http/http.go index 4ebcb33..cba41a2 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -18,14 +18,13 @@ package http import ( "bytes" - "encoding/json" "io/ioutil" "net/http" ) // PinotHTTP interface type PinotHTTP interface { - Do() *Response + Do() (*Response, error) } // HTTP client @@ -61,34 +60,22 @@ type BasicAuth struct { Password string } -// Pinot API error Response -// ex: {"code":404,"error":"Schema not found"} -type PinotErrorResponse struct { - Code int `json:"code"` - Error string `json:"error"` -} - -// Pinot API success Response -// ex: {"unrecognizedProperties":{},"status":"airlineStats successfully added"} -type PinotSuccessResponse struct { - UnrecognizedProperties interface{} `json:"unrecognizedProperties"` - Status string `json:"status"` -} - // Response passed to controller type Response struct { - Err error - StatusCode int - PinotErrorResponse - PinotSuccessResponse + ResponseBody string + StatusCode int } -// Initiate HTTP call to pinot -func (c *Client) Do() *Response { +// GET /schemas returns 404 when schema not found with code and error as resp. +// GET /tenants returns 404 when tenant not found with code and error as resp +// GET /tables returns 200 when table not found with an empty response. + +// Do method to be used schema and tenant controller. +func (c *Client) Do() (*Response, error) { req, err := http.NewRequest(c.Method, c.URL, bytes.NewBuffer(c.Body)) if err != nil { - return &Response{Err: err} + return nil, err } if c.Auth.BasicAuth != (BasicAuth{}) { @@ -98,44 +85,15 @@ func (c *Client) Do() *Response { req.Header.Add("Content-Type", "application/json") resp, err := c.HTTPClient.Do(req) if err != nil { - return &Response{Err: err} + return nil, err } defer resp.Body.Close() responseBody, err := ioutil.ReadAll(resp.Body) if err != nil { - return &Response{Err: err} + return nil, err } - // GET /schemas returns 404 when schema not found with code and error as resp. - // GET /tenants returns 404 when tenant not found with code and error as resp - // GET /tables returns 200 when table not found with an empty response. - if string(responseBody) != "{}" { - if resp.StatusCode == 200 { - var pinotSuccess PinotSuccessResponse - if err := json.Unmarshal(responseBody, &pinotSuccess); err != nil { - return &Response{Err: err} - } - return &Response{StatusCode: resp.StatusCode, PinotSuccessResponse: pinotSuccess} - } else { - var pinotErr PinotErrorResponse - if err := json.Unmarshal(responseBody, &pinotErr); err != nil { - return &Response{StatusCode: resp.StatusCode, Err: err} - } - return &Response{StatusCode: resp.StatusCode, PinotErrorResponse: pinotErr} - } - } else { - if resp.StatusCode == 200 { - // resp is empty with 200 status code - // for tables API force 404 - return &Response{StatusCode: 404} - } else { - var pinotErr PinotErrorResponse - if err := json.Unmarshal(responseBody, &pinotErr); err != nil { - return &Response{StatusCode: resp.StatusCode, Err: err} - } - return &Response{StatusCode: resp.StatusCode, PinotErrorResponse: pinotErr} - } - } + return &Response{ResponseBody: string(responseBody), StatusCode: resp.StatusCode}, nil } diff --git a/internal/schema_controller/pinotschema_controller.go b/internal/schema_controller/pinotschema_controller.go index 541e913..ecbd1fc 100644 --- a/internal/schema_controller/pinotschema_controller.go +++ b/internal/schema_controller/pinotschema_controller.go @@ -26,7 +26,6 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" @@ -61,8 +60,6 @@ func NewPinotSchemaReconciler(mgr ctrl.Manager) *PinotSchemaReconciler { // +kubebuilder:rbac:groups="",resources=secret,verbs=get func (r *PinotSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - logr := log.FromContext(ctx) - pinotSchemaCR := &v1beta1.PinotSchema{} err := r.Get(context.TODO(), req.NamespacedName, pinotSchemaCR) if err != nil { @@ -73,7 +70,6 @@ func (r *PinotSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) } if err := r.do(ctx, pinotSchemaCR); err != nil { - logr.Error(err, err.Error()) return ctrl.Result{}, err } else { return ctrl.Result{RequeueAfter: r.ReconcileWait}, nil @@ -86,8 +82,8 @@ func (r *PinotSchemaReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&datainfraiov1beta1.PinotSchema{}). WithEventFilter(predicate.Or( GenericPredicates{}, + predicate.ResourceVersionChangedPredicate{}, predicate.GenerationChangedPredicate{}, - predicate.LabelChangedPredicate{}, )). Complete(r) } diff --git a/internal/schema_controller/reconciler.go b/internal/schema_controller/reconciler.go index bc3366c..c66e4a4 100644 --- a/internal/schema_controller/reconciler.go +++ b/internal/schema_controller/reconciler.go @@ -84,8 +84,8 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch // registering our finalizer. if !controllerutil.ContainsFinalizer(schema, PinotSchemaControllerFinalizer) { controllerutil.AddFinalizer(schema, PinotSchemaControllerFinalizer) - if err := r.Update(ctx, schema); err != nil { - return err + if err := r.Update(ctx, schema.DeepCopyObject().(*v1beta1.PinotSchema)); err != nil { + return nil } } } else { @@ -107,30 +107,30 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch []byte{}, internalHTTP.Auth{BasicAuth: basicAuth}, ) - respDeleteSchema := http.Do() - if respDeleteSchema.Err != nil { - return respDeleteSchema.Err + respDeleteSchema, err := http.Do() + if err != nil { + return err } if respDeleteSchema.StatusCode != 200 { build.Recorder.GenericEvent( schema, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respDeleteSchema.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s]", string(respDeleteSchema.ResponseBody)), PinotSchemaControllerDeleteFail, ) } else { build.Recorder.GenericEvent( schema, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respDeleteSchema.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respDeleteSchema.ResponseBody)), PinotSchemaControllerDeleteSuccess, ) } // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(schema, PinotSchemaControllerFinalizer) - if err := r.Update(ctx, schema); err != nil { - return err + if err := r.Update(ctx, schema.DeepCopyObject().(*v1beta1.PinotSchema)); err != nil { + return nil } } } @@ -138,6 +138,8 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch return nil } +// Get Schema if does not exist create +// if exists check for update func (r *PinotSchemaReconciler) CreateOrUpdate( schema *v1beta1.PinotSchema, svcName string, @@ -160,14 +162,14 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( auth, ) - respGetSchema := getHttp.Do() - if respGetSchema.Err != nil { - return controllerutil.OperationResultNone, respGetSchema.Err + respGetSchema, err := getHttp.Do() + if err != nil { + return controllerutil.OperationResultNone, err } // if not found create schema // else check for updates - if respGetSchema.PinotErrorResponse.Code == 404 { + if respGetSchema.StatusCode == 404 { // create schema postHttp := internalHTTP.NewHTTPClient( @@ -178,18 +180,18 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( auth, ) - respCreatechema := postHttp.Do() - if respCreatechema.Err != nil { - return controllerutil.OperationResultNone, respCreatechema.Err + respCreatechema, err := postHttp.Do() + if err != nil { + return controllerutil.OperationResultNone, err } if respCreatechema.StatusCode == 200 { result, err := r.makePatchPinotSchemaStatus( schema, PinotSchemaControllerCreateSuccess, - string(respCreatechema.PinotSuccessResponse.Status), + string(respCreatechema.ResponseBody), v1.ConditionTrue, - v1beta1.PinotSchemaCreateSuccess, + PinotSchemaControllerCreateSuccess, ) if err != nil { return controllerutil.OperationResultNone, err @@ -197,13 +199,13 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( build.Recorder.GenericEvent( schema, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respCreatechema.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respCreatechema.ResponseBody)), PinotSchemaControllerCreateSuccess, ) build.Recorder.GenericEvent( schema, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s], Result [%s]", string(respCreatechema.PinotSuccessResponse.Status), result), + fmt.Sprintf("Resp [%s], Result [%s]", string(respCreatechema.ResponseBody), result), PinotSchemaControllerPatchStatusSuccess) return controllerutil.OperationResultCreated, nil @@ -211,9 +213,9 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( _, err := r.makePatchPinotSchemaStatus( schema, PinotSchemaControllerCreateFail, - string(respCreatechema.PinotErrorResponse.Error), + string(respCreatechema.ResponseBody), v1.ConditionTrue, - v1beta1.PinotSchemaCreateFail, + PinotSchemaControllerCreateFail, ) if err != nil { return controllerutil.OperationResultNone, err @@ -221,7 +223,7 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( build.Recorder.GenericEvent( schema, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s], Status", string(respCreatechema.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s], Status", string(respCreatechema.ResponseBody)), PinotSchemaControllerCreateFail, ) return controllerutil.OperationResultCreated, nil @@ -229,6 +231,29 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( } } else if respGetSchema.StatusCode == 200 { + // at times of mis-match of state, where resource exists on pinot, but + // on creation status wasn't updated. + // get the current state ie schema and patch the status + if schema.Status.CurrentSchemasJson == "" { + build.Recorder.GenericEvent( + schema, + v1.EventTypeWarning, + fmt.Sprintf("Schema Exists on Pinot, but status is not updated"), + PinotSchemaControllerUpdateFail, + ) + + _, err := r.makePatchPinotSchemaStatus( + schema, + PinotSchemaControllerCreateSuccess, + string(respGetSchema.ResponseBody), + v1.ConditionTrue, + PinotSchemaControllerUpdateSuccess, + ) + if err != nil { + return controllerutil.OperationResultNone, err + } + } + ok, err := utils.IsEqualJson(schema.Status.CurrentSchemasJson, schema.Spec.PinotSchemaJson) if err != nil { return controllerutil.OperationResultNone, err @@ -236,6 +261,7 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( // if desiredstate and currentstate not the same then update if !ok { + postHttp := internalHTTP.NewHTTPClient( http.MethodPut, makeControllerGetUpdateDeleteSchemaPath(svcName, schemaName), @@ -243,18 +269,20 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( []byte(schema.Spec.PinotSchemaJson), auth, ) - respUpdateSchema := postHttp.Do() - if respUpdateSchema.Err != nil { + + respUpdateSchema, err := postHttp.Do() + if err != nil { return controllerutil.OperationResultNone, err } - if respUpdateSchema.PinotSuccessResponse != (internalHTTP.PinotSuccessResponse{}) { + + if respUpdateSchema.StatusCode == 200 { // patch status to store the current valid schema json result, err := r.makePatchPinotSchemaStatus( schema, PinotSchemaControllerUpdateSuccess, - string(respUpdateSchema.PinotSuccessResponse.Status), + string(respUpdateSchema.ResponseBody), v1.ConditionTrue, - v1beta1.PinotSchemaUpdateSuccess, + PinotSchemaControllerUpdateSuccess, ) if err != nil { return controllerutil.OperationResultNone, err @@ -262,24 +290,24 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( build.Recorder.GenericEvent( schema, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respUpdateSchema.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respUpdateSchema.ResponseBody)), PinotSchemaControllerUpdateSuccess, ) build.Recorder.GenericEvent( schema, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s], Result [%s]", string(respUpdateSchema.PinotSuccessResponse.Status), result), + fmt.Sprintf("Resp [%s], Result [%s]", string(respUpdateSchema.ResponseBody), result), PinotSchemaControllerPatchStatusSuccess) - return controllerutil.OperationResultUpdated, nil - } else if respUpdateSchema.PinotErrorResponse != (internalHTTP.PinotErrorResponse{}) { + return controllerutil.OperationResultUpdated, nil + } else { // patch status with failure and emit events _, err := r.makePatchPinotSchemaStatus( schema, PinotSchemaControllerUpdateFail, - string(respUpdateSchema.PinotErrorResponse.Error), + string(respGetSchema.ResponseBody), v1.ConditionTrue, - v1beta1.PinotSchemaUpdateFail, + PinotSchemaControllerUpdateFail, ) if err != nil { return controllerutil.OperationResultNone, err @@ -287,12 +315,14 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( build.Recorder.GenericEvent( schema, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s], StatusCode [%d]", string(respUpdateSchema.PinotErrorResponse.Error), respUpdateSchema.PinotErrorResponse.Code), + fmt.Sprintf("Resp [%s]", string(respGetSchema.ResponseBody)), PinotSchemaControllerUpdateFail, ) return controllerutil.OperationResultNone, err + } } + } return controllerutil.OperationResultNone, nil @@ -320,29 +350,20 @@ func (r *PinotSchemaReconciler) makePatchPinotSchemaStatus( msg string, reason string, status v1.ConditionStatus, - pinotSchemaConditionType v1beta1.PinotSchemaConditionType, + pinotSchemaConditionType string, ) (controllerutil.OperationResult, error) { - updatedPinotSchemaStatus := v1beta1.PinotSchemaStatus{} - - updatedPinotSchemaStatus.CurrentSchemasJson = schema.Spec.PinotSchemaJson - updatedPinotSchemaStatus.LastUpdateTime = time.Now().Format(metav1.RFC3339Micro) - updatedPinotSchemaStatus.Message = msg - updatedPinotSchemaStatus.Reason = reason - updatedPinotSchemaStatus.Status = status - updatedPinotSchemaStatus.Type = pinotSchemaConditionType - - patchBytes, err := json.Marshal(map[string]v1beta1.PinotSchemaStatus{"status": updatedPinotSchemaStatus}) - if err != nil { - return controllerutil.OperationResultNone, err - } - if err := r.Client.Status().Patch( - context.Background(), - schema, - client.RawPatch(types.MergePatchType, - patchBytes, - )); err != nil { + if _, _, err := utils.PatchStatus(context.Background(), r.Client, schema, func(obj client.Object) client.Object { + in := obj.(*v1beta1.PinotSchema) + in.Status.CurrentSchemasJson = schema.Spec.PinotSchemaJson + in.Status.LastUpdateTime = metav1.Time{Time: time.Now()} + in.Status.Message = msg + in.Status.Reason = reason + in.Status.Status = status + in.Status.Type = pinotSchemaConditionType + return in + }); err != nil { return controllerutil.OperationResultNone, err } @@ -367,9 +388,9 @@ func (r *PinotSchemaReconciler) getControllerSvcUrl(namespace, pinotClusterName svcName = svcList.Items[0].Name } - _ = "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort - return "http://localhost:9000", nil + return newName, nil } func (r *PinotSchemaReconciler) getAuthCreds(ctx context.Context, schema *v1beta1.PinotSchema) (internalHTTP.BasicAuth, error) { diff --git a/internal/table_controller/pinottable_controller.go b/internal/table_controller/pinottable_controller.go index e717296..e185dba 100644 --- a/internal/table_controller/pinottable_controller.go +++ b/internal/table_controller/pinottable_controller.go @@ -27,7 +27,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" - "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" datainfraiov1beta1 "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" @@ -83,11 +82,9 @@ func (r *PinotTableReconciler) Reconcile(ctx context.Context, req ctrl.Request) func (r *PinotTableReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For(&datainfraiov1beta1.PinotTable{}). - WithEventFilter(predicate.Or( + WithEventFilter( GenericPredicates{}, - predicate.GenerationChangedPredicate{}, - predicate.LabelChangedPredicate{}, - )). + ). Complete(r) } diff --git a/internal/table_controller/reconciler.go b/internal/table_controller/reconciler.go index f8e21c1..b147f74 100644 --- a/internal/table_controller/reconciler.go +++ b/internal/table_controller/reconciler.go @@ -1,18 +1,18 @@ -// /* -// DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// */ +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package tablecontroller @@ -77,6 +77,7 @@ func (r *PinotTableReconciler) do(ctx context.Context, table *v1beta1.PinotTable if err != nil { return err } + if table.ObjectMeta.DeletionTimestamp.IsZero() { // The object is not being deleted, so if it does not have our finalizer, // then lets add the finalizer and update the object. This is equivalent @@ -84,7 +85,7 @@ func (r *PinotTableReconciler) do(ctx context.Context, table *v1beta1.PinotTable if !controllerutil.ContainsFinalizer(table, PinotTableControllerFinalizer) { controllerutil.AddFinalizer(table, PinotTableControllerFinalizer) if err := r.Update(ctx, table); err != nil { - return err + return nil } } } else { @@ -104,22 +105,22 @@ func (r *PinotTableReconciler) do(ctx context.Context, table *v1beta1.PinotTable http.Client{}, []byte{}, internalHTTP.Auth{BasicAuth: basicAuth}, ) - respDeleteTable := http.Do() - if respDeleteTable.Err != nil { - return respDeleteTable.Err + respDeleteTable, err := http.Do() + if err != nil { + return err } if respDeleteTable.StatusCode != 200 { build.Recorder.GenericEvent( table, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respDeleteTable.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s]", string(respDeleteTable.ResponseBody)), PinotTableControllerDeleteFail, ) } else { build.Recorder.GenericEvent( table, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respDeleteTable.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respDeleteTable.ResponseBody)), PinotTableControllerDeleteSuccess, ) } @@ -127,13 +128,15 @@ func (r *PinotTableReconciler) do(ctx context.Context, table *v1beta1.PinotTable // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(table, PinotTableControllerFinalizer) if err := r.Update(ctx, table); err != nil { - return err + return nil } } } return nil } +// Get table if does not exist create +// if exists check for update func (r *PinotTableReconciler) CreateOrUpdate( table *v1beta1.PinotTable, svcName string, @@ -155,14 +158,13 @@ func (r *PinotTableReconciler) CreateOrUpdate( auth, ) - respGetTable := getHttp.Do() - if respGetTable.Err != nil { + respGetTable, err := getHttp.Do() + if err != nil { return controllerutil.OperationResultNone, err } - fmt.Println(respGetTable) - // if not found create table - if respGetTable.StatusCode == 404 { + // get - an empty response + if respGetTable.ResponseBody == "{}" { postHttp := internalHTTP.NewHTTPClient( http.MethodPost, @@ -171,16 +173,20 @@ func (r *PinotTableReconciler) CreateOrUpdate( []byte(table.Spec.PinotTablesJson), auth, ) - respCreateTable := postHttp.Do() - if respCreateTable.Err != nil { + // create table + respCreateTable, err := postHttp.Do() + if err != nil { return controllerutil.OperationResultNone, err } + // create success if respCreateTable.StatusCode == 200 { - result, err := r.makePatchPinotTableStatus( + + // patch resource + _, err := r.makePatchPinotTableStatus( table, PinotTableControllerCreateSuccess, - string(respCreateTable.PinotSuccessResponse.Status), + string(respCreateTable.ResponseBody), v1.ConditionTrue, PinotTableControllerCreateSuccess, ) @@ -190,7 +196,7 @@ func (r *PinotTableReconciler) CreateOrUpdate( build.Recorder.GenericEvent( table, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s], Result [%s]", string(respCreateTable.PinotSuccessResponse.Status), result), + fmt.Sprintf("Resp [%s]", string(respCreateTable.ResponseBody)), PinotTableControllerCreateSuccess, ) return controllerutil.OperationResultCreated, nil @@ -198,23 +204,45 @@ func (r *PinotTableReconciler) CreateOrUpdate( } else { _, err := r.makePatchPinotTableStatus( table, - PinotTableControllerCreateFail, - string(respCreateTable.PinotErrorResponse.Error), + PinotTableControllerCreateSuccess, + string(respCreateTable.ResponseBody), v1.ConditionTrue, - PinotTableControllerCreateFail, + PinotTableControllerCreateSuccess, ) if err != nil { return controllerutil.OperationResultNone, err } + build.Recorder.GenericEvent( table, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respCreateTable.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s]", string(respCreateTable.ResponseBody)), PinotTableControllerCreateFail, ) return controllerutil.OperationResultNone, nil } - } else if respGetTable.StatusCode == 200 { + } else if respGetTable.ResponseBody != "{}" { + + if table.Status.CurrentTableJson == "" { + build.Recorder.GenericEvent( + table, + v1.EventTypeWarning, + fmt.Sprintf("Table Exists on Pinot, but status is not updated"), + PinotTableControllerUpdateFail, + ) + + _, err := r.makePatchPinotTableStatus( + table, + PinotTableControllerCreateSuccess, + string(respGetTable.ResponseBody), + v1.ConditionTrue, + PinotTableControllerCreateSuccess, + ) + if err != nil { + return controllerutil.OperationResultNone, err + } + } + ok, err := utils.IsEqualJson( table.Status.CurrentTableJson, table.Spec.PinotTablesJson, @@ -231,16 +259,16 @@ func (r *PinotTableReconciler) CreateOrUpdate( []byte(table.Spec.PinotTablesJson), auth, ) - respUpdateTable := postHttp.Do() - if respUpdateTable.Err != nil { - return controllerutil.OperationResultNone, respUpdateTable.Err + respUpdateTable, err := postHttp.Do() + if err != nil { + return controllerutil.OperationResultNone, err } if respUpdateTable.StatusCode == 200 { _, err := r.makePatchPinotTableStatus( table, PinotTableControllerUpdateSuccess, - string(respUpdateTable.PinotSuccessResponse.Status), + string(respUpdateTable.ResponseBody), v1.ConditionTrue, PinotTableControllerUpdateSuccess, ) @@ -250,21 +278,22 @@ func (r *PinotTableReconciler) CreateOrUpdate( build.Recorder.GenericEvent( table, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respUpdateTable.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respUpdateTable.ResponseBody)), PinotTableControllerUpdateSuccess, ) build.Recorder.GenericEvent( table, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respUpdateTable.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respUpdateTable.ResponseBody)), PinotTableControllerPatchStatusSuccess) + return controllerutil.OperationResultUpdated, nil } else { // patch status with failure and emit events _, err := r.makePatchPinotTableStatus( table, PinotTableControllerUpdateFail, - string(respUpdateTable.PinotErrorResponse.Error), + string(respUpdateTable.ResponseBody), v1.ConditionTrue, PinotTableControllerUpdateFail, ) @@ -274,10 +303,10 @@ func (r *PinotTableReconciler) CreateOrUpdate( build.Recorder.GenericEvent( table, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respUpdateTable.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s]", string(respUpdateTable.ResponseBody)), PinotTableControllerUpdateFail, ) - return controllerutil.OperationResultNone, respUpdateTable.Err + return controllerutil.OperationResultNone, err } } } @@ -320,8 +349,8 @@ func (r *PinotTableReconciler) getControllerSvcUrl(namespace, pinotClusterName s svcName = svcList.Items[0].Name } - _ = "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort - return "http://localhost:9000", nil + newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + return newName, nil } func (r *PinotTableReconciler) makePatchPinotTableStatus( @@ -329,29 +358,20 @@ func (r *PinotTableReconciler) makePatchPinotTableStatus( msg string, reason string, status v1.ConditionStatus, - pinotTableConditionType v1beta1.PinotTableConditionType, + pinotTableConditionType string, ) (controllerutil.OperationResult, error) { - updatedPinotTableStatus := v1beta1.PinotTableStatus{} - - updatedPinotTableStatus.CurrentTableJson = table.Spec.PinotTablesJson - updatedPinotTableStatus.LastUpdateTime = time.Now().Format(metav1.RFC3339Micro) - updatedPinotTableStatus.Message = msg - updatedPinotTableStatus.Reason = reason - updatedPinotTableStatus.Status = status - updatedPinotTableStatus.Type = pinotTableConditionType - - patchBytes, err := json.Marshal(map[string]v1beta1.PinotTableStatus{"status": updatedPinotTableStatus}) - if err != nil { - return controllerutil.OperationResultNone, err - } - if err := r.Client.Status().Patch( - context.Background(), - table, - client.RawPatch(types.MergePatchType, - patchBytes, - )); err != nil { + if _, _, err := utils.PatchStatus(context.Background(), r.Client, table, func(obj client.Object) client.Object { + in := obj.(*v1beta1.PinotTable) + in.Status.CurrentTableJson = table.Spec.PinotTablesJson + in.Status.LastUpdateTime = metav1.Time{Time: time.Now()} + in.Status.Message = msg + in.Status.Reason = reason + in.Status.Status = status + in.Status.Type = pinotTableConditionType + return in + }); err != nil { return controllerutil.OperationResultNone, err } diff --git a/internal/table_controller/schema_watcher.go b/internal/table_controller/schema_watcher.go new file mode 100644 index 0000000..dd907fb --- /dev/null +++ b/internal/table_controller/schema_watcher.go @@ -0,0 +1,11 @@ +package tablecontroller + +// import ( +// schemacontroller "github.com/datainfrahq/pinot-control-plane-k8s/internal/schema_controller" +// "sigs.k8s.io/controller-runtime/pkg/reconcile" +// ) + +// func (r *schemacontroller.PinotSchemaReconciler) reloadSegments() []reconcile.Request { +// r. +// return []reconcile.Request{} +// } diff --git a/internal/tenant_controller/pinottenant_controller.go b/internal/tenant_controller/pinottenant_controller.go index 8a5c477..ee44bd5 100644 --- a/internal/tenant_controller/pinottenant_controller.go +++ b/internal/tenant_controller/pinottenant_controller.go @@ -1,18 +1,18 @@ -// /* -// DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// */ +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package tenantcontroller diff --git a/internal/tenant_controller/reconciler.go b/internal/tenant_controller/reconciler.go index 1663f9d..dac0301 100644 --- a/internal/tenant_controller/reconciler.go +++ b/internal/tenant_controller/reconciler.go @@ -1,18 +1,18 @@ -// /* -// DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at -// http://www.apache.org/licenses/LICENSE-2.0 + http://www.apache.org/licenses/LICENSE-2.0 -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. -// */ +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ package tenantcontroller import ( @@ -84,7 +84,7 @@ func (r *PinotTenantReconciler) do(ctx context.Context, tenant *v1beta1.PinotTen if !controllerutil.ContainsFinalizer(tenant, PinotTenantControllerFinalizer) { controllerutil.AddFinalizer(tenant, PinotTenantControllerFinalizer) if err := r.Update(ctx, tenant); err != nil { - return err + return nil } } } else { @@ -108,22 +108,22 @@ func (r *PinotTenantReconciler) do(ctx context.Context, tenant *v1beta1.PinotTen []byte{}, internalHTTP.Auth{BasicAuth: basicAuth}, ) - respDeleteTenant := http.Do() - if respDeleteTenant.Err != nil { - return respDeleteTenant.Err + respDeleteTenant, err := http.Do() + if err != nil { + return err } if respDeleteTenant.StatusCode != 200 { build.Recorder.GenericEvent( tenant, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respDeleteTenant.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s]", string(respDeleteTenant.ResponseBody)), PinotTenantControllerDeleteFail, ) } else { build.Recorder.GenericEvent( tenant, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respDeleteTenant.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respDeleteTenant.ResponseBody)), PinotTenantControllerDeleteSuccess, ) } @@ -131,7 +131,7 @@ func (r *PinotTenantReconciler) do(ctx context.Context, tenant *v1beta1.PinotTen // remove our finalizer from the list and update it. controllerutil.RemoveFinalizer(tenant, PinotTenantControllerFinalizer) if err := r.Update(ctx, tenant); err != nil { - return err + return nil } } } @@ -214,9 +214,9 @@ func (r *PinotTenantReconciler) CreateOrUpdate( []byte{}, auth, ) - respGetTenant := getHttp.Do() - if respGetTenant.Err != nil { - return controllerutil.OperationResultNone, respGetTenant.Err + respGetTenant, err := getHttp.Do() + if err != nil { + return controllerutil.OperationResultNone, err } // if not found create tenant @@ -229,25 +229,25 @@ func (r *PinotTenantReconciler) CreateOrUpdate( []byte(tenant.Spec.PinotTenantsJson), auth, ) - respCreateTenant := postHttp.Do() - if respCreateTenant.Err != nil { - return controllerutil.OperationResultNone, respCreateTenant.Err + respCreateTenant, err := postHttp.Do() + if err != nil { + return controllerutil.OperationResultNone, err } if respCreateTenant.StatusCode == 200 { _, err := r.makePatchPinotTenantStatus( tenant, PinotTenantControllerCreateSuccess, - string(respCreateTenant.PinotSuccessResponse.Status), + string(respCreateTenant.ResponseBody), v1.ConditionTrue, PinotTenantControllerCreateSuccess, ) if err != nil { - return controllerutil.OperationResultNone, respCreateTenant.Err + return controllerutil.OperationResultNone, err } build.Recorder.GenericEvent( tenant, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respCreateTenant.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respCreateTenant.ResponseBody)), PinotTenantControllerCreateSuccess, ) return controllerutil.OperationResultCreated, nil @@ -255,7 +255,7 @@ func (r *PinotTenantReconciler) CreateOrUpdate( _, err := r.makePatchPinotTenantStatus( tenant, PinotTenantControllerCreateFail, - string(respCreateTenant.PinotErrorResponse.Error), + string(respCreateTenant.ResponseBody), v1.ConditionTrue, PinotTenantControllerCreateFail, ) @@ -264,7 +264,7 @@ func (r *PinotTenantReconciler) CreateOrUpdate( } build.Recorder.GenericEvent( tenant, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respCreateTenant.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s]", string(respCreateTenant.ResponseBody)), PinotTenantControllerCreateFail, ) return controllerutil.OperationResultNone, nil @@ -284,22 +284,22 @@ func (r *PinotTenantReconciler) CreateOrUpdate( []byte(tenant.Spec.PinotTenantsJson), auth, ) - respUpdateTenant := postHttp.Do() - if respUpdateTenant.Err != nil { - return controllerutil.OperationResultNone, respUpdateTenant.Err + respUpdateTenant, err := postHttp.Do() + if err != nil { + return controllerutil.OperationResultNone, err } if respUpdateTenant.StatusCode == 200 { build.Recorder.GenericEvent( tenant, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respUpdateTenant.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respUpdateTenant.ResponseBody)), PinotTenantControllerUpdateSuccess, ) _, err := r.makePatchPinotTenantStatus( tenant, PinotTenantControllerUpdateSuccess, - string(respUpdateTenant.PinotSuccessResponse.Status), + string(respUpdateTenant.ResponseBody), v1.ConditionTrue, PinotTenantControllerUpdateSuccess, ) @@ -309,7 +309,7 @@ func (r *PinotTenantReconciler) CreateOrUpdate( build.Recorder.GenericEvent( tenant, v1.EventTypeNormal, - fmt.Sprintf("Resp [%s]", string(respUpdateTenant.PinotSuccessResponse.Status)), + fmt.Sprintf("Resp [%s]", string(respUpdateTenant.ResponseBody)), PinotTenantControllerPatchStatusSuccess, ) return controllerutil.OperationResultUpdated, nil @@ -318,7 +318,7 @@ func (r *PinotTenantReconciler) CreateOrUpdate( build.Recorder.GenericEvent( tenant, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respUpdateTenant.PinotErrorResponse.Error)), + fmt.Sprintf("Resp [%s]", string(respUpdateTenant.ResponseBody)), PinotTenantControllerUpdateFail, ) return controllerutil.OperationResultNone, err @@ -328,36 +328,25 @@ func (r *PinotTenantReconciler) CreateOrUpdate( } return controllerutil.OperationResultNone, nil } - func (r *PinotTenantReconciler) makePatchPinotTenantStatus( tenant *v1beta1.PinotTenant, msg string, reason string, status v1.ConditionStatus, - pinotTenantConditionType v1beta1.PinotTenantConditionType, + pinotTenantConditionType string, ) (controllerutil.OperationResult, error) { - updatedPinotTenantStatus := v1beta1.PinotTenantStatus{} - - updatedPinotTenantStatus.CurrentTenantsJson = tenant.Spec.PinotTenantsJson - updatedPinotTenantStatus.LastUpdateTime = time.Now().Format(metav1.RFC3339Micro) - updatedPinotTenantStatus.Message = msg - updatedPinotTenantStatus.Reason = reason - updatedPinotTenantStatus.Status = status - updatedPinotTenantStatus.Type = pinotTenantConditionType - - patchBytes, err := json.Marshal(map[string]v1beta1.PinotTenantStatus{ - "status": updatedPinotTenantStatus}) - if err != nil { - return controllerutil.OperationResultNone, err - } - if err := r.Client.Status().Patch( - context.TODO(), - tenant, - client.RawPatch(types.MergePatchType, - patchBytes, - )); err != nil { + if _, _, err := utils.PatchStatus(context.Background(), r.Client, tenant, func(obj client.Object) client.Object { + in := obj.(*v1beta1.PinotTenant) + in.Status.CurrentTenantsJson = tenant.Spec.PinotTenantsJson + in.Status.LastUpdateTime = metav1.Time{Time: time.Now()} + in.Status.Message = msg + in.Status.Reason = reason + in.Status.Status = status + in.Status.Type = pinotTenantConditionType + return in + }); err != nil { return controllerutil.OperationResultNone, err } diff --git a/internal/utils/patch.go b/internal/utils/patch.go new file mode 100644 index 0000000..bfde8a7 --- /dev/null +++ b/internal/utils/patch.go @@ -0,0 +1,43 @@ +package utils + +import ( + "context" + + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type VerbType string + +type ( + TransformStatusFunc func(obj client.Object) client.Object +) + +const ( + VerbPatched VerbType = "Patched" + VerbUnchanged VerbType = "Unchanged" +) + +func PatchStatus(ctx context.Context, c client.Client, obj client.Object, transform TransformStatusFunc, opts ...client.SubResourcePatchOption) (client.Object, VerbType, error) { + key := types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + } + err := c.Get(ctx, key, obj) + if err != nil { + return nil, VerbUnchanged, err + } + + // The body of the request was in an unknown format - + // accepted media types include: + // - application/json-patch+json, + // - application/merge-patch+json, + // - application/apply-patch+yaml + patch := client.MergeFrom(obj) + obj = transform(obj.DeepCopyObject().(client.Object)) + err = c.Status().Patch(ctx, obj, patch, opts...) + if err != nil { + return nil, VerbUnchanged, err + } + return obj, VerbPatched, nil +} diff --git a/pinot-control-plane-secret b/pinot-control-plane-secret new file mode 100644 index 0000000..e034f25 --- /dev/null +++ b/pinot-control-plane-secret @@ -0,0 +1,2 @@ +CONTROL_PLANE_USERNAME=controlplane +CONTROL_PLANE_PASSWORD=controlplane