From 78a487ba1c2e6d03e0455c83aa143de8cb7b3af0 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Thu, 4 May 2023 07:15:07 +0530 Subject: [PATCH 1/2] segment reload --- README.md | 6 + api/v1beta1/pinottable_types.go | 1 + api/v1beta1/zz_generated.deepcopy.go | 5 + .../crd/bases/datainfra.io_pinottables.yaml | 5 + examples/00-pinot-basic/pinottable-basic.yaml | 1 + .../crds/datainfra.io_pinottables.yaml | 5 + internal/schema_controller/reconciler.go | 26 ++-- internal/table_controller/paths.go | 28 ++++ .../table_controller/pinottable_controller.go | 126 ++++++++++++++++++ internal/table_controller/reconciler.go | 29 +--- .../pinottenant_controller.go | 1 + internal/tenant_controller/reconciler.go | 27 +--- internal/utils/constants.go | 25 ++++ internal/utils/patch.go | 16 +++ internal/utils/utils.go | 13 +- 15 files changed, 249 insertions(+), 65 deletions(-) create mode 100644 internal/table_controller/paths.go create mode 100644 internal/utils/constants.go diff --git a/README.md b/README.md index fd55910..f1fe5e8 100644 --- a/README.md +++ b/README.md @@ -30,6 +30,12 @@ Based on Kubernetes operators, this control plane for apache pinot is responsibl - Schema Management - Tenant Management (experimental) +## Getting Started With Helm +``` +helm repo add datainfra https://charts.datainfra.io +helm upgrade --install pinot-control-plane datainfra/pinot-control-plane +``` + ## :books: Documentation - [Getting Started With Heterogeneous Pinot Clusters](./examples/01-pinot-hetero/) diff --git a/api/v1beta1/pinottable_types.go b/api/v1beta1/pinottable_types.go index 163b8a3..ff75a33 100644 --- a/api/v1beta1/pinottable_types.go +++ b/api/v1beta1/pinottable_types.go @@ -51,6 +51,7 @@ type PinotTableStatus struct { Message string `json:"message,omitempty"` LastUpdateTime metav1.Time `json:"lastUpdateTime,omitempty"` CurrentTableJson string `json:"currentTable.json"` + ReloadStatus []string `json:"reloadStatus"` } // +kubebuilder:object:root=true diff --git a/api/v1beta1/zz_generated.deepcopy.go b/api/v1beta1/zz_generated.deepcopy.go index 5c83721..009f45b 100644 --- a/api/v1beta1/zz_generated.deepcopy.go +++ b/api/v1beta1/zz_generated.deepcopy.go @@ -523,6 +523,11 @@ func (in *PinotTableSpec) DeepCopy() *PinotTableSpec { func (in *PinotTableStatus) DeepCopyInto(out *PinotTableStatus) { *out = *in in.LastUpdateTime.DeepCopyInto(&out.LastUpdateTime) + if in.ReloadStatus != nil { + in, out := &in.ReloadStatus, &out.ReloadStatus + *out = make([]string, len(*in)) + copy(*out, *in) + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PinotTableStatus. diff --git a/config/crd/bases/datainfra.io_pinottables.yaml b/config/crd/bases/datainfra.io_pinottables.yaml index 9d95936..f5c53bb 100644 --- a/config/crd/bases/datainfra.io_pinottables.yaml +++ b/config/crd/bases/datainfra.io_pinottables.yaml @@ -73,12 +73,17 @@ spec: type: string reason: type: string + reloadStatus: + items: + type: string + type: array status: type: string type: type: string required: - currentTable.json + - reloadStatus type: object type: object served: true diff --git a/examples/00-pinot-basic/pinottable-basic.yaml b/examples/00-pinot-basic/pinottable-basic.yaml index 0eb0f95..79b9708 100644 --- a/examples/00-pinot-basic/pinottable-basic.yaml +++ b/examples/00-pinot-basic/pinottable-basic.yaml @@ -6,6 +6,7 @@ spec: pinotCluster: pinot-basic pinotSchema: airlinestats pinotTableType: REALTIME + segmentReload: true tables.json: |- { "tableName": "airlineStats", diff --git a/helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml b/helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml index 9d95936..f5c53bb 100644 --- a/helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml +++ b/helm/pinot-control-plane/templates/crds/datainfra.io_pinottables.yaml @@ -73,12 +73,17 @@ spec: type: string reason: type: string + reloadStatus: + items: + type: string + type: array status: type: string type: type: string required: - currentTable.json + - reloadStatus type: object type: object served: true diff --git a/internal/schema_controller/reconciler.go b/internal/schema_controller/reconciler.go index c66e4a4..29c4000 100644 --- a/internal/schema_controller/reconciler.go +++ b/internal/schema_controller/reconciler.go @@ -17,7 +17,6 @@ package schemacontroller import ( "context" - "encoding/json" "fmt" "net/http" "time" @@ -57,6 +56,10 @@ const ( PinotControllerPort = "9000" ) +const ( + schemaName = "schemaName" +) + func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSchema) error { build := builder.NewBuilder( @@ -96,7 +99,7 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch return err } - schemaName, err := getSchemaName(schema.Spec.PinotSchemaJson) + schemaName, err := utils.GetValueFromJson(schema.Spec.PinotSchemaJson, schemaName) if err != nil { return err } @@ -148,7 +151,7 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( ) (controllerutil.OperationResult, error) { // get schema name - schemaName, err := getSchemaName(schema.Spec.PinotSchemaJson) + schemaName, err := utils.GetValueFromJson(schema.Spec.PinotSchemaJson, schemaName) if err != nil { return controllerutil.OperationResultNone, err } @@ -315,7 +318,7 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( build.Recorder.GenericEvent( schema, v1.EventTypeWarning, - fmt.Sprintf("Resp [%s]", string(respGetSchema.ResponseBody)), + fmt.Sprintf("Resp [%s]", string(respUpdateSchema.ResponseBody)), PinotSchemaControllerUpdateFail, ) return controllerutil.OperationResultNone, err @@ -328,17 +331,6 @@ func (r *PinotSchemaReconciler) CreateOrUpdate( return controllerutil.OperationResultNone, nil } -func getSchemaName(schemaJson string) (string, error) { - var err error - - schema := make(map[string]json.RawMessage) - if err = json.Unmarshal([]byte(schemaJson), &schema); err != nil { - return "", err - } - - return utils.TrimQuote(string(schema["schemaName"])), nil -} - func makeControllerCreateSchemaPath(svcName string) string { return svcName + "/schemas" } func makeControllerGetUpdateDeleteSchemaPath(svcName, schemaName string) string { @@ -388,9 +380,9 @@ func (r *PinotSchemaReconciler) getControllerSvcUrl(namespace, pinotClusterName svcName = svcList.Items[0].Name } - newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + _ = "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort - return newName, nil + return "http://localhost:9000", nil } func (r *PinotSchemaReconciler) getAuthCreds(ctx context.Context, schema *v1beta1.PinotSchema) (internalHTTP.BasicAuth, error) { diff --git a/internal/table_controller/paths.go b/internal/table_controller/paths.go new file mode 100644 index 0000000..1392731 --- /dev/null +++ b/internal/table_controller/paths.go @@ -0,0 +1,28 @@ +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package tablecontroller + +func makeControllerCreateTablePath(svcName string) string { + return svcName + "/tables" +} + +func makeControllerGetUpdateDeleteTablePath(svcName, tableName string) string { + return svcName + "/tables/" + tableName +} + +func makeControllerReloadTable(svcName, tableName string) string { + return svcName + "/segments/" + tableName + "/reload" +} diff --git a/internal/table_controller/pinottable_controller.go b/internal/table_controller/pinottable_controller.go index e185dba..dc1b485 100644 --- a/internal/table_controller/pinottable_controller.go +++ b/internal/table_controller/pinottable_controller.go @@ -18,18 +18,30 @@ package tablecontroller import ( "context" + "net/http" "os" "time" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/source" "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" datainfraiov1beta1 "github.com/datainfrahq/pinot-control-plane-k8s/api/v1beta1" + internalHTTP "github.com/datainfrahq/pinot-control-plane-k8s/internal/http" + schemacontroller "github.com/datainfrahq/pinot-control-plane-k8s/internal/schema_controller" + + "github.com/datainfrahq/pinot-control-plane-k8s/internal/utils" "github.com/go-logr/logr" ) @@ -80,8 +92,122 @@ func (r *PinotTableReconciler) Reconcile(ctx context.Context, req ctrl.Request) // SetupWithManager sets up the controller with the Manager. func (r *PinotTableReconciler) SetupWithManager(mgr ctrl.Manager) error { + + p := predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + + if !e.ObjectNew.GetDeletionTimestamp().IsZero() { + return false + } + r.Log.Info("Update Event Recieved, Wait for couple of seconds for schema controller to update status - table controller") + + time.Sleep(time.Second * time.Duration(2)) + + schema := v1beta1.PinotSchema{} + + if err := r.Client.Get(context.TODO(), types.NamespacedName{ + Namespace: e.ObjectNew.GetNamespace(), + Name: e.ObjectNew.GetName(), + }, &schema); err != nil { + r.Log.Error(err, "Error getting schema - table controller") + return false + } + + if time.Now().After(schema.Status.LastUpdateTime.Time) { + listOpts := []client.ListOption{ + client.InNamespace(e.ObjectNew.GetNamespace()), + } + + tableList := v1beta1.PinotTableList{} + if err := r.Client.List(context.TODO(), &tableList, listOpts...); err != nil { + return false + } + + for _, table := range tableList.Items { + if table.Spec.SegmentReload { + if schema.Status.Message == schemacontroller.PinotSchemaControllerUpdateSuccess { + svcName, err := r.getControllerSvcUrl(table.Namespace, table.Spec.PinotCluster) + if err != nil { + r.Log.Error(err, "Error getting serviceName - table controller") + return false + } + + segmentsConfig, err := utils.GetValueFromJson(table.Spec.PinotTablesJson, utils.SegmentsConfig) + if err != nil { + r.Log.Error(err, "Error getting schemaName - table controller") + return false + } + schemaNameinTable, err := utils.GetValueFromJson(segmentsConfig, utils.SchemaName) + if err != nil { + r.Log.Error(err, "Error getting schemaName in table - table controller") + return false + } + + schemaNameinEvent, err := utils.GetValueFromJson(schema.Spec.PinotSchemaJson, utils.SchemaName) + if err != nil { + r.Log.Error(err, "Error getting schemaName in Update Event - table controller") + return false + } + + getTableName, err := utils.GetValueFromJson(table.Spec.PinotTablesJson, utils.TableName) + if err != nil { + r.Log.Error(err, "Error getting tableName in Update Event - table controller") + return false + } + + basicAuth, err := r.getAuthCreds(context.TODO(), &table) + if err != nil { + r.Log.Error(err, "Error getting authCreds in Update Event - table controller") + return false + } + + if schemaNameinTable == schemaNameinEvent { + postHttp := internalHTTP.NewHTTPClient( + http.MethodPost, + makeControllerReloadTable(svcName, getTableName), + http.Client{}, + []byte{}, + internalHTTP.Auth{BasicAuth: basicAuth}, + ) + + reloadHttp, err := postHttp.Do() + if err != nil { + r.Log.Error(err, "Error getting reloading segments in Update Event - table controller") + return false + } + r.Recorder.Event(&table, v1.EventTypeNormal, reloadHttp.ResponseBody, PinotTableReloadAllSegments) + + if _, _, err := utils.PatchStatus(context.Background(), r.Client, &table, func(obj client.Object) client.Object { + in := obj.(*v1beta1.PinotTable) + in.Status.ReloadStatus = append(in.Status.ReloadStatus, reloadHttp.ResponseBody) + return in + }); err != nil { + r.Log.Error(err, "Error patching reloading segments in Update Event - table controller") + return false + } + + } + + } + } else { + r.Log.Info("No suitable condition found for reload segments - update controller") + return false + } + + } + } + + return false + }, + } + return ctrl.NewControllerManagedBy(mgr). For(&datainfraiov1beta1.PinotTable{}). + Watches( + &source.Kind{Type: &v1beta1.PinotSchema{}}, + &handler.EnqueueRequestForObject{}, + builder.WithPredicates(p), + ). WithEventFilter( GenericPredicates{}, ). diff --git a/internal/table_controller/reconciler.go b/internal/table_controller/reconciler.go index b147f74..e74314c 100644 --- a/internal/table_controller/reconciler.go +++ b/internal/table_controller/reconciler.go @@ -18,7 +18,6 @@ package tablecontroller import ( "context" - "encoding/json" "fmt" "net/http" "time" @@ -46,6 +45,7 @@ const ( PinotTableControllerUpdateFail = "PinotTableControllerUpdateFail" PinotTableControllerDeleteSuccess = "PinotTableControllerDeleteSuccess" PinotTableControllerDeleteFail = "PinotTableControllerDeleteFail" + PinotTableReloadAllSegments = "PinotTableReloadAllSegments" PinotTableControllerFinalizer = "pinottable.datainfra.io/finalizer" ) @@ -73,6 +73,7 @@ func (r *PinotTableReconciler) do(ctx context.Context, table *v1beta1.PinotTable if err != nil { return err } + _, err = r.CreateOrUpdate(table, svcName, *build, internalHTTP.Auth{BasicAuth: basicAuth}) if err != nil { return err @@ -95,7 +96,7 @@ func (r *PinotTableReconciler) do(ctx context.Context, table *v1beta1.PinotTable return err } - tenantName, err := getTableName(table.Spec.PinotTablesJson) + tenantName, err := utils.GetValueFromJson(table.Spec.PinotTablesJson, utils.TableName) if err != nil { return err } @@ -145,7 +146,7 @@ func (r *PinotTableReconciler) CreateOrUpdate( ) (controllerutil.OperationResult, error) { // get table name - tableName, err := getTableName(table.Spec.PinotTablesJson) + tableName, err := utils.GetValueFromJson(table.Spec.PinotTablesJson, utils.TableName) if err != nil { return controllerutil.OperationResultNone, err } @@ -314,23 +315,6 @@ func (r *PinotTableReconciler) CreateOrUpdate( return controllerutil.OperationResultNone, nil } -func getTableName(tablesJson string) (string, error) { - var err error - table := make(map[string]json.RawMessage) - - if err = json.Unmarshal([]byte(tablesJson), &table); err != nil { - return "", err - } - - return utils.TrimQuote(string(table["tableName"])), nil -} - -func makeControllerCreateTablePath(svcName string) string { return svcName + "/tables" } - -func makeControllerGetUpdateDeleteTablePath(svcName, tableName string) string { - return svcName + "/tables/" + tableName -} - func (r *PinotTableReconciler) getControllerSvcUrl(namespace, pinotClusterName string) (string, error) { listOpts := []client.ListOption{ client.InNamespace(namespace), @@ -349,8 +333,8 @@ func (r *PinotTableReconciler) getControllerSvcUrl(namespace, pinotClusterName s svcName = svcList.Items[0].Name } - newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort - return newName, nil + _ = "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + return "http://localhost:9000", nil } func (r *PinotTableReconciler) makePatchPinotTableStatus( @@ -370,6 +354,7 @@ func (r *PinotTableReconciler) makePatchPinotTableStatus( in.Status.Reason = reason in.Status.Status = status in.Status.Type = pinotTableConditionType + in.Status.ReloadStatus = []string{} return in }); err != nil { return controllerutil.OperationResultNone, err diff --git a/internal/tenant_controller/pinottenant_controller.go b/internal/tenant_controller/pinottenant_controller.go index ee44bd5..76579f6 100644 --- a/internal/tenant_controller/pinottenant_controller.go +++ b/internal/tenant_controller/pinottenant_controller.go @@ -58,6 +58,7 @@ func NewPinotTenantReconciler(mgr ctrl.Manager) *PinotTenantReconciler { // +kubebuilder:rbac:groups=datainfra.io,resources=pinottenants,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=datainfra.io,resources=pinottenants/status,verbs=get;update;patch // +kubebuilder:rbac:groups=datainfra.io,resources=pinottenants/finalizers,verbs=update +// +kubebuilder:rbac:groups=datainfra.io,resources=pinotschemas/finalizers,verbs=get;list;watch // +kubebuilder:rbac:groups="",resources=secret,verbs=get func (r *PinotTenantReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { logr := log.FromContext(ctx) diff --git a/internal/tenant_controller/reconciler.go b/internal/tenant_controller/reconciler.go index dac0301..4344df9 100644 --- a/internal/tenant_controller/reconciler.go +++ b/internal/tenant_controller/reconciler.go @@ -17,7 +17,6 @@ package tenantcontroller import ( "context" - "encoding/json" "fmt" "net/http" @@ -96,7 +95,7 @@ func (r *PinotTenantReconciler) do(ctx context.Context, tenant *v1beta1.PinotTen return err } - tenantName, err := getTenantName(tenant.Spec.PinotTenantsJson) + tenantName, err := utils.GetValueFromJson(tenant.Spec.PinotTenantsJson, utils.TenantName) if err != nil { return err } @@ -138,28 +137,6 @@ func (r *PinotTenantReconciler) do(ctx context.Context, tenant *v1beta1.PinotTen return nil } -func getTenantName(tenantsJson string) (string, error) { - var err error - - schema := make(map[string]json.RawMessage) - if err = json.Unmarshal([]byte(tenantsJson), &schema); err != nil { - return "", err - } - - return utils.TrimQuote(string(schema["tenantName"])), nil -} - -func getRespCode(resp []byte) string { - var err error - - respMap := make(map[string]json.RawMessage) - if err = json.Unmarshal(resp, &respMap); err != nil { - return "" - } - - return utils.TrimQuote(string(respMap["code"])) -} - func makeControllerCreateUpdateTenantPath(svcName string) string { return svcName + "/tenants" } func makeControllerGetTenantPath(svcName, tenantName string) string { @@ -201,7 +178,7 @@ func (r *PinotTenantReconciler) CreateOrUpdate( ) (controllerutil.OperationResult, error) { // get tenant name - tenantName, err := getTenantName(tenant.Spec.PinotTenantsJson) + tenantName, err := utils.GetValueFromJson(tenant.Spec.PinotTenantsJson, utils.TenantName) if err != nil { return controllerutil.OperationResultNone, err } diff --git a/internal/utils/constants.go b/internal/utils/constants.go new file mode 100644 index 0000000..d0610cc --- /dev/null +++ b/internal/utils/constants.go @@ -0,0 +1,25 @@ +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +// common constants shared across schema, table and tenant controller +const ( + TableName = "tableName" + SegmentsConfig = "segmentsConfig" + SchemaName = "schemaName" + TenantName = "tenantName" +) diff --git a/internal/utils/patch.go b/internal/utils/patch.go index bfde8a7..bf08eb6 100644 --- a/internal/utils/patch.go +++ b/internal/utils/patch.go @@ -1,3 +1,19 @@ +/* +DataInfra Pinot Control Plane (C) 2023 - 2024 DataInfra. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + package utils import ( diff --git a/internal/utils/utils.go b/internal/utils/utils.go index 415b4c2..0f43b95 100644 --- a/internal/utils/utils.go +++ b/internal/utils/utils.go @@ -22,7 +22,7 @@ import ( "reflect" ) -func TrimQuote(s string) string { +func trimQuote(s string) string { if len(s) > 0 && s[0] == '"' { s = s[1:] } @@ -48,3 +48,14 @@ func IsEqualJson(s1, s2 string) (bool, error) { return reflect.DeepEqual(o1, o2), nil } + +func GetValueFromJson(jsonObject, key string) (string, error) { + var err error + + mapJsonObject := make(map[string]json.RawMessage) + if err = json.Unmarshal([]byte(jsonObject), &mapJsonObject); err != nil { + return "", err + } + + return trimQuote(string(mapJsonObject[key])), nil +} From 2243e8406842d9cc968d8f3554caaa616c26afd1 Mon Sep 17 00:00:00 2001 From: AdheipSingh Date: Thu, 4 May 2023 17:05:51 +0530 Subject: [PATCH 2/2] release v0.0.9 --- helm/pinot-control-plane/Chart.yaml | 4 ++-- internal/schema_controller/pinotschema_controller.go | 2 +- internal/schema_controller/reconciler.go | 4 ++-- internal/table_controller/pinottable_controller.go | 2 +- internal/table_controller/reconciler.go | 4 ++-- internal/table_controller/schema_watcher.go | 11 ----------- internal/tenant_controller/pinottenant_controller.go | 2 +- 7 files changed, 9 insertions(+), 20 deletions(-) delete mode 100644 internal/table_controller/schema_watcher.go diff --git a/helm/pinot-control-plane/Chart.yaml b/helm/pinot-control-plane/Chart.yaml index 015f592..f9cb4fd 100644 --- a/helm/pinot-control-plane/Chart.yaml +++ b/helm/pinot-control-plane/Chart.yaml @@ -2,5 +2,5 @@ apiVersion: v2 name: pinot-control-plane description: A Helm chart for Kubernetes type: application -version: 0.0.8 -appVersion: "v0.0.8" +version: 0.0.9 +appVersion: "v0.0.9" diff --git a/internal/schema_controller/pinotschema_controller.go b/internal/schema_controller/pinotschema_controller.go index ecbd1fc..576a90b 100644 --- a/internal/schema_controller/pinotschema_controller.go +++ b/internal/schema_controller/pinotschema_controller.go @@ -44,7 +44,7 @@ type PinotSchemaReconciler struct { } func NewPinotSchemaReconciler(mgr ctrl.Manager) *PinotSchemaReconciler { - initLogger := ctrl.Log.WithName("controllers").WithName("pinot") + initLogger := ctrl.Log.WithName("controllers").WithName("pinot-schema") return &PinotSchemaReconciler{ Client: mgr.GetClient(), Log: initLogger, diff --git a/internal/schema_controller/reconciler.go b/internal/schema_controller/reconciler.go index 29c4000..07241f3 100644 --- a/internal/schema_controller/reconciler.go +++ b/internal/schema_controller/reconciler.go @@ -380,9 +380,9 @@ func (r *PinotSchemaReconciler) getControllerSvcUrl(namespace, pinotClusterName svcName = svcList.Items[0].Name } - _ = "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort - return "http://localhost:9000", nil + return newName, nil } func (r *PinotSchemaReconciler) getAuthCreds(ctx context.Context, schema *v1beta1.PinotSchema) (internalHTTP.BasicAuth, error) { diff --git a/internal/table_controller/pinottable_controller.go b/internal/table_controller/pinottable_controller.go index dc1b485..7476535 100644 --- a/internal/table_controller/pinottable_controller.go +++ b/internal/table_controller/pinottable_controller.go @@ -56,7 +56,7 @@ type PinotTableReconciler struct { } func NewPinotTableReconciler(mgr ctrl.Manager) *PinotTableReconciler { - initLogger := ctrl.Log.WithName("controllers").WithName("pinot") + initLogger := ctrl.Log.WithName("controllers").WithName("pinot-table") return &PinotTableReconciler{ Client: mgr.GetClient(), Log: initLogger, diff --git a/internal/table_controller/reconciler.go b/internal/table_controller/reconciler.go index e74314c..0885ef9 100644 --- a/internal/table_controller/reconciler.go +++ b/internal/table_controller/reconciler.go @@ -333,8 +333,8 @@ func (r *PinotTableReconciler) getControllerSvcUrl(namespace, pinotClusterName s svcName = svcList.Items[0].Name } - _ = "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort - return "http://localhost:9000", nil + newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort + return newName, nil } func (r *PinotTableReconciler) makePatchPinotTableStatus( diff --git a/internal/table_controller/schema_watcher.go b/internal/table_controller/schema_watcher.go deleted file mode 100644 index dd907fb..0000000 --- a/internal/table_controller/schema_watcher.go +++ /dev/null @@ -1,11 +0,0 @@ -package tablecontroller - -// import ( -// schemacontroller "github.com/datainfrahq/pinot-control-plane-k8s/internal/schema_controller" -// "sigs.k8s.io/controller-runtime/pkg/reconcile" -// ) - -// func (r *schemacontroller.PinotSchemaReconciler) reloadSegments() []reconcile.Request { -// r. -// return []reconcile.Request{} -// } diff --git a/internal/tenant_controller/pinottenant_controller.go b/internal/tenant_controller/pinottenant_controller.go index 76579f6..1c5c79e 100644 --- a/internal/tenant_controller/pinottenant_controller.go +++ b/internal/tenant_controller/pinottenant_controller.go @@ -45,7 +45,7 @@ type PinotTenantReconciler struct { } func NewPinotTenantReconciler(mgr ctrl.Manager) *PinotTenantReconciler { - initLogger := ctrl.Log.WithName("controllers").WithName("pinot") + initLogger := ctrl.Log.WithName("controllers").WithName("pinot-tenant") return &PinotTenantReconciler{ Client: mgr.GetClient(), Log: initLogger,