Skip to content

Commit

Permalink
channelz: refactor to move proto API to a separate package (#7065)
Browse files Browse the repository at this point in the history
  • Loading branch information
dfawley authored Mar 26, 2024
1 parent b78c0eb commit fc3f327
Show file tree
Hide file tree
Showing 9 changed files with 475 additions and 274 deletions.
136 changes: 136 additions & 0 deletions channelz/internal/protoconv/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package protoconv

import (
"time"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/status"

channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)

func connectivityStateToProto(s *connectivity.State) *channelzpb.ChannelConnectivityState {
if s == nil {
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
}
switch *s {
case connectivity.Idle:
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_IDLE}
case connectivity.Connecting:
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_CONNECTING}
case connectivity.Ready:
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_READY}
case connectivity.TransientFailure:
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_TRANSIENT_FAILURE}
case connectivity.Shutdown:
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_SHUTDOWN}
default:
return &channelzpb.ChannelConnectivityState{State: channelzpb.ChannelConnectivityState_UNKNOWN}
}
}

func channelTraceToProto(ct *channelz.ChannelTrace) *channelzpb.ChannelTrace {
pbt := &channelzpb.ChannelTrace{}
if ct == nil {
return pbt
}
pbt.NumEventsLogged = ct.EventNum
if ts, err := ptypes.TimestampProto(ct.CreationTime); err == nil {
pbt.CreationTimestamp = ts
}
events := make([]*channelzpb.ChannelTraceEvent, 0, len(ct.Events))
for _, e := range ct.Events {
cte := &channelzpb.ChannelTraceEvent{
Description: e.Desc,
Severity: channelzpb.ChannelTraceEvent_Severity(e.Severity),
}
if ts, err := ptypes.TimestampProto(e.Timestamp); err == nil {
cte.Timestamp = ts
}
if e.RefID != 0 {
switch e.RefType {
case channelz.RefChannel:
cte.ChildRef = &channelzpb.ChannelTraceEvent_ChannelRef{ChannelRef: &channelzpb.ChannelRef{ChannelId: e.RefID, Name: e.RefName}}
case channelz.RefSubChannel:
cte.ChildRef = &channelzpb.ChannelTraceEvent_SubchannelRef{SubchannelRef: &channelzpb.SubchannelRef{SubchannelId: e.RefID, Name: e.RefName}}
}
}
events = append(events, cte)
}
pbt.Events = events
return pbt
}

func channelToProto(cm *channelz.Channel) *channelzpb.Channel {
c := &channelzpb.Channel{}
c.Ref = &channelzpb.ChannelRef{ChannelId: cm.ID, Name: cm.RefName}

c.Data = &channelzpb.ChannelData{
State: connectivityStateToProto(cm.ChannelMetrics.State.Load()),
Target: strFromPointer(cm.ChannelMetrics.Target.Load()),
CallsStarted: cm.ChannelMetrics.CallsStarted.Load(),
CallsSucceeded: cm.ChannelMetrics.CallsSucceeded.Load(),
CallsFailed: cm.ChannelMetrics.CallsFailed.Load(),
}
if ts, err := ptypes.TimestampProto(time.Unix(0, cm.ChannelMetrics.LastCallStartedTimestamp.Load())); err == nil {
c.Data.LastCallStartedTimestamp = ts
}
ncs := cm.NestedChans()
nestedChans := make([]*channelzpb.ChannelRef, 0, len(ncs))
for id, ref := range ncs {
nestedChans = append(nestedChans, &channelzpb.ChannelRef{ChannelId: id, Name: ref})
}
c.ChannelRef = nestedChans

scs := cm.SubChans()
subChans := make([]*channelzpb.SubchannelRef, 0, len(scs))
for id, ref := range scs {
subChans = append(subChans, &channelzpb.SubchannelRef{SubchannelId: id, Name: ref})
}
c.SubchannelRef = subChans

c.Data.Trace = channelTraceToProto(cm.Trace())
return c
}

