Skip to content

Commit

Permalink
Add TLS config to druid client
Browse files Browse the repository at this point in the history
Signed-off-by: Tapajit Chandra Paul <tapajit@appscode.com>
  • Loading branch information
tapojit047 committed Oct 3, 2024
1 parent c1fd832 commit f52aef6
Show file tree
Hide file tree
Showing 16 changed files with 617 additions and 176 deletions.
126 changes: 24 additions & 102 deletions druid/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@ limitations under the License.
package druid

import (
"bytes"
"encoding/json"
"fmt"
"log"
"time"

druidgo "github.com/grafadruid/go-druid"
"github.com/hashicorp/go-retryablehttp"
"github.com/pkg/errors"
"k8s.io/klog/v2"
health "kmodules.xyz/client-go/tools/healthchecker"
Expand Down Expand Up @@ -117,55 +114,15 @@ func (c *Client) CheckDataSourceExistence() (bool, error) {
return false, errors.Wrap(err, "failed to marshal json response")
}
rawMessage := json.RawMessage(jsonData)
response, err := c.SubmitRequest(method, path, rawMessage)
if err != nil {
return false, err
}

exists, err := parseDatasourceExistenceQueryResponse(response)
if err != nil {
return false, errors.Wrap(err, "Failed to parse response of datasource existence request")
}

if err := closeResponse(response); err != nil {
return exists, err
}
return exists, nil
}

func (c *Client) SubmitRequest(method, path string, opts interface{}) (*druidgo.Response, error) {
res, err := c.NewRequest(method, path, opts)
if err != nil {
return nil, errors.Wrap(err, "failed to submit API request")
}
http := retryablehttp.NewClient()

var b []byte
buf := bytes.NewBuffer(b)
http.Logger = log.New(buf, "", 0)

resp, err := http.Do(res)
var result []map[string]interface{}
_, err = c.ExecuteRequest(method, path, rawMessage, &result)
if err != nil {
return nil, err
}
response := &druidgo.Response{Response: resp}
return response, nil
}

func parseDatasourceExistenceQueryResponse(res *druidgo.Response) (bool, error) {
var responseBody []map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&responseBody); err != nil {
return false, errors.Wrap(err, "failed to deserialize the response")
klog.Error("Failed to execute request", err)
return false, err
}
return len(responseBody) != 0, nil
}

func closeResponse(response *druidgo.Response) error {
err := response.Body.Close()
if err != nil {
return errors.Wrap(err, "Failed to close the response body")
}
return nil
return len(result) > 0, nil
}

// CheckDBReadWriteAccess checks read and write access in the DB
Expand Down Expand Up @@ -238,41 +195,25 @@ func (c *Client) GetData() (string, error) {
func (c *Client) runSelectQuery() (string, error) {
method := "POST"
path := "druid/v2/sql"

data := map[string]interface{}{
"query": "SELECT * FROM \"kubedb-datasource\"",
}

jsonData, err := json.Marshal(data)
if err != nil {
return "", errors.Wrap(err, "failed to marshal query json data")
}
rawMessage := json.RawMessage(jsonData)
response, err := c.SubmitRequest(method, path, rawMessage)
if err != nil {
return "", err
}
if response == nil {
return "", errors.New("response body is empty")
}

id, err := parseSelectQueryResponse(response, "id")
var result []map[string]interface{}
_, err = c.ExecuteRequest(method, path, rawMessage, &result)
if err != nil {
return "", errors.Wrap(err, "failed to parse the response body")
}

if err := closeResponse(response); err != nil {
klog.Error("Failed to execute POST query request", err)
return "", err
}
return id.(string), nil
}
id := result[0]["id"]

func parseSelectQueryResponse(res *druidgo.Response, key string) (interface{}, error) {
var responseBody []map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&responseBody); err != nil {
return "", errors.Wrap(err, "failed to deserialize the response")
}
value := responseBody[0][key]
return value, nil
return id.(string), nil
}

func (c *Client) updateCoordinatorsWaitBeforeDeletingConfig(value int32) error {
Expand All @@ -296,11 +237,9 @@ func (c *Client) updateCoordinatorDynamicConfig(data map[string]interface{}) err
}
rawMessage := json.RawMessage(jsonData)

response, err := c.SubmitRequest(method, path, rawMessage)
_, err = c.ExecuteRequest(method, path, rawMessage, nil)
if err != nil {
return err
}
if err := closeResponse(response); err != nil {
klog.Error("Failed to execute coordinator config update request", err)
return err
}
return nil
Expand Down Expand Up @@ -336,33 +275,19 @@ func (c *Client) submitTask(taskType DruidTaskType, dataSource string, data stri
} else {
task = GetKillTaskDefinition()
}

rawMessage := json.RawMessage(task)
method := "POST"
path := "druid/indexer/v1/task"

response, err := c.SubmitRequest(method, path, rawMessage)
if err != nil {
return "", err
}

