Skip to content

Commit

Permalink
Make equivalent of /livez and /readyz in gRPC and etcdctl command
Browse files Browse the repository at this point in the history
Reference:
- etcd-io#17925
- etcd-io@293f087#diff-ab6fb0684315e16355f6ebe0f4b3cf860b9b2ff5a0fe1b4e4308a680b19f1b0c

Signed-off-by: Chun-Hung Tseng <henrybear327@gmail.com>
Co-authored-by: Siyuan Zhang <sizhang@google.com>
  • Loading branch information
henrybear327 and siyuanfoundation committed May 18, 2024
1 parent 8938299 commit eaf9561
Show file tree
Hide file tree
Showing 13 changed files with 309 additions and 10 deletions.
43 changes: 43 additions & 0 deletions api/etcdserverpb/rpc.proto
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,27 @@ service Maintenance {
};
}

rpc Livez(HealthRequest) returns (HealthResponse) {
option (google.api.http) = {
post: "/v3/maintenance/livez"
body: "*"
};
}

rpc Readyz(HealthRequest) returns (HealthResponse) {
option (google.api.http) = {
post: "/v3/maintenance/readyz"
body: "*"
};
}

rpc Healthz(HealthRequest) returns (HealthResponse) {
option (google.api.http) = {
post: "/v3/maintenance/healthz"
body: "*"
};
}

// Status gets the status of the member.
rpc Status(StatusRequest) returns (StatusResponse) {
option (google.api.http) = {
Expand Down Expand Up @@ -1130,6 +1151,28 @@ message AlarmResponse {
repeated AlarmMember alarms = 2;
}

message HealthRequest {
message StringArray {
repeated string values = 1;
}
option (versionpb.etcd_version_msg) = "3.0";
oneof healthCheckSelector {
// List of health checks to exclude.
StringArray exclude = 1;
// List of health checks to allowlist.
// Cannot be used with exclude.
StringArray allowlist = 2;
}
}

message HealthResponse {
option (versionpb.etcd_version_msg) = "3.0";

ResponseHeader header = 1;
bool ok = 2;
string reason = 3;
}

message DowngradeRequest {
option (versionpb.etcd_version_msg) = "3.5";

Expand Down
12 changes: 12 additions & 0 deletions client/v3/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,18 @@ func (mm mockMaintenance) AlarmDisarm(ctx context.Context, m *AlarmMember) (*Ala
return nil, nil
}

func (mm mockMaintenance) Livez(ctx context.Context) (*HealthResponse, error) {
return nil, nil
}

func (mm mockMaintenance) Readyz(ctx context.Context) (*HealthResponse, error) {
return nil, nil
}

func (mm mockMaintenance) Healthz(ctx context.Context) (*HealthResponse, error) {
return nil, nil
}

func (mm mockMaintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
return nil, nil
}
Expand Down
32 changes: 32 additions & 0 deletions client/v3/maintenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type (
DefragmentResponse pb.DefragmentResponse
AlarmResponse pb.AlarmResponse
AlarmMember pb.AlarmMember
HealthResponse pb.HealthResponse
StatusResponse pb.StatusResponse
HashKVResponse pb.HashKVResponse
MoveLeaderResponse pb.MoveLeaderResponse
Expand All @@ -51,6 +52,10 @@ type Maintenance interface {
// AlarmDisarm disarms a given alarm.
AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)

Livez(ctx context.Context) (*HealthResponse, error)
Readyz(ctx context.Context) (*HealthResponse, error)
Healthz(ctx context.Context) (*HealthResponse, error)

// Defragment releases wasted space from internal fragmentation on a given etcd member.
// Defragment is only needed when deleting a large number of keys and want to reclaim
// the resources.
Expand Down Expand Up @@ -158,6 +163,33 @@ func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
return nil, toErr(ctx, err)
}

func (m *maintenance) Livez(ctx context.Context) (*HealthResponse, error) {
req := &pb.HealthRequest{}
resp, err := m.remote.Livez(ctx, req, m.callOpts...)
if err == nil {
return (*HealthResponse)(resp), nil
}
return nil, toErr(ctx, err)
}

func (m *maintenance) Readyz(ctx context.Context) (*HealthResponse, error) {
req := &pb.HealthRequest{}
resp, err := m.remote.Readyz(ctx, req, m.callOpts...)
if err == nil {
return (*HealthResponse)(resp), nil
}
return nil, toErr(ctx, err)
}

func (m *maintenance) Healthz(ctx context.Context) (*HealthResponse, error) {
req := &pb.HealthRequest{}
resp, err := m.remote.Healthz(ctx, req, m.callOpts...)
if err == nil {
return (*HealthResponse)(resp), nil
}
return nil, toErr(ctx, err)
}

func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
req := &pb.AlarmRequest{
Action: pb.AlarmRequest_DEACTIVATE,
Expand Down
12 changes: 12 additions & 0 deletions client/v3/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ func (rmc *retryMaintenanceClient) Alarm(ctx context.Context, in *pb.AlarmReques
return rmc.mc.Alarm(ctx, in, append(opts, withRepeatablePolicy())...)
}

func (rmc *retryMaintenanceClient) Livez(ctx context.Context, in *pb.HealthRequest, opts ...grpc.CallOption) (resp *pb.HealthResponse, err error) {
return rmc.mc.Livez(ctx, in, append(opts, withRepeatablePolicy())...)
}

func (rmc *retryMaintenanceClient) Readyz(ctx context.Context, in *pb.HealthRequest, opts ...grpc.CallOption) (resp *pb.HealthResponse, err error) {
return rmc.mc.Readyz(ctx, in, append(opts, withRepeatablePolicy())...)
}

func (rmc *retryMaintenanceClient) Healthz(ctx context.Context, in *pb.HealthRequest, opts ...grpc.CallOption) (resp *pb.HealthResponse, err error) {
return rmc.mc.Healthz(ctx, in, append(opts, withRepeatablePolicy())...)
}

func (rmc *retryMaintenanceClient) Status(ctx context.Context, in *pb.StatusRequest, opts ...grpc.CallOption) (resp *pb.StatusResponse, err error) {
return rmc.mc.Status(ctx, in, append(opts, withRepeatablePolicy())...)
}
Expand Down
116 changes: 112 additions & 4 deletions etcdctl/ctlv3/command/ep_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func NewEndpointCommand() *cobra.Command {
ec.AddCommand(newEpHealthCommand())
ec.AddCommand(newEpStatusCommand())
ec.AddCommand(newEpHashKVCommand())
ec.AddCommand(newEpLivezCommand())
ec.AddCommand(newEpReadyzCommand())

return ec
}
Expand All @@ -59,6 +61,26 @@ func newEpHealthCommand() *cobra.Command {
return cmd
}

func newEpLivezCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "livez",
Short: "Checks the livez of endpoints specified in `--endpoints` flag",
Run: epHealthCheckCommandFunc(true),
}

return cmd
}

func newEpReadyzCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "readyz",
Short: "Checks the readyz of endpoints specified in `--endpoints` flag",
Run: epHealthCheckCommandFunc(false),
}

return cmd
}