// GetTopChannels returns the protobuf representation of the channels starting
// at startID (max of len), and returns end=true if no top channels exist with
// higher IDs.
func GetTopChannels(startID int64, len int) (channels []*channelzpb.Channel, end bool) {
chans, end := channelz.GetTopChannels(startID, len)
for _, ch := range chans {
channels = append(channels, channelToProto(ch))
}
return channels, end
}

// GetChannel returns the protobuf representation of the channel with the given
// ID.
func GetChannel(id int64) (*channelzpb.Channel, error) {
ch := channelz.GetChannel(id)
if ch == nil {
return nil, status.Errorf(codes.NotFound, "requested channel %d not found", id)
}
return channelToProto(ch), nil
}
73 changes: 73 additions & 0 deletions channelz/internal/protoconv/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package protoconv

import (
"time"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/status"

channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)

func serverToProto(sm *channelz.Server) *channelzpb.Server {
s := &channelzpb.Server{}
s.Ref = &channelzpb.ServerRef{ServerId: sm.ID, Name: sm.RefName}

s.Data = &channelzpb.ServerData{
CallsStarted: sm.ServerMetrics.CallsStarted.Load(),
CallsSucceeded: sm.ServerMetrics.CallsSucceeded.Load(),
CallsFailed: sm.ServerMetrics.CallsFailed.Load(),
}

if ts, err := ptypes.TimestampProto(time.Unix(0, sm.ServerMetrics.LastCallStartedTimestamp.Load())); err == nil {
s.Data.LastCallStartedTimestamp = ts
}
lss := sm.ListenSockets()
sockets := make([]*channelzpb.SocketRef, 0, len(lss))
for id, ref := range lss {
sockets = append(sockets, &channelzpb.SocketRef{SocketId: id, Name: ref})
}
s.ListenSocket = sockets
return s
}

// GetServers returns the protobuf representation of the servers starting at
// startID (max of len), and returns end=true if no servers exist with higher
// IDs.
func GetServers(startID int64, len int) (servers []*channelzpb.Server, end bool) {
srvs, end := channelz.GetServers(startID, len)
for _, srv := range srvs {
servers = append(servers, serverToProto(srv))
}
return servers, end
}

