Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions agent/build/agent.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
agent-id = "agent1"

[log]
level = 5
type = "stdout"

[server]
address = "0.0.0.0:9008"
tls-cert-file = "/etc/beegfs/cert.pem"
tls-key-file = "/etc/beegfs/key.pem"

[reconciler]
deployment-strategy = "default"
manifest-path = "/etc/beegfs/manifest.yaml"
active-manifest-path = "/etc/beegfs/.active.manifest.yaml"
122 changes: 122 additions & 0 deletions agent/cmd/beegfs-agent/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package main

import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/spf13/pflag"
"github.com/thinkparq/beegfs-go/agent/internal/config"
"github.com/thinkparq/beegfs-go/agent/internal/server"
"github.com/thinkparq/beegfs-go/agent/pkg/reconciler"
"github.com/thinkparq/beegfs-go/common/configmgr"
"github.com/thinkparq/beegfs-go/common/logger"
"go.uber.org/zap"
)

const (
envVarPrefix = "BEEAGENT_"
)

// Set by the build process using ldflags.
var (
binaryName = "unknown"
version = "unknown"
commit = "unknown"
buildTime = "unknown"
)

func main() {
pflag.Bool("version", false, "Print the version then exit.")
pflag.String("cfg-file", "/etc/beegfs/agent.toml", "The path to the a configuration file (can be omitted to set all configuration using flags and/or environment variables). When Remote Storage Targets are configured using a file, they can be updated without restarting the application.")
pflag.String("agent-id", "0", "A unique ID used to identify what services from the manifest this agent is responsible for. Should not change after initially starting the agent.")
pflag.String("log.type", "stderr", "Where log messages should be sent ('stderr', 'stdout', 'syslog', 'logfile').")
pflag.String("log.file", "/var/log/beegfs/beegfs-remote.log", "The path to the desired log file when logType is 'log.file' (if needed the directory and all parent directories will be created).")
pflag.Int8("log.level", 3, "Adjust the logging level (0=Fatal, 1=Error, 2=Warn, 3=Info, 4+5=Debug).")
pflag.Int("log.max-size", 1000, "When log.type is 'logfile' the maximum size of the log.file in megabytes before it is rotated.")
pflag.Int("log.num-rotated-files", 5, "When log.type is 'logfile' the maximum number old log.file(s) to keep when log.max-size is reached and the log is rotated.")
pflag.Bool("log.developer", false, "Enable developer logging including stack traces and setting the equivalent of log.level=5 and log.type=stdout (all other log settings are ignored).")
pflag.String("server.address", "0.0.0.0:9008", "The hostname:port where this Agent should listen for requests from the BeeGFS CTL tool.")
pflag.String("server.tls-cert-file", "/etc/beegfs/cert.pem", "Path to a certificate file that provides the identify of this Agent's gRPC server.")
pflag.String("server.tls-key-file", "/etc/beegfs/key.pem", "Path to the key file belonging to the certificate for this Agent's gRPC server.")
pflag.Bool("server.tls-disable", false, "Disable TLS entirely for gRPC communication to this Agent's gRPC server.")
pflag.String("reconciler.manifest-path", "/etc/beegfs/manifest.yaml", "The path to the BeeGFS manifest this agent should apply. The manifest will be identical to the active manifest if applied successfully.")
pflag.String("reconciler.active-manifest-path", "/etc/beegfs/.active.manifest.yaml", "The past to the last BeeGFS manifest successfully applied by this agent.")
pflag.String("reconciler.deployment-strategy", "default", "The deployment strategy used by the reconciler.")
pflag.Bool("developer.dump-config", false, "Dump the full configuration and immediately exit.")
pflag.CommandLine.MarkHidden("developer.dump-config")
pflag.CommandLine.SortFlags = false
pflag.Usage = func() {
fmt.Fprintf(os.Stderr, "Usage of %s:\n", os.Args[0])
pflag.PrintDefaults()
helpText := `
Further info:
Configuration may be set using a mix of flags, environment variables, and values from a TOML configuration file.
Configuration will be merged using the following precedence order (highest->lowest): (1) flags (2) environment variables (3) configuration file (4) defaults.
Using environment variables:
To specify configuration using environment variables specify %sKEY=VALUE where KEY is the flag name you want to specify in all capitals replacing dots (.) with a double underscore (__) and hyphens (-) with an underscore (_).
Examples:
export %sLOG__DEBUG=true
`
fmt.Fprintf(os.Stderr, helpText, envVarPrefix, envVarPrefix)
os.Exit(0)
}
pflag.Parse()

if printVersion, _ := pflag.CommandLine.GetBool("version"); printVersion {
fmt.Printf("%s %s (commit: %s, built: %s)\n", binaryName, version, commit, buildTime)
os.Exit(0)
}

cfgMgr, err := configmgr.New(pflag.CommandLine, envVarPrefix, &config.AppConfig{})
if err != nil {
log.Fatalf("unable to get initial configuration: %s", err)
}
c := cfgMgr.Get()
initialCfg, ok := c.(*config.AppConfig)
if !ok {
log.Fatalf("configuration manager returned invalid configuration (expected Agent application configuration)")
}
if initialCfg.Developer.DumpConfig {
fmt.Printf("Dumping AppConfig and exiting...\n\n")
fmt.Printf("%+v\n", initialCfg)
os.Exit(0)
}

logger, err := logger.New(initialCfg.Log)
if err != nil {
log.Fatalf("unable to initialize logger: %s", err)
}
defer logger.Sync()
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM, syscall.SIGINT)

