Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

*: consolidate excessive Hash/Unit requests #1376

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 22 additions & 6 deletions registry/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,25 @@ func (r *EtcdRegistry) Units() ([]job.Unit, error) {
return nil, err
}

// Fetch all units by hash recursively to avoid sending N requests to Etcd.
hashToUnit, err := r.getAllUnitsHashMap()
if err != nil {
log.Errorf("failed fetching all Units from etcd: %v", err)
return nil, err
}
unitHashLookupFunc := func(hash unit.Hash) *unit.UnitFile {
stringHash := hash.String()
unit, ok := hashToUnit[stringHash]
if !ok {
log.Errorf("did not find Unit %v in list of all units", stringHash)
return nil
}
return unit
}

uMap := make(map[string]*job.Unit)
for _, dir := range res.Node.Nodes {
u, err := r.dirToUnit(dir)
u, err := r.dirToUnit(dir, unitHashLookupFunc)
if err != nil {
log.Errorf("Failed to parse Unit from etcd: %v", err)
continue
Expand Down Expand Up @@ -143,12 +159,12 @@ func (r *EtcdRegistry) Unit(name string) (*job.Unit, error) {
return nil, err
}

return r.dirToUnit(res.Node)
return r.dirToUnit(res.Node, r.getUnitByHash)
}

// dirToUnit takes a Node containing a Job's constituent objects (in child
// nodes) and returns a *job.Unit, or any error encountered
func (r *EtcdRegistry) dirToUnit(dir *etcd.Node) (*job.Unit, error) {
func (r *EtcdRegistry) dirToUnit(dir *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) {
objKey := path.Join(dir.Key, "object")
var objNode *etcd.Node
for _, node := range dir.Nodes {
Expand All @@ -160,7 +176,7 @@ func (r *EtcdRegistry) dirToUnit(dir *etcd.Node) (*job.Unit, error) {
if objNode == nil {
return nil, nil
}
u, err := r.getUnitFromObjectNode(objNode)
u, err := r.getUnitFromObjectNode(objNode, unitHashLookupFunc)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -254,7 +270,7 @@ func dirToHeartbeat(dir *etcd.Node) (heartbeat string) {
// getUnitFromObject takes a *etcd.Node containing a Unit's jobModel, and
// instantiates and returns a representative *job.Unit, transitively fetching the
// associated UnitFile as necessary
func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node) (*job.Unit, error) {
func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node, unitHashLookupFunc func(unit.Hash) *unit.UnitFile) (*job.Unit, error) {
var err error
var jm jobModel
if err = unmarshal(node.Value, &jm); err != nil {
Expand All @@ -263,7 +279,7 @@ func (r *EtcdRegistry) getUnitFromObjectNode(node *etcd.Node) (*job.Unit, error)

var unit *unit.UnitFile

unit = r.getUnitByHash(jm.UnitHash)
unit = unitHashLookupFunc(jm.UnitHash)
if unit == nil {
log.Warningf("No Unit found in Registry for Job(%s)", jm.Name)
return nil, nil
Expand Down
43 changes: 42 additions & 1 deletion registry/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package registry

import (
"strings"

etcd "github.com/coreos/fleet/Godeps/_workspace/src/github.com/coreos/etcd/client"

"github.com/coreos/fleet/log"
Expand Down Expand Up @@ -61,8 +63,47 @@ func (r *EtcdRegistry) getUnitByHash(hash unit.Hash) *unit.UnitFile {
}
return nil
}
return r.unitFromEtcdNode(hash, resp.Node)
}

// getAllUnitsHashMap retrieves from the Registry all Units and returns a map of hash to UnitFile
func (r *EtcdRegistry) getAllUnitsHashMap() (map[string]*unit.UnitFile, error) {
key := r.prefixed(unitPrefix)
opts := &etcd.GetOptions{
Recursive: true,
Quorum: true,
}
hashToUnit := map[string]*unit.UnitFile{}
resp, err := r.kAPI.Get(r.ctx(), key, opts)
if err != nil {
return nil, err
}

for _, node := range resp.Node.Nodes {
parts := strings.Split(node.Key, "/")
if len(parts) == 0 {
log.Errorf("key '%v' doesn't have enough parts", node.Key)
continue
}
stringHash := parts[len(parts)-1]
hash, err := unit.HashFromHexString(stringHash)
if err != nil {
log.Errorf("failed to get Hash for key '%v' with stringHash '%v': %v", node.Key, stringHash, err)
continue
}
unit := r.unitFromEtcdNode(hash, node)
if unit == nil {
continue
}
hashToUnit[stringHash] = unit
}

return hashToUnit, nil
}

func (r *EtcdRegistry) unitFromEtcdNode(hash unit.Hash, etcdNode *etcd.Node) *unit.UnitFile {
var um unitModel
if err := unmarshal(resp.Node.Value, &um); err != nil {
if err := unmarshal(etcdNode.Value, &um); err != nil {
log.Errorf("error unmarshaling Unit(%s): %v", hash, err)
return nil
}
Expand Down
14 changes: 14 additions & 0 deletions unit/unit.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package unit
import (
"bytes"
"crypto/sha1"
"encoding/hex"
"fmt"
"io/ioutil"
"strings"
Expand Down Expand Up @@ -169,6 +170,19 @@ func (h *Hash) Empty() bool {
return *h == Hash{}
}

func HashFromHexString(key string) (Hash, error) {
h := Hash{}
out, err := hex.DecodeString(key)
if err != nil {
return h, err
}
if len(out) != sha1.Size {
return h, fmt.Errorf("size of key %q (%d) differs from SHA1 size (%d)", out, len(out), sha1.Size)
}
copy(h[:], out[:sha1.Size])
return h, nil
}

// UnitState encodes the current state of a unit loaded into a fleet agent
type UnitState struct {
LoadState string
Expand Down
17 changes: 17 additions & 0 deletions unit/unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ func TestUnitHash(t *testing.T) {
}
}

func TestHashFromHexString(t *testing.T) {
u, err := NewUnitFile("[Service]\nExecStart=/bin/sleep 100\n")
if err != nil {
t.Fatalf("Unexpected error encountered creating unit: %v", err)
}
gotHash := u.Hash()

expectHashString := "1c6fb6f3684bafb0c173d8b8b957ceff031180c1"
rehashed, err := HashFromHexString(expectHashString)
if err != nil {
t.Fatalf("HashFromHexString failed with: %v", err)
}
if rehashed != gotHash {
t.Fatalf("HashFromHexString not equal to original hash")
}
}

func TestRecognizedUnitTypes(t *testing.T) {
tts := []struct {
name string
Expand Down