Skip to content

Commit

Permalink
chore: redo FilterMessages as generic function
Browse files Browse the repository at this point in the history
Simple change:

1. `FilterMessages` became `filterMessages`.
2. `FilterMessages` is now typed generic function which calls `filterMessages` internally.
3. Adjusted callsites of `FilterMessages`.

Most of the reflection can be avoided in `filterMessages` body (aside from setting `Messages` field
which can be done using proto reflection mechanisms). But for now, lets stick to the simple change.

Signed-off-by: Dmitriy Matrenichev <dmitry.matrenichev@siderolabs.com>
  • Loading branch information
DmitriyMV committed Jul 9, 2024
1 parent fbde9c5 commit ce4c404
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 145 deletions.
160 changes: 27 additions & 133 deletions pkg/machinery/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,33 +244,21 @@ func (c *Client) Kubeconfig(ctx context.Context) ([]byte, error) {
func (c *Client) ApplyConfiguration(ctx context.Context, req *machineapi.ApplyConfigurationRequest, callOptions ...grpc.CallOption) (resp *machineapi.ApplyConfigurationResponse, err error) {
resp, err = c.MachineClient.ApplyConfiguration(ctx, req, callOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ApplyConfigurationResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// GenerateConfiguration implements proto.MachineServiceClient interface.
func (c *Client) GenerateConfiguration(ctx context.Context, req *machineapi.GenerateConfigurationRequest, callOptions ...grpc.CallOption) (resp *machineapi.GenerateConfigurationResponse, err error) {
resp, err = c.MachineClient.GenerateConfiguration(ctx, req, callOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.GenerateConfigurationResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Disks returns the list of block devices.
func (c *Client) Disks(ctx context.Context, callOptions ...grpc.CallOption) (resp *storageapi.DisksResponse, err error) {
resp, err = c.StorageClient.Disks(ctx, &emptypb.Empty{}, callOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*storageapi.DisksResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Stats implements the proto.MachineServiceClient interface.
Expand All @@ -283,11 +271,7 @@ func (c *Client) Stats(ctx context.Context, namespace string, driver common.Cont
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.StatsResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Containers implements the proto.MachineServiceClient interface.
Expand All @@ -301,11 +285,7 @@ func (c *Client) Containers(ctx context.Context, namespace string, driver common
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ContainersResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Restart implements the proto.MachineServiceClient interface.
Expand Down Expand Up @@ -468,11 +448,7 @@ func (c *Client) LogsContainers(ctx context.Context, callOptions ...grpc.CallOpt
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.LogsContainersResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Version implements the proto.MachineServiceClient interface.
Expand All @@ -483,11 +459,7 @@ func (c *Client) Version(ctx context.Context, callOptions ...grpc.CallOption) (r
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.VersionResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Processes implements the proto.MachineServiceClient interface.
Expand All @@ -498,11 +470,7 @@ func (c *Client) Processes(ctx context.Context, callOptions ...grpc.CallOption)
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ProcessesResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Memory implements the proto.MachineServiceClient interface.
Expand All @@ -513,11 +481,7 @@ func (c *Client) Memory(ctx context.Context, callOptions ...grpc.CallOption) (re
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.MemoryResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Mounts implements the proto.MachineServiceClient interface.
Expand All @@ -528,11 +492,7 @@ func (c *Client) Mounts(ctx context.Context, callOptions ...grpc.CallOption) (re
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.MountsResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// LS implements the proto.MachineServiceClient interface.
Expand Down Expand Up @@ -634,11 +594,7 @@ func (c *Client) UpgradeWithOptions(ctx context.Context, opts ...UpgradeOption)

resp, err := c.MachineClient.Upgrade(ctx, &options.Request, options.GRPCCallOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.UpgradeResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// ServiceList returns list of services with their state.
Expand All @@ -649,11 +605,7 @@ func (c *Client) ServiceList(ctx context.Context, callOptions ...grpc.CallOption
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ServiceListResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// ServiceInfo provides info about a service and node metadata.
Expand All @@ -678,9 +630,7 @@ func (c *Client) ServiceInfo(ctx context.Context, id string, callOptions ...grpc
return services, err
}

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ServiceListResponse) //nolint:errcheck
resp, err = FilterMessages(resp, err)

// FilterMessages might remove responses if they actually contain errors,
// errors will be merged into `resp`.
Expand Down Expand Up @@ -710,11 +660,7 @@ func (c *Client) ServiceStart(ctx context.Context, id string, callOptions ...grp
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ServiceStartResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// ServiceStop stops a service.
Expand All @@ -725,11 +671,7 @@ func (c *Client) ServiceStop(ctx context.Context, id string, callOptions ...grpc
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ServiceStopResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// ServiceRestart restarts a service.
Expand All @@ -740,11 +682,7 @@ func (c *Client) ServiceRestart(ctx context.Context, id string, callOptions ...g
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.ServiceRestartResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Time returns the time.
Expand All @@ -755,11 +693,7 @@ func (c *Client) Time(ctx context.Context, callOptions ...grpc.CallOption) (resp
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*timeapi.TimeResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// TimeCheck returns the time compared to the specified ntp server.
Expand All @@ -770,11 +704,7 @@ func (c *Client) TimeCheck(ctx context.Context, server string, callOptions ...gr
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*timeapi.TimeResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// Read reads a file.
Expand Down Expand Up @@ -824,22 +754,14 @@ func (c *Client) EtcdLeaveCluster(ctx context.Context, req *machineapi.EtcdLeave
func (c *Client) EtcdForfeitLeadership(ctx context.Context, req *machineapi.EtcdForfeitLeadershipRequest, callOptions ...grpc.CallOption) (*machineapi.EtcdForfeitLeadershipResponse, error) {
resp, err := c.MachineClient.EtcdForfeitLeadership(ctx, req, callOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.EtcdForfeitLeadershipResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// EtcdMemberList lists etcd members of the cluster.
func (c *Client) EtcdMemberList(ctx context.Context, req *machineapi.EtcdMemberListRequest, callOptions ...grpc.CallOption) (*machineapi.EtcdMemberListResponse, error) {
resp, err := c.MachineClient.EtcdMemberList(ctx, req, callOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.EtcdMemberListResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// EtcdSnapshot receives a snapshot of the etcd from the node.
Expand Down Expand Up @@ -895,11 +817,7 @@ func (c *Client) EtcdRecover(ctx context.Context, snapshot io.Reader, callOption

resp, err := cli.CloseAndRecv()

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.EtcdRecoverResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// EtcdAlarmList lists etcd alarms for the current node.
Expand All @@ -908,11 +826,7 @@ func (c *Client) EtcdRecover(ctx context.Context, snapshot io.Reader, callOption
func (c *Client) EtcdAlarmList(ctx context.Context, opts ...grpc.CallOption) (*machineapi.EtcdAlarmListResponse, error) {
resp, err := c.MachineClient.EtcdAlarmList(ctx, &emptypb.Empty{}, opts...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.EtcdAlarmListResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// EtcdAlarmDisarm disarms etcd alarms for the current node.
Expand All @@ -921,11 +835,7 @@ func (c *Client) EtcdAlarmList(ctx context.Context, opts ...grpc.CallOption) (*m
func (c *Client) EtcdAlarmDisarm(ctx context.Context, opts ...grpc.CallOption) (*machineapi.EtcdAlarmDisarmResponse, error) {
resp, err := c.MachineClient.EtcdAlarmDisarm(ctx, &emptypb.Empty{}, opts...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.EtcdAlarmDisarmResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// EtcdDefragment defragments etcd data directory for the current node.
Expand All @@ -937,11 +847,7 @@ func (c *Client) EtcdAlarmDisarm(ctx context.Context, opts ...grpc.CallOption) (
func (c *Client) EtcdDefragment(ctx context.Context, opts ...grpc.CallOption) (*machineapi.EtcdDefragmentResponse, error) {
resp, err := c.MachineClient.EtcdDefragment(ctx, &emptypb.Empty{}, opts...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.EtcdDefragmentResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// EtcdStatus returns etcd status for the current member.
Expand All @@ -950,22 +856,14 @@ func (c *Client) EtcdDefragment(ctx context.Context, opts ...grpc.CallOption) (*
func (c *Client) EtcdStatus(ctx context.Context, opts ...grpc.CallOption) (*machineapi.EtcdStatusResponse, error) {
resp, err := c.MachineClient.EtcdStatus(ctx, &emptypb.Empty{}, opts...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.EtcdStatusResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// GenerateClientConfiguration implements proto.MachineServiceClient interface.
func (c *Client) GenerateClientConfiguration(ctx context.Context, req *machineapi.GenerateClientConfigurationRequest, callOptions ...grpc.CallOption) (resp *machineapi.GenerateClientConfigurationResponse, err error) { //nolint:lll
resp, err = c.MachineClient.GenerateClientConfiguration(ctx, req, callOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.GenerateClientConfigurationResponse) //nolint:errcheck

return
return FilterMessages(resp, err)
}

// PacketCapture implements the proto.MachineServiceClient interface.
Expand Down Expand Up @@ -1046,11 +944,7 @@ func (c *Client) Netstat(ctx context.Context, req *machineapi.NetstatRequest, ca
callOptions...,
)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*machineapi.NetstatResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}

// MetaWrite writes a key to META storage.
Expand Down
6 changes: 1 addition & 5 deletions pkg/machinery/client/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,5 @@ type InspectClient struct {
func (c *InspectClient) ControllerRuntimeDependencies(ctx context.Context, callOptions ...grpc.CallOption) (*inspectapi.ControllerRuntimeDependenciesResponse, error) {
resp, err := c.client.ControllerRuntimeDependencies(ctx, &emptypb.Empty{}, callOptions...)

var filtered any
filtered, err = FilterMessages(resp, err)
resp, _ = filtered.(*inspectapi.ControllerRuntimeDependenciesResponse) //nolint:errcheck

return resp, err
return FilterMessages(resp, err)
}
27 changes: 25 additions & 2 deletions pkg/machinery/client/reply.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/go-multierror"
rpcstatus "google.golang.org/genproto/googleapis/rpc/status"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
)

// NodeError is RPC error from some node.
Expand All @@ -29,10 +30,32 @@ func (ne *NodeError) Unwrap() error {
return ne.Err
}

// Message is a generic interface for Messages.
type Message[T any] interface {
*T
proto.Message
}

// MessageResponse is a generic interface for response with Messages.
type MessageResponse[V any, T Message[V]] interface {
proto.Message
GetMessages() []T
}

// FilterMessages removes error Messages from resp and builds multierror.
//
func FilterMessages[V any, T Message[V], MR MessageResponse[V, T]](resp MR, err error) (MR, error) {
var zero MR

res, filteredErr := filterMessages(resp, err)
if res == nil {
return zero, filteredErr
}

return resp, filteredErr
}

//nolint:gocyclo,cyclop
func FilterMessages(resp any, err error) (any, error) {
func filterMessages(resp any, err error) (any, error) {
if resp == nil {
return nil, err
}
Expand Down
Loading

0 comments on commit ce4c404

Please sign in to comment.