From 6a82e03c472e474697f2d708b51ed5872e68dda9 Mon Sep 17 00:00:00 2001 From: Damiano Albani Date: Fri, 27 May 2016 17:02:27 +0200 Subject: [PATCH 1/2] Import implementation for dynamic metadata Source: https://github.com/coreos/fleet/pull/1077 --- Documentation/api-v1.md | 24 ++++ Documentation/unit-files-and-scheduling.md | 3 +- agent/reconcile.go | 8 +- agent/reconcile_test.go | 1 + api/machines.go | 74 +++++++++++- api/machines_test.go | 134 ++++++++++++++++++++- client/api.go | 2 + registry/fake.go | 31 +++++ registry/interface.go | 3 + registry/machine.go | 94 +++++++++++++-- 10 files changed, 356 insertions(+), 18 deletions(-) diff --git a/Documentation/api-v1.md b/Documentation/api-v1.md index 0151a4aee..9ad269460 100644 --- a/Documentation/api-v1.md +++ b/Documentation/api-v1.md @@ -202,6 +202,30 @@ The request must not have a body. A successful response will contain a page of zero or more Machine entities. +### Edit Machine Metadata + +Add, change, or remove metadata from one or more machines. + +#### Request + +``` +PATCH /machines HTTP/1.1 + +[ + { "op": "add", "path": "//metadata/", "value": }, + { "op": "remove", "path": "//metadata/" }, + { "op": "replace", "path": "//metadata/", "value": } +] +``` + +The request body must contain a JSON document in [JSONPatch](http://jsonpatch.com) format. Supported operations are "add", "remove" and "replace". Any number of operations for any number of machines, including machines not currently registered with the cluster, may be included in a single request. All operations will be processed in-order, top to bottom after validation. Modified metadata will persist across a machine leaving and rejoining the cluster. + + +#### Response + +A success in indicated by a `204 No Content`. +Invalid operations, missing values, or improperly formatted paths will result in a `400 Bad Request`. + ## Capability Discovery The v1 fleet API is described by a [discovery document][disco]. Users should generate their client bindings from this document using the appropriate language generator. diff --git a/Documentation/unit-files-and-scheduling.md b/Documentation/unit-files-and-scheduling.md index 0ccf17ea0..302c06b66 100644 --- a/Documentation/unit-files-and-scheduling.md +++ b/Documentation/unit-files-and-scheduling.md @@ -210,7 +210,7 @@ app.service fd1d3e94.../10.0.0.1 active running ``` A machine is not automatically configured with metadata. -A deployer may define machine metadata using the `metadata` [config option][config-option]. +A deployer may define machine metadata using the `metadata` [config option][config-option] or via the [HTTP api][http-api]. ## Schedule unit next to another unit @@ -244,6 +244,7 @@ MachineOf=%p.socket would result in an effective `MachineOf` of `foo.socket`. Using the same unit snippet with a Unit called `bar.service`, on the other hand, would result in an effective `MachineOf` of `bar.socket`. [config-option]: deployment-and-configuration.md#metadata +[http-api]: api-v1.md#edit-machine-metadata [systemd-guide]: https://github.com/coreos/docs/blob/master/os/getting-started-with-systemd.md [systemd instances]: http://0pointer.de/blog/projects/instances.html [systemd specifiers]: http://www.freedesktop.org/software/systemd/man/systemd.unit.html#Specifiers diff --git a/agent/reconcile.go b/agent/reconcile.go index 06b37b55d..ab439bc7e 100644 --- a/agent/reconcile.go +++ b/agent/reconcile.go @@ -127,7 +127,13 @@ func desiredAgentState(a *Agent, reg registry.Registry) (*AgentState, error) { return nil, err } - ms := a.Machine.State() + // fetch full machine state from registry instead of + // using the local version to allow for dynamic metadata + ms, err := reg.MachineState(a.Machine.State().ID) + if err != nil { + log.Errorf("Failed fetching machine state from Registry: %v", err) + return nil, err + } as := AgentState{ MState: &ms, Units: make(map[string]*job.Unit), diff --git a/agent/reconcile_test.go b/agent/reconcile_test.go index 9d4fb38cc..8ee79ac45 100644 --- a/agent/reconcile_test.go +++ b/agent/reconcile_test.go @@ -254,6 +254,7 @@ MachineMetadata=dog=woof`), reg := registry.NewFakeRegistry() reg.SetJobs(tt.regJobs) a := makeAgentWithMetadata(tt.metadata) + reg.SetMachines([]machine.MachineState{a.Machine.State()}) as, err := desiredAgentState(a, reg) if err != nil { t.Errorf("case %d: unexpected error: %v", i, err) diff --git a/api/machines.go b/api/machines.go index d598c84c1..93f8b61fa 100644 --- a/api/machines.go +++ b/api/machines.go @@ -15,9 +15,11 @@ package api import ( - "fmt" + "encoding/json" + "errors" "net/http" "path" + "regexp" "github.com/coreos/fleet/client" "github.com/coreos/fleet/log" @@ -25,6 +27,10 @@ import ( "github.com/coreos/fleet/schema" ) +var ( + metadataPathRegex = regexp.MustCompile("^/([^/]+)/metadata/([A-Za-z0-9_.-]+$)") +) + func wireUpMachinesResource(mux *http.ServeMux, prefix string, tokenLimit int, cAPI client.API) { res := path.Join(prefix, "machines") mr := machinesResource{cAPI, uint16(tokenLimit)} @@ -36,12 +42,24 @@ type machinesResource struct { tokenLimit uint16 } +type machineMetadataOp struct { + Operation string `json:"op"` + Path string + Value string +} + func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request) { - if req.Method != "GET" { - sendError(rw, http.StatusBadRequest, fmt.Errorf("only HTTP GET supported against this resource")) - return + switch req.Method { + case "GET": + mr.list(rw, req) + case "PATCH": + mr.patch(rw, req) + default: + sendError(rw, http.StatusMethodNotAllowed, errors.New("only GET and PATCH supported against this resource")) } +} +func (mr *machinesResource) list(rw http.ResponseWriter, req *http.Request) { token, err := findNextPageToken(req.URL, mr.tokenLimit) if err != nil { sendError(rw, http.StatusBadRequest, err) @@ -63,6 +81,54 @@ func (mr *machinesResource) ServeHTTP(rw http.ResponseWriter, req *http.Request) sendResponse(rw, http.StatusOK, page) } +func (mr *machinesResource) patch(rw http.ResponseWriter, req *http.Request) { + var ops []machineMetadataOp + dec := json.NewDecoder(req.Body) + if err := dec.Decode(&ops); err != nil { + sendError(rw, http.StatusBadRequest, err) + return + } + + for _, op := range ops { + if op.Operation != "add" && op.Operation != "remove" && op.Operation != "replace" { + sendError(rw, http.StatusBadRequest, errors.New("invalid op: expect add, remove, or replace")) + return + } + + if metadataPathRegex.FindStringSubmatch(op.Path) == nil { + sendError(rw, http.StatusBadRequest, errors.New("machine metadata path invalid")) + return + } + + if op.Operation != "remove" && len(op.Value) == 0 { + sendError(rw, http.StatusBadRequest, errors.New("invalid value: add and replace require a value")) + return + } + } + + for _, op := range ops { + // regex already validated above + s := metadataPathRegex.FindStringSubmatch(op.Path) + machID := s[1] + key := s[2] + + if op.Operation == "remove" { + err := mr.cAPI.DeleteMachineMetadata(machID, key) + if err != nil { + sendError(rw, http.StatusInternalServerError, err) + return + } + } else { + err := mr.cAPI.SetMachineMetadata(machID, key, op.Value) + if err != nil { + sendError(rw, http.StatusInternalServerError, err) + return + } + } + } + sendResponse(rw, http.StatusNoContent, nil) +} + func getMachinePage(cAPI client.API, tok PageToken) (*schema.MachinePage, error) { all, err := cAPI.Machines() if err != nil { diff --git a/api/machines_test.go b/api/machines_test.go index 349cda87c..c08e5b51f 100644 --- a/api/machines_test.go +++ b/api/machines_test.go @@ -19,6 +19,7 @@ import ( "net/http/httptest" "reflect" "strconv" + "strings" "testing" "github.com/coreos/fleet/client" @@ -26,15 +27,21 @@ import ( "github.com/coreos/fleet/registry" ) -func TestMachinesList(t *testing.T) { +func fakeMachinesSetup() (*machinesResource, *httptest.ResponseRecorder) { fr := registry.NewFakeRegistry() fr.SetMachines([]machine.MachineState{ - {ID: "XXX", PublicIP: "", Metadata: nil}, + {ID: "XXX", PublicIP: "", Metadata: map[string]string{}}, {ID: "YYY", PublicIP: "1.2.3.4", Metadata: map[string]string{"ping": "pong"}}, }) fAPI := &client.RegistryClient{Registry: fr} resource := &machinesResource{cAPI: fAPI, tokenLimit: testTokenLimit} rw := httptest.NewRecorder() + + return resource, rw +} + +func TestMachinesList(t *testing.T) { + resource, rw := fakeMachinesSetup() req, err := http.NewRequest("GET", "http://example.com", nil) if err != nil { t.Fatalf("Failed creating http.Request: %v", err) @@ -136,3 +143,126 @@ func TestExtractMachinePage(t *testing.T) { } } } + +func TestMachinesPatchAddModify(t *testing.T) { + reqBody := ` + [{"op": "add", "path": "/XXX/metadata/foo", "value": "bar"}, + {"op": "replace", "path": "/YYY/metadata/ping", "value": "splat"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusNoContent { + t.Errorf("Expected 204, got %d", rw.Code) + } + + // fetch machine to make sure data has been added + req, err = http.NewRequest("GET", "http://example.com/machines", nil) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + rw.Body.Reset() + resource.ServeHTTP(rw, req) + + if rw.Body == nil { + t.Error("Received nil response body") + } else { + body := rw.Body.String() + expected := `{"machines":[{"id":"XXX","metadata":{"foo":"bar"}},{"id":"YYY","metadata":{"ping":"splat"},"primaryIP":"1.2.3.4"}]}` + if body != expected { + t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body) + } + } +} + +func TestMachinesPatchDelete(t *testing.T) { + reqBody := ` + [{"op": "remove", "path": "/XXX/metadata/foo"}, + {"op": "remove", "path": "/YYY/metadata/ping"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusNoContent { + t.Errorf("Expected 204, got %d", rw.Code) + } + + // fetch machine to make sure data has been added + req, err = http.NewRequest("GET", "http://example.com/machines", nil) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + rw.Body.Reset() + resource.ServeHTTP(rw, req) + + if rw.Body == nil { + t.Error("Received nil response body") + } else { + body := rw.Body.String() + expected := `{"machines":[{"id":"XXX"},{"id":"YYY","primaryIP":"1.2.3.4"}]}` + if body != expected { + t.Errorf("Expected body:\n%s\n\nReceived body:\n%s\n", expected, body) + } + } +} + +func TestMachinesPatchBadOp(t *testing.T) { + reqBody := ` + [{"op": "noop", "path": "/XXX/metadata/foo", "value": "bar"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusBadRequest { + t.Errorf("Expected 400, got %d", rw.Code) + } +} + +func TestMachinesPatchBadPath(t *testing.T) { + reqBody := ` + [{"op": "add", "path": "/XXX/foo", "value": "bar"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusBadRequest { + t.Errorf("Expected 400, got %d", rw.Code) + } +} + +func TestMachinesPatchBadValue(t *testing.T) { + reqBody := ` + [{"op": "add", "path": "/XXX/foo"}] + ` + + resource, rw := fakeMachinesSetup() + req, err := http.NewRequest("PATCH", "http://example.com/machines", strings.NewReader(reqBody)) + if err != nil { + t.Fatalf("Failed creating http.Request: %v", err) + } + + resource.ServeHTTP(rw, req) + if rw.Code != http.StatusBadRequest { + t.Errorf("Expected 400, got %d", rw.Code) + } +} diff --git a/client/api.go b/client/api.go index 8d28d2ff3..aa6ff3dfd 100644 --- a/client/api.go +++ b/client/api.go @@ -21,6 +21,8 @@ import ( type API interface { Machines() ([]machine.MachineState, error) + SetMachineMetadata(machID, key, value string) error + DeleteMachineMetadata(machID, key string) error Unit(string) (*schema.Unit, error) Units() ([]*schema.Unit, error) diff --git a/registry/fake.go b/registry/fake.go index 52be56eed..7738ddd8a 100644 --- a/registry/fake.go +++ b/registry/fake.go @@ -314,6 +314,37 @@ func (f *FakeRegistry) UnitHeartbeat(name, machID string, ttl time.Duration) err func (f *FakeRegistry) ClearUnitHeartbeat(string) {} +func (f *FakeRegistry) SetMachineMetadata(machID string, key string, value string) error { + for _, mach := range f.machines { + if mach.ID == machID { + mach.Metadata[key] = value + } + } + return nil +} + +func (f *FakeRegistry) DeleteMachineMetadata(machID string, key string) error { + for _, mach := range f.machines { + if mach.ID == machID { + delete(mach.Metadata, key) + } + } + return nil +} + +func (f *FakeRegistry) MachineState(machID string) (machine.MachineState, error) { + f.RLock() + defer f.RUnlock() + + for _, mach := range f.machines { + if mach.ID == machID { + return mach, nil + } + } + + return machine.MachineState{}, errors.New("Machine state not found") +} + func NewFakeClusterRegistry(dVersion *semver.Version, eVersion int) *FakeClusterRegistry { return &FakeClusterRegistry{ dVersion: dVersion, diff --git a/registry/interface.go b/registry/interface.go index 70508efc4..58d1cd300 100644 --- a/registry/interface.go +++ b/registry/interface.go @@ -37,7 +37,10 @@ type Registry interface { ScheduleUnit(name, machID string) error SetUnitTargetState(name string, state job.JobState) error SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) + MachineState(machID string) (machine.MachineState, error) UnscheduleUnit(name, machID string) error + SetMachineMetadata(machID, key, value string) error + DeleteMachineMetadata(machID, key string) error IsRegistryReady() bool UseEtcdRegistry() bool diff --git a/registry/machine.go b/registry/machine.go index baaade63c..d62728a94 100644 --- a/registry/machine.go +++ b/registry/machine.go @@ -22,6 +22,7 @@ import ( "golang.org/x/net/context" "github.com/coreos/fleet/machine" + "path" ) const ( @@ -44,17 +45,13 @@ func (r *EtcdRegistry) Machines() (machines []machine.MachineState, err error) { } for _, node := range resp.Node.Nodes { - for _, obj := range node.Nodes { - if !strings.HasSuffix(obj.Key, "/object") { - continue - } - - var mach machine.MachineState - err = unmarshal(obj.Value, &mach) - if err != nil { - return - } + var mach machine.MachineState + mach, err = readMachineState(node) + if err != nil { + return + } + if mach.ID != "" { machines = append(machines, mach) } } @@ -109,6 +106,35 @@ func (r *EtcdRegistry) SetMachineState(ms machine.MachineState, ttl time.Duratio return resp.Node.ModifiedIndex, nil } +func (r *EtcdRegistry) MachineState(machID string) (machine.MachineState, error) { + key := path.Join(r.keyPrefix, machinePrefix, machID) + opts := &etcd.GetOptions{ + Recursive: true, + Sort: true, + } + + resp, err := r.kAPI.Get(context.Background(), key, opts) + if err != nil { + return machine.MachineState{}, err + } + + return readMachineState(resp.Node) +} + +func (r *EtcdRegistry) SetMachineMetadata(machID string, key string, value string) error { + key = path.Join(r.keyPrefix, machinePrefix, machID, "metadata", key) + opts := &etcd.SetOptions{} + _, err := r.kAPI.Set(context.Background(), key, value, opts) + return err +} + +func (r *EtcdRegistry) DeleteMachineMetadata(machID string, key string) error { + // Deleting a key sets its value to "" to allow for intelligent merging + // between the machine-defined metadata and the dynamic metadata. + // See mergeMetadata for more detail. + return r.SetMachineMetadata(machID, key, "") +} + func (r *EtcdRegistry) RemoveMachineState(machID string) error { key := r.prefixed(machinePrefix, machID, "object") _, err := r.kAPI.Delete(context.Background(), key, nil) @@ -117,3 +143,51 @@ func (r *EtcdRegistry) RemoveMachineState(machID string) error { } return err } + +// mergeMetadata merges the machine-set metadata with the dynamic metadata to better facilitate +// machines leaving and rejoining a cluster. +// Merging metadata uses the following rules: +// - Any keys that are only in one collection are added as-is +// - Any keys that exist in both, the dynamic value takes precence +// - Any keys that have a zero-value string in the dynamic metadata are considered deleted +// and are not included in the final collection +func mergeMetadata(machineMetadata, dynamicMetadata map[string]string) map[string]string { + if dynamicMetadata == nil { + return machineMetadata + } + finalMetadata := make(map[string]string, len(dynamicMetadata)) + for k, v := range machineMetadata { + finalMetadata[k] = v + } + for k, v := range dynamicMetadata { + if v == "" { + delete(finalMetadata, k) + } else { + finalMetadata[k] = v + } + } + return finalMetadata +} + +// readMachineState reads machine state from an etcd node +func readMachineState(node *etcd.Node) (mach machine.MachineState, err error) { + var metadata map[string]string + + for _, obj := range node.Nodes { + if strings.HasSuffix(obj.Key, "/object") { + err = unmarshal(obj.Value, &mach) + if err != nil { + return + } + } else if strings.HasSuffix(obj.Key, "/metadata") { + // Load metadata into a separate map to avoid stepping on it when deserializing the object key + metadata = make(map[string]string, len(obj.Nodes)) + for _, mdnode := range obj.Nodes { + metadata[path.Base(mdnode.Key)] = mdnode.Value + } + } + } + + mach.Metadata = mergeMetadata(mach.Metadata, metadata) + return +} From 5378558abc1690b37fd6fe7d28da95acbbf16d74 Mon Sep 17 00:00:00 2001 From: Damiano Albani Date: Mon, 29 Aug 2016 22:09:04 +0200 Subject: [PATCH 2/2] Implement RPC-related methods --- registry/interface.go | 4 ++-- registry/rpc/registrymux.go | 12 ++++++++++++ registry/rpc/rpcregistry.go | 12 ++++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/registry/interface.go b/registry/interface.go index 58d1cd300..ca40a05fd 100644 --- a/registry/interface.go +++ b/registry/interface.go @@ -39,8 +39,8 @@ type Registry interface { SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) MachineState(machID string) (machine.MachineState, error) UnscheduleUnit(name, machID string) error - SetMachineMetadata(machID, key, value string) error - DeleteMachineMetadata(machID, key string) error + SetMachineMetadata(machID string, key string, value string) error + DeleteMachineMetadata(machID string, key string) error IsRegistryReady() bool UseEtcdRegistry() bool diff --git a/registry/rpc/registrymux.go b/registry/rpc/registrymux.go index dc84635a5..156c89e11 100644 --- a/registry/rpc/registrymux.go +++ b/registry/rpc/registrymux.go @@ -290,6 +290,10 @@ func (r *RegistryMux) SetUnitTargetState(name string, state job.JobState) error return r.getRegistry().SetUnitTargetState(name, state) } +func (r *RegistryMux) MachineState(machID string) (machine.MachineState, error) { + return r.getRegistry().MachineState(machID) +} + func (r *RegistryMux) SetMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) { return r.etcdRegistry.SetMachineState(ms, ttl) } @@ -333,3 +337,11 @@ func (r *RegistryMux) EngineVersion() (int, error) { func (r *RegistryMux) UpdateEngineVersion(from int, to int) error { return r.etcdRegistry.UpdateEngineVersion(from, to) } + +func (r *RegistryMux) SetMachineMetadata(machID string, key string, value string) error { + return r.etcdRegistry.SetMachineMetadata(machID, key, value) +} + +func (r *RegistryMux) DeleteMachineMetadata(machID string, key string) error { + return r.etcdRegistry.DeleteMachineMetadata(machID, key) +} diff --git a/registry/rpc/rpcregistry.go b/registry/rpc/rpcregistry.go index fc05893b6..3938897fb 100644 --- a/registry/rpc/rpcregistry.go +++ b/registry/rpc/rpcregistry.go @@ -244,6 +244,14 @@ func (r *RPCRegistry) UnscheduleUnit(unitName, machID string) error { return err } +func (r *RPCRegistry) SetMachineMetadata(machID string, key string, value string) error { + panic("Set machine metadata function not implemented") +} + +func (r *RPCRegistry) DeleteMachineMetadata(machID string, key string) error { + panic("Delete machine metadata function not implemented") +} + func (r *RPCRegistry) Machines() ([]machine.MachineState, error) { panic("Machines function not implemented") } @@ -252,6 +260,10 @@ func (r *RPCRegistry) SetMachineState(ms machine.MachineState, ttl time.Duration panic("Set machine state function not implemented") } +func (r *RPCRegistry) MachineState(machID string) (machine.MachineState, error) { + panic("Machine state function not implemented") +} + func (r *RPCRegistry) CreateMachineState(ms machine.MachineState, ttl time.Duration) (uint64, error) { panic("Set machine state function not implemented") }