Skip to content

Commit

Permalink
Merge pull request #31 from datainfrahq/refactor
Browse files Browse the repository at this point in the history
Refactor Schema, Tables and Tenant controller
  • Loading branch information
AdheipSingh authored May 1, 2023
2 parents 3d8ea58 + 75aac4c commit 70691a3
Show file tree
Hide file tree
Showing 5 changed files with 350 additions and 169 deletions.
52 changes: 0 additions & 52 deletions config/rbac/role.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -113,55 +113,3 @@ rules:
- get
- patch
- update
- apiGroups:
- datainfra.io
resources:
- pinottables
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- datainfra.io
resources:
- pinottables/finalizers
verbs:
- update
- apiGroups:
- datainfra.io
resources:
- pinottables/status
verbs:
- get
- patch
- update
- apiGroups:
- datainfra.io
resources:
- pinottenants
verbs:
- create
- delete
- get
- list
- patch
- update
- watch
- apiGroups:
- datainfra.io
resources:
- pinottenants/finalizers
verbs:
- update
- apiGroups:
- datainfra.io
resources:
- pinottenants/status
verbs:
- get
- patch
- update
92 changes: 72 additions & 20 deletions internal/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,17 @@ package http

import (
"bytes"
"encoding/json"
"io/ioutil"
"net/http"
)

// PinotHTTP interface
type PinotHTTP interface {
Do() HttpResponse
Do() *Response
}

// HTTP client
type Client struct {
Method string
URL string
Expand All @@ -34,37 +37,58 @@ type Client struct {
Auth Auth
}

func NewHTTPClient(method, url string, client http.Client, body []byte, auth Auth) PinotHTTP {
newClient := &Client{
Method: method,
URL: url,
HTTPClient: client,
Body: body,
Auth: auth,
}

return newClient
}

// Auth mechanisms supported by pinot control plane to authenticate
// with pinot clusters
type Auth struct {
BasicAuth BasicAuth
}

// BasicAuth
type BasicAuth struct {
UserName string
Password string
}
type HttpResponse struct {
RespBody []byte
Err error
StatusCode int

// Pinot API error Response
// ex: {"code":404,"error":"Schema not found"}
type PinotErrorResponse struct {
Code int `json:"code"`
Error string `json:"error"`
}

func NewHTTPClient(method, url string, client http.Client, body []byte, auth Auth) PinotHTTP {
newClient := &Client{
Method: method,
URL: url,
HTTPClient: client,
Body: body,
Auth: auth,
}
// Pinot API success Response
// ex: {"unrecognizedProperties":{},"status":"airlineStats successfully added"}
type PinotSuccessResponse struct {
UnrecognizedProperties interface{} `json:"unrecognizedProperties"`
Status string `json:"status"`
}

return newClient
// Response passed to controller
type Response struct {
Err error
StatusCode int
PinotErrorResponse
PinotSuccessResponse
}

func (c *Client) Do() HttpResponse {
// Initiate HTTP call to pinot
func (c *Client) Do() *Response {

req, err := http.NewRequest(c.Method, c.URL, bytes.NewBuffer(c.Body))
if err != nil {
return HttpResponse{Err: err}
return &Response{Err: err}
}

if c.Auth.BasicAuth != (BasicAuth{}) {
Expand All @@ -74,16 +98,44 @@ func (c *Client) Do() HttpResponse {
req.Header.Add("Content-Type", "application/json")
resp, err := c.HTTPClient.Do(req)
if err != nil {
return HttpResponse{Err: err}
return &Response{Err: err}
}

defer resp.Body.Close()

responseBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return HttpResponse{Err: err}
return &Response{Err: err}
}

return HttpResponse{RespBody: responseBody, Err: nil, StatusCode: resp.StatusCode}

// GET /schemas returns 404 when schema not found with code and error as resp.
// GET /tenants returns 404 when tenant not found with code and error as resp
// GET /tables returns 200 when table not found with an empty response.
if string(responseBody) != "{}" {
if resp.StatusCode == 200 {
var pinotSuccess PinotSuccessResponse
if err := json.Unmarshal(responseBody, &pinotSuccess); err != nil {
return &Response{Err: err}
}
return &Response{StatusCode: resp.StatusCode, PinotSuccessResponse: pinotSuccess}
} else {
var pinotErr PinotErrorResponse
if err := json.Unmarshal(responseBody, &pinotErr); err != nil {
return &Response{StatusCode: resp.StatusCode, Err: err}
}
return &Response{StatusCode: resp.StatusCode, PinotErrorResponse: pinotErr}
}
} else {
if resp.StatusCode == 200 {
// resp is empty with 200 status code
// for tables API force 404
return &Response{StatusCode: 404}
} else {
var pinotErr PinotErrorResponse
if err := json.Unmarshal(responseBody, &pinotErr); err != nil {
return &Response{StatusCode: resp.StatusCode, Err: err}
}
return &Response{StatusCode: resp.StatusCode, PinotErrorResponse: pinotErr}
}
}
}
135 changes: 100 additions & 35 deletions internal/schema_controller/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,15 +107,24 @@ func (r *PinotSchemaReconciler) do(ctx context.Context, schema *v1beta1.PinotSch
[]byte{},
internalHTTP.Auth{BasicAuth: basicAuth},
)
resp := http.Do()
if resp.Err != nil {
build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp.RespBody)), PinotSchemaControllerDeleteFail)
return err
respDeleteSchema := http.Do()
if respDeleteSchema.Err != nil {
return respDeleteSchema.Err
}
if resp.StatusCode != 200 {
build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp.RespBody)), PinotSchemaControllerDeleteFail)
if respDeleteSchema.StatusCode != 200 {
build.Recorder.GenericEvent(
schema,
v1.EventTypeWarning,
fmt.Sprintf("Resp [%s]", string(respDeleteSchema.PinotErrorResponse.Error)),
PinotSchemaControllerDeleteFail,
)
} else {
build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp.RespBody)), PinotSchemaControllerDeleteSuccess)
build.Recorder.GenericEvent(
schema,
v1.EventTypeNormal,
fmt.Sprintf("Resp [%s]", string(respDeleteSchema.PinotSuccessResponse.Status)),
PinotSchemaControllerDeleteSuccess,
)
}

