Skip to content
This repository has been archived by the owner on Jan 30, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1642 from dalbani/dynamic-metadata
Browse files Browse the repository at this point in the history
fleetd: support dynamic metadata
  • Loading branch information
Dongsu Park authored Nov 11, 2016
2 parents 3aaa1ab + 5378558 commit 868a18f
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 18 deletions.
24 changes: 24 additions & 0 deletions Documentation/api-v1.md
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,30 @@ The request must not have a body.

A successful response will contain a page of zero or more Machine entities.

### Edit Machine Metadata

Add, change, or remove metadata from one or more machines.

#### Request

```
PATCH /machines HTTP/1.1
[
{ "op": "add", "path": "/<machine_id>/metadata/<name>", "value": <new value> },
{ "op": "remove", "path": "/<machine_id>/metadata/<name>" },
{ "op": "replace", "path": "/<machine_id>/metadata/<name>", "value": <new value> }
]
```

The request body must contain a JSON document in [JSONPatch](http://jsonpatch.com) format. Supported operations are "add", "remove" and "replace". Any number of operations for any number of machines, including machines not currently registered with the cluster, may be included in a single request. All operations will be processed in-order, top to bottom after validation. Modified metadata will persist across a machine leaving and rejoining the cluster.


#### Response

A success in indicated by a `204 No Content`.
Invalid operations, missing values, or improperly formatted paths will result in a `400 Bad Request`.

## Capability Discovery

The v1 fleet API is described by a [discovery document][disco]. Users should generate their client bindings from this document using the appropriate language generator.
Expand Down
3 changes: 2 additions & 1 deletion Documentation/unit-files-and-scheduling.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ app.service fd1d3e94.../10.0.0.1 active running
```

A machine is not automatically configured with metadata.
A deployer may define machine metadata using the `metadata` [config option][config-option].
A deployer may define machine metadata using the `metadata` [config option][config-option] or via the [HTTP api][http-api].

## Schedule unit next to another unit

Expand Down Expand Up @@ -244,6 +244,7 @@ MachineOf=%p.socket
would result in an effective `MachineOf` of `foo.socket`. Using the same unit snippet with a Unit called `bar.service`, on the other hand, would result in an effective `MachineOf` of `bar.socket`.

[config-option]: deployment-and-configuration.md#metadata
[http-api]: api-v1.md#edit-machine-metadata
[systemd-guide]: https://github.com/coreos/docs/blob/master/os/getting-started-with-systemd.md
[systemd instances]: http://0pointer.de/blog/projects/instances.html
[systemd specifiers]: http://www.freedesktop.org/software/systemd/man/systemd.unit.html#Specifiers
Expand Down
8 changes: 7 additions & 1 deletion agent/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,13 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) {
return nil, err
}

ms := a.Machine.State()
// fetch full machine state from registry instead of
// using the local version to allow for dynamic metadata
ms, err := reg.MachineState(a.Machine.State().ID)
if err != nil {
log.Errorf("Failed fetching machine state from Registry: %v", err)
return nil, err
}
as := AgentState{
MState: &ms,
Units: make(map[string]*job.Unit),
Expand Down
1 change: 1 addition & 0 deletions agent/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ MachineMetadata=dog=woof`),
reg := registry.NewFakeRegistry()
reg.SetJobs(tt.regJobs)
a := makeAgentWithMetadata(tt.metadata)
reg.SetMachines([]machine.MachineState{a.Machine.State()})
as, err := desiredAgentState(a, reg)
if err != nil {
t.Errorf("case %d: unexpected error: %v", i, err)
Expand Down
74 changes: 70 additions & 4 deletions api/machines.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,22 @@
package api

import (
"fmt"
"encoding/json"
"errors"
"net/http"
"path"
"regexp"

"github.com/coreos/fleet/client"
"github.com/coreos/fleet/log"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/schema"
)

var (
metadataPathRegex = regexp.MustCompile("^/([^/]+)/metadata/([A-Za-z0-9_.-]+$)")
)

func wireUpMachinesResource(mux *http.ServeMux, prefix string, tokenLimit int, cAPI client.API) {
res := path.Join(prefix, "machines")
mr := machinesResource{cAPI, uint16(tokenLimit)}
Expand All @@ -36,12 +42,24 @@ type machinesResource struct {
tokenLimit uint16
}

type machineMetadataOp struct {
Operation string `json:"op"`
Path string
Value string
}

func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if req.Method != "GET" {
sendError(rw, http.StatusBadRequest, fmt.Errorf("only HTTP GET supported against this resource"))
return
switch req.Method {
case "GET":
mr.list(rw, req)
case "PATCH":
mr.patch(rw, req)
default:
sendError(rw, http.StatusMethodNotAllowed, errors.New("only GET and PATCH supported against this resource"))
}
}

func (mr *machinesResource) list(rw http.ResponseWriter, req *http.Request) {
token, err := findNextPageToken(req.URL, mr.tokenLimit)
if err != nil {
sendError(rw, http.StatusBadRequest, err)
Expand All @@ -63,6 +81,54 @@ func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request)
sendResponse(rw, http.StatusOK, page)
}

func (mr *machinesResource) patch(rw http.ResponseWriter, req *http.Request) {
var ops []machineMetadataOp
dec := json.NewDecoder(req.Body)
if err := dec.Decode(&ops); err != nil {
sendError(rw, http.StatusBadRequest, err)
return
}

for _, op := range ops {
if op.Operation != "add" && op.Operation != "remove" && op.Operation != "replace" {
sendError(rw, http.StatusBadRequest, errors.New("invalid op: expect add, remove, or replace"))
return
}

if metadataPathRegex.FindStringSubmatch(op.Path) == nil {
sendError(rw, http.StatusBadRequest, errors.New("machine metadata path invalid"))
return
}

if op.Operation != "remove" && len(op.Value) == 0 {
sendError(rw, http.StatusBadRequest, errors.New("invalid value: add and replace require a value"))
return
}
}

for _, op := range ops {
// regex already validated above
s := metadataPathRegex.FindStringSubmatch(op.Path)
machID := s[1]
key := s[2]

if op.Operation == "remove" {
err := mr.cAPI.DeleteMachineMetadata(machID, key)
if err != nil {
sendError(rw, http.StatusInternalServerError, err)
return
}
} else {
err := mr.cAPI.SetMachineMetadata(machID, key, op.Value)
if err != nil {
sendError(rw, http.StatusInternalServerError, err)
return
}
}
}
sendResponse(rw, http.StatusNoContent, nil)
}

func getMachinePage(cAPI client.API, tok PageToken) (*schema.MachinePage, error) {
all, err := cAPI.Machines()
if err != nil {
Expand Down
134 changes: 132 additions & 2 deletions api/machines_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,29 @@ import (
"net/http/httptest"
"reflect"
"strconv"
"strings"
"testing"

"github.com/coreos/fleet/client"
"github.com/coreos/fleet/machine"
"github.com/coreos/fleet/registry"
)

func TestMachinesList(t *testing.T) {
func fakeMachinesSetup() (*machinesResource, *httptest.ResponseRecorder) {
fr := registry.NewFakeRegistry()
fr.SetMachines([]machine.MachineState{
{ID: "XXX", PublicIP: "", Metadata: nil},
{ID: "XXX", PublicIP: "", Metadata: map[string]string{}},
{ID: "YYY", PublicIP: "1.2.3.4", Metadata: map[string]string{"ping": "pong"}},
})
fAPI := &client.RegistryClient{Registry: fr}
resource := &machinesResource{cAPI: fAPI, tokenLimit: testTokenLimit}
rw := httptest.NewRecorder()

return resource, rw
}

func TestMachinesList(t *testing.T) {
resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("GET", "http://example.com", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
Expand Down Expand Up @@ -136,3 +143,126 @@ func TestExtractMachinePage(t *testing.T) {
}
}
}

func TestMachinesPatchAddModify(t *testing.T) {
reqBody := `
[{"op": "add", "path": "/XXX/metadata/foo", "value": "bar"},
{"op": "replace", "path": "/YYY/metadata/ping", "value": "splat"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusNoContent {
t.Errorf("Expected 204, got %d", rw.Code)
}

// fetch machine to make sure data has been added
req, err = http.NewRequest("GET", "http://example.com/machines", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}
rw.Body.Reset()
resource.ServeHTTP(rw, req)

if rw.Body == nil {
t.Error("Received nil response body")
} else {
body := rw.Body.String()
expected := `{"machines":[{"id":"XXX","metadata":{"foo":"bar"}},{"id":"YYY","metadata":{"ping":"splat"},"primaryIP":"1.2.3.4"}]}`
if body != expected {
t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body)
}
}
}

func TestMachinesPatchDelete(t *testing.T) {
reqBody := `
[{"op": "remove", "path": "/XXX/metadata/foo"},
{"op": "remove", "path": "/YYY/metadata/ping"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusNoContent {
t.Errorf("Expected 204, got %d", rw.Code)
}

// fetch machine to make sure data has been added
req, err = http.NewRequest("GET", "http://example.com/machines", nil)
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}
rw.Body.Reset()
resource.ServeHTTP(rw, req)

if rw.Body == nil {
t.Error("Received nil response body")
} else {
body := rw.Body.String()
expected := `{"machines":[{"id":"XXX"},{"id":"YYY","primaryIP":"1.2.3.4"}]}`
if body != expected {
t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body)
}
}
}

func TestMachinesPatchBadOp(t *testing.T) {
reqBody := `
[{"op": "noop", "path": "/XXX/metadata/foo", "value": "bar"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusBadRequest {
t.Errorf("Expected 400, got %d", rw.Code)
}
}

func TestMachinesPatchBadPath(t *testing.T) {
reqBody := `
[{"op": "add", "path": "/XXX/foo", "value": "bar"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusBadRequest {
t.Errorf("Expected 400, got %d", rw.Code)
}
}

func TestMachinesPatchBadValue(t *testing.T) {
reqBody := `
[{"op": "add", "path": "/XXX/foo"}]
`

resource, rw := fakeMachinesSetup()
req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody))
if err != nil {
t.Fatalf("Failed creating http.Request: %v", err)
}

resource.ServeHTTP(rw, req)
if rw.Code != http.StatusBadRequest {
t.Errorf("Expected 400, got %d", rw.Code)
}
}
2 changes: 2 additions & 0 deletions client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (

type API interface {
Machines() ([]machine.MachineState, error)
SetMachineMetadata(machID, key, value string) error
DeleteMachineMetadata(machID, key string) error

Unit(string) (*schema.Unit, error)
Units() ([]*schema.Unit, error)
Expand Down
31 changes: 31 additions & 0 deletions registry/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,37 @@ func (f *FakeRegistry) UnitHeartbeat(name, machID string, ttl time.Duration) err

func (f *FakeRegistry) ClearUnitHeartbeat(string) {}

func (f *FakeRegistry) SetMachineMetadata(machID string, key string, value string) error {
for _, mach := range f.machines {
if mach.ID == machID {
mach.Metadata[key] = value
}
}
return nil
}

func (f *FakeRegistry) DeleteMachineMetadata(machID string, key string) error {
for _, mach := range f.machines {
if mach.ID == machID {
delete(mach.Metadata, key)
}
}
return nil
}

func (f *FakeRegistry) MachineState(machID string) (machine.MachineState, error) {
f.RLock()
defer f.RUnlock()

for _, mach := range f.machines {
if mach.ID == machID {
return mach, nil
}
}

return machine.MachineState{}, errors.New("Machine state not found")
}

func NewFakeClusterRegistry(dVersion *semver.Version, eVersion int) *FakeClusterRegistry {
return &FakeClusterRegistry{
dVersion: dVersion,
Expand Down
3 changes: 3 additions & 0 deletions registry/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ type Registry interface {
ScheduleUnit(name, machID string) error
SetUnitTargetState(name string, state job.JobState) error
SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error)
MachineState(machID string) (machine.MachineState, error)
UnscheduleUnit(name, machID string) error
SetMachineMetadata(machID string, key string, value string) error
DeleteMachineMetadata(machID string, key string) error

IsRegistryReady() bool
UseEtcdRegistry() bool
Expand Down
Loading

0 comments on commit 868a18f

Please sign in to comment.