taskID, err := GetValueFromClusterResponse(response, "task")
var result map[string]interface{}
_, err := c.ExecuteRequest(method, path, rawMessage, &result)
if err != nil {
return "", errors.Wrap(err, "failed to parse response of task api request")
}
if err = closeResponse(response); err != nil {
klog.Error("Failed to execute POST ingestion or kill task request", err)
return "", err
}
return fmt.Sprintf("%v", taskID), nil
}

func GetValueFromClusterResponse(res *druidgo.Response, key string) (interface{}, error) {
responseBody := make(map[string]interface{})
if err := json.NewDecoder(res.Body).Decode(&responseBody); err != nil {
return "", errors.Wrap(err, "failed to deserialize the response")
}
value := responseBody[key]
return value, nil
taskID := result["task"]
return taskID.(string), nil
}

func GetIngestionTaskDefinition(dataSource string, data string) string {
Expand Down Expand Up @@ -419,21 +344,18 @@ func GetKillTaskDefinition() string {
func (c *Client) CheckTaskStatus(taskID string) (bool, error) {
method := "GET"
path := fmt.Sprintf("druid/indexer/v1/task/%s/status", taskID)
response, err := c.SubmitRequest(method, path, nil)
if err != nil {
return false, errors.Wrap(err, "failed to check task status")
}

statusRes, err := GetValueFromClusterResponse(response, "status")
var result map[string]interface{}
_, err := c.ExecuteRequest(method, path, nil, &result)
if err != nil {
return false, errors.Wrap(err, "failed to parse respons of task ingestion request")
klog.Error("Failed to execute GET task status request", err)
return false, err
}

statusRes := result["status"]
statusMap := statusRes.(map[string]interface{})
status := statusMap["status"].(string)

if err = closeResponse(response); err != nil {
return false, err
}
return status == "SUCCESS", nil
}

Expand Down
110 changes: 92 additions & 18 deletions druid/kubedb_client_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,19 @@ package druid

import (
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"net/http"

druidgo "github.com/grafadruid/go-druid"
_ "github.com/lib/pq"
core "k8s.io/api/core/v1"
kerr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
dbapi "kubedb.dev/apimachinery/apis/kubedb/v1"
olddbapi "kubedb.dev/apimachinery/apis/kubedb/v1alpha2"
"sigs.k8s.io/controller-runtime/pkg/client"
)
Expand Down Expand Up @@ -74,28 +78,24 @@ func (o *KubeDBClientBuilder) WithNodeRole(nodeRole olddbapi.DruidNodeRoleType)

func (o *KubeDBClientBuilder) GetDruidClient() (*Client, error) {
var druidOpts []druidgo.ClientOption
if !*o.db.Spec.DisableSecurity {
if o.db.Spec.AuthSecret == nil {
klog.Error("AuthSecret not set")
return nil, errors.New("auth-secret is not set")
// Add druid auth credential to the client
if !o.db.Spec.DisableSecurity {
authOpts, err := o.getClientAuthOpts()
if err != nil {
klog.Error(err, "failed to get client auth options")
return nil, err
}
druidOpts = append(druidOpts, *authOpts)
}

authSecret := &core.Secret{}
err := o.kc.Get(o.ctx, types.NamespacedName{
Namespace: o.db.Namespace,
Name: o.db.Spec.AuthSecret.Name,
}, authSecret)
// Add druid ssl configs to the client
if o.db.Spec.EnableSSL {
sslOpts, err := o.getClientSSLConfig()
if err != nil {
if kerr.IsNotFound(err) {
klog.Error(err, "AuthSecret not found")
return nil, errors.New("auth-secret not found")
}
klog.Error(err, "failed to get client ssl options")
return nil, err
}
userName := string(authSecret.Data[core.BasicAuthUsernameKey])
password := string(authSecret.Data[core.BasicAuthPasswordKey])

druidOpts = append(druidOpts, druidgo.WithBasicAuth(userName, password))
druidOpts = append(druidOpts, *sslOpts)
}

druidClient, err := druidgo.NewClient(o.url, druidOpts...)
Expand All @@ -107,8 +107,82 @@ func (o *KubeDBClientBuilder) GetDruidClient() (*Client, error) {
}, nil
}

func (o *KubeDBClientBuilder) getClientAuthOpts() (*druidgo.ClientOption, error) {
if o.db.Spec.AuthSecret == nil {
klog.Error("AuthSecret not set")
return nil, errors.New("auth-secret is not set")
}

authSecret := &core.Secret{}
err := o.kc.Get(o.ctx, types.NamespacedName{
Namespace: o.db.Namespace,
Name: o.db.Spec.AuthSecret.Name,
}, authSecret)
if err != nil {
if kerr.IsNotFound(err) {
klog.Error(err, "AuthSecret not found")
return nil, errors.New("auth-secret not found")
}
return nil, err
}
userName := string(authSecret.Data[core.BasicAuthUsernameKey])
password := string(authSecret.Data[core.BasicAuthPasswordKey])

druidAuthOpts := druidgo.WithBasicAuth(userName, password)
return &druidAuthOpts, nil
}

func (o *KubeDBClientBuilder) getClientSSLConfig() (*druidgo.ClientOption, error) {
certSecret := &core.Secret{}
err := o.kc.Get(o.ctx, types.NamespacedName{
Namespace: o.db.Namespace,
Name: o.db.GetCertSecretName(olddbapi.DruidClientCert),
}, certSecret)
if err != nil {
if kerr.IsNotFound(err) {
klog.Error(err, "Client certificate secret not found")
return nil, errors.New("client certificate secret is not found")
}
klog.Error(err, "Failed to get client certificate Secret")
return nil, err
}

// get tls cert, clientCA and rootCA for tls config
clientCA := x509.NewCertPool()
rootCA := x509.NewCertPool()

crt, err := tls.X509KeyPair(certSecret.Data[core.TLSCertKey], certSecret.Data[core.TLSPrivateKeyKey])
if err != nil {
klog.Error(err, "Failed to parse private key pair")
return nil, err
}
clientCA.AppendCertsFromPEM(certSecret.Data[dbapi.TLSCACertFileName])
rootCA.AppendCertsFromPEM(certSecret.Data[dbapi.TLSCACertFileName])

httpClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{crt},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: clientCA,
RootCAs: rootCA,
MaxVersion: tls.VersionTLS12,
},
},
}
tlsOpts := druidgo.WithHTTPClient(httpClient)
return &tlsOpts, nil
}

// GetNodesAddress returns DNS for the nodes based on type of the node
func (o *KubeDBClientBuilder) GetNodesAddress() string {
baseUrl := fmt.Sprintf("http://%s-0.%s.%s.svc.cluster.local:%d", o.db.PetSetName(o.nodeRole), o.db.GoverningServiceName(), o.db.Namespace, o.db.DruidNodeContainerPort(o.nodeRole))
var scheme string
if o.db.Spec.EnableSSL {
scheme = "https"
} else {
scheme = "http"
}

baseUrl := fmt.Sprintf("%s://%s-0.%s.%s.svc.cluster.local:%d", scheme, o.db.PetSetName(o.nodeRole), o.db.GoverningServiceName(), o.db.Namespace, o.db.DruidNodeContainerPort(o.nodeRole))
return baseUrl
}
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ require (
github.com/go-sql-driver/mysql v1.8.1
github.com/gocql/gocql v1.6.0
github.com/grafadruid/go-druid v0.0.6
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/lib/pq v1.10.7
github.com/michaelklishin/rabbit-hole/v2 v2.16.0
github.com/microsoft/go-mssqldb v1.6.0
Expand Down Expand Up @@ -79,6 +78,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
github.com/imdario/mergo v0.3.16 // indirect
github.com/jcmturner/aescts/v2 v2.0.0 // indirect
Expand Down Expand Up @@ -162,3 +162,5 @@ replace sigs.k8s.io/controller-runtime => github.com/kmodules/controller-runtime
replace github.com/imdario/mergo => github.com/imdario/mergo v0.3.6

replace k8s.io/apiserver => github.com/kmodules/apiserver v0.30.2-0.20240519082755-d7b8c2d9e699

replace kubedb.dev/apimachinery => ../apimachinery
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -804,8 +804,6 @@ kmodules.xyz/monitoring-agent-api v0.30.1 h1:OdYucfl7OblBqVhCyEcOC3HvUrOKtDh8lcM
kmodules.xyz/monitoring-agent-api v0.30.1/go.mod h1:oR3tk5O4koYar4cD9N3AjbBFr9XTwBU3sw9qD2NdNQc=
kmodules.xyz/offshoot-api v0.30.1 h1:TrulAYO+oBsXe9sZZGTmNWIuI8qD2izMpgcTSPvgAmI=
kmodules.xyz/offshoot-api v0.30.1/go.mod h1:T3mpjR6fui0QzOcmQvIuANytW48fe9ytmy/1cgx6D4g=
kubedb.dev/apimachinery v0.48.0 h1:gMGqkBRs81wbmGPQIqMGY/MztKg4MYjZ39d9F656V2c=
kubedb.dev/apimachinery v0.48.0/go.mod h1:TeZW+vt9OLf0Jyb/AZktvOOzf3NV+rFhHDN/zTh1EjA=
kubeops.dev/petset v0.0.7 h1:F77BTRfUqRVO7kNc8q2oFSSviDmYBqni/osXqu0kgJ4=
kubeops.dev/petset v0.0.7/go.mod h1:lt0SZV4ohRy7RiwLNUnMoauG4lCbcRbSqhMg20rdUQg=
lukechampine.com/uint128 v1.1.1/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk=
Expand Down
Loading

0 comments on commit f52aef6

Please sign in to comment.