Skip to content

Commit

Permalink
Merge pull request #414 from fosterseth/feat_signed_work_requests
Browse files Browse the repository at this point in the history
Signed work submissions
  • Loading branch information
fosterseth authored Sep 22, 2021
2 parents fe7035b + 9589480 commit 156e6e2
Show file tree
Hide file tree
Showing 23 changed files with 485 additions and 112 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ TESTCMD = -run $(RUNTEST)
endif

test:
@go test ./... -p 1 -parallel=16 $(TESTCMD) -count=1 -v
@go test ./... -p 1 -parallel=16 $(TESTCMD) -count=1

testloop: receptor
@i=1; while echo "------ $$i" && \
Expand Down
6 changes: 3 additions & 3 deletions docs/source/tls.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ foo.yml
- tls-server:
name: myserver
cert: foo.crt
key: foo.key
cert: /full/path/foo.crt
key: /full/path/foo.key
requireclientcert: true
clientcas: ca.crt
clientcas: /full/path/ca.crt
- tcp-listener:
port: 2222
Expand Down
45 changes: 45 additions & 0 deletions docs/source/workceptor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,51 @@ For local work, transitioning from Pending to Running occurs the moment the ``co

For remote work, transitioning from Pending to Running occurs when the status reported from the remote node has a Running state.

Signed work
^^^^^^^^^^^^^^^^^^

Remote work submissions can be digitally signed by the sender. The target node will verify the signature of the work command before starting the work unit.

A pair of RSA public and private keys are created offline and distributed to the nodes. The public key should be on the node receiving work (PKIX format). The private key should be on the node submitting work (PKCS1 format).

The following commands can be used to create keys for signing work:

.. code::
openssl genrsa -out signworkprivate.pem 2048
openssl rsa -in signworkprivate.pem -pubout -out signworkpublic.pem
in `bar.yml`

.. code-block:: yaml
# PKIX
- work-verification:
publickey: /full/path/signworkpublic.pem
- work-command:
workType: echopayload
command: bash
params: "-c \"while read -r line; do echo ${line^^}; sleep 5; done\""
verifysignature: true
in `foo.yml`

.. code-block:: yaml
# PKCS1
- work-signing:
privatekey: /full/path/signworkprivate.pem
tokenexpiration: 30m
Tokenexpiration determines how long a the signature is valid for. This expiration directly corresponds to the "expiresAt" field in the generated JSON web token. Valid units include "h" and "m", e.g. 1h30m for one hour and 30 minutes.

Use the "--signwork" parameter to sign the work.

