diff --git a/channelz/internal/protoconv/channel.go b/channelz/internal/protoconv/channel.go new file mode 100644 index 000000000000..6bca5dd9be89 --- /dev/null +++ b/channelz/internal/protoconv/channel.go @@ -0,0 +1,136 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package protoconv + +import ( + "time" + + "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/status" + + channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" +) + +func connectivityStateToProto(s *connectivity.State) *channelzpb.ChannelConnectivityState { + if s == nil { + return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} + } + switch *s { + case connectivity.Idle: + return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE} + case connectivity.Connecting: + return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING} + case connectivity.Ready: + return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY} + case connectivity.TransientFailure: + return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE} + case connectivity.Shutdown: + return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN} + default: + return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} + } +} + +func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace { + pbt := &channelzpb.ChannelTrace{} + if ct == nil { + return pbt + } + pbt.NumEventsLogged = ct.EventNum + if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil { + pbt.CreationTimestamp = ts + } + events := make([]*channelzpb.ChannelTraceEvent, 0, len(ct.Events)) + for _, e := range ct.Events { + cte := &channelzpb.ChannelTraceEvent{ + Description: e.Desc, + Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity), + } + if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil { + cte.Timestamp = ts + } + if e.RefID != 0 { + switch e.RefType { + case channelz.RefChannel: + cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}} + case channelz.RefSubChannel: + cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}} + } + } + events = append(events, cte) + } + pbt.Events = events + return pbt +} + +func channelToProto(cm *channelz.Channel) *channelzpb.Channel { + c := &channelzpb.Channel{} + c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName} + + c.Data = &channelzpb.ChannelData{ + State: connectivityStateToProto(cm.ChannelMetrics.State.Load()), + Target: strFromPointer(cm.ChannelMetrics.Target.Load()), + CallsStarted: cm.ChannelMetrics.CallsStarted.Load(), + CallsSucceeded: cm.ChannelMetrics.CallsSucceeded.Load(), + CallsFailed: cm.ChannelMetrics.CallsFailed.Load(), + } + if ts, err := ptypes.TimestampProto(time.Unix(0, cm.ChannelMetrics.LastCallStartedTimestamp.Load())); err == nil { + c.Data.LastCallStartedTimestamp = ts + } + ncs := cm.NestedChans() + nestedChans := make([]*channelzpb.ChannelRef, 0, len(ncs)) + for id, ref := range ncs { + nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) + } + c.ChannelRef = nestedChans + + scs := cm.SubChans() + subChans := make([]*channelzpb.SubchannelRef, 0, len(scs)) + for id, ref := range scs { + subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) + } + c.SubchannelRef = subChans + + c.Data.Trace = channelTraceToProto(cm.Trace()) + return c +} + +// GetTopChannels returns the protobuf representation of the channels starting +// at startID (max of len), and returns end=true if no top channels exist with +// higher IDs. +func GetTopChannels(startID int64, len int) (channels []*channelzpb.Channel, end bool) { + chans, end := channelz.GetTopChannels(startID, len) + for _, ch := range chans { + channels = append(channels, channelToProto(ch)) + } + return channels, end +} + +// GetChannel returns the protobuf representation of the channel with the given +// ID. +func GetChannel(id int64) (*channelzpb.Channel, error) { + ch := channelz.GetChannel(id) + if ch == nil { + return nil, status.Errorf(codes.NotFound, "requested channel %d not found", id) + } + return channelToProto(ch), nil +} diff --git a/channelz/internal/protoconv/server.go b/channelz/internal/protoconv/server.go new file mode 100644 index 000000000000..112de445d798 --- /dev/null +++ b/channelz/internal/protoconv/server.go @@ -0,0 +1,73 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package protoconv + +import ( + "time" + + "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/status" + + channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" +) + +func serverToProto(sm *channelz.Server) *channelzpb.Server { + s := &channelzpb.Server{} + s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName} + + s.Data = &channelzpb.ServerData{ + CallsStarted: sm.ServerMetrics.CallsStarted.Load(), + CallsSucceeded: sm.ServerMetrics.CallsSucceeded.Load(), + CallsFailed: sm.ServerMetrics.CallsFailed.Load(), + } + + if ts, err := ptypes.TimestampProto(time.Unix(0, sm.ServerMetrics.LastCallStartedTimestamp.Load())); err == nil { + s.Data.LastCallStartedTimestamp = ts + } + lss := sm.ListenSockets() + sockets := make([]*channelzpb.SocketRef, 0, len(lss)) + for id, ref := range lss { + sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) + } + s.ListenSocket = sockets + return s +} + +// GetServers returns the protobuf representation of the servers starting at +// startID (max of len), and returns end=true if no servers exist with higher +// IDs. +func GetServers(startID int64, len int) (servers []*channelzpb.Server, end bool) { + srvs, end := channelz.GetServers(startID, len) + for _, srv := range srvs { + servers = append(servers, serverToProto(srv)) + } + return servers, end +} + +// GetServer returns the protobuf representation of the server with the given +// ID. +func GetServer(id int64) (*channelzpb.Server, error) { + srv := channelz.GetServer(id) + if srv == nil { + return nil, status.Errorf(codes.NotFound, "requested server %d not found", id) + } + return serverToProto(srv), nil +} diff --git a/channelz/internal/protoconv/socket.go b/channelz/internal/protoconv/socket.go new file mode 100644 index 000000000000..cc06b1c8b32c --- /dev/null +++ b/channelz/internal/protoconv/socket.go @@ -0,0 +1,137 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package protoconv + +import ( + "net" + "time" + + "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/protoadapt" + "google.golang.org/protobuf/types/known/anypb" + + wrpb "github.com/golang/protobuf/ptypes/wrappers" + channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" +) + +func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security { + switch v := se.(type) { + case *credentials.TLSChannelzSecurityValue: + return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{ + CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName}, + LocalCertificate: v.LocalCertificate, + RemoteCertificate: v.RemoteCertificate, + }}} + case *credentials.OtherChannelzSecurityValue: + otherSecurity := &channelzpb.Security_OtherSecurity{ + Name: v.Name, + } + if anyval, err := anypb.New(protoadapt.MessageV2Of(v.Value)); err == nil { + otherSecurity.Value = anyval + } + return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}} + } + return nil +} + +func addrToProto(a net.Addr) *channelzpb.Address { + if a == nil { + return nil + } + switch a.Network() { + case "udp": + // TODO: Address_OtherAddress{}. Need proto def for Value. + case "ip": + // Note zone info is discarded through the conversion. + return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}} + case "ip+net": + // Note mask info is discarded through the conversion. + return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}} + case "tcp": + // Note zone info is discarded through the conversion. + return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}} + case "unix", "unixgram", "unixpacket": + return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}} + default: + } + return &channelzpb.Address{} +} + +func socketToProto(skt *channelz.Socket) *channelzpb.Socket { + s := &channelzpb.Socket{} + s.Ref = &channelzpb.SocketRef{SocketId: skt.ID, Name: skt.RefName} + + s.Data = &channelzpb.SocketData{ + StreamsStarted: skt.SocketMetrics.StreamsStarted.Load(), + StreamsSucceeded: skt.SocketMetrics.StreamsSucceeded.Load(), + StreamsFailed: skt.SocketMetrics.StreamsFailed.Load(), + MessagesSent: skt.SocketMetrics.MessagesSent.Load(), + MessagesReceived: skt.SocketMetrics.MessagesReceived.Load(), + KeepAlivesSent: skt.SocketMetrics.KeepAlivesSent.Load(), + } + if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastLocalStreamCreatedTimestamp.Load())); err == nil { + s.Data.LastLocalStreamCreatedTimestamp = ts + } + if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastRemoteStreamCreatedTimestamp.Load())); err == nil { + s.Data.LastRemoteStreamCreatedTimestamp = ts + } + if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastMessageSentTimestamp.Load())); err == nil { + s.Data.LastMessageSentTimestamp = ts + } + if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastMessageReceivedTimestamp.Load())); err == nil { + s.Data.LastMessageReceivedTimestamp = ts + } + if skt.EphemeralMetrics != nil { + e := skt.EphemeralMetrics() + s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: e.LocalFlowControlWindow} + s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: e.RemoteFlowControlWindow} + } + + s.Data.Option = sockoptToProto(skt.SocketOptions) + s.Security = securityToProto(skt.Security) + s.Local = addrToProto(skt.LocalAddr) + s.Remote = addrToProto(skt.RemoteAddr) + s.RemoteName = skt.RemoteName + return s +} + +// GetServerSockets returns the protobuf representation of the server (listen) +// sockets starting at startID (max of len), and returns end=true if no server +// sockets exist with higher IDs. +func GetServerSockets(serverID, startID int64, len int) (sockets []*channelzpb.SocketRef, end bool) { + skts, end := channelz.GetServerSockets(serverID, startID, len) + for _, m := range skts { + sockets = append(sockets, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName}) + } + return sockets, end +} + +// GetSocket returns the protobuf representation of the socket with the given +// ID. +func GetSocket(id int64) (*channelzpb.Socket, error) { + skt := channelz.GetSocket(id) + if skt == nil { + return nil, status.Errorf(codes.NotFound, "requested socket %d not found", id) + } + return socketToProto(skt), nil +} diff --git a/channelz/service/func_linux.go b/channelz/internal/protoconv/sockopt_linux.go similarity index 99% rename from channelz/service/func_linux.go rename to channelz/internal/protoconv/sockopt_linux.go index 7523a52813dc..a8e9f3ccf57c 100644 --- a/channelz/service/func_linux.go +++ b/channelz/internal/protoconv/sockopt_linux.go @@ -16,7 +16,7 @@ * */ -package service +package protoconv import ( "time" diff --git a/channelz/service/func_nonlinux.go b/channelz/internal/protoconv/sockopt_nonlinux.go similarity index 97% rename from channelz/service/func_nonlinux.go rename to channelz/internal/protoconv/sockopt_nonlinux.go index 473495d6655e..2c303047bf3a 100644 --- a/channelz/service/func_nonlinux.go +++ b/channelz/internal/protoconv/sockopt_nonlinux.go @@ -19,7 +19,7 @@ * */ -package service +package protoconv import ( channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" diff --git a/channelz/internal/protoconv/subchannel.go b/channelz/internal/protoconv/subchannel.go new file mode 100644 index 000000000000..4bae7f2fd8b0 --- /dev/null +++ b/channelz/internal/protoconv/subchannel.go @@ -0,0 +1,65 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package protoconv + +import ( + "time" + + "github.com/golang/protobuf/ptypes" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal/channelz" + "google.golang.org/grpc/status" + + channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" +) + +func subChannelToProto(cm *channelz.SubChannel) *channelzpb.Subchannel { + sc := &channelzpb.Subchannel{} + sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName} + + sc.Data = &channelzpb.ChannelData{ + State: connectivityStateToProto(cm.ChannelMetrics.State.Load()), + Target: strFromPointer(cm.ChannelMetrics.Target.Load()), + CallsStarted: cm.ChannelMetrics.CallsStarted.Load(), + CallsSucceeded: cm.ChannelMetrics.CallsSucceeded.Load(), + CallsFailed: cm.ChannelMetrics.CallsFailed.Load(), + } + if ts, err := ptypes.TimestampProto(time.Unix(0, cm.ChannelMetrics.LastCallStartedTimestamp.Load())); err == nil { + sc.Data.LastCallStartedTimestamp = ts + } + + skts := cm.Sockets() + sockets := make([]*channelzpb.SocketRef, 0, len(skts)) + for id, ref := range skts { + sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) + } + sc.SocketRef = sockets + sc.Data.Trace = channelTraceToProto(cm.Trace()) + return sc +} + +// GetSubChannel returns the protobuf representation of the subchannel with the +// given ID. +func GetSubChannel(id int64) (*channelzpb.Subchannel, error) { + subChan := channelz.GetSubChannel(id) + if subChan == nil { + return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", id) + } + return subChannelToProto(subChan), nil +} diff --git a/channelz/internal/protoconv/util.go b/channelz/internal/protoconv/util.go new file mode 100644 index 000000000000..b41afe0cdce0 --- /dev/null +++ b/channelz/internal/protoconv/util.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package protoconv supports converting between the internal channelz +// implementation and the protobuf representation of all the entities. +package protoconv + +import ( + "google.golang.org/grpc/grpclog" +) + +var logger = grpclog.Component("channelz") + +func strFromPointer(s *string) string { + if s == nil { + return "" + } + return *s +} diff --git a/channelz/service/service.go b/channelz/service/service.go index 51a0a33f433a..0e23c96a4e4f 100644 --- a/channelz/service/service.go +++ b/channelz/service/service.go @@ -21,31 +21,19 @@ package service import ( "context" - "net" - "time" - "github.com/golang/protobuf/ptypes" - wrpb "github.com/golang/protobuf/ptypes/wrappers" channelzgrpc "google.golang.org/grpc/channelz/grpc_channelz_v1" channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1" "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/channelz/internal/protoconv" "google.golang.org/grpc/internal/channelz" - "google.golang.org/grpc/status" - "google.golang.org/protobuf/protoadapt" - "google.golang.org/protobuf/types/known/anypb" ) func init() { channelz.TurnOn() } -var logger = grpclog.Component("channelz") - // RegisterChannelzServiceToServer registers the channelz service to the given server. // // Note: it is preferred to use the admin API @@ -63,287 +51,52 @@ type serverImpl struct { channelzgrpc.UnimplementedChannelzServer } -func connectivityStateToProto(s *connectivity.State) *channelzpb.ChannelConnectivityState { - if s == nil { - return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} - } - switch *s { - case connectivity.Idle: - return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE} - case connectivity.Connecting: - return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING} - case connectivity.Ready: - return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY} - case connectivity.TransientFailure: - return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE} - case connectivity.Shutdown: - return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN} - default: - return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN} - } -} - -func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace { - pbt := &channelzpb.ChannelTrace{} - if ct == nil { - return pbt - } - pbt.NumEventsLogged = ct.EventNum - if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil { - pbt.CreationTimestamp = ts - } - events := make([]*channelzpb.ChannelTraceEvent, 0, len(ct.Events)) - for _, e := range ct.Events { - cte := &channelzpb.ChannelTraceEvent{ - Description: e.Desc, - Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity), - } - if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil { - cte.Timestamp = ts - } - if e.RefID != 0 { - switch e.RefType { - case channelz.RefChannel: - cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}} - case channelz.RefSubChannel: - cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}} - } - } - events = append(events, cte) - } - pbt.Events = events - return pbt -} - -func strFromPointer(s *string) string { - if s == nil { - return "" - } - return *s -} - -func channelMetricToProto(cm *channelz.Channel) *channelzpb.Channel { - c := &channelzpb.Channel{} - c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName} - - c.Data = &channelzpb.ChannelData{ - State: connectivityStateToProto(cm.ChannelMetrics.State.Load()), - Target: strFromPointer(cm.ChannelMetrics.Target.Load()), - CallsStarted: cm.ChannelMetrics.CallsStarted.Load(), - CallsSucceeded: cm.ChannelMetrics.CallsSucceeded.Load(), - CallsFailed: cm.ChannelMetrics.CallsFailed.Load(), - } - if ts, err := ptypes.TimestampProto(time.Unix(0, cm.ChannelMetrics.LastCallStartedTimestamp.Load())); err == nil { - c.Data.LastCallStartedTimestamp = ts - } - ncs := cm.NestedChans() - nestedChans := make([]*channelzpb.ChannelRef, 0, len(ncs)) - for id, ref := range ncs { - nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref}) - } - c.ChannelRef = nestedChans - - scs := cm.SubChans() - subChans := make([]*channelzpb.SubchannelRef, 0, len(scs)) - for id, ref := range scs { - subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref}) - } - c.SubchannelRef = subChans - - c.Data.Trace = channelTraceToProto(cm.Trace()) - return c -} - -func subChannelMetricToProto(cm *channelz.SubChannel) *channelzpb.Subchannel { - sc := &channelzpb.Subchannel{} - sc.Ref = &channelzpb.SubchannelRef{SubchannelId: cm.ID, Name: cm.RefName} - - sc.Data = &channelzpb.ChannelData{ - State: connectivityStateToProto(cm.ChannelMetrics.State.Load()), - Target: strFromPointer(cm.ChannelMetrics.Target.Load()), - CallsStarted: cm.ChannelMetrics.CallsStarted.Load(), - CallsSucceeded: cm.ChannelMetrics.CallsSucceeded.Load(), - CallsFailed: cm.ChannelMetrics.CallsFailed.Load(), - } - if ts, err := ptypes.TimestampProto(time.Unix(0, cm.ChannelMetrics.LastCallStartedTimestamp.Load())); err == nil { - sc.Data.LastCallStartedTimestamp = ts - } - - skts := cm.Sockets() - sockets := make([]*channelzpb.SocketRef, 0, len(skts)) - for id, ref := range skts { - sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) - } - sc.SocketRef = sockets - sc.Data.Trace = channelTraceToProto(cm.Trace()) - return sc -} - -func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security { - switch v := se.(type) { - case *credentials.TLSChannelzSecurityValue: - return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{ - CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName}, - LocalCertificate: v.LocalCertificate, - RemoteCertificate: v.RemoteCertificate, - }}} - case *credentials.OtherChannelzSecurityValue: - otherSecurity := &channelzpb.Security_OtherSecurity{ - Name: v.Name, - } - if anyval, err := anypb.New(protoadapt.MessageV2Of(v.Value)); err == nil { - otherSecurity.Value = anyval - } - return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}} - } - return nil -} - -func addrToProto(a net.Addr) *channelzpb.Address { - if a == nil { - return nil - } - switch a.Network() { - case "udp": - // TODO: Address_OtherAddress{}. Need proto def for Value. - case "ip": - // Note zone info is discarded through the conversion. - return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}} - case "ip+net": - // Note mask info is discarded through the conversion. - return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}} - case "tcp": - // Note zone info is discarded through the conversion. - return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}} - case "unix", "unixgram", "unixpacket": - return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}} - default: - } - return &channelzpb.Address{} -} - -func socketMetricToProto(skt *channelz.Socket) *channelzpb.Socket { - s := &channelzpb.Socket{} - s.Ref = &channelzpb.SocketRef{SocketId: skt.ID, Name: skt.RefName} - - s.Data = &channelzpb.SocketData{ - StreamsStarted: skt.SocketMetrics.StreamsStarted.Load(), - StreamsSucceeded: skt.SocketMetrics.StreamsSucceeded.Load(), - StreamsFailed: skt.SocketMetrics.StreamsFailed.Load(), - MessagesSent: skt.SocketMetrics.MessagesSent.Load(), - MessagesReceived: skt.SocketMetrics.MessagesReceived.Load(), - KeepAlivesSent: skt.SocketMetrics.KeepAlivesSent.Load(), - } - if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastLocalStreamCreatedTimestamp.Load())); err == nil { - s.Data.LastLocalStreamCreatedTimestamp = ts - } - if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastRemoteStreamCreatedTimestamp.Load())); err == nil { - s.Data.LastRemoteStreamCreatedTimestamp = ts - } - if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastMessageSentTimestamp.Load())); err == nil { - s.Data.LastMessageSentTimestamp = ts - } - if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastMessageReceivedTimestamp.Load())); err == nil { - s.Data.LastMessageReceivedTimestamp = ts - } - if skt.EphemeralMetrics != nil { - e := skt.EphemeralMetrics() - s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: e.LocalFlowControlWindow} - s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: e.RemoteFlowControlWindow} +func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) { + ch, err := protoconv.GetChannel(req.GetChannelId()) + if err != nil { + return nil, err } - - s.Data.Option = sockoptToProto(skt.SocketOptions) - s.Security = securityToProto(skt.Security) - s.Local = addrToProto(skt.LocalAddr) - s.Remote = addrToProto(skt.RemoteAddr) - s.RemoteName = skt.RemoteName - return s + return &channelzpb.GetChannelResponse{Channel: ch}, nil } func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) { - chans, end := channelz.GetTopChannels(req.GetStartChannelId(), int(req.GetMaxResults())) resp := &channelzpb.GetTopChannelsResponse{} - for _, ch := range chans { - resp.Channel = append(resp.Channel, channelMetricToProto(ch)) - } - resp.End = end + resp.Channel, resp.End = protoconv.GetTopChannels(req.GetStartChannelId(), int(req.GetMaxResults())) return resp, nil } -func serverMetricToProto(sm *channelz.Server) *channelzpb.Server { - s := &channelzpb.Server{} - s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName} - - s.Data = &channelzpb.ServerData{ - CallsStarted: sm.ServerMetrics.CallsStarted.Load(), - CallsSucceeded: sm.ServerMetrics.CallsSucceeded.Load(), - CallsFailed: sm.ServerMetrics.CallsFailed.Load(), - } - - if ts, err := ptypes.TimestampProto(time.Unix(0, sm.ServerMetrics.LastCallStartedTimestamp.Load())); err == nil { - s.Data.LastCallStartedTimestamp = ts - } - lss := sm.ListenSockets() - sockets := make([]*channelzpb.SocketRef, 0, len(lss)) - for id, ref := range lss { - sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref}) +func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) { + srv, err := protoconv.GetServer(req.GetServerId()) + if err != nil { + return nil, err } - s.ListenSocket = sockets - return s + return &channelzpb.GetServerResponse{Server: srv}, nil } func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) { - metrics, end := channelz.GetServers(req.GetStartServerId(), int(req.GetMaxResults())) resp := &channelzpb.GetServersResponse{} - for _, m := range metrics { - resp.Server = append(resp.Server, serverMetricToProto(m)) - } - resp.End = end - return resp, nil -} - -func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) { - skts, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), int(req.GetMaxResults())) - resp := &channelzpb.GetServerSocketsResponse{} - for _, m := range skts { - resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName}) - } - resp.End = end - return resp, nil -} - -func (s *serverImpl) GetChannel(ctx context.Context, req *channelzpb.GetChannelRequest) (*channelzpb.GetChannelResponse, error) { - ch := channelz.GetChannel(req.GetChannelId()) - if ch == nil { - return nil, status.Errorf(codes.NotFound, "requested channel %d not found", req.GetChannelId()) - } - resp := &channelzpb.GetChannelResponse{Channel: channelMetricToProto(ch)} + resp.Server, resp.End = protoconv.GetServers(req.GetStartServerId(), int(req.GetMaxResults())) return resp, nil } func (s *serverImpl) GetSubchannel(ctx context.Context, req *channelzpb.GetSubchannelRequest) (*channelzpb.GetSubchannelResponse, error) { - subChan := channelz.GetSubChannel(req.GetSubchannelId()) - if subChan == nil { - return nil, status.Errorf(codes.NotFound, "requested sub channel %d not found", req.GetSubchannelId()) + subChan, err := protoconv.GetSubChannel(req.GetSubchannelId()) + if err != nil { + return nil, err } - resp := &channelzpb.GetSubchannelResponse{Subchannel: subChannelMetricToProto(subChan)} - return resp, nil + return &channelzpb.GetSubchannelResponse{Subchannel: subChan}, nil } -func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) { - var metric *channelz.Socket - if metric = channelz.GetSocket(req.GetSocketId()); metric == nil { - return nil, status.Errorf(codes.NotFound, "requested socket %d not found", req.GetSocketId()) - } - resp := &channelzpb.GetSocketResponse{Socket: socketMetricToProto(metric)} +func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) { + resp := &channelzpb.GetServerSocketsResponse{} + resp.SocketRef, resp.End = protoconv.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), int(req.GetMaxResults())) return resp, nil } -func (s *serverImpl) GetServer(ctx context.Context, req *channelzpb.GetServerRequest) (*channelzpb.GetServerResponse, error) { - metric := channelz.GetServer(req.GetServerId()) - if metric == nil { - return nil, status.Errorf(codes.NotFound, "requested server %d not found", req.GetServerId()) +func (s *serverImpl) GetSocket(ctx context.Context, req *channelzpb.GetSocketRequest) (*channelzpb.GetSocketResponse, error) { + socket, err := protoconv.GetSocket(req.GetSocketId()) + if err != nil { + return nil, err } - resp := &channelzpb.GetServerResponse{Server: serverMetricToProto(metric)} - return resp, nil + return &channelzpb.GetSocketResponse{Socket: socket}, nil } diff --git a/channelz/service/service_test.go b/channelz/service/service_test.go index 4e4154226bbb..9e9729d10ca4 100644 --- a/channelz/service/service_test.go +++ b/channelz/service/service_test.go @@ -32,6 +32,7 @@ import ( "github.com/google/go-cmp/cmp" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal/channelz" "google.golang.org/grpc/internal/grpctest" "google.golang.org/protobuf/testing/protocmp" @@ -315,6 +316,8 @@ func (s) TestGetServerSocketsNonZeroStartID(t *testing.T) { } } +var logger = grpclog.Component("channelz") + func (s) TestGetChannel(t *testing.T) { refNames := []string{"top channel 1", "nested channel 1", "sub channel 2", "nested channel 3"} cids := make([]*channelz.Channel, 3)