// remove our finalizer from the list and update it.
Expand Down Expand Up @@ -150,14 +159,15 @@ func (r *PinotSchemaReconciler) CreateOrUpdate(
[]byte{},
auth,
)
resp := getHttp.Do()
if resp.Err != nil {
return controllerutil.OperationResultNone, resp.Err

respGetSchema := getHttp.Do()
if respGetSchema.Err != nil {
return controllerutil.OperationResultNone, respGetSchema.Err
}

// if not found create schema
// else check for updates
if resp.StatusCode == 404 {
if respGetSchema.PinotErrorResponse.Code == 404 {

// create schema
postHttp := internalHTTP.NewHTTPClient(
Expand All @@ -167,30 +177,58 @@ func (r *PinotSchemaReconciler) CreateOrUpdate(
[]byte(schema.Spec.PinotSchemaJson),
auth,
)
respS := postHttp.Do()
if respS.Err != nil {
return controllerutil.OperationResultNone, err

respCreatechema := postHttp.Do()
if respCreatechema.Err != nil {
return controllerutil.OperationResultNone, respCreatechema.Err
}
// if respS 200, patch status and emit event
// patch status will store the current state
if respS.StatusCode == 200 {
result, err := r.makePatchPinotSchemaStatus(schema, PinotSchemaControllerCreateSuccess, string(respS.RespBody), v1.ConditionTrue, v1beta1.PinotSchemaCreateSuccess)

if respCreatechema.StatusCode == 200 {
result, err := r.makePatchPinotSchemaStatus(
schema,
PinotSchemaControllerCreateSuccess,
string(respCreatechema.PinotSuccessResponse.Status),
v1.ConditionTrue,
v1beta1.PinotSchemaCreateSuccess,
)
if err != nil {
return controllerutil.OperationResultNone, err
}
build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(respS.RespBody)), PinotSchemaControllerCreateSuccess)
build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s], Result [%s]", string(respS.RespBody), result), PinotSchemaControllerPatchStatusSuccess)
build.Recorder.GenericEvent(
schema,
v1.EventTypeNormal,
fmt.Sprintf("Resp [%s]", string(respCreatechema.PinotSuccessResponse.Status)),
PinotSchemaControllerCreateSuccess,
)
build.Recorder.GenericEvent(
schema,
v1.EventTypeNormal,
fmt.Sprintf("Resp [%s], Result [%s]", string(respCreatechema.PinotSuccessResponse.Status), result),
PinotSchemaControllerPatchStatusSuccess)
return controllerutil.OperationResultCreated, nil

} else {
result, err := r.makePatchPinotSchemaStatus(schema, PinotSchemaControllerCreateFail, string(respS.RespBody), v1.ConditionTrue, v1beta1.PinotSchemaCreateFail)
_, err := r.makePatchPinotSchemaStatus(
schema,
PinotSchemaControllerCreateFail,
string(respCreatechema.PinotErrorResponse.Error),
v1.ConditionTrue,
v1beta1.PinotSchemaCreateFail,
)
if err != nil {
return controllerutil.OperationResultNone, err
}
build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(respS.RespBody)), PinotSchemaControllerCreateFail)
build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s], Result [%s]", string(respS.RespBody), result), PinotSchemaControllerPatchStatusSuccess)
build.Recorder.GenericEvent(
schema,
v1.EventTypeWarning,
fmt.Sprintf("Resp [%s], Status", string(respCreatechema.PinotErrorResponse.Error)),
PinotSchemaControllerCreateFail,
)
return controllerutil.OperationResultCreated, nil

}
} else if resp.StatusCode == 200 { // schema exists, check for updates
} else if respGetSchema.StatusCode == 200 {

ok, err := utils.IsEqualJson(schema.Status.CurrentSchemasJson, schema.Spec.PinotSchemaJson)
if err != nil {
return controllerutil.OperationResultNone, err
Expand All @@ -205,26 +243,53 @@ func (r *PinotSchemaReconciler) CreateOrUpdate(
[]byte(schema.Spec.PinotSchemaJson),
auth,
)
resp := postHttp.Do()
if resp.Err != nil {
respUpdateSchema := postHttp.Do()
if respUpdateSchema.Err != nil {
return controllerutil.OperationResultNone, err
}
if resp.StatusCode == 200 {
if respUpdateSchema.PinotSuccessResponse != (internalHTTP.PinotSuccessResponse{}) {
// patch status to store the current valid schema json
result, err := r.makePatchPinotSchemaStatus(schema, PinotSchemaControllerUpdateSuccess, string(resp.RespBody), v1.ConditionTrue, v1beta1.PinotSchemaUpdateSuccess)
result, err := r.makePatchPinotSchemaStatus(
schema,
PinotSchemaControllerUpdateSuccess,
string(respUpdateSchema.PinotSuccessResponse.Status),
v1.ConditionTrue,
v1beta1.PinotSchemaUpdateSuccess,
)
if err != nil {
return controllerutil.OperationResultNone, err
}
build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s]", string(resp.RespBody)), PinotSchemaControllerUpdateSuccess)
build.Recorder.GenericEvent(schema, v1.EventTypeNormal, fmt.Sprintf("Resp [%s], Result [%s]", string(resp.RespBody), result), PinotSchemaControllerPatchStatusSuccess)
build.Recorder.GenericEvent(
schema,
v1.EventTypeNormal,
fmt.Sprintf("Resp [%s]", string(respUpdateSchema.PinotSuccessResponse.Status)),
PinotSchemaControllerUpdateSuccess,
)
build.Recorder.GenericEvent(
schema,
v1.EventTypeNormal,
fmt.Sprintf("Resp [%s], Result [%s]", string(respUpdateSchema.PinotSuccessResponse.Status), result),
PinotSchemaControllerPatchStatusSuccess)
return controllerutil.OperationResultUpdated, nil
} else {

} else if respUpdateSchema.PinotErrorResponse != (internalHTTP.PinotErrorResponse{}) {
// patch status with failure and emit events
_, err := r.makePatchPinotSchemaStatus(schema, PinotSchemaControllerUpdateFail, string(resp.RespBody), v1.ConditionTrue, v1beta1.PinotSchemaUpdateFail)
_, err := r.makePatchPinotSchemaStatus(
schema,
PinotSchemaControllerUpdateFail,
string(respUpdateSchema.PinotErrorResponse.Error),
v1.ConditionTrue,
v1beta1.PinotSchemaUpdateFail,
)
if err != nil {
return controllerutil.OperationResultNone, err
}
build.Recorder.GenericEvent(schema, v1.EventTypeWarning, fmt.Sprintf("Resp [%s]", string(resp.RespBody)), PinotSchemaControllerUpdateFail)
build.Recorder.GenericEvent(
schema,
v1.EventTypeWarning,
fmt.Sprintf("Resp [%s], StatusCode [%d]", string(respUpdateSchema.PinotErrorResponse.Error), respUpdateSchema.PinotErrorResponse.Code),
PinotSchemaControllerUpdateFail,
)
return controllerutil.OperationResultNone, err
}
}
Expand Down Expand Up @@ -302,9 +367,9 @@ func (r *PinotSchemaReconciler) getControllerSvcUrl(namespace, pinotClusterName
svcName = svcList.Items[0].Name
}

newName := "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort
_ = "http://" + svcName + "." + namespace + ".svc.cluster.local:" + PinotControllerPort

return newName, nil
return "http://localhost:9000", nil
}

func (r *PinotSchemaReconciler) getAuthCreds(ctx context.Context, schema *v1beta1.PinotSchema) (internalHTTP.BasicAuth, error) {
Expand Down
Loading

0 comments on commit 70691a3

Please sign in to comment.