.. code::
$ receptorctl --socket /tmp/foo.sock work submit echoint --node bar --no-payload --signwork
Units on disk
^^^^^^^^^^^^^^^^^^

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/fortytw2/leaktest v1.3.0
github.com/fsnotify/fsnotify v1.4.9
github.com/ghjm/cmdline v0.1.0
github.com/golang-jwt/jwt/v4 v4.0.0
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/gorilla/websocket v1.4.2
github.com/jupp0r/go-priority-queue v0.0.0-20160601094913-ab1073853bde
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v4 v4.0.0 h1:RAqyYixv1p7uEnocuy8P1nru5wprCh/MH2BIlW5z5/o=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
Expand Down
51 changes: 48 additions & 3 deletions pkg/certificates/ca.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ func LoadFromPEMFile(filename string) ([]interface{}, error) {
var block *pem.Block
for len(content) > 0 {
block, content = pem.Decode(content)
if block == nil {
return nil, fmt.Errorf("failed to decode PEM block")
}
switch block.Type {
case "CERTIFICATE":
var cert *x509.Certificate
Expand All @@ -68,6 +71,12 @@ func LoadFromPEMFile(filename string) ([]interface{}, error) {
return nil, err
}
results = append(results, key)
case "PUBLIC KEY":
key, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
return nil, err
}
results = append(results, key)
default:
return nil, fmt.Errorf("unknown block type %s", block.Type)
}
Expand Down Expand Up @@ -112,13 +121,32 @@ func SaveToPEMFile(filename string, data []interface{}) error {

continue
}
var key *rsa.PrivateKey
key, ok = elem.(*rsa.PrivateKey)
var keyPrivate *rsa.PrivateKey
keyPrivate, ok = elem.(*rsa.PrivateKey)
if ok {
keyPEM := new(bytes.Buffer)
err = pem.Encode(keyPEM, &pem.Block{
Type: "RSA PRIVATE KEY",
Bytes: x509.MarshalPKCS1PrivateKey(key),
Bytes: x509.MarshalPKCS1PrivateKey(keyPrivate),
})
if err != nil {
return err
}
content = append(content, keyPEM.String())

continue
}
var keyPublic *rsa.PublicKey
keyPublic, ok = elem.(*rsa.PublicKey)
if ok {
keyPEM := new(bytes.Buffer)
keyPublicBytes, err := x509.MarshalPKIXPublicKey(keyPublic)
if err != nil {
return err
}
err = pem.Encode(keyPEM, &pem.Block{
Type: "PUBLIC KEY",
Bytes: keyPublicBytes,
})
if err != nil {
return err
Expand Down Expand Up @@ -183,6 +211,23 @@ func LoadPrivateKey(filename string) (*rsa.PrivateKey, error) {
return key, nil
}

// LoadPublicKey loads a single RSA public key from a file.
func LoadPublicKey(filename string) (*rsa.PublicKey, error) {
data, err := LoadFromPEMFile(filename)
if err != nil {
return nil, err
}
if len(data) != 1 {
return nil, fmt.Errorf("public key file should contain exactly one item")
}
key, ok := data[0].(*rsa.PublicKey)
if !ok {
return nil, fmt.Errorf("public key file does not contain public key data")
}

return key, nil
}

// CA contains internal data for a certificate authority.
type CA struct {
Certificate *x509.Certificate
Expand Down
1 change: 1 addition & 0 deletions pkg/controlsvc/controlsvc.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ func (s *Server) RunControlSession(conn net.Conn) {
cfr, err = cc.ControlFunc(s.nc, cfo)
}
if err != nil {
logger.Error(err.Error())
_, err = conn.Write([]byte(fmt.Sprintf("ERROR: %s\n", err)))
if err != nil {
logger.Error("Write error in control service: %s\n", err)
Expand Down
24 changes: 18 additions & 6 deletions pkg/netceptor/netceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Netceptor struct {
seenUpdateExpireTime time.Duration
maxForwardingHops byte
maxConnectionIdleTime time.Duration
workCommands []string
workCommands []WorkCommand
epoch uint64
sequence uint64
connLock *sync.RWMutex
Expand Down Expand Up @@ -221,14 +221,21 @@ const (
ConnTypeStreamTLS = 2
)

// WorkCommand tracks available work types and whether they verify work submissions.
type WorkCommand struct {
WorkType string
// Secure true means receptor will verify the signature of the work submit payload
Secure bool
}

// ServiceAdvertisement is the data associated with a service advertisement.
type ServiceAdvertisement struct {
NodeID string
Service string
Time time.Time
ConnType byte
Tags map[string]string
WorkCommands []string
WorkCommands []WorkCommand
}

// serviceAdvertisementFull is the whole message from the network.
Expand Down Expand Up @@ -563,7 +570,9 @@ func (s *Netceptor) Status() Status {
adCopy := *ad
if adCopy.NodeID == s.nodeID {
adCopy.Time = time.Now()
adCopy.WorkCommands = s.workCommands
if len(s.workCommands) > 0 {
adCopy.WorkCommands = s.workCommands
}
}
serviceAds = append(serviceAds, &adCopy)
}
Expand Down Expand Up @@ -688,7 +697,9 @@ func (s *Netceptor) sendServiceAds() {
}
if svcType, ok := sa.Tags["type"]; ok {
if svcType == "Control Service" {
sa.WorkCommands = s.workCommands
if len(s.workCommands) > 0 {
sa.WorkCommands = s.workCommands
}
}
}
ads = append(ads, sa)
Expand Down Expand Up @@ -867,11 +878,12 @@ func (s *Netceptor) GetServerTLSConfig(name string) (*tls.Config, error) {
}

// AddWorkCommand records a work command so it can be included in service announcements.
func (s *Netceptor) AddWorkCommand(command string) error {
func (s *Netceptor) AddWorkCommand(command string, secure bool) error {
if command == "" {
return fmt.Errorf("must provide a name")
}
s.workCommands = append(s.workCommands, command)
wC := WorkCommand{WorkType: command, Secure: secure}
s.workCommands = append(s.workCommands, wC)

return nil
}
Expand Down
60 changes: 58 additions & 2 deletions pkg/workceptor/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,7 @@ type commandCfg struct {
Command string `required:"true" description:"Command to run to process units of work"`
Params string `description:"Command-line parameters"`
AllowRuntimeParams bool `description:"Allow users to add more parameters" default:"false"`
VerifySignature bool `description:"Verify a signed work submission" default:"false"`
}

func (cfg commandCfg) newWorker(w *Workceptor, unitID string, workType string) WorkUnit {
Expand All @@ -309,7 +310,10 @@ func (cfg commandCfg) newWorker(w *Workceptor, unitID string, workType string) W

// Run runs the action.
func (cfg commandCfg) Run() error {
err := MainInstance.RegisterWorker(cfg.WorkType, cfg.newWorker)
if cfg.VerifySignature && MainInstance.verifyingkey == "" {
return fmt.Errorf("VerifySignature for work command '%s' is true, but the work verification public key is not specified", cfg.WorkType)
}
err := MainInstance.RegisterWorker(cfg.WorkType, cfg.newWorker, cfg.VerifySignature)

return err
}
Expand Down Expand Up @@ -339,7 +343,59 @@ func (cfg commandRunnerCfg) Run() error {
return nil
}

type signingKeyPrivateCfg struct {
PrivateKey string `description:"Private key to sign work submissions" barevalue:"yes" default:""`
TokenExpiration string `description:"Expiration of the signed json web token, e.g. 3h or 3h30m" default:""`
}

type verifyingKeyPublicCfg struct {
PublicKey string `description:"Public key to verify signed work submissions" barevalue:"yes" default:""`
}

func filenameExists(filename string) error {
if _, err := os.Stat(filename); err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("%s does not exist", filename)
}

return err
}

return nil
}

func (cfg signingKeyPrivateCfg) Prepare() error {
err := filenameExists(cfg.PrivateKey)
if err != nil {
return err
}
if cfg.TokenExpiration != "" {
duration, err := time.ParseDuration(cfg.TokenExpiration)
if err != nil {
return fmt.Errorf("failed to parse TokenExpiration -- valid examples include '1.5h', '30m', '30m10s'")
}
MainInstance.signingexpiration = duration
}
MainInstance.signingkey = cfg.PrivateKey

return nil
}

func (cfg verifyingKeyPublicCfg) Prepare() error {
err := filenameExists(cfg.PublicKey)
if err != nil {
return err
}
MainInstance.verifyingkey = cfg.PublicKey

return nil
}

func init() {
cmdline.RegisterConfigTypeForApp("receptor-workers",
"work-signing", "Private key to sign work submissions", signingKeyPrivateCfg{}, cmdline.Singleton, cmdline.Section(workersSection))
cmdline.RegisterConfigTypeForApp("receptor-workers",
"work-verification", "Public key to verify work submissions", verifyingKeyPublicCfg{}, cmdline.Singleton, cmdline.Section(workersSection))
cmdline.RegisterConfigTypeForApp("receptor-workers",
"work-command", "Run a worker using an external command", commandCfg{}, cmdline.Section(workersSection))
cmdline.RegisterConfigTypeForApp("receptor-workers",
Expand Down Expand Up @@ -371,5 +427,5 @@ func (c Command) setup(wc *Workceptor) error {
return cw
}

return wc.RegisterWorker(c.WorkType, factory)
return wc.RegisterWorker(c.WorkType, factory, false)
}
Loading

0 comments on commit 156e6e2

Please sign in to comment.