Skip to content

Commit

Permalink
Merge pull request #1020 from marquiz/devel/worker-refactor
Browse files Browse the repository at this point in the history
worker: move code
  • Loading branch information
k8s-ci-robot authored Dec 27, 2022
2 parents e97b2c1 + 1026d91 commit 8eb6640
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 178 deletions.
2 changes: 1 addition & 1 deletion cmd/nfd-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

"k8s.io/klog/v2"

"sigs.k8s.io/node-feature-discovery/pkg/nfd-client/worker"
worker "sigs.k8s.io/node-feature-discovery/pkg/nfd-worker"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
)
Expand Down
136 changes: 0 additions & 136 deletions pkg/nfd-client/base.go

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package worker
package nfdworker

import (
"os"
Expand Down
107 changes: 84 additions & 23 deletions pkg/nfd-client/worker/nfd-worker.go → pkg/nfd-worker/nfd-worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package worker
package nfdworker

import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"os"
Expand All @@ -27,6 +29,9 @@ import (
"time"

"golang.org/x/net/context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/validation"
"k8s.io/klog/v2"
Expand All @@ -38,7 +43,6 @@ import (
nfdv1alpha1 "sigs.k8s.io/node-feature-discovery/pkg/apis/nfd/v1alpha1"
nfdclient "sigs.k8s.io/node-feature-discovery/pkg/generated/clientset/versioned"
pb "sigs.k8s.io/node-feature-discovery/pkg/labeler"
clientcommon "sigs.k8s.io/node-feature-discovery/pkg/nfd-client"
"sigs.k8s.io/node-feature-discovery/pkg/utils"
"sigs.k8s.io/node-feature-discovery/pkg/version"
"sigs.k8s.io/node-feature-discovery/source"
Expand All @@ -57,6 +61,12 @@ import (
_ "sigs.k8s.io/node-feature-discovery/source/usb"
)

// NfdWorker is the interface for nfd-worker daemon
type NfdWorker interface {
Run() error
Stop()
}

// NFDConfig contains the configuration settings of NfdWorker.
type NFDConfig struct {
Core coreConfig
Expand All @@ -80,14 +90,18 @@ type Labels map[string]string

// Args are the command line arguments of NfdWorker.
type Args struct {
clientcommon.Args

CaFile string
CertFile string
ConfigFile string
EnableNodeFeatureApi bool
KeyFile string
Klog map[string]*utils.KlogFlagVal
Kubeconfig string
Oneshot bool
Options string
Server string
ServerNameOverride string

Klog map[string]*utils.KlogFlagVal
Overrides ConfigOverrideArgs
}

Expand All @@ -100,10 +114,9 @@ type ConfigOverrideArgs struct {
}

type nfdWorker struct {
clientcommon.NfdBaseClient

args Args
certWatch *utils.FsWatcher
clientConn *grpc.ClientConn
configFilePath string
config *NFDConfig
kubernetesNamespace string
Expand All @@ -119,21 +132,27 @@ type duration struct {
}

// NewNfdWorker creates new NfdWorker instance.
func NewNfdWorker(args *Args) (clientcommon.NfdClient, error) {
base, err := clientcommon.NewNfdBaseClient(&args.Args)
if err != nil {
return nil, err
}

func NewNfdWorker(args *Args) (NfdWorker, error) {
nfd := &nfdWorker{
NfdBaseClient: base,

args: *args,
config: &NFDConfig{},
kubernetesNamespace: utils.GetKubernetesNamespace(),
stop: make(chan struct{}, 1),
}

// Check TLS related args
if args.CertFile != "" || args.KeyFile != "" || args.CaFile != "" {
if args.CertFile == "" {
return nfd, fmt.Errorf("-cert-file needs to be specified alongside -key-file and -ca-file")
}
if args.KeyFile == "" {
return nfd, fmt.Errorf("-key-file needs to be specified alongside -cert-file and -ca-file")
}
if args.CaFile == "" {
return nfd, fmt.Errorf("-ca-file needs to be specified alongside -cert-file and -key-file")
}
}

if args.ConfigFile != "" {
nfd.configFilePath = filepath.Clean(args.ConfigFile)
}
Expand Down Expand Up @@ -175,7 +194,7 @@ func (w *nfdWorker) Run() error {
return err
}

defer w.GrpcDisconnect()
defer w.grpcDisconnect()

labelTrigger := time.After(0)
for {
Expand Down Expand Up @@ -214,7 +233,7 @@ func (w *nfdWorker) Run() error {
}
// Manage connection to master
if w.config.Core.NoPublish || !w.args.EnableNodeFeatureApi {
w.GrpcDisconnect()
w.grpcDisconnect()
}

// Always re-label after a re-config event. This way the new config
Expand All @@ -223,7 +242,7 @@ func (w *nfdWorker) Run() error {

case <-w.certWatch.Events:
klog.Infof("TLS certificate update, renewing connection to nfd-master")
w.GrpcDisconnect()
w.grpcDisconnect()

case <-w.stop:
klog.Infof("shutting down nfd-worker")
Expand All @@ -249,18 +268,60 @@ func (w *nfdWorker) getGrpcClient() (pb.LabelerClient, error) {
return w.grpcClient, nil
}

if err := w.NfdBaseClient.Connect(); err != nil {
// Check that if a connection already exists
if w.clientConn != nil {
return nil, fmt.Errorf("client connection already exists")
}

// Dial and create a client
dialCtx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
dialOpts := []grpc.DialOption{grpc.WithBlock()}
if w.args.CaFile != "" || w.args.CertFile != "" || w.args.KeyFile != "" {
// Load client cert for client authentication
cert, err := tls.LoadX509KeyPair(w.args.CertFile, w.args.KeyFile)
if err != nil {
return nil, fmt.Errorf("failed to load client certificate: %v", err)
}
// Load CA cert for server cert verification
caCert, err := os.ReadFile(w.args.CaFile)
if err != nil {
return nil, fmt.Errorf("failed to read root certificate file: %v", err)
}
caPool := x509.NewCertPool()
if ok := caPool.AppendCertsFromPEM(caCert); !ok {
return nil, fmt.Errorf("failed to add certificate from '%s'", w.args.CaFile)
}
// Create TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caPool,
ServerName: w.args.ServerNameOverride,
MinVersion: tls.VersionTLS13,
}
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
klog.Infof("connecting to nfd-master at %s ...", w.args.Server)
conn, err := grpc.DialContext(dialCtx, w.args.Server, dialOpts...)
if err != nil {
return nil, err
}
w.clientConn = conn

w.grpcClient = pb.NewLabelerClient(w.ClientConn())
w.grpcClient = pb.NewLabelerClient(w.clientConn)

return w.grpcClient, nil
}

// GrpcDisconnect closes the gRPC connection to NFD master
func (w *nfdWorker) GrpcDisconnect() {
w.NfdBaseClient.Disconnect()
// grpcDisconnect closes the gRPC connection to NFD master
func (w *nfdWorker) grpcDisconnect() {
if w.clientConn != nil {
klog.Infof("closing connection to nfd-master ...")
w.clientConn.Close()
}
w.clientConn = nil
w.grpcClient = nil
}
func (c *coreConfig) sanitize() {
Expand Down
Loading

0 comments on commit 8eb6640

Please sign in to comment.