reconciler, err := reconciler.New(ctx, initialCfg.AgentID, logger.Logger, initialCfg.Reconciler)
if err != nil {
logger.Fatal("unable to initialize reconciler", zap.Error(err))
}
cfgMgr.AddListener(reconciler)
agentServer, err := server.New(logger.Logger, initialCfg.Server, reconciler)
if err != nil {
logger.Fatal("unable to initialize gRPC server", zap.Error(err))
}

errChan := make(chan error, 2)
agentServer.ListenAndServe(errChan)
go cfgMgr.Manage(ctx, logger.Logger)

select {
case err := <-errChan:
logger.Error("component terminated unexpectedly", zap.Error(err))
case <-ctx.Done():
logger.Info("shutdown signal received")
}
cancel()
agentServer.Stop()
if err := reconciler.Stop(); err != nil {
logger.Error("error stopping reconciler", zap.Error(err))
}
logger.Info("shutdown all components, exiting")
}
35 changes: 35 additions & 0 deletions agent/internal/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package config

import (
"github.com/thinkparq/beegfs-go/agent/internal/server"
"github.com/thinkparq/beegfs-go/agent/pkg/reconciler"
"github.com/thinkparq/beegfs-go/common/configmgr"
"github.com/thinkparq/beegfs-go/common/logger"
)

type AppConfig struct {
AgentID string `mapstructure:"agent-id"`
Log logger.Config `mapstructure:"log"`
Reconciler reconciler.Config `mapstructure:"reconciler"`
Server server.Config `mapstructure:"server"`
Developer struct {
DumpConfig bool `mapstructure:"dump-config"`
}
}

func (c *AppConfig) NewEmptyInstance() configmgr.Configurable {
return new(AppConfig)
}

func (c *AppConfig) UpdateAllowed(newConfig configmgr.Configurable) error {
return nil
}

func (c *AppConfig) ValidateConfig() error {
return nil
}

// GetReconcilerConfig returns only the part of an AppConfig expected by the reconciler.
func (c *AppConfig) GetReconcilerConfig() reconciler.Config {
return c.Reconciler
}
143 changes: 143 additions & 0 deletions agent/internal/server/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package server

import (
"context"
"errors"
"fmt"
"net"
"path"
"reflect"
"sync"

"github.com/thinkparq/beegfs-go/agent/pkg/manifest"
"github.com/thinkparq/beegfs-go/agent/pkg/reconciler"
pb "github.com/thinkparq/protobuf/go/agent"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/status"
)

type Config struct {
Address string `mapstructure:"address"`
TlsCertFile string `mapstructure:"tls-cert-file"`
TlsKeyFile string `mapstructure:"tls-key-file"`
TlsDisable bool `mapstructure:"tls-disable"`
}

type AgentServer struct {
pb.UnimplementedBeeAgentServer
log *zap.Logger
wg *sync.WaitGroup
Config
grpcServer *grpc.Server
reconciler reconciler.Reconciler
}

func New(log *zap.Logger, config Config, reconciler reconciler.Reconciler) (*AgentServer, error) {
log = log.With(zap.String("component", path.Base(reflect.TypeOf(AgentServer{}).PkgPath())))

s := AgentServer{
log: log,
Config: config,
wg: new(sync.WaitGroup),
reconciler: reconciler,
}
var grpcServerOpts []grpc.ServerOption
if !s.TlsDisable && s.TlsCertFile != "" && s.TlsKeyFile != "" {
creds, err := credentials.NewServerTLSFromFile(s.TlsCertFile, s.TlsKeyFile)
if err != nil {
return nil, err
}
grpcServerOpts = append(grpcServerOpts, grpc.Creds(creds))
} else {
s.log.Warn("not using TLS because it was explicitly disabled or a certificate and/or key were not specified")
}
s.grpcServer = grpc.NewServer(grpcServerOpts...)
pb.RegisterBeeAgentServer(s.grpcServer, &s)
return &s, nil
}