func newEpStatusCommand() *cobra.Command {
return &cobra.Command{
Use: "status",
Expand All @@ -81,10 +103,11 @@ func newEpHashKVCommand() *cobra.Command {
}

type epHealth struct {
Ep string `json:"endpoint"`
Health bool `json:"health"`
Took string `json:"took"`
Error string `json:"error,omitempty"`
Ep string `json:"endpoint"`
Health bool `json:"health"`
Took string `json:"took"`
Error string `json:"error,omitempty"`
DebugString string `json:"debugString,omitempty"`
}

// epHealthCommandFunc executes the "endpoint-health" command.
Expand Down Expand Up @@ -185,6 +208,91 @@ func epHealthCommandFunc(cmd *cobra.Command, args []string) {
}
}

// epHealthCheckCommandFunc executes the "endpoint-check" command.
func epHealthCheckCommandFunc(isLivez bool) func(*cobra.Command, []string) {
return func(cmd *cobra.Command, args []string) {
lg, err := logutil.CreateDefaultZapLogger(zap.InfoLevel)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitError, err)
}
flags.SetPflagsFromEnv(lg, "ETCDCTL", cmd.InheritedFlags())
initDisplayFromCmd(cmd)

sec := secureCfgFromCmd(cmd)
dt := dialTimeoutFromCmd(cmd)
ka := keepAliveTimeFromCmd(cmd)
kat := keepAliveTimeoutFromCmd(cmd)
auth := authCfgFromCmd(cmd)
var cfgs []*clientv3.Config
for _, ep := range endpointsFromCluster(cmd) {
cfg, err := clientv3.NewClientConfig(&clientv3.ConfigSpec{
Endpoints: []string{ep},
DialTimeout: dt,
KeepAliveTime: ka,
KeepAliveTimeout: kat,
Secure: sec,
Auth: auth,
}, lg)
if err != nil {
cobrautl.ExitWithError(cobrautl.ExitBadArgs, err)
}
cfgs = append(cfgs, cfg)
}

var wg sync.WaitGroup
hch := make(chan epHealth, len(cfgs))
for _, cfg := range cfgs {
wg.Add(1)
go func(cfg *clientv3.Config) {
defer wg.Done()
ep := cfg.Endpoints[0]
cfg.Logger = lg.Named("client")
cli, err := clientv3.New(*cfg)
if err != nil {
hch <- epHealth{Ep: ep, Health: false, Error: err.Error()}
return
}
st := time.Now()
ctx, cancel := commandCtx(cmd)
var resp *clientv3.HealthResponse
if isLivez {
resp, err = cli.Livez(ctx)
} else {
resp, err = cli.Readyz(ctx)
}
var eh epHealth
if err != nil {
eh = epHealth{Ep: ep, Error: err.Error(), Took: time.Since(st).String()}
} else {
eh = epHealth{Ep: ep, Health: resp.Ok, Took: time.Since(st).String(), DebugString: resp.Reason}
}
cancel()
hch <- eh
}(cfg)
}

wg.Wait()
close(hch)

errs := false
var healthList []epHealth
for h := range hch {
healthList = append(healthList, h)
if !h.Health {
errs = true
}
}
path := "/readyz"
if isLivez {
path = "/livez"
}
display.EndpointHealthCheck(healthList, path, true)
if errs {
cobrautl.ExitWithError(cobrautl.ExitError, fmt.Errorf("unhealthy cluster"))
}
}
}

