From 386ab531cbfcdf2793b40dc174843207fffa469b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Krzysztof=20S=C5=82onka?= Date: Wed, 11 Oct 2023 15:15:17 +0200 Subject: [PATCH] chore(revert): revert "feat(kuma-cp): introduce zone health checks" (#8017) Revert "feat(kuma-cp): introduce zone health checks (#7821)" This reverts commit baa72b6aa477507388ad7122c19ff0e03ebc77fe. --- Signed-off-by: slonka --- api/mesh/v1alpha1/kds.pb.go | 282 ++++++++---------- api/mesh/v1alpha1/kds.proto | 7 +- .../raw/protos/ZoneHealthCheckResponse.json | 8 - pkg/config/loader_test.go | 7 - pkg/config/multizone/kds.go | 24 -- pkg/events/interfaces.go | 13 - pkg/kds/features.go | 6 - pkg/kds/global/components.go | 22 -- pkg/kds/mux/client.go | 45 --- pkg/kds/mux/zone_sync.go | 49 +-- pkg/kds/mux/zone_watch.go | 143 --------- pkg/kds/mux/zone_watch_test.go | 167 ----------- pkg/kds/service/server.go | 115 ++----- 13 files changed, 168 insertions(+), 720 deletions(-) delete mode 100644 pkg/kds/mux/zone_watch.go delete mode 100644 pkg/kds/mux/zone_watch_test.go diff --git a/api/mesh/v1alpha1/kds.pb.go b/api/mesh/v1alpha1/kds.pb.go index 2f9842b24340..3d6ec34f29d5 100644 --- a/api/mesh/v1alpha1/kds.pb.go +++ b/api/mesh/v1alpha1/kds.pb.go @@ -11,7 +11,6 @@ import ( protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" anypb "google.golang.org/protobuf/types/known/anypb" - durationpb "google.golang.org/protobuf/types/known/durationpb" reflect "reflect" sync "sync" ) @@ -120,10 +119,6 @@ type ZoneHealthCheckResponse struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - - // The the interval that the global control plane - // expects between health check pings - Interval *durationpb.Duration `protobuf:"bytes,1,opt,name=interval,proto3" json:"interval,omitempty"` } func (x *ZoneHealthCheckResponse) Reset() { @@ -158,13 +153,6 @@ func (*ZoneHealthCheckResponse) Descriptor() ([]byte, []int) { return file_api_mesh_v1alpha1_kds_proto_rawDescGZIP(), []int{2} } -func (x *ZoneHealthCheckResponse) GetInterval() *durationpb.Duration { - if x != nil { - return x.Interval - } - return nil -} - // XDSConfigRequest is a request for XDS Config Dump that is executed on Zone // CP. type XDSConfigRequest struct { @@ -759,9 +747,7 @@ var file_api_mesh_v1alpha1_kds_proto_rawDesc = []byte{ 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2f, 0x76, 0x33, 0x2f, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, - 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, - 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x75, 0x72, 0x61, 0x74, 0x69, - 0x6f, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xbd, 0x01, 0x0a, 0x0c, 0x4b, 0x75, 0x6d, + 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xbd, 0x01, 0x0a, 0x0c, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x39, 0x0a, 0x04, 0x6d, 0x65, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x4b, 0x75, 0x6d, @@ -775,118 +761,114 @@ var file_api_mesh_v1alpha1_kds_proto_rawDesc = []byte{ 0x0a, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x22, 0x18, 0x0a, 0x16, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x22, 0x50, 0x0a, 0x17, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, - 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x35, 0x0a, - 0x08, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x19, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, - 0x66, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x08, 0x69, 0x6e, 0x74, 0x65, - 0x72, 0x76, 0x61, 0x6c, 0x22, 0xa0, 0x01, 0x0a, 0x10, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, - 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, - 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, - 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, - 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, - 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x6e, 0x0a, 0x11, 0x58, 0x44, 0x53, 0x43, 0x6f, - 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, - 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, - 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, - 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, - 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, - 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, - 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, - 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, - 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, - 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, - 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x68, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, - 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x16, - 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, - 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, - 0x22, 0x9f, 0x01, 0x0a, 0x0f, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, - 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, - 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, - 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, - 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, - 0x73, 0x68, 0x22, 0x71, 0x0a, 0x10, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, - 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1c, 0x0a, - 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, - 0x00, 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, - 0x65, 0x73, 0x75, 0x6c, 0x74, 0x32, 0x8e, 0x01, 0x0a, 0x14, 0x4b, 0x75, 0x6d, 0x61, 0x44, 0x69, - 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x76, - 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, - 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2c, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, - 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, - 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, - 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, - 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, 0x98, 0x03, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, - 0x6c, 0x4b, 0x44, 0x53, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x10, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, - 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, + 0x73, 0x74, 0x22, 0x19, 0x0a, 0x17, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0xa0, 0x01, + 0x0a, 0x10, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, + 0x64, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, + 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, + 0x22, 0x6e, 0x0a, 0x11, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x18, 0x0a, 0x06, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x06, + 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, + 0x22, 0x9c, 0x01, 0x0a, 0x0c, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, + 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, + 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, + 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x54, 0x79, 0x70, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, + 0x68, 0x0a, 0x0d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, + 0x16, 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, + 0x52, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x16, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, + 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x42, + 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x22, 0x9f, 0x01, 0x0a, 0x0f, 0x43, 0x6c, + 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x1d, 0x0a, + 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x23, 0x0a, 0x0d, + 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x54, 0x79, 0x70, + 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x6e, 0x61, + 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, + 0x63, 0x65, 0x5f, 0x6d, 0x65, 0x73, 0x68, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x72, + 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x4d, 0x65, 0x73, 0x68, 0x22, 0x71, 0x0a, 0x10, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x1d, 0x0a, 0x0a, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x09, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x12, 0x16, + 0x0a, 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, + 0x05, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x12, 0x1c, 0x0a, 0x08, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x73, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x08, 0x63, 0x6c, 0x75, 0x73, + 0x74, 0x65, 0x72, 0x73, 0x42, 0x08, 0x0a, 0x06, 0x72, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x32, 0x8e, + 0x01, 0x0a, 0x14, 0x4b, 0x75, 0x6d, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, + 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x76, 0x0a, 0x13, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x4b, 0x75, 0x6d, 0x61, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x73, 0x12, 0x2c, + 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, + 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, + 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x65, + 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, + 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, + 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x32, + 0x98, 0x03, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x4b, 0x44, 0x53, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x12, 0x63, 0x0a, 0x10, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x58, 0x44, + 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x73, 0x12, 0x25, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, + 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, + 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, + 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, - 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x58, 0x44, 0x53, 0x43, - 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, - 0x12, 0x56, 0x0a, 0x0b, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, - 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, - 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x1a, 0x20, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, - 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x5f, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6b, 0x75, 0x6d, - 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, - 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x1a, 0x23, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, - 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x66, 0x0a, 0x0b, 0x48, 0x65, 0x61, - 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x12, 0x2a, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, - 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, - 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, - 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, - 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x4b, 0x44, 0x53, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x54, 0x6f, - 0x5a, 0x6f, 0x6e, 0x65, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, - 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, - 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, - 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, - 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, - 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, - 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, - 0x01, 0x30, 0x01, 0x12, 0x7d, 0x0a, 0x10, 0x5a, 0x6f, 0x6e, 0x65, 0x54, 0x6f, 0x47, 0x6c, 0x6f, - 0x62, 0x61, 0x6c, 0x53, 0x79, 0x6e, 0x63, 0x12, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, - 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, - 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, - 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x31, 0x2e, 0x65, 0x6e, - 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, - 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, - 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, - 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, - 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x68, 0x71, 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x2f, 0x61, 0x70, 0x69, - 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x12, 0x56, 0x0a, 0x0b, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, 0x21, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, + 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x53, 0x74, + 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x20, 0x2e, 0x6b, 0x75, + 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, + 0x2e, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, + 0x01, 0x12, 0x5f, 0x0a, 0x0e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x43, 0x6c, 0x75, 0x73, 0x74, + 0x65, 0x72, 0x73, 0x12, 0x24, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, + 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, + 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x23, 0x2e, 0x6b, 0x75, 0x6d, 0x61, + 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x43, + 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, + 0x30, 0x01, 0x12, 0x66, 0x0a, 0x0b, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, 0x63, + 0x6b, 0x12, 0x2a, 0x2e, 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, + 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, + 0x68, 0x43, 0x68, 0x65, 0x63, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2b, 0x2e, + 0x6b, 0x75, 0x6d, 0x61, 0x2e, 0x6d, 0x65, 0x73, 0x68, 0x2e, 0x76, 0x31, 0x61, 0x6c, 0x70, 0x68, + 0x61, 0x31, 0x2e, 0x5a, 0x6f, 0x6e, 0x65, 0x48, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x43, 0x68, 0x65, + 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x4b, + 0x44, 0x53, 0x53, 0x79, 0x6e, 0x63, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x7d, 0x0a, + 0x10, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x54, 0x6f, 0x5a, 0x6f, 0x6e, 0x65, 0x53, 0x79, 0x6e, + 0x63, 0x12, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, + 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, + 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x28, 0x01, 0x30, 0x01, 0x12, 0x7d, 0x0a, 0x10, + 0x5a, 0x6f, 0x6e, 0x65, 0x54, 0x6f, 0x47, 0x6c, 0x6f, 0x62, 0x61, 0x6c, 0x53, 0x79, 0x6e, 0x63, + 0x12, 0x32, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, + 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, 0x33, 0x2e, 0x44, 0x65, + 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x1a, 0x31, 0x2e, 0x65, 0x6e, 0x76, 0x6f, 0x79, 0x2e, 0x73, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x2e, 0x64, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x2e, 0x76, + 0x33, 0x2e, 0x44, 0x65, 0x6c, 0x74, 0x61, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x28, 0x01, 0x30, 0x01, 0x42, 0x2a, 0x5a, 0x28, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x68, 0x71, + 0x2f, 0x6b, 0x75, 0x6d, 0x61, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x6d, 0x65, 0x73, 0x68, 0x2f, 0x76, + 0x31, 0x61, 0x6c, 0x70, 0x68, 0x61, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -914,35 +896,33 @@ var file_api_mesh_v1alpha1_kds_proto_goTypes = []interface{}{ (*ClustersResponse)(nil), // 8: kuma.mesh.v1alpha1.ClustersResponse (*KumaResource_Meta)(nil), // 9: kuma.mesh.v1alpha1.KumaResource.Meta (*anypb.Any)(nil), // 10: google.protobuf.Any - (*durationpb.Duration)(nil), // 11: google.protobuf.Duration - (*v3.DiscoveryRequest)(nil), // 12: envoy.service.discovery.v3.DiscoveryRequest - (*v3.DeltaDiscoveryRequest)(nil), // 13: envoy.service.discovery.v3.DeltaDiscoveryRequest - (*v3.DeltaDiscoveryResponse)(nil), // 14: envoy.service.discovery.v3.DeltaDiscoveryResponse - (*v3.DiscoveryResponse)(nil), // 15: envoy.service.discovery.v3.DiscoveryResponse + (*v3.DiscoveryRequest)(nil), // 11: envoy.service.discovery.v3.DiscoveryRequest + (*v3.DeltaDiscoveryRequest)(nil), // 12: envoy.service.discovery.v3.DeltaDiscoveryRequest + (*v3.DeltaDiscoveryResponse)(nil), // 13: envoy.service.discovery.v3.DeltaDiscoveryResponse + (*v3.DiscoveryResponse)(nil), // 14: envoy.service.discovery.v3.DiscoveryResponse } var file_api_mesh_v1alpha1_kds_proto_depIdxs = []int32{ 9, // 0: kuma.mesh.v1alpha1.KumaResource.meta:type_name -> kuma.mesh.v1alpha1.KumaResource.Meta 10, // 1: kuma.mesh.v1alpha1.KumaResource.spec:type_name -> google.protobuf.Any - 11, // 2: kuma.mesh.v1alpha1.ZoneHealthCheckResponse.interval:type_name -> google.protobuf.Duration - 12, // 3: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:input_type -> envoy.service.discovery.v3.DiscoveryRequest - 4, // 4: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:input_type -> kuma.mesh.v1alpha1.XDSConfigResponse - 6, // 5: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:input_type -> kuma.mesh.v1alpha1.StatsResponse - 8, // 6: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:input_type -> kuma.mesh.v1alpha1.ClustersResponse - 1, // 7: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:input_type -> kuma.mesh.v1alpha1.ZoneHealthCheckRequest - 13, // 8: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest - 14, // 9: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse - 15, // 10: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:output_type -> envoy.service.discovery.v3.DiscoveryResponse - 3, // 11: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:output_type -> kuma.mesh.v1alpha1.XDSConfigRequest - 5, // 12: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:output_type -> kuma.mesh.v1alpha1.StatsRequest - 7, // 13: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:output_type -> kuma.mesh.v1alpha1.ClustersRequest - 2, // 14: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:output_type -> kuma.mesh.v1alpha1.ZoneHealthCheckResponse - 14, // 15: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse - 13, // 16: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest - 10, // [10:17] is the sub-list for method output_type - 3, // [3:10] is the sub-list for method input_type - 3, // [3:3] is the sub-list for extension type_name - 3, // [3:3] is the sub-list for extension extendee - 0, // [0:3] is the sub-list for field type_name + 11, // 2: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:input_type -> envoy.service.discovery.v3.DiscoveryRequest + 4, // 3: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:input_type -> kuma.mesh.v1alpha1.XDSConfigResponse + 6, // 4: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:input_type -> kuma.mesh.v1alpha1.StatsResponse + 8, // 5: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:input_type -> kuma.mesh.v1alpha1.ClustersResponse + 1, // 6: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:input_type -> kuma.mesh.v1alpha1.ZoneHealthCheckRequest + 12, // 7: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest + 13, // 8: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:input_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse + 14, // 9: kuma.mesh.v1alpha1.KumaDiscoveryService.StreamKumaResources:output_type -> envoy.service.discovery.v3.DiscoveryResponse + 3, // 10: kuma.mesh.v1alpha1.GlobalKDSService.StreamXDSConfigs:output_type -> kuma.mesh.v1alpha1.XDSConfigRequest + 5, // 11: kuma.mesh.v1alpha1.GlobalKDSService.StreamStats:output_type -> kuma.mesh.v1alpha1.StatsRequest + 7, // 12: kuma.mesh.v1alpha1.GlobalKDSService.StreamClusters:output_type -> kuma.mesh.v1alpha1.ClustersRequest + 2, // 13: kuma.mesh.v1alpha1.GlobalKDSService.HealthCheck:output_type -> kuma.mesh.v1alpha1.ZoneHealthCheckResponse + 13, // 14: kuma.mesh.v1alpha1.KDSSyncService.GlobalToZoneSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryResponse + 12, // 15: kuma.mesh.v1alpha1.KDSSyncService.ZoneToGlobalSync:output_type -> envoy.service.discovery.v3.DeltaDiscoveryRequest + 9, // [9:16] is the sub-list for method output_type + 2, // [2:9] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name } func init() { file_api_mesh_v1alpha1_kds_proto_init() } diff --git a/api/mesh/v1alpha1/kds.proto b/api/mesh/v1alpha1/kds.proto index 0aceeacbe167..afa93ade3f60 100644 --- a/api/mesh/v1alpha1/kds.proto +++ b/api/mesh/v1alpha1/kds.proto @@ -6,7 +6,6 @@ option go_package = "github.com/kumahq/kuma/api/mesh/v1alpha1"; import "envoy/service/discovery/v3/discovery.proto"; import "google/protobuf/any.proto"; -import "google/protobuf/duration.proto"; service KumaDiscoveryService { rpc StreamKumaResources(stream envoy.service.discovery.v3.DiscoveryRequest) @@ -23,12 +22,10 @@ message KumaResource { google.protobuf.Any spec = 2; } -message ZoneHealthCheckRequest {} +message ZoneHealthCheckRequest { +} message ZoneHealthCheckResponse { - // The the interval that the global control plane - // expects between health check pings - google.protobuf.Duration interval = 1; } service GlobalKDSService { diff --git a/docs/generated/raw/protos/ZoneHealthCheckResponse.json b/docs/generated/raw/protos/ZoneHealthCheckResponse.json index d523b0a8c0fc..20ba797786ab 100644 --- a/docs/generated/raw/protos/ZoneHealthCheckResponse.json +++ b/docs/generated/raw/protos/ZoneHealthCheckResponse.json @@ -3,14 +3,6 @@ "$ref": "#/definitions/ZoneHealthCheckResponse", "definitions": { "ZoneHealthCheckResponse": { - "properties": { - "interval": { - "pattern": "^([0-9]+\\.?[0-9]*|\\.[0-9]+)s$", - "type": "string", - "description": "The the interval that the global control plane expects between health check pings", - "format": "regex" - } - }, "additionalProperties": true, "type": "object", "title": "Zone Health Check Response" diff --git a/pkg/config/loader_test.go b/pkg/config/loader_test.go index 54a35b7a8dc7..65d1d7fb358e 100644 --- a/pkg/config/loader_test.go +++ b/pkg/config/loader_test.go @@ -253,8 +253,6 @@ var _ = Describe("Config loader", func() { Expect(cfg.Multizone.Global.KDS.NackBackoff.Duration).To(Equal(11 * time.Second)) Expect(cfg.Multizone.Global.KDS.DisableSOTW).To(BeTrue()) Expect(cfg.Multizone.Global.KDS.ResponseBackoff.Duration).To(Equal(time.Second)) - Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration).To(Equal(11 * time.Second)) - Expect(cfg.Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration).To(Equal(110 * time.Second)) Expect(cfg.Multizone.Zone.GlobalAddress).To(Equal("grpc://1.1.1.1:5685")) Expect(cfg.Multizone.Zone.Name).To(Equal("zone-1")) Expect(cfg.Multizone.Zone.KDS.RootCAFile).To(Equal("/rootCa")) @@ -570,9 +568,6 @@ multizone: nackBackoff: 11s responseBackoff: 1s disableSOTW: true - zoneHealthCheck: - pollInterval: 11s - timeout: 110s zone: globalAddress: "grpc://1.1.1.1:5685" name: "zone-1" @@ -872,8 +867,6 @@ tracing: "KUMA_MULTIZONE_GLOBAL_KDS_NACK_BACKOFF": "11s", "KUMA_MULTIZONE_GLOBAL_KDS_RESPONSE_BACKOFF": "1s", "KUMA_MULTIZONE_GLOBAL_KDS_DISABLE_SOTW": "true", - "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_POLL_INTERVAL": "11s", - "KUMA_MULTIZONE_GLOBAL_KDS_ZONE_HEALTH_CHECK_TIMEOUT": "110s", "KUMA_MULTIZONE_ZONE_GLOBAL_ADDRESS": "grpc://1.1.1.1:5685", "KUMA_MULTIZONE_ZONE_NAME": "zone-1", "KUMA_MULTIZONE_ZONE_KDS_ROOT_CA_FILE": "/rootCa", diff --git a/pkg/config/multizone/kds.go b/pkg/config/multizone/kds.go index baa3e1fa6c3f..ec39e823bb2b 100644 --- a/pkg/config/multizone/kds.go +++ b/pkg/config/multizone/kds.go @@ -40,8 +40,6 @@ type KdsServerConfig struct { // ResponseBackoff is a time Global CP waits before sending ACK/NACK. // This is a way to slow down Zone CP from sending resources too often. ResponseBackoff config_types.Duration `json:"responseBackoff" envconfig:"kuma_multizone_global_kds_response_backoff"` - // ZoneHealthCheck holds config for ensuring zones are online - ZoneHealthCheck ZoneHealthCheckConfig `json:"zoneHealthCheck"` } var _ config.Config = &KdsServerConfig{} @@ -75,9 +73,6 @@ func (c *KdsServerConfig) Validate() error { if _, err := config_types.TLSCiphers(c.TlsCipherSuites); err != nil { errs = multierr.Append(errs, errors.New(".TlsCipherSuites"+err.Error())) } - if err := c.ZoneHealthCheck.Validate(); err != nil { - errs = multierr.Append(errs, errors.Wrap(err, "invalid zoneHealthCheck config")) - } return errs } @@ -109,22 +104,3 @@ func (k KdsClientConfig) Sanitize() { func (k KdsClientConfig) Validate() error { return nil } - -type ZoneHealthCheckConfig struct { - // PollInterval is the interval between the global CP checking ZoneInsight for - // health check pings and interval between zone CP sending health check pings - PollInterval config_types.Duration `json:"pollInterval" envconfig:"kuma_multizone_global_kds_zone_health_check_poll_interval"` - // Timeout is the time after the last health check that a zone counts as - // no longer online - Timeout config_types.Duration `json:"timeout" envconfig:"kuma_multizone_global_kds_zone_health_check_timeout"` -} - -func (c ZoneHealthCheckConfig) Sanitize() { -} - -func (c ZoneHealthCheckConfig) Validate() error { - if (c.Timeout.Duration > 0) != (c.PollInterval.Duration > 0) { - return errors.New("timeout and pollInterval must both be either set or unset") - } - return nil -} diff --git a/pkg/events/interfaces.go b/pkg/events/interfaces.go index d4666e91b913..fd21603c8056 100644 --- a/pkg/events/interfaces.go +++ b/pkg/events/interfaces.go @@ -34,19 +34,6 @@ type Listener interface { Close() } -func NewNeverListener() Listener { - return &neverRecvListener{} -} - -type neverRecvListener struct{} - -func (*neverRecvListener) Recv() <-chan Event { - return nil -} - -func (*neverRecvListener) Close() { -} - type Predicate = func(event Event) bool type Emitter interface { diff --git a/pkg/kds/features.go b/pkg/kds/features.go index 4a5d3b53c890..0a7b0e93fd77 100644 --- a/pkg/kds/features.go +++ b/pkg/kds/features.go @@ -9,11 +9,5 @@ func (f Features) HasFeature(feature string) bool { return f[feature] } -const FeaturesMetadataKey string = "features" - // FeatureZoneToken means that the zone control plane can handle incoming Zone Token from global control plane. const FeatureZoneToken string = "zone-token" - -// FeatureZonePingHealth means that the zone control plane sends pings to the -// global control plane to indicate it's still running. -const FeatureZonePingHealth string = "zone-ping-health" diff --git a/pkg/kds/global/components.go b/pkg/kds/global/components.go index 4dacdbcba211..a659163f15fa 100644 --- a/pkg/kds/global/components.go +++ b/pkg/kds/global/components.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -160,24 +159,6 @@ func Setup(rt runtime.Runtime) error { for _, filter := range rt.KDSContext().GlobalServerFiltersV2 { streamInterceptors = append(streamInterceptors, filter) } - - if rt.Config().Multizone.Global.KDS.ZoneHealthCheck.Timeout.Duration > time.Duration(0) { - zwLog := kdsGlobalLog.WithName("zone-watch") - zw, err := mux.NewZoneWatch( - zwLog, - rt.Config().Multizone.Global.KDS.ZoneHealthCheck, - rt.Metrics(), - rt.EventBus(), - rt.ReadOnlyResourceManager(), - rt.Extensions(), - ) - if err != nil { - return errors.Wrap(err, "couldn't create ZoneWatch") - } - if err := rt.Add(component.NewResilientComponent(zwLog, zw)); err != nil { - return err - } - } return rt.Add(component.NewResilientComponent(kdsGlobalLog.WithName("kds-mux-client"), mux.NewServer( onSessionStarted, rt.KDSContext().GlobalServerFilters, @@ -191,15 +172,12 @@ func Setup(rt runtime.Runtime) error { streamInterceptors, rt.Extensions(), rt.Config().Store.Upsert, - rt.EventBus(), - rt.Config().Multizone.Global.KDS.ZoneHealthCheck.PollInterval.Duration, ), mux.NewKDSSyncServiceServer( onGlobalToZoneSyncConnect, onZoneToGlobalSyncConnect, rt.KDSContext().GlobalServerFiltersV2, rt.Extensions(), - rt.EventBus(), ), ))) } diff --git a/pkg/kds/mux/client.go b/pkg/kds/mux/client.go index f9a86b0a3fb9..231cc0e52461 100644 --- a/pkg/kds/mux/client.go +++ b/pkg/kds/mux/client.go @@ -6,7 +6,6 @@ import ( "crypto/x509" "net/url" "os" - "time" "github.com/go-logr/logr" "github.com/pkg/errors" @@ -24,7 +23,6 @@ import ( "github.com/kumahq/kuma/pkg/core" "github.com/kumahq/kuma/pkg/core/resources/registry" "github.com/kumahq/kuma/pkg/core/runtime/component" - "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/metrics" "github.com/kumahq/kuma/pkg/version" @@ -100,15 +98,12 @@ func (c *client) Start(stop <-chan struct{}) (errs error) { withKDSCtx, cancel := context.WithCancel(metadata.AppendToOutgoingContext(c.ctx, "client-id", c.clientID, KDSVersionHeaderKey, KDSVersionV3, - kds.FeaturesMetadataKey, kds.FeatureZonePingHealth, )) defer cancel() log := muxClientLog.WithValues("client-id", c.clientID) errorCh := make(chan error) - c.startHealthCheck(withKDSCtx, log, conn, stop, errorCh) - go c.startXDSConfigs(withKDSCtx, log, conn, stop, errorCh) go c.startStats(withKDSCtx, log, conn, stop, errorCh) go c.startClusters(withKDSCtx, log, conn, stop, errorCh) @@ -287,46 +282,6 @@ func (c *client) startClusters( c.handleProcessingErrors(stream, log, stop, processingErrorsCh, errorCh) } -func (c *client) startHealthCheck( - ctx context.Context, - log logr.Logger, - conn *grpc.ClientConn, - stop <-chan struct{}, - errorCh chan error, -) { - client := mesh_proto.NewGlobalKDSServiceClient(conn) - log = log.WithValues("rpc", "healthcheck") - log.Info("starting") - - go func() { - prevInterval := 5 * time.Minute - ticker := time.NewTicker(prevInterval) - defer ticker.Stop() - for { - log.Info("sending health check") - resp, err := client.HealthCheck(ctx, &mesh_proto.ZoneHealthCheckRequest{}) - if err != nil && !errors.Is(err, context.Canceled) { - log.Error(err, "health check failed") - errorCh <- errors.Wrap(err, "zone health check request failed") - } else if interval := resp.Interval.AsDuration(); interval > 0 { - if prevInterval != interval { - prevInterval = interval - log.Info("Global CP requested new healthcheck interval", "interval", interval) - } - ticker.Reset(interval) - } - - select { - case <-ticker.C: - continue - case <-stop: - log.Info("stopping") - return - } - } - }() -} - func (c *client) handleProcessingErrors( stream grpc.ClientStream, log logr.Logger, diff --git a/pkg/kds/mux/zone_sync.go b/pkg/kds/mux/zone_sync.go index fc25b76f121f..1fb8cf1f4e9e 100644 --- a/pkg/kds/mux/zone_sync.go +++ b/pkg/kds/mux/zone_sync.go @@ -2,22 +2,16 @@ package mux import ( "context" - "slices" "github.com/pkg/errors" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" "github.com/kumahq/kuma/pkg/core" - "github.com/kumahq/kuma/pkg/events" - "github.com/kumahq/kuma/pkg/kds" - "github.com/kumahq/kuma/pkg/kds/service" "github.com/kumahq/kuma/pkg/kds/util" "github.com/kumahq/kuma/pkg/log" - "github.com/kumahq/kuma/pkg/multitenant" ) type FilterV2 interface { @@ -44,7 +38,6 @@ type KDSSyncServiceServer struct { zoneToGlobalCb OnZoneToGlobalSyncConnectFunc filters []FilterV2 extensions context.Context - eventBus events.EventBus mesh_proto.UnimplementedKDSSyncServiceServer } @@ -53,14 +46,12 @@ func NewKDSSyncServiceServer( zoneToGlobalCb OnZoneToGlobalSyncConnectFunc, filters []FilterV2, extensions context.Context, - eventBus events.EventBus, ) *KDSSyncServiceServer { return &KDSSyncServiceServer{ globalToZoneCb: globalToZoneCb, zoneToGlobalCb: zoneToGlobalCb, filters: filters, extensions: extensions, - eventBus: eventBus, } } @@ -68,26 +59,19 @@ var _ mesh_proto.KDSSyncServiceServer = &KDSSyncServiceServer{} func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService_GlobalToZoneSyncServer) error { logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions) - zone, err := util.ClientIDFromIncomingCtx(stream.Context()) + clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { return err } - logger = logger.WithValues("clientID", zone) + logger = logger.WithValues("clientID", clientID) for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } - - shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) - defer shouldDisconnectStream.Close() - processingErrorsCh := make(chan error) go g.globalToZoneCb.OnGlobalToZoneSyncConnect(stream, processingErrorsCh) select { - case <-shouldDisconnectStream.Recv(): - logger.Info("ending stream, zone health check failed") - return nil case <-stream.Context().Done(): logger.Info("GlobalToZoneSync rpc stream stopped") return nil @@ -102,26 +86,19 @@ func (g *KDSSyncServiceServer) GlobalToZoneSync(stream mesh_proto.KDSSyncService func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService_ZoneToGlobalSyncServer) error { logger := log.AddFieldsFromCtx(clientLog, stream.Context(), g.extensions) - zone, err := util.ClientIDFromIncomingCtx(stream.Context()) + clientID, err := util.ClientIDFromIncomingCtx(stream.Context()) if err != nil { return err } - logger = logger.WithValues("clientID", zone) + logger = logger.WithValues("clientID", clientID) for _, filter := range g.filters { if err := filter.InterceptServerStream(stream); err != nil { return errors.Wrap(err, "closing KDS stream following a callback error") } } - - shouldDisconnectStream := g.watchZoneHealthCheck(stream.Context(), zone) - defer shouldDisconnectStream.Close() - processingErrorsCh := make(chan error) go g.zoneToGlobalCb.OnZoneToGlobalSyncConnect(stream, processingErrorsCh) select { - case <-shouldDisconnectStream.Recv(): - logger.Info("ending stream, zone health check failed") - return nil case <-stream.Context().Done(): logger.Info("ZoneToGlobalSync rpc stream stopped") return nil @@ -133,21 +110,3 @@ func (g *KDSSyncServiceServer) ZoneToGlobalSync(stream mesh_proto.KDSSyncService return status.Error(codes.Internal, "stream failed") } } - -func (g *KDSSyncServiceServer) watchZoneHealthCheck(streamContext context.Context, zone string) events.Listener { - tenantID, _ := multitenant.TenantFromCtx(streamContext) - md, _ := metadata.FromIncomingContext(streamContext) - - shouldDisconnectStream := events.NewNeverListener() - - features := md.Get(kds.FeaturesMetadataKey) - if slices.Contains(features, kds.FeatureZonePingHealth) { - shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { - disconnectEvent, ok := e.(service.ZoneWentOffline) - return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone - }) - g.eventBus.Send(service.ZoneOpenedStream{Zone: zone, TenantID: tenantID}) - } - - return shouldDisconnectStream -} diff --git a/pkg/kds/mux/zone_watch.go b/pkg/kds/mux/zone_watch.go deleted file mode 100644 index 43ede2a9ce49..000000000000 --- a/pkg/kds/mux/zone_watch.go +++ /dev/null @@ -1,143 +0,0 @@ -package mux - -import ( - "context" - "time" - - "github.com/go-logr/logr" - "github.com/prometheus/client_golang/prometheus" - - "github.com/kumahq/kuma/pkg/config/multizone" - "github.com/kumahq/kuma/pkg/core" - "github.com/kumahq/kuma/pkg/core/resources/apis/system" - "github.com/kumahq/kuma/pkg/core/resources/manager" - "github.com/kumahq/kuma/pkg/core/resources/model" - "github.com/kumahq/kuma/pkg/core/resources/store" - "github.com/kumahq/kuma/pkg/events" - "github.com/kumahq/kuma/pkg/kds/service" - kuma_log "github.com/kumahq/kuma/pkg/log" - core_metrics "github.com/kumahq/kuma/pkg/metrics" - "github.com/kumahq/kuma/pkg/multitenant" -) - -type zoneTenant struct { - zone string - tenantID string -} - -type ZoneWatch struct { - log logr.Logger - poll time.Duration - timeout time.Duration - bus events.EventBus - extensions context.Context - rm manager.ReadOnlyResourceManager - summary prometheus.Summary - zones map[zoneTenant]time.Time -} - -func NewZoneWatch( - log logr.Logger, - cfg multizone.ZoneHealthCheckConfig, - metrics prometheus.Registerer, - bus events.EventBus, - rm manager.ReadOnlyResourceManager, - extensions context.Context, -) (*ZoneWatch, error) { - summary := prometheus.NewSummary(prometheus.SummaryOpts{ - Name: "component_zone_watch", - Help: "Summary of ZoneWatch component interval", - Objectives: core_metrics.DefaultObjectives, - }) - if err := metrics.Register(summary); err != nil { - return nil, err - } - - return &ZoneWatch{ - log: log, - poll: cfg.PollInterval.Duration, - timeout: cfg.Timeout.Duration, - bus: bus, - extensions: extensions, - rm: rm, - summary: summary, - zones: map[zoneTenant]time.Time{}, - }, nil -} - -func (zw *ZoneWatch) Start(stop <-chan struct{}) error { - timer := time.NewTicker(zw.poll) - defer timer.Stop() - - connectionWatch := zw.bus.Subscribe(func(e events.Event) bool { - _, ok := e.(service.ZoneOpenedStream) - return ok - }) - defer connectionWatch.Close() - - for { - select { - case <-timer.C: - start := core.Now() - for zone, firstSeen := range zw.zones { - ctx := multitenant.WithTenant(context.TODO(), zone.tenantID) - zoneInsight := system.NewZoneInsightResource() - - log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) - if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(zone.zone, model.NoMesh)); err != nil { - log.Info("error getting ZoneInsight", "zone", zone.zone, "error", err) - continue - } - - // It may be that we don't have a health check yet so we use the - // lastSeen time because we know the zone was connected at that - // point at least - lastHealthCheck := zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() - if firstSeen.After(lastHealthCheck) { - lastHealthCheck = firstSeen - } - if time.Since(lastHealthCheck) > zw.timeout { - zw.bus.Send(service.ZoneWentOffline{ - Zone: zone.zone, - TenantID: zone.tenantID, - }) - delete(zw.zones, zone) - } - } - zw.summary.Observe(float64(core.Now().Sub(start).Milliseconds())) - case e := <-connectionWatch.Recv(): - newStream := e.(service.ZoneOpenedStream) - - ctx := multitenant.WithTenant(context.TODO(), newStream.TenantID) - zoneInsight := system.NewZoneInsightResource() - - log := kuma_log.AddFieldsFromCtx(zw.log, ctx, zw.extensions) - if err := zw.rm.Get(ctx, zoneInsight, store.GetByKey(newStream.Zone, model.NoMesh)); err != nil { - log.Info("error getting ZoneInsight", "zone", newStream.Zone) - continue - } - - // We keep a record of the time we see in the ZoneInsight when the - // stream is opened. - // This is to prevent the zone from timing out on a poll - // where the last health check is still from a previous connect, so: - // a long time ago: zone CP disconnects, no more health checks are sent - // now: - // zone CP opens streams - // global CP gets ZoneOpenedStream - // global CP runs poll and see the last health check from "a long time ago" - // BAD: global CP kills streams - // zone CP health check arrives - zw.zones[zoneTenant{ - tenantID: newStream.TenantID, - zone: newStream.Zone, - }] = zoneInsight.Spec.GetHealthCheck().GetTime().AsTime() - case <-stop: - return nil - } - } -} - -func (zw *ZoneWatch) NeedLeaderElection() bool { - return false -} diff --git a/pkg/kds/mux/zone_watch_test.go b/pkg/kds/mux/zone_watch_test.go deleted file mode 100644 index 56afe64c4f8c..000000000000 --- a/pkg/kds/mux/zone_watch_test.go +++ /dev/null @@ -1,167 +0,0 @@ -package mux_test - -import ( - "context" - "time" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - "google.golang.org/protobuf/types/known/timestamppb" - - system_proto "github.com/kumahq/kuma/api/system/v1alpha1" - "github.com/kumahq/kuma/pkg/config/multizone" - "github.com/kumahq/kuma/pkg/config/types" - "github.com/kumahq/kuma/pkg/core" - "github.com/kumahq/kuma/pkg/core/resources/apis/system" - "github.com/kumahq/kuma/pkg/core/resources/manager" - core_model "github.com/kumahq/kuma/pkg/core/resources/model" - "github.com/kumahq/kuma/pkg/core/resources/store" - "github.com/kumahq/kuma/pkg/events" - "github.com/kumahq/kuma/pkg/kds/mux" - "github.com/kumahq/kuma/pkg/kds/service" - core_metrics "github.com/kumahq/kuma/pkg/metrics" - "github.com/kumahq/kuma/pkg/plugins/resources/memory" -) - -func sendHealthCheckPing(rm manager.ResourceManager, name string) { - zoneInsight := system.NewZoneInsightResource() - Expect(rm.Get( - context.Background(), - zoneInsight, - store.GetByKey(name, core_model.NoMesh), - )).To(Succeed()) - - zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ - Time: timestamppb.New(time.Now()), - } - Expect(rm.Update( - context.Background(), - zoneInsight, - )).To(Succeed()) -} - -const zone = "zone-1" - -var _ = Describe("ZoneWatch", func() { - var errCh chan error - - var rm manager.ResourceManager - var eventBus events.EventBus - var stop chan struct{} - var zoneWatch *mux.ZoneWatch - var timeouts events.Listener - - pollInterval := 100 * time.Millisecond - timeout := 5 * pollInterval - - BeforeEach(func() { - metrics, err := core_metrics.NewMetrics("") - Expect(err).ToNot(HaveOccurred()) - eventBus, err = events.NewEventBus(10, metrics) - Expect(err).NotTo(HaveOccurred()) - - cfg := multizone.ZoneHealthCheckConfig{ - PollInterval: types.Duration{Duration: pollInterval}, - Timeout: types.Duration{Duration: timeout}, - } - - zoneInsight := system.NewZoneInsightResource() - zoneInsight.Spec.HealthCheck = &system_proto.HealthCheck{ - Time: timestamppb.New(time.Now()), - } - rm = manager.NewResourceManager(memory.NewStore()) - Expect(rm.Create( - context.Background(), - zoneInsight, - store.CreateByKey(zone, core_model.NoMesh), - )).To(Succeed()) - - log := core.Log.WithName("test") - zoneWatch, err = mux.NewZoneWatch( - log, - cfg, - metrics, - eventBus, - rm, - context.Background(), - ) - Expect(err).NotTo(HaveOccurred()) - - stop = make(chan struct{}) - - timeouts = eventBus.Subscribe(func(event events.Event) bool { - _, ok := event.(service.ZoneWentOffline) - return ok - }) - - errCh = make(chan error, 1) - - go func() { - errCh <- zoneWatch.Start(stop) - }() - - // wait for ZoneWatch to have subscribed to new zone events - time.Sleep(pollInterval) - }) - - AfterEach(func() { - select { - case <-errCh: - Fail("zone watch should not have stopped") - default: - } - close(stop) - timeouts.Close() - Eventually(errCh).Should(Receive(Succeed())) - }) - - // We know _best case_ the zone will register offline - // in timeout + pollInterval - zoneWentOfflineCheckTimeout := timeout + 2*pollInterval - - It("should timeout zones that stop sending a health check", func() { - eventBus.Send(service.ZoneOpenedStream{ - TenantID: "", - Zone: zone, - }) - - // wait for opened stream to be registered - // in real conditions the interval will be large enough - // that these events will almost certainly be handled - // by the ZoneWatch loop between polls and before the timeout - time.Sleep(pollInterval) - - Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ - TenantID: "", - Zone: zone, - }))) - - Consistently(timeouts.Recv(), zoneWentOfflineCheckTimeout).ShouldNot(Receive()) - }) - It("shouldn't timeout as long as ZoneInsight is updated", func() { - eventBus.Send(service.ZoneOpenedStream{ - TenantID: "", - Zone: zone, - }) - - // wait for opened stream to be registered - // in real conditions the interval will be large enough - // that these events will almost certainly be handled - // by the ZoneWatch loop between polls and before the timeout - time.Sleep(pollInterval) - - // Send a health check and block for a poll interval and make sure - // nothing has been received - // do this until we know the timeout would have come if weren't sending - // health checks - Consistently(func(g Gomega) { - sendHealthCheckPing(rm, zone) - g.Consistently(timeouts.Recv(), pollInterval).ShouldNot(Receive()) - }, 3*timeout).Should(Succeed()) - - Eventually(timeouts.Recv(), zoneWentOfflineCheckTimeout).Should(Receive(Equal(service.ZoneWentOffline{ - TenantID: "", - Zone: zone, - }))) - }) -}) diff --git a/pkg/kds/service/server.go b/pkg/kds/service/server.go index 58270d98bd0e..58aa15e3fa0d 100644 --- a/pkg/kds/service/server.go +++ b/pkg/kds/service/server.go @@ -5,15 +5,12 @@ import ( "fmt" "io" "math/rand" - "slices" "time" "github.com/sethvargo/go-retry" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" mesh_proto "github.com/kumahq/kuma/api/mesh/v1alpha1" @@ -25,8 +22,6 @@ import ( "github.com/kumahq/kuma/pkg/core/resources/manager" "github.com/kumahq/kuma/pkg/core/resources/model" core_store "github.com/kumahq/kuma/pkg/core/resources/store" - "github.com/kumahq/kuma/pkg/events" - "github.com/kumahq/kuma/pkg/kds" "github.com/kumahq/kuma/pkg/kds/util" kuma_log "github.com/kumahq/kuma/pkg/log" "github.com/kumahq/kuma/pkg/multitenant" @@ -40,14 +35,12 @@ type StreamInterceptor interface { } type GlobalKDSServiceServer struct { - envoyAdminRPCs EnvoyAdminRPCs - resManager manager.ResourceManager - instanceID string - filters []StreamInterceptor - extensions context.Context - upsertCfg config_store.UpsertConfig - eventBus events.EventBus - zoneHealthCheckInterval time.Duration + envoyAdminRPCs EnvoyAdminRPCs + resManager manager.ResourceManager + instanceID string + filters []StreamInterceptor + extensions context.Context + upsertCfg config_store.UpsertConfig mesh_proto.UnimplementedGlobalKDSServiceServer } @@ -58,18 +51,14 @@ func NewGlobalKDSServiceServer( filters []StreamInterceptor, extensions context.Context, upsertCfg config_store.UpsertConfig, - eventBus events.EventBus, - zoneHealthCheckInterval time.Duration, ) *GlobalKDSServiceServer { return &GlobalKDSServiceServer{ - envoyAdminRPCs: envoyAdminRPCs, - resManager: resManager, - instanceID: instanceID, - filters: filters, - extensions: extensions, - upsertCfg: upsertCfg, - eventBus: eventBus, - zoneHealthCheckInterval: zoneHealthCheckInterval, + envoyAdminRPCs: envoyAdminRPCs, + resManager: resManager, + instanceID: instanceID, + filters: filters, + extensions: extensions, + upsertCfg: upsertCfg, } } @@ -114,18 +103,7 @@ func (g *GlobalKDSServiceServer) HealthCheck(ctx context.Context, _ *mesh_proto. log.Error(err, "couldn't update zone insight", "zone", zone) } - return &mesh_proto.ZoneHealthCheckResponse{ - Interval: durationpb.New(g.zoneHealthCheckInterval), - }, nil -} - -type ZoneWentOffline struct { - TenantID string - Zone string -} -type ZoneOpenedStream struct { - TenantID string - Zone string + return &mesh_proto.ZoneHealthCheckResponse{}, nil } func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( @@ -139,23 +117,6 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( return status.Error(codes.InvalidArgument, err.Error()) } clientID := ClientID(stream.Context(), zone) - tenantID, _ := multitenant.TenantFromCtx(stream.Context()) - - shouldDisconnectStream := events.NewNeverListener() - - md, _ := metadata.FromIncomingContext(stream.Context()) - features := md.Get(kds.FeaturesMetadataKey) - - if slices.Contains(features, kds.FeatureZonePingHealth) { - shouldDisconnectStream = g.eventBus.Subscribe(func(e events.Event) bool { - disconnectEvent, ok := e.(ZoneWentOffline) - return ok && disconnectEvent.TenantID == tenantID && disconnectEvent.Zone == zone - }) - g.eventBus.Send(ZoneOpenedStream{Zone: zone, TenantID: tenantID}) - } - - defer shouldDisconnectStream.Close() - logger := log.WithValues("rpc", rpcName, "clientID", clientID) logger = kuma_log.AddFieldsFromCtx(logger, stream.Context(), g.extensions) for _, filter := range g.filters { @@ -175,39 +136,25 @@ func (g *GlobalKDSServiceServer) streamEnvoyAdminRPC( return status.Error(codes.Internal, "could not store stream connection") } logger.Info("stored stream connection") - streamResult := make(chan error, 1) - go func() { - for { - resp, err := recv() - if err == io.EOF { - logger.Info("stream stopped") - streamResult <- nil - return - } - if status.Code(err) == codes.Canceled { - logger.Info("stream cancelled") - streamResult <- nil - return - } - if err != nil { - logger.Error(err, "could not receive a message") - streamResult <- status.Error(codes.Internal, "could not receive a message") - return - } - logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) - if err := rpc.ResponseReceived(clientID, resp); err != nil { - logger.Error(err, "could not mark the response as received") - streamResult <- status.Error(codes.InvalidArgument, "could not mark the response as received") - return - } + for { + resp, err := recv() + if err == io.EOF { + logger.Info("stream stopped") + return nil + } + if status.Code(err) == codes.Canceled { + logger.Info("stream cancelled") + return nil + } + if err != nil { + logger.Error(err, "could not receive a message") + return status.Error(codes.Internal, "could not receive a message") + } + logger.V(1).Info("Envoy Admin RPC response received", "requestId", resp.GetRequestId()) + if err := rpc.ResponseReceived(clientID, resp); err != nil { + logger.Error(err, "could not mark the response as received") + return status.Error(codes.InvalidArgument, "could not mark the response as received") } - }() - select { - case <-shouldDisconnectStream.Recv(): - logger.Info("ending stream, zone health check failed") - return nil - case res := <-streamResult: - return res } }