Skip to content

Commit

Permalink
Introduce the HTTP client
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <ghzpotato@gmail.com>
  • Loading branch information
JmPotato committed Nov 2, 2023
1 parent a1a1eea commit dbc89cb
Show file tree
Hide file tree
Showing 5 changed files with 247 additions and 1 deletion.
2 changes: 1 addition & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type GlobalConfigItem struct {
PayLoad []byte
}

// Client is a PD (Placement Driver) client.
// Client is a PD (Placement Driver) RPC client.
// It should not be used after calling Close().
type Client interface {
// GetClusterID gets the cluster ID from PD.
Expand Down
21 changes: 21 additions & 0 deletions client/http/api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

// The following constants are the paths of PD HTTP APIs.
const (
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
)
17 changes: 17 additions & 0 deletions client/http/backoff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

// TODO: support the customized backoff strategy.
152 changes: 152 additions & 0 deletions client/http/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"go.uber.org/zap"
)

const defaultTimeout = 30 * time.Second

// HTTPClient is a PD (Placement Driver) HTTP client.
type HTTPClient interface {
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
}

var _ HTTPClient = (*httpClient)(nil)

type httpClient struct {
pdAddrs []string
cli *http.Client
}

// NewHTTPClient creates a PD HTTP client with the given PD addresses and TLS config.
func NewHTTPClient(
pdAddrs []string,
tlsConf *tls.Config,
) HTTPClient {
// Normalize the addresses with correct scheme prefix.
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, "http") {
if tlsConf != nil {
addr = "https://" + addr
} else {
addr = "http://" + addr
}
pdAddrs[i] = addr
}
}
// Init the HTTP client.
// TODO: do we need to make the HTTP client a singleton?
cli := &http.Client{Timeout: defaultTimeout}
if tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = tlsConf
cli.Transport = transport
}

return &httpClient{
pdAddrs: pdAddrs,
cli: cli,
}
}

func (hc *httpClient) pdAddr() string {
// TODO: support the customized PD address selection strategy.
return hc.pdAddrs[0]
}

func (hc *httpClient) request(
ctx context.Context,
name, method, uri string,
res interface{},
) error {
reqURL := fmt.Sprintf("%s%s", hc.pdAddr(), uri)
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", reqURL),
zap.String("method", method),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, reqURL, nil)
if err != nil {
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
}
// TODO: integrate the metrics.
resp, err := hc.cli.Do(req)
if err != nil {
log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...)
return errors.Trace(err)
}
defer func() {
err = resp.Body.Close()
if err != nil {
log.Warn("[pd] close http response body failed", append(logFields, zap.Error(err))...)
}
}()

if resp.StatusCode != http.StatusOK {
logFields = append(logFields, zap.String("status", resp.Status))

bs, readErr := io.ReadAll(resp.Body)
if readErr != nil {
logFields = append(logFields, zap.NamedError("read-body-error", err))
} else {
logFields = append(logFields, zap.ByteString("body", bs))
}

log.Error("[pd] request failed with a non-200 status", logFields...)
return errors.Errorf("request pd http api failed with status: '%s'", resp.Status)
}

err = json.NewDecoder(resp.Body).Decode(res)
if err != nil {
return errors.Trace(err)
}
return nil
}

// GetHotReadRegions gets the hot read region statistics info.
func (hc *httpClient) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
err := hc.request(ctx, "GetHotReadRegions", http.MethodGet, HotRead, &hotReadRegions)
if err != nil {
return nil, err
}
return &hotReadRegions, nil
}

// GetHotWriteRegions gets the hot write region statistics info.
func (hc *httpClient) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotWriteRegions StoreHotPeersInfos
err := hc.request(ctx, "GetHotWriteRegions", http.MethodGet, HotWrite, &hotWriteRegions)
if err != nil {
return nil, err
}
return &hotWriteRegions, nil
}
56 changes: 56 additions & 0 deletions client/http/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import "time"

// NOTICE: the structures below are copied from the PD API definitions.
// Please make sure the consistency if any change happens to the PD API.

// StoreHotPeersInfos is used to get human-readable description for hot regions.
type StoreHotPeersInfos struct {
AsPeer StoreHotPeersStat `json:"as_peer"`
AsLeader StoreHotPeersStat `json:"as_leader"`
}

// StoreHotPeersStat is used to record the hot region statistics group by store.
type StoreHotPeersStat map[uint64]*HotPeersStat

// HotPeersStat records all hot regions statistics
type HotPeersStat struct {
StoreByteRate float64 `json:"store_bytes"`
StoreKeyRate float64 `json:"store_keys"`
StoreQueryRate float64 `json:"store_query"`
TotalBytesRate float64 `json:"total_flow_bytes"`
TotalKeysRate float64 `json:"total_flow_keys"`
TotalQueryRate float64 `json:"total_flow_query"`
Count int `json:"regions_count"`
Stats []HotPeerStatShow `json:"statistics"`
}

// HotPeerStatShow records the hot region statistics for output
type HotPeerStatShow struct {
StoreID uint64 `json:"store_id"`
Stores []uint64 `json:"stores"`
IsLeader bool `json:"is_leader"`
IsLearner bool `json:"is_learner"`
RegionID uint64 `json:"region_id"`
HotDegree int `json:"hot_degree"`
ByteRate float64 `json:"flow_bytes"`
KeyRate float64 `json:"flow_keys"`
QueryRate float64 `json:"flow_query"`
AntiCount int `json:"anti_count"`
LastUpdateTime time.Time `json:"last_update_time,omitempty"`
}

0 comments on commit dbc89cb

Please sign in to comment.