type epStatus struct {
Ep string `json:"Endpoint"`
Resp *clientv3.StatusResponse `json:"Status"`
Expand Down
8 changes: 5 additions & 3 deletions etcdctl/ctlv3/command/printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type printer interface {
MemberList(v3.MemberListResponse)

EndpointHealth([]epHealth)
EndpointHealthCheck(hs []epHealth, path string, verbose bool)
EndpointStatus([]epStatus)
EndpointHashKV([]epHashKV)
MoveLeader(leader, target uint64, r v3.MoveLeaderResponse)
Expand Down Expand Up @@ -165,9 +166,10 @@ func newPrinterUnsupported(n string) printer {
return &printerUnsupported{printerRPC{nil, f}}
}

func (p *printerUnsupported) EndpointHealth([]epHealth) { p.p(nil) }
func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) }
func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) }
func (p *printerUnsupported) EndpointHealth([]epHealth) { p.p(nil) }
func (p *printerUnsupported) EndpointHealthCheck(hs []epHealth, path string, verbose bool) { p.p(nil) }
func (p *printerUnsupported) EndpointStatus([]epStatus) { p.p(nil) }
func (p *printerUnsupported) EndpointHashKV([]epHashKV) { p.p(nil) }

func (p *printerUnsupported) MoveLeader(leader, target uint64, r v3.MoveLeaderResponse) { p.p(nil) }
func (p *printerUnsupported) DowngradeValidate(r v3.DowngradeResponse) { p.p(nil) }
Expand Down
7 changes: 4 additions & 3 deletions etcdctl/ctlv3/command/printer_json.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func newJSONPrinter(isHex bool) printer {
}
}

func (p *jsonPrinter) EndpointHealth(r []epHealth) { printJSON(r) }
func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) }
func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) }
func (p *jsonPrinter) EndpointHealth(r []epHealth) { printJSON(r) }
func (p *jsonPrinter) EndpointHealthCheck(r []epHealth, path string, verbose bool) { printJSON(r) }
func (p *jsonPrinter) EndpointStatus(r []epStatus) { printJSON(r) }
func (p *jsonPrinter) EndpointHashKV(r []epHashKV) { printJSON(r) }

func (p *jsonPrinter) MemberList(r clientv3.MemberListResponse) {
if p.isHex {
Expand Down
13 changes: 13 additions & 0 deletions etcdctl/ctlv3/command/printer_simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,19 @@ func (s *simplePrinter) EndpointHealth(hs []epHealth) {
}
}

func (s *simplePrinter) EndpointHealthCheck(hs []epHealth, path string, verbose bool) {
for _, h := range hs {
if h.Health {
fmt.Printf("%s: %s is OK,: took = %v\n", h.Ep, path, h.Took)
} else {
fmt.Printf("%s: %s is OK,: took = %v, Error = %v\n", h.Ep, path, h.Took, h.Error)
}
if verbose {
fmt.Printf("%s\n", h.DebugString)
}
}
}

func (s *simplePrinter) EndpointStatus(statusList []epStatus) {
_, rows := makeEndpointStatusTable(statusList)
for _, row := range rows {
Expand Down
3 changes: 3 additions & 0 deletions etcdctl/ctlv3/command/printer_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ func (tp *tablePrinter) MemberList(r v3.MemberListResponse) {
table.SetAlignment(tablewriter.ALIGN_RIGHT)
table.Render()
}
func (tp *tablePrinter) EndpointHealthCheck(r []epHealth, path string, verbose bool) {
tp.EndpointHealth(r)
}
func (tp *tablePrinter) EndpointHealth(r []epHealth) {
hdr, rows := makeEndpointHealthTable(r)
table := tablewriter.NewWriter(os.Stdout)
Expand Down
7 changes: 7 additions & 0 deletions scripts/etcd_version_annotations.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,13 @@ etcdserverpb.HashRequest: "3.0"
etcdserverpb.HashResponse: "3.0"
etcdserverpb.HashResponse.hash: ""
etcdserverpb.HashResponse.header: ""
etcdserverpb.HealthRequest: "3.0"
etcdserverpb.HealthRequest.allowlist: ""
etcdserverpb.HealthRequest.exclude: ""
etcdserverpb.HealthResponse: "3.0"
etcdserverpb.HealthResponse.header: ""
etcdserverpb.HealthResponse.ok: ""
etcdserverpb.HealthResponse.reason: ""
etcdserverpb.InternalAuthenticateRequest: "3.0"
etcdserverpb.InternalAuthenticateRequest.name: ""
etcdserverpb.InternalAuthenticateRequest.password: ""
Expand Down
Loading

0 comments on commit eaf9561

Please sign in to comment.