From baa72b6aa477507388ad7122c19ff0e03ebc77fe Mon Sep 17 00:00:00 2001 From: Mike Beaumont Date: Tue, 10 Oct 2023 19:08:06 +0200 Subject: [PATCH] feat(kuma-cp): introduce zone health checks (#7821) ## Explanation Client: * Send health check periodically Server: * When we get a health check, mark it in the corresponding `ZoneInsight` * Introduce new component `ZoneWatch` that listens for `ZoneOpenedStream` and marks the `(tenantID, zone)` for watching * Periodically check all watched `(tenantID, zone)` and if the time of last health check in `ZoneInsight` is too late, send `ZoneWentOffline` * All opened streams also listen for `ZoneWentOffline` events and the handler returns if said event is received, ending the stream We store the info in `ZoneInsight` because: All instances need to potentially kill streams but not every instance will receive a health check from connected zones ## Tests The need for `time.Sleep` in the tests comes about because it happens asynchronously that: 1) `ZoneWatch` subscribes to `ZoneOpenedStream` events in `Start` in reality, this is guaranteed to happen before ZoneOpenedStream events are sent by the fact that we only send them in response to new gRPC streams being opened 2) `ZoneOpenedStream` is witnessed by `ZoneWatch` the test adds a `time.Sleep` because we only want to update the health check time once and in particular _after_ the zone starts being watched and then check that it's disconnected in reality, a zone will continually send its health check ping so it _eventually_ will be updated after the initial seen last time. Signed-off-by: Mike Beaumont --- api/mesh/v1alpha1/kds.pb.go | 282 ++++++++++-------- api/mesh/v1alpha1/kds.proto | 7 +- .../raw/protos/ZoneHealthCheckResponse.json | 8 + pkg/config/loader_test.go | 7 + pkg/config/multizone/kds.go | 24 ++ pkg/events/interfaces.go | 13 + pkg/kds/features.go | 6 + pkg/kds/global/components.go | 22 ++ pkg/kds/mux/client.go | 45 +++ pkg/kds/mux/zone_sync.go | 49 ++- pkg/kds/mux/zone_watch.go | 143 +++++++++ pkg/kds/mux/zone_watch_test.go | 167 +++++++++++ pkg/kds/service/server.go | 115 +++++-- 13 files changed, 720 insertions(+), 168 deletions(-) create mode 100644 pkg/kds/mux/zone_watch.go create mode 100644 pkg/kds/mux/zone_watch_test.go diff --git a/api/mesh/v1alpha1/kds.pb.go b/api/mesh/v1alpha1/kds.pb.go index 3d6ec34f29d5..2f9842b24340 100644 --- a/api/mesh/v1alpha1/kds.pb.go +++ b/api/mesh/v1alpha1/kds.pb.go @@ -11,6 +11,7 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" anypb "google.golang.org/protobuf/types/known/anypb" + durationpb "google.golang.org/protobuf/types/known/durationpb" reflect "reflect" sync "sync" ) @@ -119,6 +120,10 @@ type ZoneHealthCheckResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields + + // The the interval that the global control plane + // expects between health check pings + Interval *durationpb.Duration `protobuf:"bytes,1,opt,name=interval,proto3" json:"interval,omitempty"` } func (x *ZoneHealthCheckResponse) Reset() { @@ -153,6 +158,13 @@ func (*ZoneHealthCheckResponse) Descriptor() ([]byte, []int) { return file_api_mesh_v1alpha1_kds_proto_rawDescGZIP(), []int{2} } +func (x *ZoneHealthCheckResponse) GetInterval() *durationpb.Duration { + if x != nil { + return x.Interval + } + return nil +} + // XDSConfigRequest is a request for XDS Config Dump that is executed on Zone // CP. type XDSConfigRequest struct { @@ -747,7 +759,9 @@ var file_api_mesh_v1alpha1_kds_proto_rawDesc = []byte{ 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x76, 0x33, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, - 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xbd, 0x01, 0x0a, 0x0c, 0x4b, 0x75, 0x6d, + 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, + 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xbd, 0x01, 0x0a, 0x0c, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x39, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x75, 0x6d, @@ -761,114 +775,118 @@ var file_api_mesh_v1alpha1_kds_proto_rawDesc = []byte{ 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x18, 0x0a, 0x16, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x22, 0x19, 0x0a, 0x17, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa0, 0x01, - 0x0a, 0x10, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, - 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, - 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, - 0x22, 0x6e, 0x0a, 0x11, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, - 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, 0x06, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, - 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, - 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, - 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, - 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, - 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, - 0x68, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, - 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, - 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x16, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, - 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x42, - 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x9f, 0x01, 0x0a, 0x0f, 0x43, 0x6c, - 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, - 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, - 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, - 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, - 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, - 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x71, 0x0a, 0x10, 0x43, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, - 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, - 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, - 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1c, 0x0a, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, - 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, - 0x74, 0x65, 0x72, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x32, 0x8e, - 0x01, 0x0a, 0x14, 0x4b, 0x75, 0x6d, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, - 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x76, 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, - 0x6d, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2c, - 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, - 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, - 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x65, - 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, - 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, - 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, - 0x98, 0x03, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x4b, 0x44, 0x53, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x58, 0x44, - 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, - 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, - 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, - 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, + 0x73, 0x74, 0x22, 0x50, 0x0a, 0x17, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x35, 0x0a, + 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, + 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x69, 0x6e, 0x74, 0x65, + 0x72, 0x76, 0x61, 0x6c, 0x22, 0xa0, 0x01, 0x0a, 0x10, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, + 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, + 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x6e, 0x0a, 0x11, 0x58, 0x44, 0x53, 0x43, 0x6f, + 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, + 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, + 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, + 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, + 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, + 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, + 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, + 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x68, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x16, + 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, + 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x22, 0x9f, 0x01, 0x0a, 0x0f, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, + 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, + 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, + 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, + 0x73, 0x68, 0x22, 0x71, 0x0a, 0x10, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1c, 0x0a, + 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, + 0x00, 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, + 0x65, 0x73, 0x75, 0x6c, 0x74, 0x32, 0x8e, 0x01, 0x0a, 0x14, 0x4b, 0x75, 0x6d, 0x61, 0x44, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x76, + 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, + 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2c, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, + 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, + 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, + 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, 0x98, 0x03, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, + 0x6c, 0x4b, 0x44, 0x53, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x10, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, + 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x56, 0x0a, 0x0b, 0x53, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, - 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, - 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x20, 0x2e, 0x6b, 0x75, - 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, - 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, - 0x01, 0x12, 0x5f, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x75, 0x73, 0x74, - 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, - 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x23, 0x2e, 0x6b, 0x75, 0x6d, 0x61, - 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, - 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, - 0x30, 0x01, 0x12, 0x66, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, - 0x6b, 0x12, 0x2a, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, - 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, - 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, - 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, - 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, - 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x4b, - 0x44, 0x53, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, - 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x54, 0x6f, 0x5a, 0x6f, 0x6e, 0x65, 0x53, 0x79, 0x6e, - 0x63, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, - 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, - 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, - 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x7d, 0x0a, 0x10, - 0x5a, 0x6f, 0x6e, 0x65, 0x54, 0x6f, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x53, 0x79, 0x6e, 0x63, - 0x12, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, - 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, - 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, - 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x68, 0x71, - 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, - 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, + 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, 0x53, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, + 0x12, 0x56, 0x0a, 0x0b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, + 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, + 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x1a, 0x20, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, + 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5f, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6b, 0x75, 0x6d, + 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, + 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x1a, 0x23, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, + 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x66, 0x0a, 0x0b, 0x48, 0x65, 0x61, + 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x2a, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, + 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, + 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, + 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, + 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x4b, 0x44, 0x53, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x54, 0x6f, + 0x5a, 0x6f, 0x6e, 0x65, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, + 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, + 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, + 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, + 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, + 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, + 0x01, 0x30, 0x01, 0x12, 0x7d, 0x0a, 0x10, 0x5a, 0x6f, 0x6e, 0x65, 0x54, 0x6f, 0x47, 0x6c, 0x6f, + 0x62, 0x61, 0x6c, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, + 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, + 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, + 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x31, 0x2e, 0x65, 0x6e, + 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, + 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, + 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, + 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x68, 0x71, 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -896,33 +914,35 @@ var file_api_mesh_v1alpha1_kds_proto_goTypes = []interface{}{ (*ClustersResponse)(nil), // 8: kuma.mesh.v1alpha1.ClustersResponse (*KumaResource_Meta)(nil), // 9: kuma.mesh.v1alpha1.KumaResource.Meta (*anypb.Any)(nil), // 10: google.protobuf.Any - (*v3.DiscoveryRequest)(nil), // 11: envoy.service.discovery.v3.DiscoveryRequest - (*v3.DeltaDiscoveryRequest)(nil), // 12: envoy.service.discovery.v3.DeltaDiscoveryRequest - (*v3.DeltaDiscoveryResponse)(nil), // 13: envoy.service.discovery.v3.DeltaDiscoveryResponse - (*v3.DiscoveryResponse)(nil), // 14: envoy.service.discovery.v3.DiscoveryResponse + (*durationpb.Duration)(nil), // 11: google.protobuf.Duration + (*v3.DiscoveryRequest)(nil), // 12: envoy.service.discovery.v3.DiscoveryRequest + (*v3.DeltaDiscoveryRequest)(nil), // 13: envoy.service.discovery.v3.DeltaDiscoveryRequest + (*v3.DeltaDiscoveryResponse)(nil), // 14: envoy.service.discovery.v3.DeltaDiscoveryResponse + (*v3.DiscoveryResponse)(nil), // 15: envoy.service.discovery.v3.DiscoveryResponse } var file_api_mesh_v1alpha1_kds_proto_depIdxs = []int32{ 9, // 0: kuma.mesh.v1alpha1.KumaResource.meta:type_name -> kuma.mesh.v1alpha1.KumaResource.Meta 10, // 1: kuma.mesh.v1alpha1.KumaResource.spec:type_name -> google.protobuf.Any - 11, // 2: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:input_type -> envoy.service.discovery.v3.DiscoveryRequest - 4, // 3: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:input_type -> kuma.mesh.v1alpha1.XDSConfigResponse - 6, // 4: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:input_type -> kuma.mesh.v1alpha1.StatsResponse - 8, // 5: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:input_type -> kuma.mesh.v1alpha1.ClustersResponse - 1, // 6: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:input_type -> kuma.mesh.v1alpha1.ZoneHealthCheckRequest - 12, // 7: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest - 13, // 8: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse - 14, // 9: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:output_type -> envoy.service.discovery.v3.DiscoveryResponse - 3, // 10: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:output_type -> kuma.mesh.v1alpha1.XDSConfigRequest - 5, // 11: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:output_type -> kuma.mesh.v1alpha1.StatsRequest - 7, // 12: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:output_type -> kuma.mesh.v1alpha1.ClustersRequest - 2, // 13: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:output_type -> kuma.mesh.v1alpha1.ZoneHealthCheckResponse - 13, // 14: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse - 12, // 15: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest - 9, // [9:16] is the sub-list for method output_type - 2, // [2:9] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 11, // 2: kuma.mesh.v1alpha1.ZoneHealthCheckResponse.interval:type_name -> google.protobuf.Duration + 12, // 3: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:input_type -> envoy.service.discovery.v3.DiscoveryRequest + 4, // 4: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:input_type -> kuma.mesh.v1alpha1.XDSConfigResponse + 6, // 5: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:input_type -> kuma.mesh.v1alpha1.StatsResponse + 8, // 6: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:input_type -> kuma.mesh.v1alpha1.ClustersResponse + 1, // 7: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:input_type -> kuma.mesh.v1alpha1.ZoneHealthCheckRequest + 13, // 8: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest + 14, // 9: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse + 15, // 10: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:output_type -> envoy.service.discovery.v3.DiscoveryResponse + 3, // 11: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:output_type -> kuma.mesh.v1alpha1.XDSConfigRequest + 5, // 12: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:output_type -> kuma.mesh.v1alpha1.StatsRequest + 7, // 13: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:output_type -> kuma.mesh.v1alpha1.ClustersRequest + 2, // 14: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:output_type -> kuma.mesh.v1alpha1.ZoneHealthCheckResponse + 14, // 15: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse + 13, // 16: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest + 10, // [10:17] is the sub-list for method output_type + 3, // [3:10] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_api_mesh_v1alpha1_kds_proto_init() } diff --git a/api/mesh/v1alpha1/kds.proto b/api/mesh/v1alpha1/kds.proto index afa93ade3f60..0aceeacbe167 100644 --- a/api/mesh/v1alpha1/kds.proto +++ b/api/mesh/v1alpha1/kds.proto @@ -6,6 +6,7 @@ option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1"; import "envoy/service/discovery/v3/discovery.proto"; import "google/protobuf/any.proto"; +import "google/protobuf/duration.proto"; service KumaDiscoveryService { rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest) @@ -22,10 +23,12 @@ message KumaResource { google.protobuf.Any spec = 2; } -message ZoneHealthCheckRequest { -} +message ZoneHealthCheckRequest {} message ZoneHealthCheckResponse { + // The the interval that the global control plane + // expects between health check pings + google.protobuf.Duration interval = 1; } service GlobalKDSService { diff --git a/docs/generated/raw/protos/ZoneHealthCheckResponse.json b/docs/generated/raw/protos/ZoneHealthCheckResponse.json index 20ba797786ab..d523b0a8c0fc 100644 --- a/docs/generated/raw/protos/ZoneHealthCheckResponse.json +++ b/docs/generated/raw/protos/ZoneHealthCheckResponse.json @@ -3,6 +3,14 @@ "$ref": "#/definitions/ZoneHealthCheckResponse", "definitions": { "ZoneHealthCheckResponse": { + "properties": { + "interval": { + "pattern": "^([0-9]+\\.?[0-9]*|\\.[0-9]+)s$", + "type": "string", + "description": "The the interval that the global control plane expects between health check pings", + "format": "regex" + } + }, "additionalProperties": true, "type": "object", "title": "Zone Health Check Response" diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 65d1d7fb358e..54a35b7a8dc7 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -253,6 +253,8 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second)) Expect(cfg.Multizone.Global.KDS.DisableSOTW).To(BeTrue()) Expect(cfg.Multizone.Global.KDS.ResponseBackoff.Duration).To(Equal(time.Second)) + Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration).To(Equal(11 * time.Second)) + Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration).To(Equal(110 * time.Second)) Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685")) Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1")) Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa")) @@ -568,6 +570,9 @@ multizone: nackBackoff: 11s responseBackoff: 1s disableSOTW: true + zoneHealthCheck: + pollInterval: 11s + timeout: 110s zone: globalAddress: "grpc://1.1.1.1:5685" name: "zone-1" @@ -867,6 +872,8 @@ tracing: "KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s", "KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF": "1s", "KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW": "true", + "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_POLL_INTERVAL": "11s", + "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_TIMEOUT": "110s", "KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685", "KUMA_MULTIZONE_ZONE_NAME": "zone-1", "KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa", diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index ec39e823bb2b..baa3e1fa6c3f 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -40,6 +40,8 @@ type KdsServerConfig struct { // ResponseBackoff is a time Global CP waits before sending ACK/NACK. // This is a way to slow down Zone CP from sending resources too often. ResponseBackoff config_types.Duration `json:"responseBackoff" envconfig:"kuma_multizone_global_kds_response_backoff"` + // ZoneHealthCheck holds config for ensuring zones are online + ZoneHealthCheck ZoneHealthCheckConfig `json:"zoneHealthCheck"` } var _ config.Config = &KdsServerConfig{} @@ -73,6 +75,9 @@ func (c *KdsServerConfig) Validate() error { if _, err := config_types.TLSCiphers(c.TlsCipherSuites); err != nil { errs = multierr.Append(errs, errors.New(".TlsCipherSuites"+err.Error())) } + if err := c.ZoneHealthCheck.Validate(); err != nil { + errs = multierr.Append(errs, errors.Wrap(err, "invalid zoneHealthCheck config")) + } return errs } @@ -104,3 +109,22 @@ func (k KdsClientConfig) Sanitize() { func (k KdsClientConfig) Validate() error { return nil } + +type ZoneHealthCheckConfig struct { + // PollInterval is the interval between the global CP checking ZoneInsight for + // health check pings and interval between zone CP sending health check pings + PollInterval config_types.Duration `json:"pollInterval" envconfig:"kuma_multizone_global_kds_zone_health_check_poll_interval"` + // Timeout is the time after the last health check that a zone counts as + // no longer online + Timeout config_types.Duration `json:"timeout" envconfig:"kuma_multizone_global_kds_zone_health_check_timeout"` +} + +func (c ZoneHealthCheckConfig) Sanitize() { +} + +func (c ZoneHealthCheckConfig) Validate() error { + if (c.Timeout.Duration > 0) != (c.PollInterval.Duration > 0) { + return errors.New("timeout and pollInterval must both be either set or unset") + } + return nil +} diff --git a/pkg/events/interfaces.go b/pkg/events/interfaces.go index fd21603c8056..d4666e91b913 100644 --- a/pkg/events/interfaces.go +++ b/pkg/events/interfaces.go @@ -34,6 +34,19 @@ type Listener interface { Close() } +func NewNeverListener() Listener { + return &neverRecvListener{} +} + +type neverRecvListener struct{} + +func (*neverRecvListener) Recv() <-chan Event { + return nil +} + +func (*neverRecvListener) Close() { +} + type Predicate = func(event Event) bool type Emitter interface { diff --git a/pkg/kds/features.go b/pkg/kds/features.go index 0a7b0e93fd77..4a5d3b53c890 100644 --- a/pkg/kds/features.go +++ b/pkg/kds/features.go @@ -9,5 +9,11 @@ func (f Features) HasFeature(feature string) bool { return f[feature] } +const FeaturesMetadataKey string = "features" + // FeatureZoneToken means that the zone control plane can handle incoming Zone Token from global control plane. const FeatureZoneToken string = "zone-token" + +// FeatureZonePingHealth means that the zone control plane sends pings to the +// global control plane to indicate it's still running. +const FeatureZonePingHealth string = "zone-ping-health" diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index a659163f15fa..4dacdbcba211 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "strings" + "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -159,6 +160,24 @@ func Setup(rt runtime.Runtime) error { for _, filter := range rt.KDSContext().GlobalServerFiltersV2 { streamInterceptors = append(streamInterceptors, filter) } + + if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) { + zwLog := kdsGlobalLog.WithName("zone-watch") + zw, err := mux.NewZoneWatch( + zwLog, + rt.Config().Multizone.Global.KDS.ZoneHealthCheck, + rt.Metrics(), + rt.EventBus(), + rt.ReadOnlyResourceManager(), + rt.Extensions(), + ) + if err != nil { + return errors.Wrap(err, "couldn't create ZoneWatch") + } + if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil { + return err + } + } return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer( onSessionStarted, rt.KDSContext().GlobalServerFilters, @@ -172,12 +191,15 @@ func Setup(rt runtime.Runtime) error { streamInterceptors, rt.Extensions(), rt.Config().Store.Upsert, + rt.EventBus(), + rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration, ), mux.NewKDSSyncServiceServer( onGlobalToZoneSyncConnect, onZoneToGlobalSyncConnect, rt.KDSContext().GlobalServerFiltersV2, rt.Extensions(), + rt.EventBus(), ), ))) } diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index 231cc0e52461..f9a86b0a3fb9 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -6,6 +6,7 @@ import ( "crypto/x509" "net/url" "os" + "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -23,6 +24,7 @@ import ( "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/registry" "github.com/kumahq/kuma/pkg/core/runtime/component" + "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/metrics" "github.com/kumahq/kuma/pkg/version" @@ -98,12 +100,15 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { withKDSCtx, cancel := context.WithCancel(metadata.AppendToOutgoingContext(c.ctx, "client-id", c.clientID, KDSVersionHeaderKey, KDSVersionV3, + kds.FeaturesMetadataKey, kds.FeatureZonePingHealth, )) defer cancel() log := muxClientLog.WithValues("client-id", c.clientID) errorCh := make(chan error) + c.startHealthCheck(withKDSCtx, log, conn, stop, errorCh) + go c.startXDSConfigs(withKDSCtx, log, conn, stop, errorCh) go c.startStats(withKDSCtx, log, conn, stop, errorCh) go c.startClusters(withKDSCtx, log, conn, stop, errorCh) @@ -282,6 +287,46 @@ func (c *client) startClusters( c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh) } +func (c *client) startHealthCheck( + ctx context.Context, + log logr.Logger, + conn *grpc.ClientConn, + stop <-chan struct{}, + errorCh chan error, +) { + client := mesh_proto.NewGlobalKDSServiceClient(conn) + log = log.WithValues("rpc", "healthcheck") + log.Info("starting") + + go func() { + prevInterval := 5 * time.Minute + ticker := time.NewTicker(prevInterval) + defer ticker.Stop() + for { + log.Info("sending health check") + resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) + if err != nil && !errors.Is(err, context.Canceled) { + log.Error(err, "health check failed") + errorCh <- errors.Wrap(err, "zone health check request failed") + } else if interval := resp.Interval.AsDuration(); interval > 0 { + if prevInterval != interval { + prevInterval = interval + log.Info("Global CP requested new healthcheck interval", "interval", interval) + } + ticker.Reset(interval) + } + + select { + case <-ticker.C: + continue + case <-stop: + log.Info("stopping") + return + } + } + }() +} + func (c *client) handleProcessingErrors( stream grpc.ClientStream, log logr.Logger, diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go index 1fb8cf1f4e9e..fc25b76f121f 100644 --- a/pkg/kds/mux/zone_sync.go +++ b/pkg/kds/mux/zone_sync.go @@ -2,16 +2,22 @@ package mux import ( "context" + "slices" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds" + "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/kds/util" "github.com/kumahq/kuma/pkg/log" + "github.com/kumahq/kuma/pkg/multitenant" ) type FilterV2 interface { @@ -38,6 +44,7 @@ type KDSSyncServiceServer struct { zoneToGlobalCb OnZoneToGlobalSyncConnectFunc filters []FilterV2 extensions context.Context + eventBus events.EventBus mesh_proto.UnimplementedKDSSyncServiceServer } @@ -46,12 +53,14 @@ func NewKDSSyncServiceServer( zoneToGlobalCb OnZoneToGlobalSyncConnectFunc, filters []FilterV2, extensions context.Context, + eventBus events.EventBus, ) *KDSSyncServiceServer { return &KDSSyncServiceServer{ globalToZoneCb: globalToZoneCb, zoneToGlobalCb: zoneToGlobalCb, filters: filters, extensions: extensions, + eventBus: eventBus, } } @@ -59,19 +68,26 @@ var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{} func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error { logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions) - clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) + zone, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { return err } - logger = logger.WithValues("clientID", clientID) + logger = logger.WithValues("clientID", zone) for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } + + shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) + defer shouldDisconnectStream.Close() + processingErrorsCh := make(chan error) go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh) select { + case <-shouldDisconnectStream.Recv(): + logger.Info("ending stream, zone health check failed") + return nil case <-stream.Context().Done(): logger.Info("GlobalToZoneSync rpc stream stopped") return nil @@ -86,19 +102,26 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error { logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions) - clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) + zone, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { return err } - logger = logger.WithValues("clientID", clientID) + logger = logger.WithValues("clientID", zone) for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } + + shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) + defer shouldDisconnectStream.Close() + processingErrorsCh := make(chan error) go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh) select { + case <-shouldDisconnectStream.Recv(): + logger.Info("ending stream, zone health check failed") + return nil case <-stream.Context().Done(): logger.Info("ZoneToGlobalSync rpc stream stopped") return nil @@ -110,3 +133,21 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService return status.Error(codes.Internal, "stream failed") } } + +func (g *KDSSyncServiceServer) watchZoneHealthCheck(streamContext context.Context, zone string) events.Listener { + tenantID, _ := multitenant.TenantFromCtx(streamContext) + md, _ := metadata.FromIncomingContext(streamContext) + + shouldDisconnectStream := events.NewNeverListener() + + features := md.Get(kds.FeaturesMetadataKey) + if slices.Contains(features, kds.FeatureZonePingHealth) { + shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { + disconnectEvent, ok := e.(service.ZoneWentOffline) + return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone + }) + g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID}) + } + + return shouldDisconnectStream +} diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go new file mode 100644 index 000000000000..43ede2a9ce49 --- /dev/null +++ b/pkg/kds/mux/zone_watch.go @@ -0,0 +1,143 @@ +package mux + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "github.com/prometheus/client_golang/prometheus" + + "github.com/kumahq/kuma/pkg/config/multizone" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + "github.com/kumahq/kuma/pkg/core/resources/model" + "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds/service" + kuma_log "github.com/kumahq/kuma/pkg/log" + core_metrics "github.com/kumahq/kuma/pkg/metrics" + "github.com/kumahq/kuma/pkg/multitenant" +) + +type zoneTenant struct { + zone string + tenantID string +} + +type ZoneWatch struct { + log logr.Logger + poll time.Duration + timeout time.Duration + bus events.EventBus + extensions context.Context + rm manager.ReadOnlyResourceManager + summary prometheus.Summary + zones map[zoneTenant]time.Time +} + +func NewZoneWatch( + log logr.Logger, + cfg multizone.ZoneHealthCheckConfig, + metrics prometheus.Registerer, + bus events.EventBus, + rm manager.ReadOnlyResourceManager, + extensions context.Context, +) (*ZoneWatch, error) { + summary := prometheus.NewSummary(prometheus.SummaryOpts{ + Name: "component_zone_watch", + Help: "Summary of ZoneWatch component interval", + Objectives: core_metrics.DefaultObjectives, + }) + if err := metrics.Register(summary); err != nil { + return nil, err + } + + return &ZoneWatch{ + log: log, + poll: cfg.PollInterval.Duration, + timeout: cfg.Timeout.Duration, + bus: bus, + extensions: extensions, + rm: rm, + summary: summary, + zones: map[zoneTenant]time.Time{}, + }, nil +} + +func (zw *ZoneWatch) Start(stop <-chan struct{}) error { + timer := time.NewTicker(zw.poll) + defer timer.Stop() + + connectionWatch := zw.bus.Subscribe(func(e events.Event) bool { + _, ok := e.(service.ZoneOpenedStream) + return ok + }) + defer connectionWatch.Close() + + for { + select { + case <-timer.C: + start := core.Now() + for zone, firstSeen := range zw.zones { + ctx := multitenant.WithTenant(context.TODO(), zone.tenantID) + zoneInsight := system.NewZoneInsightResource() + + log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) + if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(zone.zone, model.NoMesh)); err != nil { + log.Info("error getting ZoneInsight", "zone", zone.zone, "error", err) + continue + } + + // It may be that we don't have a health check yet so we use the + // lastSeen time because we know the zone was connected at that + // point at least + lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() + if firstSeen.After(lastHealthCheck) { + lastHealthCheck = firstSeen + } + if time.Since(lastHealthCheck) > zw.timeout { + zw.bus.Send(service.ZoneWentOffline{ + Zone: zone.zone, + TenantID: zone.tenantID, + }) + delete(zw.zones, zone) + } + } + zw.summary.Observe(float64(core.Now().Sub(start).Milliseconds())) + case e := <-connectionWatch.Recv(): + newStream := e.(service.ZoneOpenedStream) + + ctx := multitenant.WithTenant(context.TODO(), newStream.TenantID) + zoneInsight := system.NewZoneInsightResource() + + log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) + if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(newStream.Zone, model.NoMesh)); err != nil { + log.Info("error getting ZoneInsight", "zone", newStream.Zone) + continue + } + + // We keep a record of the time we see in the ZoneInsight when the + // stream is opened. + // This is to prevent the zone from timing out on a poll + // where the last health check is still from a previous connect, so: + // a long time ago: zone CP disconnects, no more health checks are sent + // now: + // zone CP opens streams + // global CP gets ZoneOpenedStream + // global CP runs poll and see the last health check from "a long time ago" + // BAD: global CP kills streams + // zone CP health check arrives + zw.zones[zoneTenant{ + tenantID: newStream.TenantID, + zone: newStream.Zone, + }] = zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() + case <-stop: + return nil + } + } +} + +func (zw *ZoneWatch) NeedLeaderElection() bool { + return false +} diff --git a/pkg/kds/mux/zone_watch_test.go b/pkg/kds/mux/zone_watch_test.go new file mode 100644 index 000000000000..56afe64c4f8c --- /dev/null +++ b/pkg/kds/mux/zone_watch_test.go @@ -0,0 +1,167 @@ +package mux_test + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "google.golang.org/protobuf/types/known/timestamppb" + + system_proto "github.com/kumahq/kuma/api/system/v1alpha1" + "github.com/kumahq/kuma/pkg/config/multizone" + "github.com/kumahq/kuma/pkg/config/types" + "github.com/kumahq/kuma/pkg/core" + "github.com/kumahq/kuma/pkg/core/resources/apis/system" + "github.com/kumahq/kuma/pkg/core/resources/manager" + core_model "github.com/kumahq/kuma/pkg/core/resources/model" + "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds/mux" + "github.com/kumahq/kuma/pkg/kds/service" + core_metrics "github.com/kumahq/kuma/pkg/metrics" + "github.com/kumahq/kuma/pkg/plugins/resources/memory" +) + +func sendHealthCheckPing(rm manager.ResourceManager, name string) { + zoneInsight := system.NewZoneInsightResource() + Expect(rm.Get( + context.Background(), + zoneInsight, + store.GetByKey(name, core_model.NoMesh), + )).To(Succeed()) + + zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ + Time: timestamppb.New(time.Now()), + } + Expect(rm.Update( + context.Background(), + zoneInsight, + )).To(Succeed()) +} + +const zone = "zone-1" + +var _ = Describe("ZoneWatch", func() { + var errCh chan error + + var rm manager.ResourceManager + var eventBus events.EventBus + var stop chan struct{} + var zoneWatch *mux.ZoneWatch + var timeouts events.Listener + + pollInterval := 100 * time.Millisecond + timeout := 5 * pollInterval + + BeforeEach(func() { + metrics, err := core_metrics.NewMetrics("") + Expect(err).ToNot(HaveOccurred()) + eventBus, err = events.NewEventBus(10, metrics) + Expect(err).NotTo(HaveOccurred()) + + cfg := multizone.ZoneHealthCheckConfig{ + PollInterval: types.Duration{Duration: pollInterval}, + Timeout: types.Duration{Duration: timeout}, + } + + zoneInsight := system.NewZoneInsightResource() + zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ + Time: timestamppb.New(time.Now()), + } + rm = manager.NewResourceManager(memory.NewStore()) + Expect(rm.Create( + context.Background(), + zoneInsight, + store.CreateByKey(zone, core_model.NoMesh), + )).To(Succeed()) + + log := core.Log.WithName("test") + zoneWatch, err = mux.NewZoneWatch( + log, + cfg, + metrics, + eventBus, + rm, + context.Background(), + ) + Expect(err).NotTo(HaveOccurred()) + + stop = make(chan struct{}) + + timeouts = eventBus.Subscribe(func(event events.Event) bool { + _, ok := event.(service.ZoneWentOffline) + return ok + }) + + errCh = make(chan error, 1) + + go func() { + errCh <- zoneWatch.Start(stop) + }() + + // wait for ZoneWatch to have subscribed to new zone events + time.Sleep(pollInterval) + }) + + AfterEach(func() { + select { + case <-errCh: + Fail("zone watch should not have stopped") + default: + } + close(stop) + timeouts.Close() + Eventually(errCh).Should(Receive(Succeed())) + }) + + // We know _best case_ the zone will register offline + // in timeout + pollInterval + zoneWentOfflineCheckTimeout := timeout + 2*pollInterval + + It("should timeout zones that stop sending a health check", func() { + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + }) + + // wait for opened stream to be registered + // in real conditions the interval will be large enough + // that these events will almost certainly be handled + // by the ZoneWatch loop between polls and before the timeout + time.Sleep(pollInterval) + + Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ + TenantID: "", + Zone: zone, + }))) + + Consistently(timeouts.Recv(), zoneWentOfflineCheckTimeout).ShouldNot(Receive()) + }) + It("shouldn't timeout as long as ZoneInsight is updated", func() { + eventBus.Send(service.ZoneOpenedStream{ + TenantID: "", + Zone: zone, + }) + + // wait for opened stream to be registered + // in real conditions the interval will be large enough + // that these events will almost certainly be handled + // by the ZoneWatch loop between polls and before the timeout + time.Sleep(pollInterval) + + // Send a health check and block for a poll interval and make sure + // nothing has been received + // do this until we know the timeout would have come if weren't sending + // health checks + Consistently(func(g Gomega) { + sendHealthCheckPing(rm, zone) + g.Consistently(timeouts.Recv(), pollInterval).ShouldNot(Receive()) + }, 3*timeout).Should(Succeed()) + + Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ + TenantID: "", + Zone: zone, + }))) + }) +}) diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index 58aa15e3fa0d..58270d98bd0e 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -5,12 +5,15 @@ import ( "fmt" "io" "math/rand" + "slices" "time" "github.com/sethvargo/go-retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -22,6 +25,8 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/model" core_store "github.com/kumahq/kuma/pkg/core/resources/store" + "github.com/kumahq/kuma/pkg/events" + "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/util" kuma_log "github.com/kumahq/kuma/pkg/log" "github.com/kumahq/kuma/pkg/multitenant" @@ -35,12 +40,14 @@ type StreamInterceptor interface { } type GlobalKDSServiceServer struct { - envoyAdminRPCs EnvoyAdminRPCs - resManager manager.ResourceManager - instanceID string - filters []StreamInterceptor - extensions context.Context - upsertCfg config_store.UpsertConfig + envoyAdminRPCs EnvoyAdminRPCs + resManager manager.ResourceManager + instanceID string + filters []StreamInterceptor + extensions context.Context + upsertCfg config_store.UpsertConfig + eventBus events.EventBus + zoneHealthCheckInterval time.Duration mesh_proto.UnimplementedGlobalKDSServiceServer } @@ -51,14 +58,18 @@ func NewGlobalKDSServiceServer( filters []StreamInterceptor, extensions context.Context, upsertCfg config_store.UpsertConfig, + eventBus events.EventBus, + zoneHealthCheckInterval time.Duration, ) *GlobalKDSServiceServer { return &GlobalKDSServiceServer{ - envoyAdminRPCs: envoyAdminRPCs, - resManager: resManager, - instanceID: instanceID, - filters: filters, - extensions: extensions, - upsertCfg: upsertCfg, + envoyAdminRPCs: envoyAdminRPCs, + resManager: resManager, + instanceID: instanceID, + filters: filters, + extensions: extensions, + upsertCfg: upsertCfg, + eventBus: eventBus, + zoneHealthCheckInterval: zoneHealthCheckInterval, } } @@ -103,7 +114,18 @@ func (g *GlobalKDSServiceServer) HealthCheck(ctx context.Context, _ *mesh_proto. log.Error(err, "couldn't update zone insight", "zone", zone) } - return &mesh_proto.ZoneHealthCheckResponse{}, nil + return &mesh_proto.ZoneHealthCheckResponse{ + Interval: durationpb.New(g.zoneHealthCheckInterval), + }, nil +} + +type ZoneWentOffline struct { + TenantID string + Zone string +} +type ZoneOpenedStream struct { + TenantID string + Zone string } func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( @@ -117,6 +139,23 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( return status.Error(codes.InvalidArgument, err.Error()) } clientID := ClientID(stream.Context(), zone) + tenantID, _ := multitenant.TenantFromCtx(stream.Context()) + + shouldDisconnectStream := events.NewNeverListener() + + md, _ := metadata.FromIncomingContext(stream.Context()) + features := md.Get(kds.FeaturesMetadataKey) + + if slices.Contains(features, kds.FeatureZonePingHealth) { + shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { + disconnectEvent, ok := e.(ZoneWentOffline) + return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone + }) + g.eventBus.Send(ZoneOpenedStream{Zone: zone, TenantID: tenantID}) + } + + defer shouldDisconnectStream.Close() + logger := log.WithValues("rpc", rpcName, "clientID", clientID) logger = kuma_log.AddFieldsFromCtx(logger, stream.Context(), g.extensions) for _, filter := range g.filters { @@ -136,25 +175,39 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( return status.Error(codes.Internal, "could not store stream connection") } logger.Info("stored stream connection") - for { - resp, err := recv() - if err == io.EOF { - logger.Info("stream stopped") - return nil - } - if status.Code(err) == codes.Canceled { - logger.Info("stream cancelled") - return nil - } - if err != nil { - logger.Error(err, "could not receive a message") - return status.Error(codes.Internal, "could not receive a message") - } - logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) - if err := rpc.ResponseReceived(clientID, resp); err != nil { - logger.Error(err, "could not mark the response as received") - return status.Error(codes.InvalidArgument, "could not mark the response as received") + streamResult := make(chan error, 1) + go func() { + for { + resp, err := recv() + if err == io.EOF { + logger.Info("stream stopped") + streamResult <- nil + return + } + if status.Code(err) == codes.Canceled { + logger.Info("stream cancelled") + streamResult <- nil + return + } + if err != nil { + logger.Error(err, "could not receive a message") + streamResult <- status.Error(codes.Internal, "could not receive a message") + return + } + logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) + if err := rpc.ResponseReceived(clientID, resp); err != nil { + logger.Error(err, "could not mark the response as received") + streamResult <- status.Error(codes.InvalidArgument, "could not mark the response as received") + return + } } + }() + select { + case <-shouldDisconnectStream.Recv(): + logger.Info("ending stream, zone health check failed") + return nil + case res := <-streamResult: + return res } }