// GetServer returns the protobuf representation of the server with the given
// ID.
func GetServer(id int64) (*channelzpb.Server, error) {
srv := channelz.GetServer(id)
if srv == nil {
return nil, status.Errorf(codes.NotFound, "requested server %d not found", id)
}
return serverToProto(srv), nil
}
137 changes: 137 additions & 0 deletions channelz/internal/protoconv/socket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/*
*
* Copyright 2024 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package protoconv

import (
"net"
"time"

"github.com/golang/protobuf/ptypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/protoadapt"
"google.golang.org/protobuf/types/known/anypb"

wrpb "github.com/golang/protobuf/ptypes/wrappers"
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
)

func securityToProto(se credentials.ChannelzSecurityValue) *channelzpb.Security {
switch v := se.(type) {
case *credentials.TLSChannelzSecurityValue:
return &channelzpb.Security{Model: &channelzpb.Security_Tls_{Tls: &channelzpb.Security_Tls{
CipherSuite: &channelzpb.Security_Tls_StandardName{StandardName: v.StandardName},
LocalCertificate: v.LocalCertificate,
RemoteCertificate: v.RemoteCertificate,
}}}
case *credentials.OtherChannelzSecurityValue:
otherSecurity := &channelzpb.Security_OtherSecurity{
Name: v.Name,
}
if anyval, err := anypb.New(protoadapt.MessageV2Of(v.Value)); err == nil {
otherSecurity.Value = anyval
}
return &channelzpb.Security{Model: &channelzpb.Security_Other{Other: otherSecurity}}
}
return nil
}

func addrToProto(a net.Addr) *channelzpb.Address {
if a == nil {
return nil
}
switch a.Network() {
case "udp":
// TODO: Address_OtherAddress{}. Need proto def for Value.
case "ip":
// Note zone info is discarded through the conversion.
return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPAddr).IP}}}
case "ip+net":
// Note mask info is discarded through the conversion.
return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.IPNet).IP}}}
case "tcp":
// Note zone info is discarded through the conversion.
return &channelzpb.Address{Address: &channelzpb.Address_TcpipAddress{TcpipAddress: &channelzpb.Address_TcpIpAddress{IpAddress: a.(*net.TCPAddr).IP, Port: int32(a.(*net.TCPAddr).Port)}}}
case "unix", "unixgram", "unixpacket":
return &channelzpb.Address{Address: &channelzpb.Address_UdsAddress_{UdsAddress: &channelzpb.Address_UdsAddress{Filename: a.String()}}}
default:
}
return &channelzpb.Address{}
}

func socketToProto(skt *channelz.Socket) *channelzpb.Socket {
s := &channelzpb.Socket{}
s.Ref = &channelzpb.SocketRef{SocketId: skt.ID, Name: skt.RefName}

s.Data = &channelzpb.SocketData{
StreamsStarted: skt.SocketMetrics.StreamsStarted.Load(),
StreamsSucceeded: skt.SocketMetrics.StreamsSucceeded.Load(),
StreamsFailed: skt.SocketMetrics.StreamsFailed.Load(),
MessagesSent: skt.SocketMetrics.MessagesSent.Load(),
MessagesReceived: skt.SocketMetrics.MessagesReceived.Load(),
KeepAlivesSent: skt.SocketMetrics.KeepAlivesSent.Load(),
}
if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastLocalStreamCreatedTimestamp.Load())); err == nil {
s.Data.LastLocalStreamCreatedTimestamp = ts
}
if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastRemoteStreamCreatedTimestamp.Load())); err == nil {
s.Data.LastRemoteStreamCreatedTimestamp = ts
}
if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastMessageSentTimestamp.Load())); err == nil {
s.Data.LastMessageSentTimestamp = ts
}
if ts, err := ptypes.TimestampProto(time.Unix(0, skt.SocketMetrics.LastMessageReceivedTimestamp.Load())); err == nil {
s.Data.LastMessageReceivedTimestamp = ts
}
if skt.EphemeralMetrics != nil {
e := skt.EphemeralMetrics()
s.Data.LocalFlowControlWindow = &wrpb.Int64Value{Value: e.LocalFlowControlWindow}
s.Data.RemoteFlowControlWindow = &wrpb.Int64Value{Value: e.RemoteFlowControlWindow}
}

s.Data.Option = sockoptToProto(skt.SocketOptions)
s.Security = securityToProto(skt.Security)
s.Local = addrToProto(skt.LocalAddr)
s.Remote = addrToProto(skt.RemoteAddr)
s.RemoteName = skt.RemoteName
return s
}

// GetServerSockets returns the protobuf representation of the server (listen)
// sockets starting at startID (max of len), and returns end=true if no server
// sockets exist with higher IDs.
func GetServerSockets(serverID, startID int64, len int) (sockets []*channelzpb.SocketRef, end bool) {
skts, end := channelz.GetServerSockets(serverID, startID, len)
for _, m := range skts {
sockets = append(sockets, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})
}
return sockets, end
}

// GetSocket returns the protobuf representation of the socket with the given
// ID.
func GetSocket(id int64) (*channelzpb.Socket, error) {
skt := channelz.GetSocket(id)
if skt == nil {
return nil, status.Errorf(codes.NotFound, "requested socket %d not found", id)
}
return socketToProto(skt), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
*
*/

package service
package protoconv

import (
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
*
*/

package service
package protoconv

import (
channelzpb "google.golang.org/grpc/channelz/grpc_channelz_v1"
Expand Down
Loading

0 comments on commit fc3f327

Please sign in to comment.