func (s *AgentServer) ListenAndServe(errChan chan<- error) {
go func() {
s.log.Info("listening on local network address", zap.Any("address", s.Address))
lis, err := net.Listen("tcp", s.Address)
if err != nil {
errChan <- fmt.Errorf("remote server: error listening on the specified address %s: %w", s.Address, err)
return
}
s.log.Info("serving gRPC requests")
err = s.grpcServer.Serve(lis)
if err != nil {
errChan <- fmt.Errorf("remote server: error serving gRPC requests: %w", err)
}
}()
}

func (s *AgentServer) Stop() {
s.log.Info("attempting to stop gRPC server")
s.grpcServer.Stop()
s.wg.Wait()
}

func (s *AgentServer) UpdateManifest(ctx context.Context, request *pb.UpdateManifestRequest) (*pb.UpdateManifestResponse, error) {
s.wg.Add(1)
defer s.wg.Done()

filesystems := make(map[string]manifest.Filesystem, len(request.GetConfig()))
for fsUUID, protoFS := range request.GetConfig() {
if protoFS == nil {
return nil, status.Error(codes.InvalidArgument, "file system configuration was unexpectedly nil for fsUUID "+fsUUID)
}
filesystems[fsUUID] = manifest.FromProto(protoFS)
}

if err := s.reconciler.UpdateConfiguration(manifest.Manifest{
Filesystems: filesystems,
}); err != nil {
return nil, grpcStatusFrom(err)
}
return &pb.UpdateManifestResponse{
AgentId: s.reconciler.GetAgentID(),
}, nil
}

func (s *AgentServer) ReconciliationStatus(ctx context.Context, request *pb.ReconciliationStatusRequest) (*pb.ReconciliationStatusResponse, error) {
s.wg.Add(1)
defer s.wg.Done()
if result, err := s.reconciler.Status(); err != nil {
return nil, grpcStatusFrom(err)
} else {
return &pb.ReconciliationStatusResponse{
Status: result.Status,
AgentId: s.reconciler.GetAgentID(),
}, nil
}
}

func (s *AgentServer) CancelReconciliation(ctx context.Context, request *pb.CancelReconciliationRequest) (*pb.CancelReconciliationResponse, error) {
s.wg.Add(1)
defer s.wg.Done()
if result, err := s.reconciler.Cancel(request.GetReason()); err != nil {
return nil, grpcStatusFrom(err)
} else {
return &pb.CancelReconciliationResponse{
Status: result.Status,
AgentId: s.reconciler.GetAgentID(),
}, nil
}
}

func grpcStatusFrom(err error) error {
var grpcErr error
switch {
case errors.Is(err, reconciler.ErrSavingManifest):
grpcErr = status.Error(codes.FailedPrecondition, err.Error())
case errors.Is(err, reconciler.ErrBadManifest):
grpcErr = status.Error(codes.InvalidArgument, err.Error())
default:
grpcErr = status.Error(codes.Unknown, err.Error())
}
return grpcErr
}
44 changes: 44 additions & 0 deletions agent/pkg/deploy/deploy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package deploy

import "context"

// Deployer is responsible for carrying out the steps needed to manage a BeeGFS "service" and
// handles starting/modifying/stopping various system resources.
type Deployer interface {
Installer
Networker
Mounter
Servicer
// Cleanup should be called once the deployer is no longer needed to cleanup any long lived
// resources created by setting up a particular deployment strategy.
Cleanup() error
}

func NewDefaultStrategy(ctx context.Context) (Deployer, error) {
packageManager, err := DetectPackageManager()
if err != nil {
return nil, err
}

systemd, err := NewSystemd(ctx)
if err != nil {
return nil, err
}
return &defaultStrategy{
Package: packageManager,
IP: IP{},
Mount: Mount{},
Systemd: systemd,
}, nil
}

type defaultStrategy struct {
Package // implements Sourcerer
IP // implements Networker
Mount // implements Mounter
Systemd // implements Servicer
}

func (s *defaultStrategy) Cleanup() error {
return s.Systemd.Cleanup()
}
Loading
Loading