diff --git a/agent/consul/acl.go b/agent/consul/acl.go index 075940b62718..d438a5d8935b 100644 --- a/agent/consul/acl.go +++ b/agent/consul/acl.go @@ -1351,3 +1351,107 @@ func vetDeregisterWithACL(rule acl.Authorizer, subj *structs.DeregisterRequest, return nil } + +// vetNodeTxnOp applies the given ACL policy to a node transaction operation. +func vetNodeTxnOp(op *structs.TxnNodeOp, rule acl.Authorizer) error { + // Fast path if ACLs are not enabled. + if rule == nil { + return nil + } + + node := op.Node + + n := &api.Node{ + Node: node.Node, + ID: string(node.ID), + Address: node.Address, + Datacenter: node.Datacenter, + TaggedAddresses: node.TaggedAddresses, + Meta: node.Meta, + } + + // Sentinel doesn't apply to deletes, only creates/updates, so we don't need the scopeFn. + var scope func() map[string]interface{} + if op.Verb != api.NodeDelete && op.Verb != api.NodeDeleteCAS { + scope = func() map[string]interface{} { + return sentinel.ScopeCatalogUpsert(n, nil) + } + } + + if rule != nil && !rule.NodeWrite(node.Node, scope) { + return acl.ErrPermissionDenied + } + + return nil +} + +// vetServiceTxnOp applies the given ACL policy to a service transaction operation. +func vetServiceTxnOp(op *structs.TxnServiceOp, rule acl.Authorizer) error { + // Fast path if ACLs are not enabled. + if rule == nil { + return nil + } + + service := op.Service + + n := &api.Node{Node: op.Node} + svc := &api.AgentService{ + ID: service.ID, + Service: service.Service, + Tags: service.Tags, + Meta: service.Meta, + Address: service.Address, + Port: service.Port, + EnableTagOverride: service.EnableTagOverride, + } + var scope func() map[string]interface{} + if op.Verb != api.ServiceDelete && op.Verb != api.ServiceDeleteCAS { + scope = func() map[string]interface{} { + return sentinel.ScopeCatalogUpsert(n, svc) + } + } + if !rule.ServiceWrite(service.Service, scope) { + return acl.ErrPermissionDenied + } + + return nil +} + +// vetCheckTxnOp applies the given ACL policy to a check transaction operation. +func vetCheckTxnOp(op *structs.TxnCheckOp, rule acl.Authorizer) error { + // Fast path if ACLs are not enabled. + if rule == nil { + return nil + } + + n := &api.Node{Node: op.Check.Node} + svc := &api.AgentService{ + ID: op.Check.ServiceID, + Service: op.Check.ServiceID, + Tags: op.Check.ServiceTags, + } + var scope func() map[string]interface{} + if op.Check.ServiceID == "" { + // Node-level check. + if op.Verb == api.CheckDelete || op.Verb == api.CheckDeleteCAS { + scope = func() map[string]interface{} { + return sentinel.ScopeCatalogUpsert(n, svc) + } + } + if !rule.NodeWrite(op.Check.Node, scope) { + return acl.ErrPermissionDenied + } + } else { + // Service-level check. + if op.Verb == api.CheckDelete || op.Verb == api.CheckDeleteCAS { + scope = func() map[string]interface{} { + return sentinel.ScopeCatalogUpsert(n, svc) + } + } + if !rule.ServiceWrite(op.Check.ServiceName, scope) { + return acl.ErrPermissionDenied + } + } + + return nil +} diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index da13abf6cb74..7ef2defb072c 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -20,74 +20,98 @@ type Catalog struct { srv *Server } -// Register is used register that a node is providing a given service. -func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Register", args, args, reply); done { +// nodePreApply does the verification of a node before it is applied to Raft. +func nodePreApply(nodeName, nodeID string) error { + if nodeName == "" { + return fmt.Errorf("Must provide node") + } + if nodeID != "" { + if _, err := uuid.ParseUUID(nodeID); err != nil { + return fmt.Errorf("Bad node ID: %v", err) + } + } + + return nil +} + +func servicePreApply(service *structs.NodeService, rule acl.Authorizer) error { + // Validate the service. This is in addition to the below since + // the above just hasn't been moved over yet. We should move it over + // in time. + if err := service.Validate(); err != nil { return err } - defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now()) - // Verify the args. - if args.Node == "" { - return fmt.Errorf("Must provide node") + // If no service id, but service name, use default + if service.ID == "" && service.Service != "" { + service.ID = service.Service } - if args.Address == "" && !args.SkipNodeUpdate { - return fmt.Errorf("Must provide address if SkipNodeUpdate is not set") + + // Verify ServiceName provided if ID. + if service.ID != "" && service.Service == "" { + return fmt.Errorf("Must provide service name with ID") } - if args.ID != "" { - if _, err := uuid.ParseUUID(string(args.ID)); err != nil { - return fmt.Errorf("Bad node ID: %v", err) + + // Check the service address here and in the agent endpoint + // since service registration isn't synchronous. + if ipaddr.IsAny(service.Address) { + return fmt.Errorf("Invalid service address") + } + + // Apply the ACL policy if any. The 'consul' service is excluded + // since it is managed automatically internally (that behavior + // is going away after version 0.8). We check this same policy + // later if version 0.8 is enabled, so we can eventually just + // delete this and do all the ACL checks down there. + if service.Service != structs.ConsulServiceName { + if rule != nil && !rule.ServiceWrite(service.Service, nil) { + return acl.ErrPermissionDenied } } + // Proxies must have write permission on their destination + if service.Kind == structs.ServiceKindConnectProxy { + if rule != nil && !rule.ServiceWrite(service.Proxy.DestinationServiceName, nil) { + return acl.ErrPermissionDenied + } + } + + return nil +} + +// checkPreApply does the verification of a check before it is applied to Raft. +func checkPreApply(check *structs.HealthCheck) { + if check.CheckID == "" && check.Name != "" { + check.CheckID = types.CheckID(check.Name) + } +} + +// Register is used register that a node is providing a given service. +func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { + if done, err := c.srv.forward("Catalog.Register", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now()) + // Fetch the ACL token, if any. rule, err := c.srv.ResolveToken(args.Token) if err != nil { return err } + // Verify the args. + if err := nodePreApply(args.Node, string(args.ID)); err != nil { + return err + } + if args.Address == "" && !args.SkipNodeUpdate { + return fmt.Errorf("Must provide address if SkipNodeUpdate is not set") + } + // Handle a service registration. if args.Service != nil { - // Validate the service. This is in addition to the below since - // the above just hasn't been moved over yet. We should move it over - // in time. - if err := args.Service.Validate(); err != nil { + if err := servicePreApply(args.Service, rule); err != nil { return err } - - // If no service id, but service name, use default - if args.Service.ID == "" && args.Service.Service != "" { - args.Service.ID = args.Service.Service - } - - // Verify ServiceName provided if ID. - if args.Service.ID != "" && args.Service.Service == "" { - return fmt.Errorf("Must provide service name with ID") - } - - // Check the service address here and in the agent endpoint - // since service registration isn't synchronous. - if ipaddr.IsAny(args.Service.Address) { - return fmt.Errorf("Invalid service address") - } - - // Apply the ACL policy if any. The 'consul' service is excluded - // since it is managed automatically internally (that behavior - // is going away after version 0.8). We check this same policy - // later if version 0.8 is enabled, so we can eventually just - // delete this and do all the ACL checks down there. - if args.Service.Service != structs.ConsulServiceName { - if rule != nil && !rule.ServiceWrite(args.Service.Service, nil) { - return acl.ErrPermissionDenied - } - } - - // Proxies must have write permission on their destination - if args.Service.Kind == structs.ServiceKindConnectProxy { - if rule != nil && !rule.ServiceWrite(args.Service.Proxy.DestinationServiceName, nil) { - return acl.ErrPermissionDenied - } - } } // Move the old format single check into the slice, and fixup IDs. @@ -96,12 +120,10 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error args.Check = nil } for _, check := range args.Checks { - if check.CheckID == "" && check.Name != "" { - check.CheckID = types.CheckID(check.Name) - } if check.Node == "" { check.Node = args.Node } + checkPreApply(check) } // Check the complete register request against the given ACL policy. diff --git a/agent/consul/filter.go b/agent/consul/filter.go index ea4d938dcb76..68b3238ac8dd 100644 --- a/agent/consul/filter.go +++ b/agent/consul/filter.go @@ -61,8 +61,18 @@ func (t *txnResultsFilter) Len() int { func (t *txnResultsFilter) Filter(i int) bool { result := t.results[i] - if result.KV != nil { + switch { + case result.KV != nil: return !t.authorizer.KeyRead(result.KV.Key) + case result.Node != nil: + return !t.authorizer.NodeRead(result.Node.Node) + case result.Service != nil: + return !t.authorizer.ServiceRead(result.Service.Service) + case result.Check != nil: + if result.Check.ServiceName != "" { + return !t.authorizer.ServiceRead(result.Check.ServiceName) + } + return !t.authorizer.NodeRead(result.Check.Node) } return false } diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 162eed79cf66..b1c38768a7ad 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -373,6 +373,35 @@ func (s *Store) ensureNoNodeWithSimilarNameTxn(tx *memdb.Txn, node *structs.Node return nil } +// ensureNodeCASTxn updates a node only if the existing index matches the given index. +// Returns a bool indicating if a write happened and any error. +func (s *Store) ensureNodeCASTxn(tx *memdb.Txn, idx uint64, node *structs.Node) (bool, error) { + // Retrieve the existing entry. + existing, err := getNodeTxn(tx, node.Node) + if err != nil { + return false, err + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if node.ModifyIndex == 0 && existing != nil { + return false, nil + } + if node.ModifyIndex != 0 && existing == nil { + return false, nil + } + if existing != nil && node.ModifyIndex != 0 && node.ModifyIndex != existing.ModifyIndex { + return false, nil + } + + // Perform the update. + if err := s.ensureNodeTxn(tx, idx, node); err != nil { + return false, err + } + + return true, nil +} + // ensureNodeTxn is the inner function called to actually create a node // registration or modify an existing one in the state store. It allows // passing in a memdb transaction so it may be part of a larger txn. @@ -461,14 +490,22 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) { idx := maxIndexTxn(tx, "nodes") // Retrieve the node from the state store - node, err := tx.First("nodes", "id", id) + node, err := getNodeTxn(tx, id) if err != nil { return 0, nil, fmt.Errorf("node lookup failed: %s", err) } + return idx, node, nil +} + +func getNodeTxn(tx *memdb.Txn, nodeName string) (*structs.Node, error) { + node, err := tx.First("nodes", "id", nodeName) + if err != nil { + return nil, fmt.Errorf("node lookup failed: %s", err) + } if node != nil { - return idx, node.(*structs.Node), nil + return node.(*structs.Node), nil } - return idx, nil, nil + return nil, nil } func getNodeIDTxn(tx *memdb.Txn, id types.NodeID) (*structs.Node, error) { @@ -569,6 +606,34 @@ func (s *Store) DeleteNode(idx uint64, nodeName string) error { return nil } +// deleteNodeCASTxn is used to try doing a node delete operation with a given +// raft index. If the CAS index specified is not equal to the last observed index for +// the given check, then the call is a noop, otherwise a normal check delete is invoked. +func (s *Store) deleteNodeCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName string) (bool, error) { + // Look up the node. + node, err := getNodeTxn(tx, nodeName) + if err != nil { + return false, err + } + if node == nil { + return false, nil + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + if node.ModifyIndex != cidx { + return false, nil + } + + // Call the actual deletion if the above passed. + if err := s.deleteNodeTxn(tx, idx, nodeName); err != nil { + return false, err + } + + return true, nil +} + // deleteNodeTxn is the inner method used for removing a node from // the store within a given transaction. func (s *Store) deleteNodeTxn(tx *memdb.Txn, idx uint64, nodeName string) error { @@ -676,6 +741,36 @@ func (s *Store) EnsureService(idx uint64, node string, svc *structs.NodeService) return nil } +// ensureServiceCASTxn updates a service only if the existing index matches the given index. +// Returns a bool indicating if a write happened and any error. +func (s *Store) ensureServiceCASTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) (bool, error) { + // Retrieve the existing service. + existing, err := tx.First("services", "id", node, svc.ID) + if err != nil { + return false, fmt.Errorf("failed service lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if svc.ModifyIndex == 0 && existing != nil { + return false, nil + } + if svc.ModifyIndex != 0 && existing == nil { + return false, nil + } + e, ok := existing.(*structs.Node) + if ok && svc.ModifyIndex != 0 && svc.ModifyIndex != e.ModifyIndex { + return false, nil + } + + // Perform the update. + if err := s.ensureServiceTxn(tx, idx, node, svc); err != nil { + return false, err + } + + return true, nil +} + // ensureServiceTxn is used to upsert a service registration within an // existing memdb transaction. func (s *Store) ensureServiceTxn(tx *memdb.Txn, idx uint64, node string, svc *structs.NodeService) error { @@ -1052,15 +1147,26 @@ func (s *Store) NodeService(nodeName string, serviceID string) (uint64, *structs idx := maxIndexTxn(tx, "services") // Query the service - service, err := tx.First("services", "id", nodeName, serviceID) + service, err := s.getNodeServiceTxn(tx, nodeName, serviceID) if err != nil { return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) } + return idx, service, nil +} + +func (s *Store) getNodeServiceTxn(tx *memdb.Txn, nodeName, serviceID string) (*structs.NodeService, error) { + // Query the service + service, err := tx.First("services", "id", nodeName, serviceID) + if err != nil { + return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err) + } + if service != nil { - return idx, service.(*structs.ServiceNode).ToNodeService(), nil + return service.(*structs.ServiceNode).ToNodeService(), nil } - return idx, nil, nil + + return nil, nil } // NodeServices is used to query service registrations by node name or UUID. @@ -1155,6 +1261,34 @@ func serviceIndexName(name string) string { return fmt.Sprintf("service.%s", name) } +// deleteServiceCASTxn is used to try doing a service delete operation with a given +// raft index. If the CAS index specified is not equal to the last observed index for +// the given service, then the call is a noop, otherwise a normal delete is invoked. +func (s *Store) deleteServiceCASTxn(tx *memdb.Txn, idx, cidx uint64, nodeName, serviceID string) (bool, error) { + // Look up the service. + service, err := s.getNodeServiceTxn(tx, nodeName, serviceID) + if err != nil { + return false, fmt.Errorf("service lookup failed: %s", err) + } + if service == nil { + return false, nil + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + if service.ModifyIndex != cidx { + return false, nil + } + + // Call the actual deletion if the above passed. + if err := s.deleteServiceTxn(tx, idx, nodeName, serviceID); err != nil { + return false, err + } + + return true, nil +} + // deleteServiceTxn is the inner method called to remove a service // registration within an existing transaction. func (s *Store) deleteServiceTxn(tx *memdb.Txn, idx uint64, nodeName, serviceID string) error { @@ -1250,6 +1384,35 @@ func (s *Store) updateAllServiceIndexesOfNode(tx *memdb.Txn, idx uint64, nodeID return nil } +// ensureCheckCASTxn updates a check only if the existing index matches the given index. +// Returns a bool indicating if a write happened and any error. +func (s *Store) ensureCheckCASTxn(tx *memdb.Txn, idx uint64, hc *structs.HealthCheck) (bool, error) { + // Retrieve the existing entry. + _, existing, err := s.getNodeCheckTxn(tx, hc.Node, hc.CheckID) + if err != nil { + return false, fmt.Errorf("failed health check lookup: %s", err) + } + + // Check if the we should do the set. A ModifyIndex of 0 means that + // we are doing a set-if-not-exists. + if hc.ModifyIndex == 0 && existing != nil { + return false, nil + } + if hc.ModifyIndex != 0 && existing == nil { + return false, nil + } + if existing != nil && hc.ModifyIndex != 0 && hc.ModifyIndex != existing.ModifyIndex { + return false, nil + } + + // Perform the update. + if err := s.ensureCheckTxn(tx, idx, hc); err != nil { + return false, err + } + + return true, nil +} + // ensureCheckTransaction is used as the inner method to handle inserting // a health check into the state store. It ensures safety against inserting // checks with no matching node or service. @@ -1366,6 +1529,12 @@ func (s *Store) NodeCheck(nodeName string, checkID types.CheckID) (uint64, *stru tx := s.db.Txn(false) defer tx.Abort() + return s.getNodeCheckTxn(tx, nodeName, checkID) +} + +// nodeCheckTxn is used as the inner method to handle reading a health check +// from the state store. +func (s *Store) getNodeCheckTxn(tx *memdb.Txn, nodeName string, checkID types.CheckID) (uint64, *structs.HealthCheck, error) { // Get the table index. idx := maxIndexTxn(tx, "checks") @@ -1555,6 +1724,34 @@ func (s *Store) DeleteCheck(idx uint64, node string, checkID types.CheckID) erro return nil } +// deleteCheckCASTxn is used to try doing a check delete operation with a given +// raft index. If the CAS index specified is not equal to the last observed index for +// the given check, then the call is a noop, otherwise a normal check delete is invoked. +func (s *Store) deleteCheckCASTxn(tx *memdb.Txn, idx, cidx uint64, node string, checkID types.CheckID) (bool, error) { + // Try to retrieve the existing health check. + _, hc, err := s.getNodeCheckTxn(tx, node, checkID) + if err != nil { + return false, fmt.Errorf("check lookup failed: %s", err) + } + if hc == nil { + return false, nil + } + + // If the existing index does not match the provided CAS + // index arg, then we shouldn't update anything and can safely + // return early here. + if hc.ModifyIndex != cidx { + return false, nil + } + + // Call the actual deletion if the above passed. + if err := s.deleteCheckTxn(tx, idx, node, checkID); err != nil { + return false, err + } + + return true, nil +} + // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. func (s *Store) deleteCheckTxn(tx *memdb.Txn, idx uint64, node string, checkID types.CheckID) error { diff --git a/agent/consul/state/txn.go b/agent/consul/state/txn.go index c90f216343c3..941cae411801 100644 --- a/agent/consul/state/txn.go +++ b/agent/consul/state/txn.go @@ -118,10 +118,200 @@ func (s *Store) txnIntention(tx *memdb.Txn, idx uint64, op *structs.TxnIntention case structs.IntentionOpDelete: return s.intentionDeleteTxn(tx, idx, op.Intention.ID) default: - return fmt.Errorf("unknown Intention verb %q", op.Op) + return fmt.Errorf("unknown Intention op %q", op.Op) } } +// txnNode handles all Node-related operations. +func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (structs.TxnResults, error) { + var entry *structs.Node + var err error + + getNode := func() (*structs.Node, error) { + if op.Node.ID != "" { + return getNodeIDTxn(tx, op.Node.ID) + } else { + return getNodeTxn(tx, op.Node.Node) + } + } + + switch op.Verb { + case api.NodeGet: + entry, err = getNode() + if entry == nil && err == nil { + err = fmt.Errorf("node %q doesn't exist", op.Node.Node) + } + + case api.NodeSet: + err = s.ensureNodeTxn(tx, idx, &op.Node) + if err == nil { + entry, err = getNode() + } + + case api.NodeCAS: + var ok bool + ok, err = s.ensureNodeCASTxn(tx, idx, &op.Node) + if !ok && err == nil { + err = fmt.Errorf("failed to set node %q, index is stale", op.Node.Node) + break + } + entry, err = getNode() + + case api.NodeDelete: + err = s.deleteNodeTxn(tx, idx, op.Node.Node) + + case api.NodeDeleteCAS: + var ok bool + ok, err = s.deleteNodeCASTxn(tx, idx, op.Node.ModifyIndex, op.Node.Node) + if !ok && err == nil { + err = fmt.Errorf("failed to delete node %q, index is stale", op.Node.Node) + } + + default: + err = fmt.Errorf("unknown Node verb %q", op.Verb) + } + if err != nil { + return nil, err + } + + // For a GET we keep the value, otherwise we clone and blank out the + // value (we have to clone so we don't modify the entry being used by + // the state store). + if entry != nil { + if op.Verb == api.NodeGet { + result := structs.TxnResult{Node: entry} + return structs.TxnResults{&result}, nil + } + + clone := *entry + result := structs.TxnResult{Node: &clone} + return structs.TxnResults{&result}, nil + } + + return nil, nil +} + +// txnService handles all Service-related operations. +func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp) (structs.TxnResults, error) { + var entry *structs.NodeService + var err error + + switch op.Verb { + case api.ServiceGet: + entry, err = s.getNodeServiceTxn(tx, op.Node, op.Service.ID) + if entry == nil && err == nil { + err = fmt.Errorf("service %q on node %q doesn't exist", op.Service.ID, op.Node) + } + + case api.ServiceSet: + err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service) + entry, err = s.getNodeServiceTxn(tx, op.Node, op.Service.ID) + + case api.ServiceCAS: + var ok bool + ok, err = s.ensureServiceCASTxn(tx, idx, op.Node, &op.Service) + if !ok && err == nil { + err = fmt.Errorf("failed to set service %q on node %q, index is stale", op.Service.ID, op.Node) + break + } + entry, err = s.getNodeServiceTxn(tx, op.Node, op.Service.ID) + + case api.ServiceDelete: + err = s.deleteServiceTxn(tx, idx, op.Node, op.Service.ID) + + case api.ServiceDeleteCAS: + var ok bool + ok, err = s.deleteServiceCASTxn(tx, idx, op.Service.ModifyIndex, op.Node, op.Service.ID) + if !ok && err == nil { + err = fmt.Errorf("failed to delete service %q on node %q, index is stale", op.Service.ID, op.Node) + } + + default: + err = fmt.Errorf("unknown Service verb %q", op.Verb) + } + if err != nil { + return nil, err + } + + // For a GET we keep the value, otherwise we clone and blank out the + // value (we have to clone so we don't modify the entry being used by + // the state store). + if entry != nil { + if op.Verb == api.ServiceGet { + result := structs.TxnResult{Service: entry} + return structs.TxnResults{&result}, nil + } + + clone := *entry + result := structs.TxnResult{Service: &clone} + return structs.TxnResults{&result}, nil + } + + return nil, nil +} + +// txnCheck handles all Check-related operations. +func (s *Store) txnCheck(tx *memdb.Txn, idx uint64, op *structs.TxnCheckOp) (structs.TxnResults, error) { + var entry *structs.HealthCheck + var err error + + switch op.Verb { + case api.CheckGet: + _, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID) + if entry == nil && err == nil { + err = fmt.Errorf("check %q on node %q doesn't exist", op.Check.CheckID, op.Check.Node) + } + + case api.CheckSet: + err = s.ensureCheckTxn(tx, idx, &op.Check) + if err == nil { + _, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID) + } + + case api.CheckCAS: + var ok bool + entry = &op.Check + ok, err = s.ensureCheckCASTxn(tx, idx, entry) + if !ok && err == nil { + err = fmt.Errorf("failed to set check %q on node %q, index is stale", entry.CheckID, entry.Node) + break + } + _, entry, err = s.getNodeCheckTxn(tx, op.Check.Node, op.Check.CheckID) + + case api.CheckDelete: + err = s.deleteCheckTxn(tx, idx, op.Check.Node, op.Check.CheckID) + + case api.CheckDeleteCAS: + var ok bool + ok, err = s.deleteCheckCASTxn(tx, idx, op.Check.ModifyIndex, op.Check.Node, op.Check.CheckID) + if !ok && err == nil { + err = fmt.Errorf("failed to delete check %q on node %q, index is stale", op.Check.CheckID, op.Check.Node) + } + + default: + err = fmt.Errorf("unknown Check verb %q", op.Verb) + } + if err != nil { + return nil, err + } + + // For a GET we keep the value, otherwise we clone and blank out the + // value (we have to clone so we don't modify the entry being used by + // the state store). + if entry != nil { + if op.Verb == api.CheckGet { + result := structs.TxnResult{Check: entry} + return structs.TxnResults{&result}, nil + } + + clone := entry.Clone() + result := structs.TxnResult{Check: clone} + return structs.TxnResults{&result}, nil + } + + return nil, nil +} + // txnDispatch runs the given operations inside the state store transaction. func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) { results := make(structs.TxnResults, 0, len(ops)) @@ -136,6 +326,12 @@ func (s *Store) txnDispatch(tx *memdb.Txn, idx uint64, ops structs.TxnOps) (stru ret, err = s.txnKVS(tx, idx, op.KV) case op.Intention != nil: err = s.txnIntention(tx, idx, op.Intention) + case op.Node != nil: + ret, err = s.txnNode(tx, idx, op.Node) + case op.Service != nil: + ret, err = s.txnService(tx, idx, op.Service) + case op.Check != nil: + ret, err = s.txnCheck(tx, idx, op.Check) default: err = fmt.Errorf("no operation specified") } diff --git a/agent/consul/state/txn_test.go b/agent/consul/state/txn_test.go index 5e89e8bad080..c21e3a0ea9f7 100644 --- a/agent/consul/state/txn_test.go +++ b/agent/consul/state/txn_test.go @@ -1,12 +1,14 @@ package state import ( + "fmt" "reflect" "strings" "testing" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" "github.com/pascaldekloe/goe/verify" "github.com/stretchr/testify/require" ) @@ -116,6 +118,384 @@ func TestStateStore_Txn_Intention(t *testing.T) { verify.Values(t, "", actual, intentions) } +func TestStateStore_Txn_Node(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + // Create some nodes. + var nodes [5]structs.Node + for i := 0; i < len(nodes); i++ { + nodes[i] = structs.Node{ + Node: fmt.Sprintf("node%d", i+1), + ID: types.NodeID(testUUID()), + } + + // Leave node5 to be created by an operation. + if i < 5 { + s.EnsureNode(uint64(i+1), &nodes[i]) + } + } + + // Set up a transaction that hits every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeGet, + Node: nodes[0], + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeSet, + Node: nodes[4], + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeCAS, + Node: structs.Node{ + Node: "node2", + ID: nodes[1].ID, + Datacenter: "dc2", + RaftIndex: structs.RaftIndex{ModifyIndex: 2}, + }, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDelete, + Node: structs.Node{Node: "node3"}, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDeleteCAS, + Node: structs.Node{ + Node: "node4", + RaftIndex: structs.RaftIndex{ModifyIndex: 4}, + }, + }, + }, + } + results, errors := s.TxnRW(8, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + + // Make sure the response looks as expected. + nodes[1].Datacenter = "dc2" + nodes[1].ModifyIndex = 8 + expected := structs.TxnResults{ + &structs.TxnResult{ + Node: &nodes[0], + }, + &structs.TxnResult{ + Node: &nodes[4], + }, + &structs.TxnResult{ + Node: &nodes[1], + }, + } + verify.Values(t, "", results, expected) + + // Pull the resulting state store contents. + idx, actual, err := s.Nodes(nil) + require.NoError(err) + if idx != 8 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + expectedNodes := structs.Nodes{&nodes[0], &nodes[1], &nodes[4]} + verify.Values(t, "", actual, expectedNodes) +} + +func TestStateStore_Txn_Service(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + testRegisterNode(t, s, 1, "node1") + + // Create some services. + for i := 1; i <= 4; i++ { + testRegisterService(t, s, uint64(i+1), "node1", fmt.Sprintf("svc%d", i)) + } + + // Set up a transaction that hits every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceGet, + Node: "node1", + Service: structs.NodeService{ID: "svc1"}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceSet, + Node: "node1", + Service: structs.NodeService{ID: "svc5"}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceCAS, + Node: "node1", + Service: structs.NodeService{ + ID: "svc2", + Tags: []string{"modified"}, + RaftIndex: structs.RaftIndex{ModifyIndex: 3}, + }, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceDelete, + Node: "node1", + Service: structs.NodeService{ID: "svc3"}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceDeleteCAS, + Node: "node1", + Service: structs.NodeService{ + ID: "svc4", + RaftIndex: structs.RaftIndex{ModifyIndex: 5}, + }, + }, + }, + } + results, errors := s.TxnRW(6, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + + // Make sure the response looks as expected. + expected := structs.TxnResults{ + &structs.TxnResult{ + Service: &structs.NodeService{ + ID: "svc1", + Service: "svc1", + Address: "1.1.1.1", + Port: 1111, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + }, + &structs.TxnResult{ + Service: &structs.NodeService{ + ID: "svc5", + Weights: &structs.Weights{Passing: 1, Warning: 1}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + }, + }, + &structs.TxnResult{ + Service: &structs.NodeService{ + ID: "svc2", + Tags: []string{"modified"}, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + }, + }, + } + verify.Values(t, "", results, expected) + + // Pull the resulting state store contents. + idx, actual, err := s.NodeServices(nil, "node1") + require.NoError(err) + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + expectedServices := &structs.NodeServices{ + Node: &structs.Node{ + Node: "node1", + RaftIndex: structs.RaftIndex{ + CreateIndex: 1, + ModifyIndex: 1, + }, + }, + Services: map[string]*structs.NodeService{ + "svc1": &structs.NodeService{ + ID: "svc1", + Service: "svc1", + Address: "1.1.1.1", + Port: 1111, + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + "svc5": &structs.NodeService{ + ID: "svc5", + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + "svc2": &structs.NodeService{ + ID: "svc2", + Tags: []string{"modified"}, + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + Weights: &structs.Weights{Passing: 1, Warning: 1}, + }, + }, + } + verify.Values(t, "", actual, expectedServices) +} + +func TestStateStore_Txn_Checks(t *testing.T) { + require := require.New(t) + s := testStateStore(t) + + testRegisterNode(t, s, 1, "node1") + + // Create some checks. + for i := 1; i <= 4; i++ { + testRegisterCheck(t, s, uint64(i+1), "node1", "", types.CheckID(fmt.Sprintf("check%d", i)), "failing") + } + + // Set up a transaction that hits every operation. + ops := structs.TxnOps{ + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckGet, + Check: structs.HealthCheck{Node: "node1", CheckID: "check1"}, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckSet, + Check: structs.HealthCheck{Node: "node1", CheckID: "check5", Status: "passing"}, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckCAS, + Check: structs.HealthCheck{ + Node: "node1", + CheckID: "check2", + Status: "warning", + RaftIndex: structs.RaftIndex{ModifyIndex: 3}, + }, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckDelete, + Check: structs.HealthCheck{Node: "node1", CheckID: "check3"}, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckDeleteCAS, + Check: structs.HealthCheck{ + Node: "node1", + CheckID: "check4", + RaftIndex: structs.RaftIndex{ModifyIndex: 5}, + }, + }, + }, + } + results, errors := s.TxnRW(6, ops) + if len(errors) > 0 { + t.Fatalf("err: %v", errors) + } + + // Make sure the response looks as expected. + expected := structs.TxnResults{ + &structs.TxnResult{ + Check: &structs.HealthCheck{ + Node: "node1", + CheckID: "check1", + Status: "failing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + }, + &structs.TxnResult{ + Check: &structs.HealthCheck{ + Node: "node1", + CheckID: "check5", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + }, + }, + &structs.TxnResult{ + Check: &structs.HealthCheck{ + Node: "node1", + CheckID: "check2", + Status: "warning", + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + }, + }, + } + verify.Values(t, "", results, expected) + + // Pull the resulting state store contents. + idx, actual, err := s.NodeChecks(nil, "node1") + require.NoError(err) + if idx != 6 { + t.Fatalf("bad index: %d", idx) + } + + // Make sure it looks as expected. + expectedChecks := structs.HealthChecks{ + &structs.HealthCheck{ + Node: "node1", + CheckID: "check1", + Status: "failing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 2, + ModifyIndex: 2, + }, + }, + &structs.HealthCheck{ + Node: "node1", + CheckID: "check2", + Status: "warning", + RaftIndex: structs.RaftIndex{ + CreateIndex: 3, + ModifyIndex: 6, + }, + }, + &structs.HealthCheck{ + Node: "node1", + CheckID: "check5", + Status: "passing", + RaftIndex: structs.RaftIndex{ + CreateIndex: 6, + ModifyIndex: 6, + }, + }, + } + verify.Values(t, "", actual, expectedChecks) +} + func TestStateStore_Txn_KVS(t *testing.T) { s := testStateStore(t) diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index 236d4da9b0f1..ac27dd348daf 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -7,6 +7,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" ) // Txn endpoint is used to perform multi-object atomic transactions. @@ -21,7 +22,8 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx // Perform the pre-apply checks for any KV operations. for i, op := range ops { - if op.KV != nil { + switch { + case op.KV != nil: ok, err := kvsPreApply(t.srv, authorizer, op.KV.Verb, &op.KV.DirEnt) if err != nil { errors = append(errors, &structs.TxnError{ @@ -35,6 +37,65 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx What: err.Error(), }) } + case op.Node != nil: + // Skip the pre-apply checks if this is a GET. + if op.Node.Verb == api.NodeGet { + break + } + + node := op.Node.Node + if err := nodePreApply(node.Node, string(node.ID)); err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + break + } + + // Check that the token has permissions for the given operation. + if err := vetNodeTxnOp(op.Node, authorizer); err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + } + case op.Service != nil: + // Skip the pre-apply checks if this is a GET. + if op.Service.Verb == api.ServiceGet { + break + } + + service := &op.Service.Service + if err := servicePreApply(service, nil); err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + break + } + + // Check that the token has permissions for the given operation. + if err := vetServiceTxnOp(op.Service, authorizer); err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + } + case op.Check != nil: + // Skip the pre-apply checks if this is a GET. + if op.Check.Verb == api.CheckGet { + break + } + + checkPreApply(&op.Check.Check) + + // Check that the token has permissions for the given operation. + if err := vetCheckTxnOp(op.Check, authorizer); err != nil { + errors = append(errors, &structs.TxnError{ + OpIndex: i, + What: err.Error(), + }) + } } } diff --git a/agent/consul/txn_endpoint_test.go b/agent/consul/txn_endpoint_test.go index c1447cf99d2e..5cbfb56f2fbd 100644 --- a/agent/consul/txn_endpoint_test.go +++ b/agent/consul/txn_endpoint_test.go @@ -12,9 +12,49 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/types" "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/pascaldekloe/goe/verify" + "github.com/stretchr/testify/require" ) +var testTxnRules = ` +key "" { + policy = "deny" +} +key "foo" { + policy = "read" +} +key "test" { + policy = "write" +} +key "test/priv" { + policy = "read" +} + +service "" { + policy = "deny" +} +service "foo-svc" { + policy = "read" +} +service "test-svc" { + policy = "write" +} + +node "" { + policy = "deny" +} +node "foo-node" { + policy = "read" +} +node "test-node" { + policy = "write" +} +` + +var testNodeID = "9749a7df-fac5-46b4-8078-32a3d96c59f3" + func TestTxn_CheckNotExists(t *testing.T) { t.Parallel() dir1, s1 := testServer(t) @@ -101,12 +141,76 @@ func TestTxn_Apply(t *testing.T) { }, }, }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeSet, + Node: structs.Node{ + ID: types.NodeID(testNodeID), + Node: "foo", + Address: "127.0.0.1", + }, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeGet, + Node: structs.Node{ + ID: types.NodeID(testNodeID), + Node: "foo", + }, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceSet, + Node: "foo", + Service: structs.NodeService{ + ID: "svc-foo", + Service: "svc-foo", + Address: "1.1.1.1", + }, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceGet, + Node: "foo", + Service: structs.NodeService{ + ID: "svc-foo", + Service: "svc-foo", + }, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckSet, + Check: structs.HealthCheck{ + Node: "foo", + CheckID: types.CheckID("check-foo"), + Name: "test", + Status: "passing", + }, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckGet, + Check: structs.HealthCheck{ + Node: "foo", + CheckID: types.CheckID("check-foo"), + Name: "test", + }, + }, + }, }, } var out structs.TxnResponse if err := msgpackrpc.CallWithCodec(codec, "Txn.Apply", &arg, &out); err != nil { t.Fatalf("err: %v", err) } + if len(out.Errors) != 0 { + t.Fatalf("errs: %v", out.Errors) + } // Verify the state store directly. state := s1.fsm.State() @@ -122,6 +226,30 @@ func TestTxn_Apply(t *testing.T) { t.Fatalf("bad: %v", d) } + _, n, err := state.GetNode("foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if n.Node != "foo" || n.Address != "127.0.0.1" { + t.Fatalf("bad: %v", err) + } + + _, s, err := state.NodeService("foo", "svc-foo") + if err != nil { + t.Fatalf("err: %v", err) + } + if s.ID != "svc-foo" || s.Address != "1.1.1.1" { + t.Fatalf("bad: %v", err) + } + + _, c, err := state.NodeCheck("foo", types.CheckID("check-foo")) + if err != nil { + t.Fatalf("err: %v", err) + } + if c.CheckID != "check-foo" || c.Status != "passing" || c.Name != "test" { + t.Fatalf("bad: %v", err) + } + // Verify the transaction's return value. expected := structs.TxnResponse{ Results: structs.TxnResults{ @@ -147,15 +275,34 @@ func TestTxn_Apply(t *testing.T) { }, }, }, + &structs.TxnResult{ + Node: n, + }, + &structs.TxnResult{ + Node: n, + }, + &structs.TxnResult{ + Service: s, + }, + &structs.TxnResult{ + Service: s, + }, + &structs.TxnResult{ + Check: c, + }, + &structs.TxnResult{ + Check: c, + }, }, } - if !reflect.DeepEqual(out, expected) { - t.Fatalf("bad %v", out) - } + verify.Values(t, "", out, expected) } func TestTxn_Apply_ACLDeny(t *testing.T) { t.Parallel() + + require := require.New(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { c.ACLDatacenter = "dc1" c.ACLsEnabled = true @@ -167,15 +314,25 @@ func TestTxn_Apply_ACLDeny(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") - // Put in a key to read back. + // Set up some state to read back. state := s1.fsm.State() d := &structs.DirEntry{ Key: "nope", Value: []byte("hello"), } - if err := state.KVSSet(1, d); err != nil { - t.Fatalf("err: %v", err) + require.NoError(state.KVSSet(1, d)) + + node := &structs.Node{ + ID: types.NodeID(testNodeID), + Node: "nope", } + require.NoError(state.EnsureNode(2, node)) + + svc := structs.NodeService{ID: "nope", Service: "nope", Address: "127.0.0.1"} + require.NoError(state.EnsureService(3, "nope", &svc)) + + check := structs.HealthCheck{Node: "nope", CheckID: types.CheckID("nope")} + state.EnsureCheck(4, &check) // Create the ACL. var id string @@ -186,7 +343,7 @@ func TestTxn_Apply_ACLDeny(t *testing.T) { ACL: structs.ACL{ Name: "User token", Type: structs.ACLTokenTypeClient, - Rules: testListRules, + Rules: testTxnRules, }, WriteRequest: structs.WriteRequest{Token: "root"}, } @@ -296,6 +453,101 @@ func TestTxn_Apply_ACLDeny(t *testing.T) { }, }, }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeGet, + Node: structs.Node{ID: node.ID, Node: node.Node}, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeSet, + Node: structs.Node{ID: node.ID, Node: node.Node}, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeCAS, + Node: structs.Node{ID: node.ID, Node: node.Node}, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDelete, + Node: structs.Node{ID: node.ID, Node: node.Node}, + }, + }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeDeleteCAS, + Node: structs.Node{ID: node.ID, Node: node.Node}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceGet, + Node: "foo-node", + Service: svc, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceSet, + Node: "foo-node", + Service: svc, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceCAS, + Node: "foo-node", + Service: svc, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceDelete, + Node: "foo-node", + Service: svc, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceDeleteCAS, + Node: "foo-node", + Service: svc, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckGet, + Check: check, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckSet, + Check: check, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckCAS, + Check: check, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckDelete, + Check: check, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckDeleteCAS, + Check: check, + }, + }, }, WriteRequest: structs.WriteRequest{ Token: id, @@ -309,20 +561,55 @@ func TestTxn_Apply_ACLDeny(t *testing.T) { // Verify the transaction's return value. var expected structs.TxnResponse for i, op := range arg.Ops { - switch op.KV.Verb { - case api.KVGet, api.KVGetTree: - // These get filtered but won't result in an error. - - default: - expected.Errors = append(expected.Errors, &structs.TxnError{ - OpIndex: i, - What: acl.ErrPermissionDenied.Error(), - }) + switch { + case op.KV != nil: + switch op.KV.Verb { + case api.KVGet, api.KVGetTree: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } + case op.Node != nil: + switch op.Node.Verb { + case api.NodeGet: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } + case op.Service != nil: + switch op.Service.Verb { + case api.ServiceGet: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } + case op.Check != nil: + switch op.Check.Verb { + case api.CheckGet: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } } } - if !reflect.DeepEqual(out, expected) { - t.Fatalf("bad %v", out) - } + + verify.Values(t, "", out, expected) } func TestTxn_Apply_LockDelay(t *testing.T) { @@ -413,6 +700,9 @@ func TestTxn_Apply_LockDelay(t *testing.T) { func TestTxn_Read(t *testing.T) { t.Parallel() + + require := require.New(t) + dir1, s1 := testServer(t) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -431,6 +721,19 @@ func TestTxn_Read(t *testing.T) { t.Fatalf("err: %v", err) } + // Put in a node/check/service to read back. + node := &structs.Node{ + ID: types.NodeID(testNodeID), + Node: "foo", + } + require.NoError(state.EnsureNode(2, node)) + + svc := structs.NodeService{ID: "svc-foo", Service: "svc-foo", Address: "127.0.0.1"} + require.NoError(state.EnsureService(3, "foo", &svc)) + + check := structs.HealthCheck{Node: "foo", CheckID: types.CheckID("check-foo")} + state.EnsureCheck(4, &check) + // Do a super basic request. The state store test covers the details so // we just need to be sure that the transaction is sent correctly and // the results are converted appropriately. @@ -445,6 +748,25 @@ func TestTxn_Read(t *testing.T) { }, }, }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeGet, + Node: structs.Node{ID: node.ID, Node: node.Node}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceGet, + Node: "foo", + Service: svc, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckGet, + Check: check, + }, + }, }, } var out structs.TxnReadResponse @@ -453,6 +775,8 @@ func TestTxn_Read(t *testing.T) { } // Verify the transaction's return value. + svc.Weights = &structs.Weights{Passing: 1, Warning: 1} + svc.RaftIndex = structs.RaftIndex{CreateIndex: 3, ModifyIndex: 3} expected := structs.TxnReadResponse{ TxnResponse: structs.TxnResponse{ Results: structs.TxnResults{ @@ -466,19 +790,29 @@ func TestTxn_Read(t *testing.T) { }, }, }, + &structs.TxnResult{ + Node: node, + }, + &structs.TxnResult{ + Service: &svc, + }, + &structs.TxnResult{ + Check: &check, + }, }, }, QueryMeta: structs.QueryMeta{ KnownLeader: true, }, } - if !reflect.DeepEqual(out, expected) { - t.Fatalf("bad %v", out) - } + verify.Values(t, "", out, expected) } func TestTxn_Read_ACLDeny(t *testing.T) { t.Parallel() + + require := require.New(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { c.ACLDatacenter = "dc1" c.ACLsEnabled = true @@ -502,6 +836,19 @@ func TestTxn_Read_ACLDeny(t *testing.T) { t.Fatalf("err: %v", err) } + // Put in a node/check/service to read back. + node := &structs.Node{ + ID: types.NodeID(testNodeID), + Node: "nope", + } + require.NoError(state.EnsureNode(2, node)) + + svc := structs.NodeService{ID: "nope", Service: "nope", Address: "127.0.0.1"} + require.NoError(state.EnsureService(3, "nope", &svc)) + + check := structs.HealthCheck{Node: "nope", CheckID: types.CheckID("nope")} + state.EnsureCheck(4, &check) + // Create the ACL. var id string { @@ -511,7 +858,7 @@ func TestTxn_Read_ACLDeny(t *testing.T) { ACL: structs.ACL{ Name: "User token", Type: structs.ACLTokenTypeClient, - Rules: testListRules, + Rules: testTxnRules, }, WriteRequest: structs.WriteRequest{Token: "root"}, } @@ -557,6 +904,25 @@ func TestTxn_Read_ACLDeny(t *testing.T) { }, }, }, + &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: api.NodeGet, + Node: structs.Node{ID: node.ID, Node: node.Node}, + }, + }, + &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: api.ServiceGet, + Node: "foo", + Service: svc, + }, + }, + &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: api.CheckGet, + Check: check, + }, + }, }, QueryOptions: structs.QueryOptions{ Token: id, @@ -574,15 +940,51 @@ func TestTxn_Read_ACLDeny(t *testing.T) { }, } for i, op := range arg.Ops { - switch op.KV.Verb { - case api.KVGet, api.KVGetTree: - // These get filtered but won't result in an error. - - default: - expected.Errors = append(expected.Errors, &structs.TxnError{ - OpIndex: i, - What: acl.ErrPermissionDenied.Error(), - }) + switch { + case op.KV != nil: + switch op.KV.Verb { + case api.KVGet, api.KVGetTree: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } + case op.Node != nil: + switch op.Node.Verb { + case api.NodeGet: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } + case op.Service != nil: + switch op.Service.Verb { + case api.ServiceGet: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } + case op.Check != nil: + switch op.Check.Verb { + case api.CheckGet: + // These get filtered but won't result in an error. + + default: + expected.Errors = append(expected.Errors, &structs.TxnError{ + OpIndex: i, + What: acl.ErrPermissionDenied.Error(), + }) + } } } if !reflect.DeepEqual(out, expected) { diff --git a/agent/http.go b/agent/http.go index 73e9925a2ca1..b8a80f580cdd 100644 --- a/agent/http.go +++ b/agent/http.go @@ -446,7 +446,18 @@ func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error) return err } } - return mapstructure.Decode(raw, out) + + decodeConf := &mapstructure.DecoderConfig{ + DecodeHook: mapstructure.StringToTimeDurationHookFunc(), + Result: &out, + } + + decoder, err := mapstructure.NewDecoder(decodeConf) + if err != nil { + return err + } + + return decoder.Decode(raw) } // setTranslateAddr is used to set the address translation header. This is only diff --git a/agent/structs/structs.go b/agent/structs/structs.go index c1bd34a9f95a..e1d9c0cdf494 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -2,6 +2,7 @@ package structs import ( "bytes" + "encoding/json" "fmt" "math/rand" "reflect" @@ -893,14 +894,72 @@ type HealthCheck struct { } type HealthCheckDefinition struct { - HTTP string `json:",omitempty"` - TLSSkipVerify bool `json:",omitempty"` - Header map[string][]string `json:",omitempty"` - Method string `json:",omitempty"` - TCP string `json:",omitempty"` - Interval api.ReadableDuration `json:",omitempty"` - Timeout api.ReadableDuration `json:",omitempty"` - DeregisterCriticalServiceAfter api.ReadableDuration `json:",omitempty"` + HTTP string `json:",omitempty"` + TLSSkipVerify bool `json:",omitempty"` + Header map[string][]string `json:",omitempty"` + Method string `json:",omitempty"` + TCP string `json:",omitempty"` + Interval time.Duration `json:",omitempty"` + Timeout time.Duration `json:",omitempty"` + DeregisterCriticalServiceAfter time.Duration `json:",omitempty"` +} + +func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) { + type Alias HealthCheckDefinition + exported := &struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Interval: d.Interval.String(), + Timeout: d.Timeout.String(), + DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(), + Alias: (*Alias)(d), + } + if d.Interval == 0 { + exported.Interval = "" + } + if d.Timeout == 0 { + exported.Timeout = "" + } + if d.DeregisterCriticalServiceAfter == 0 { + exported.DeregisterCriticalServiceAfter = "" + } + + return json.Marshal(exported) +} + +func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error { + type Alias HealthCheckDefinition + aux := &struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Alias: (*Alias)(d), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + var err error + if aux.Interval != "" { + if d.Interval, err = time.ParseDuration(aux.Interval); err != nil { + return err + } + } + if aux.Timeout != "" { + if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil { + return err + } + } + if aux.DeregisterCriticalServiceAfter != "" { + if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil { + return err + } + } + return nil } // IsSame checks if one HealthCheck is the same as another, without looking @@ -916,7 +975,8 @@ func (c *HealthCheck) IsSame(other *HealthCheck) bool { c.Output != other.Output || c.ServiceID != other.ServiceID || c.ServiceName != other.ServiceName || - !reflect.DeepEqual(c.ServiceTags, other.ServiceTags) { + !reflect.DeepEqual(c.ServiceTags, other.ServiceTags) || + !reflect.DeepEqual(c.Definition, other.Definition) { return false } diff --git a/agent/structs/txn.go b/agent/structs/txn.go index c3b814b636c2..87d247bd02a3 100644 --- a/agent/structs/txn.go +++ b/agent/structs/txn.go @@ -9,7 +9,7 @@ import ( ) // TxnKVOp is used to define a single operation on the KVS inside a -// transaction +// transaction. type TxnKVOp struct { Verb api.KVOp DirEnt DirEntry @@ -19,6 +19,40 @@ type TxnKVOp struct { // inside a transaction. type TxnKVResult *DirEntry +// TxnNodeOp is used to define a single operation on a node in the catalog inside +// a transaction. +type TxnNodeOp struct { + Verb api.NodeOp + Node Node +} + +// TxnNodeResult is used to define the result of a single operation on a node +// in the catalog inside a transaction. +type TxnNodeResult *Node + +// TxnServiceOp is used to define a single operation on a service in the catalog inside +// a transaction. +type TxnServiceOp struct { + Verb api.ServiceOp + Node string + Service NodeService +} + +// TxnServiceResult is used to define the result of a single operation on a service +// in the catalog inside a transaction. +type TxnServiceResult *NodeService + +// TxnCheckOp is used to define a single operation on a health check inside a +// transaction. +type TxnCheckOp struct { + Verb api.CheckOp + Check HealthCheck +} + +// TxnCheckResult is used to define the result of a single operation on a health +// check inside a transaction. +type TxnCheckResult *HealthCheck + // TxnKVOp is used to define a single operation on an Intention inside a // transaction. type TxnIntentionOp IntentionRequest @@ -28,6 +62,9 @@ type TxnIntentionOp IntentionRequest type TxnOp struct { KV *TxnKVOp Intention *TxnIntentionOp + Node *TxnNodeOp + Service *TxnServiceOp + Check *TxnCheckOp } // TxnOps is a list of operations within a transaction. @@ -75,7 +112,10 @@ type TxnErrors []*TxnError // TxnResult is used to define the result of a given operation inside a // transaction. Only one of the types should be filled out per entry. type TxnResult struct { - KV TxnKVResult + KV TxnKVResult `json:",omitempty"` + Node TxnNodeResult `json:",omitempty"` + Service TxnServiceResult `json:",omitempty"` + Check TxnCheckResult `json:",omitempty"` } // TxnResults is a list of TxnResult entries. diff --git a/agent/txn_endpoint.go b/agent/txn_endpoint.go index 4870b0327aca..d11958ebd8ff 100644 --- a/agent/txn_endpoint.go +++ b/agent/txn_endpoint.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/types" ) const ( @@ -48,9 +49,9 @@ func decodeValue(rawKV interface{}) error { return nil } -// fixupKVOp looks for non-nil KV operations and passes them on for +// fixupTxnOp looks for non-nil Txn operations and passes them on for // value conversion. -func fixupKVOp(rawOp interface{}) error { +func fixupTxnOp(rawOp interface{}) error { rawMap, ok := rawOp.(map[string]interface{}) if !ok { return fmt.Errorf("unexpected raw op type: %T", rawOp) @@ -67,15 +68,15 @@ func fixupKVOp(rawOp interface{}) error { return nil } -// fixupKVOps takes the raw decoded JSON and base64 decodes values in KV ops, +// fixupTxnOps takes the raw decoded JSON and base64 decodes values in Txn ops, // replacing them with byte arrays. -func fixupKVOps(raw interface{}) error { +func fixupTxnOps(raw interface{}) error { rawSlice, ok := raw.([]interface{}) if !ok { return fmt.Errorf("unexpected raw type: %t", raw) } for _, rawOp := range rawSlice { - if err := fixupKVOp(rawOp); err != nil { + if err := fixupTxnOp(rawOp); err != nil { return err } } @@ -100,7 +101,7 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st // decode it, we will return a 400 since we don't have enough context to // associate the error with a given operation. var ops api.TxnOps - if err := decodeBody(req, &ops, fixupKVOps); err != nil { + if err := decodeBody(req, &ops, fixupTxnOps); err != nil { resp.WriteHeader(http.StatusBadRequest) fmt.Fprintf(resp, "Failed to parse body: %v", err) return nil, 0, false @@ -123,7 +124,8 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st var writes int var netKVSize int for _, in := range ops { - if in.KV != nil { + switch { + case in.KV != nil: size := len(in.KV.Value) if size > maxKVSize { resp.WriteHeader(http.StatusRequestEntityTooLarge) @@ -152,6 +154,102 @@ func (s *HTTPServer) convertOps(resp http.ResponseWriter, req *http.Request) (st }, } opsRPC = append(opsRPC, out) + + case in.Node != nil: + if in.Node.Verb != api.NodeGet { + writes++ + } + + // Setup the default DC if not provided + if in.Node.Node.Datacenter == "" { + in.Node.Node.Datacenter = s.agent.config.Datacenter + } + + node := in.Node.Node + out := &structs.TxnOp{ + Node: &structs.TxnNodeOp{ + Verb: in.Node.Verb, + Node: structs.Node{ + ID: types.NodeID(node.ID), + Node: node.Node, + Address: node.Address, + Datacenter: node.Datacenter, + TaggedAddresses: node.TaggedAddresses, + Meta: node.Meta, + RaftIndex: structs.RaftIndex{ + ModifyIndex: node.ModifyIndex, + }, + }, + }, + } + opsRPC = append(opsRPC, out) + + case in.Service != nil: + if in.Service.Verb != api.ServiceGet { + writes++ + } + + svc := in.Service.Service + out := &structs.TxnOp{ + Service: &structs.TxnServiceOp{ + Verb: in.Service.Verb, + Node: in.Service.Node, + Service: structs.NodeService{ + ID: svc.ID, + Service: svc.Service, + Tags: svc.Tags, + Address: svc.Address, + Meta: svc.Meta, + Port: svc.Port, + Weights: &structs.Weights{ + Passing: svc.Weights.Passing, + Warning: svc.Weights.Warning, + }, + EnableTagOverride: svc.EnableTagOverride, + RaftIndex: structs.RaftIndex{ + ModifyIndex: svc.ModifyIndex, + }, + }, + }, + } + opsRPC = append(opsRPC, out) + + case in.Check != nil: + if in.Check.Verb != api.CheckGet { + writes++ + } + + check := in.Check.Check + out := &structs.TxnOp{ + Check: &structs.TxnCheckOp{ + Verb: in.Check.Verb, + Check: structs.HealthCheck{ + Node: check.Node, + CheckID: types.CheckID(check.CheckID), + Name: check.Name, + Status: check.Status, + Notes: check.Notes, + Output: check.Output, + ServiceID: check.ServiceID, + ServiceName: check.ServiceName, + ServiceTags: check.ServiceTags, + Definition: structs.HealthCheckDefinition{ + HTTP: check.Definition.HTTP, + TLSSkipVerify: check.Definition.TLSSkipVerify, + Header: check.Definition.Header, + Method: check.Definition.Method, + TCP: check.Definition.TCP, + Interval: check.Definition.Interval, + Timeout: check.Definition.Timeout, + DeregisterCriticalServiceAfter: check.Definition.DeregisterCriticalServiceAfter, + }, + RaftIndex: structs.RaftIndex{ + ModifyIndex: check.ModifyIndex, + }, + }, + }, + } + opsRPC = append(opsRPC, out) } } diff --git a/api/health.go b/api/health.go index eae6a01a8682..b3f6b41cf8a4 100644 --- a/api/health.go +++ b/api/health.go @@ -1,8 +1,10 @@ package api import ( + "encoding/json" "fmt" "strings" + "time" ) const ( @@ -36,6 +38,9 @@ type HealthCheck struct { ServiceTags []string Definition HealthCheckDefinition + + CreateIndex uint64 + ModifyIndex uint64 } // HealthCheckDefinition is used to store the details about @@ -46,9 +51,56 @@ type HealthCheckDefinition struct { Method string TLSSkipVerify bool TCP string - Interval ReadableDuration - Timeout ReadableDuration - DeregisterCriticalServiceAfter ReadableDuration + Interval time.Duration + Timeout time.Duration + DeregisterCriticalServiceAfter time.Duration +} + +func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) { + type Alias HealthCheckDefinition + return json.Marshal(&struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Interval: d.Interval.String(), + Timeout: d.Timeout.String(), + DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(), + Alias: (*Alias)(d), + }) +} + +func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error { + type Alias HealthCheckDefinition + aux := &struct { + Interval string + Timeout string + DeregisterCriticalServiceAfter string + *Alias + }{ + Alias: (*Alias)(d), + } + if err := json.Unmarshal(data, &aux); err != nil { + return err + } + var err error + if aux.Interval != "" { + if d.Interval, err = time.ParseDuration(aux.Interval); err != nil { + return err + } + } + if aux.Timeout != "" { + if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil { + return err + } + } + if aux.DeregisterCriticalServiceAfter != "" { + if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil { + return err + } + } + return nil } // HealthChecks is a collection of HealthCheck structs. diff --git a/api/health_test.go b/api/health_test.go index 7a9147350e8e..9b25b0f3c67f 100644 --- a/api/health_test.go +++ b/api/health_test.go @@ -213,9 +213,9 @@ func TestAPI_HealthChecks(t *testing.T) { if meta.LastIndex == 0 { r.Fatalf("bad: %v", meta) } - if got, want := out, checks; !verify.Values(t, "checks", got, want) { - r.Fatal("health.Checks failed") - } + checks[0].CreateIndex = out[0].CreateIndex + checks[0].ModifyIndex = out[0].ModifyIndex + verify.Values(r, "checks", out, checks) }) } diff --git a/api/kv.go b/api/kv.go index 97f51568559b..bd45a067c908 100644 --- a/api/kv.go +++ b/api/kv.go @@ -45,44 +45,6 @@ type KVPair struct { // KVPairs is a list of KVPair objects type KVPairs []*KVPair -// KVOp constants give possible operations available in a KVTxn. -type KVOp string - -const ( - KVSet KVOp = "set" - KVDelete KVOp = "delete" - KVDeleteCAS KVOp = "delete-cas" - KVDeleteTree KVOp = "delete-tree" - KVCAS KVOp = "cas" - KVLock KVOp = "lock" - KVUnlock KVOp = "unlock" - KVGet KVOp = "get" - KVGetTree KVOp = "get-tree" - KVCheckSession KVOp = "check-session" - KVCheckIndex KVOp = "check-index" - KVCheckNotExists KVOp = "check-not-exists" -) - -// KVTxnOp defines a single operation inside a transaction. -type KVTxnOp struct { - Verb KVOp - Key string - Value []byte - Flags uint64 - Index uint64 - Session string -} - -// KVTxnOps defines a set of operations to be performed inside a single -// transaction. -type KVTxnOps []*KVTxnOp - -// KVTxnResponse has the outcome of a transaction. -type KVTxnResponse struct { - Results []*KVPair - Errors TxnErrors -} - // KV is used to manipulate the K/V API type KV struct { c *Client @@ -300,121 +262,25 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption return res, qm, nil } -// TxnOp is the internal format we send to Consul. It's not specific to KV, -// though currently only KV operations are supported. -type TxnOp struct { - KV *KVTxnOp -} - -// TxnOps is a list of transaction operations. -type TxnOps []*TxnOp - -// TxnResult is the internal format we receive from Consul. -type TxnResult struct { - KV *KVPair -} - -// TxnResults is a list of TxnResult objects. -type TxnResults []*TxnResult - -// TxnError is used to return information about an operation in a transaction. -type TxnError struct { - OpIndex int - What string -} - -// TxnErrors is a list of TxnError objects. -type TxnErrors []*TxnError - -// TxnResponse is the internal format we receive from Consul. -type TxnResponse struct { - Results TxnResults - Errors TxnErrors -} - -// Txn is used to apply multiple KV operations in a single, atomic transaction. -// -// Note that Go will perform the required base64 encoding on the values -// automatically because the type is a byte slice. Transactions are defined as a -// list of operations to perform, using the KVOp constants and KVTxnOp structure -// to define operations. If any operation fails, none of the changes are applied -// to the state store. Note that this hides the internal raw transaction interface -// and munges the input and output types into KV-specific ones for ease of use. -// If there are more non-KV operations in the future we may break out a new -// transaction API client, but it will be easy to keep this KV-specific variant -// supported. -// -// Even though this is generally a write operation, we take a QueryOptions input -// and return a QueryMeta output. If the transaction contains only read ops, then -// Consul will fast-path it to a different endpoint internally which supports -// consistency controls, but not blocking. If there are write operations then -// the request will always be routed through raft and any consistency settings -// will be ignored. -// -// Here's an example: -// -// ops := KVTxnOps{ -// &KVTxnOp{ -// Verb: KVLock, -// Key: "test/lock", -// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e", -// Value: []byte("hello"), -// }, -// &KVTxnOp{ -// Verb: KVGet, -// Key: "another/key", -// }, -// } -// ok, response, _, err := kv.Txn(&ops, nil) -// -// If there is a problem making the transaction request then an error will be -// returned. Otherwise, the ok value will be true if the transaction succeeded -// or false if it was rolled back. The response is a structured return value which -// will have the outcome of the transaction. Its Results member will have entries -// for each operation. Deleted keys will have a nil entry in the, and to save -// space, the Value of each key in the Results will be nil unless the operation -// is a KVGet. If the transaction was rolled back, the Errors member will have -// entries referencing the index of the operation that failed along with an error -// message. +// The Txn function has been deprecated from the KV object; please see the Txn +// object for more information about Transactions. func (k *KV) Txn(txn KVTxnOps, q *QueryOptions) (bool, *KVTxnResponse, *QueryMeta, error) { - r := k.c.newRequest("PUT", "/v1/txn") - r.setQueryOptions(q) - - // Convert into the internal format since this is an all-KV txn. - ops := make(TxnOps, 0, len(txn)) - for _, kvOp := range txn { - ops = append(ops, &TxnOp{KV: kvOp}) + var ops TxnOps + for _, op := range txn { + ops = append(ops, &TxnOp{KV: op}) } - r.obj = ops - rtt, resp, err := k.c.doRequest(r) + + respOk, txnResp, qm, err := k.c.txn(ops, q) if err != nil { return false, nil, nil, err } - defer resp.Body.Close() - qm := &QueryMeta{} - parseQueryMeta(resp, qm) - qm.RequestTime = rtt - - if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { - var txnResp TxnResponse - if err := decodeBody(resp, &txnResp); err != nil { - return false, nil, nil, err - } - - // Convert from the internal format. - kvResp := KVTxnResponse{ - Errors: txnResp.Errors, - } - for _, result := range txnResp.Results { - kvResp.Results = append(kvResp.Results, result.KV) - } - return resp.StatusCode == http.StatusOK, &kvResp, qm, nil + // Convert from the internal format. + kvResp := KVTxnResponse{ + Errors: txnResp.Errors, } - - var buf bytes.Buffer - if _, err := io.Copy(&buf, resp.Body); err != nil { - return false, nil, nil, fmt.Errorf("Failed to read response: %v", err) + for _, result := range txnResp.Results { + kvResp.Results = append(kvResp.Results, result.KV) } - return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String()) + return respOk, &kvResp, qm, nil } diff --git a/api/kv_test.go b/api/kv_test.go index 1ca6cd794a6e..f19cfc1a78ea 100644 --- a/api/kv_test.go +++ b/api/kv_test.go @@ -456,7 +456,7 @@ func TestAPI_ClientAcquireRelease(t *testing.T) { } } -func TestAPI_ClientTxn(t *testing.T) { +func TestAPI_KVClientTxn(t *testing.T) { t.Parallel() c, s := makeClient(t) defer s.Stop() diff --git a/api/txn.go b/api/txn.go new file mode 100644 index 000000000000..65d7a16ea040 --- /dev/null +++ b/api/txn.go @@ -0,0 +1,230 @@ +package api + +import ( + "bytes" + "fmt" + "io" + "net/http" +) + +// Txn is used to manipulate the Txn API +type Txn struct { + c *Client +} + +// Txn is used to return a handle to the K/V apis +func (c *Client) Txn() *Txn { + return &Txn{c} +} + +// TxnOp is the internal format we send to Consul. Currently only K/V and +// check operations are supported. +type TxnOp struct { + KV *KVTxnOp + Node *NodeTxnOp + Service *ServiceTxnOp + Check *CheckTxnOp +} + +// TxnOps is a list of transaction operations. +type TxnOps []*TxnOp + +// TxnResult is the internal format we receive from Consul. +type TxnResult struct { + KV *KVPair + Node *Node + Service *CatalogService + Check *HealthCheck +} + +// TxnResults is a list of TxnResult objects. +type TxnResults []*TxnResult + +// TxnError is used to return information about an operation in a transaction. +type TxnError struct { + OpIndex int + What string +} + +// TxnErrors is a list of TxnError objects. +type TxnErrors []*TxnError + +// TxnResponse is the internal format we receive from Consul. +type TxnResponse struct { + Results TxnResults + Errors TxnErrors +} + +// KVOp constants give possible operations available in a transaction. +type KVOp string + +const ( + KVSet KVOp = "set" + KVDelete KVOp = "delete" + KVDeleteCAS KVOp = "delete-cas" + KVDeleteTree KVOp = "delete-tree" + KVCAS KVOp = "cas" + KVLock KVOp = "lock" + KVUnlock KVOp = "unlock" + KVGet KVOp = "get" + KVGetTree KVOp = "get-tree" + KVCheckSession KVOp = "check-session" + KVCheckIndex KVOp = "check-index" + KVCheckNotExists KVOp = "check-not-exists" +) + +// KVTxnOp defines a single operation inside a transaction. +type KVTxnOp struct { + Verb KVOp + Key string + Value []byte + Flags uint64 + Index uint64 + Session string +} + +// KVTxnOps defines a set of operations to be performed inside a single +// transaction. +type KVTxnOps []*KVTxnOp + +// KVTxnResponse has the outcome of a transaction. +type KVTxnResponse struct { + Results []*KVPair + Errors TxnErrors +} + +// NodeOp constants give possible operations available in a transaction. +type NodeOp string + +const ( + NodeGet NodeOp = "get" + NodeSet NodeOp = "set" + NodeCAS NodeOp = "cas" + NodeDelete NodeOp = "delete" + NodeDeleteCAS NodeOp = "delete-cas" +) + +// NodeTxnOp defines a single operation inside a transaction. +type NodeTxnOp struct { + Verb NodeOp + Node Node +} + +// ServiceOp constants give possible operations available in a transaction. +type ServiceOp string + +const ( + ServiceGet ServiceOp = "get" + ServiceSet ServiceOp = "set" + ServiceCAS ServiceOp = "cas" + ServiceDelete ServiceOp = "delete" + ServiceDeleteCAS ServiceOp = "delete-cas" +) + +// ServiceTxnOp defines a single operation inside a transaction. +type ServiceTxnOp struct { + Verb ServiceOp + Node string + Service AgentService +} + +// CheckOp constants give possible operations available in a transaction. +type CheckOp string + +const ( + CheckGet CheckOp = "get" + CheckSet CheckOp = "set" + CheckCAS CheckOp = "cas" + CheckDelete CheckOp = "delete" + CheckDeleteCAS CheckOp = "delete-cas" +) + +// CheckTxnOp defines a single operation inside a transaction. +type CheckTxnOp struct { + Verb CheckOp + Check HealthCheck +} + +// Txn is used to apply multiple Consul operations in a single, atomic transaction. +// +// Note that Go will perform the required base64 encoding on the values +// automatically because the type is a byte slice. Transactions are defined as a +// list of operations to perform, using the different fields in the TxnOp structure +// to define operations. If any operation fails, none of the changes are applied +// to the state store. +// +// Even though this is generally a write operation, we take a QueryOptions input +// and return a QueryMeta output. If the transaction contains only read ops, then +// Consul will fast-path it to a different endpoint internally which supports +// consistency controls, but not blocking. If there are write operations then +// the request will always be routed through raft and any consistency settings +// will be ignored. +// +// Here's an example: +// +// ops := KVTxnOps{ +// &KVTxnOp{ +// Verb: KVLock, +// Key: "test/lock", +// Session: "adf4238a-882b-9ddc-4a9d-5b6758e4159e", +// Value: []byte("hello"), +// }, +// &KVTxnOp{ +// Verb: KVGet, +// Key: "another/key", +// }, +// &CheckTxnOp{ +// Verb: CheckSet, +// HealthCheck: HealthCheck{ +// Node: "foo", +// CheckID: "redis:a", +// Name: "Redis Health Check", +// Status: "passing", +// }, +// } +// } +// ok, response, _, err := kv.Txn(&ops, nil) +// +// If there is a problem making the transaction request then an error will be +// returned. Otherwise, the ok value will be true if the transaction succeeded +// or false if it was rolled back. The response is a structured return value which +// will have the outcome of the transaction. Its Results member will have entries +// for each operation. For KV operations, Deleted keys will have a nil entry in the +// results, and to save space, the Value of each key in the Results will be nil +// unless the operation is a KVGet. If the transaction was rolled back, the Errors +// member will have entries referencing the index of the operation that failed +// along with an error message. +func (t *Txn) Txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) { + return t.c.txn(txn, q) +} + +func (c *Client) txn(txn TxnOps, q *QueryOptions) (bool, *TxnResponse, *QueryMeta, error) { + r := c.newRequest("PUT", "/v1/txn") + r.setQueryOptions(q) + + r.obj = txn + rtt, resp, err := c.doRequest(r) + if err != nil { + return false, nil, nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusConflict { + var txnResp TxnResponse + if err := decodeBody(resp, &txnResp); err != nil { + return false, nil, nil, err + } + + return resp.StatusCode == http.StatusOK, &txnResp, qm, nil + } + + var buf bytes.Buffer + if _, err := io.Copy(&buf, resp.Body); err != nil { + return false, nil, nil, fmt.Errorf("Failed to read response: %v", err) + } + return false, nil, nil, fmt.Errorf("Failed request: %s", buf.String()) +} diff --git a/api/txn_test.go b/api/txn_test.go new file mode 100644 index 000000000000..c164ebe92b50 --- /dev/null +++ b/api/txn_test.go @@ -0,0 +1,247 @@ +package api + +import ( + "strings" + "testing" + "time" + + "github.com/hashicorp/go-uuid" + + "github.com/pascaldekloe/goe/verify" + "github.com/stretchr/testify/require" +) + +func TestAPI_ClientTxn(t *testing.T) { + t.Parallel() + require := require.New(t) + c, s := makeClient(t) + defer s.Stop() + + session := c.Session() + txn := c.Txn() + + // Set up a test service and health check. + nodeID, err := uuid.GenerateUUID() + require.NoError(err) + + catalog := c.Catalog() + reg := &CatalogRegistration{ + ID: nodeID, + Node: "foo", + Address: "2.2.2.2", + Service: &AgentService{ + ID: "foo1", + Service: "foo", + }, + Check: &AgentCheck{ + CheckID: "bar", + Status: "critical", + Definition: HealthCheckDefinition{ + TCP: "1.1.1.1", + Interval: 5 * time.Second, + }, + }, + } + _, err = catalog.Register(reg, nil) + require.NoError(err) + + node, _, err := catalog.Node("foo", nil) + require.NoError(err) + require.Equal(nodeID, node.Node.ID) + + // Make a session. + id, _, err := session.CreateNoChecks(nil, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + defer session.Destroy(id, nil) + + // Acquire and get the key via a transaction, but don't supply a valid + // session. + key := testKey() + value := []byte("test") + ops := TxnOps{ + &TxnOp{ + KV: &KVTxnOp{ + Verb: KVLock, + Key: key, + Value: value, + }, + }, + &TxnOp{ + KV: &KVTxnOp{ + Verb: KVGet, + Key: key, + }, + }, + &TxnOp{ + Node: &NodeTxnOp{ + Verb: NodeGet, + Node: Node{Node: "foo"}, + }, + }, + &TxnOp{ + Service: &ServiceTxnOp{ + Verb: ServiceGet, + Node: "foo", + Service: AgentService{ID: "foo1"}, + }, + }, + &TxnOp{ + Check: &CheckTxnOp{ + Verb: CheckGet, + Check: HealthCheck{Node: "foo", CheckID: "bar"}, + }, + }, + } + ok, ret, _, err := txn.Txn(ops, nil) + if err != nil { + t.Fatalf("err: %v", err) + } else if ok { + t.Fatalf("transaction should have failed") + } + + if ret == nil || len(ret.Errors) != 2 || len(ret.Results) != 0 { + t.Fatalf("bad: %v", ret.Errors[2]) + } + if ret.Errors[0].OpIndex != 0 || + !strings.Contains(ret.Errors[0].What, "missing session") || + !strings.Contains(ret.Errors[1].What, "doesn't exist") { + t.Fatalf("bad: %v", ret.Errors[0]) + } + + // Now poke in a real session and try again. + ops[0].KV.Session = id + ok, ret, _, err = txn.Txn(ops, nil) + if err != nil { + t.Fatalf("err: %v", err) + } else if !ok { + t.Fatalf("transaction failure") + } + + if ret == nil || len(ret.Errors) != 0 || len(ret.Results) != 5 { + t.Fatalf("bad: %v", ret) + } + expected := TxnResults{ + &TxnResult{ + KV: &KVPair{ + Key: key, + Session: id, + LockIndex: 1, + CreateIndex: ret.Results[0].KV.CreateIndex, + ModifyIndex: ret.Results[0].KV.ModifyIndex, + }, + }, + &TxnResult{ + KV: &KVPair{ + Key: key, + Session: id, + Value: []byte("test"), + LockIndex: 1, + CreateIndex: ret.Results[1].KV.CreateIndex, + ModifyIndex: ret.Results[1].KV.ModifyIndex, + }, + }, + &TxnResult{ + Node: &Node{ + ID: nodeID, + Node: "foo", + Address: "2.2.2.2", + Datacenter: "dc1", + CreateIndex: ret.Results[2].Node.CreateIndex, + ModifyIndex: ret.Results[2].Node.CreateIndex, + }, + }, + &TxnResult{ + Service: &CatalogService{ + ID: "foo1", + CreateIndex: ret.Results[3].Service.CreateIndex, + ModifyIndex: ret.Results[3].Service.CreateIndex, + }, + }, + &TxnResult{ + Check: &HealthCheck{ + Node: "foo", + CheckID: "bar", + Status: "critical", + Definition: HealthCheckDefinition{ + TCP: "1.1.1.1", + Interval: 5 * time.Second, + }, + CreateIndex: ret.Results[4].Check.CreateIndex, + ModifyIndex: ret.Results[4].Check.CreateIndex, + }, + }, + } + verify.Values(t, "", ret.Results, expected) + + // Run a read-only transaction. + ops = TxnOps{ + &TxnOp{ + KV: &KVTxnOp{ + Verb: KVGet, + Key: key, + }, + }, + &TxnOp{ + Node: &NodeTxnOp{ + Verb: NodeGet, + Node: Node{ID: s.Config.NodeID, Node: s.Config.NodeName}, + }, + }, + } + ok, ret, _, err = txn.Txn(ops, nil) + if err != nil { + t.Fatalf("err: %v", err) + } else if !ok { + t.Fatalf("transaction failure") + } + + expected = TxnResults{ + &TxnResult{ + KV: &KVPair{ + Key: key, + Session: id, + Value: []byte("test"), + LockIndex: 1, + CreateIndex: ret.Results[0].KV.CreateIndex, + ModifyIndex: ret.Results[0].KV.ModifyIndex, + }, + }, + &TxnResult{ + Node: &Node{ + ID: s.Config.NodeID, + Node: s.Config.NodeName, + Address: "127.0.0.1", + Datacenter: "dc1", + TaggedAddresses: map[string]string{ + "lan": s.Config.Bind, + "wan": s.Config.Bind, + }, + Meta: map[string]string{"consul-network-segment": ""}, + CreateIndex: ret.Results[1].Node.CreateIndex, + ModifyIndex: ret.Results[1].Node.ModifyIndex, + }, + }, + } + verify.Values(t, "", ret.Results, expected) + + // Sanity check using the regular GET API. + kv := c.KV() + pair, meta, err := kv.Get(key, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + if pair == nil { + t.Fatalf("expected value: %#v", pair) + } + if pair.LockIndex != 1 { + t.Fatalf("Expected lock: %v", pair) + } + if pair.Session != id { + t.Fatalf("Expected lock: %v", pair) + } + if meta.LastIndex == 0 { + t.Fatalf("unexpected value: %#v", meta) + } +} diff --git a/website/source/api/txn.html.md b/website/source/api/txn.html.md index 17bce978f57b..23e28b0d20f6 100644 --- a/website/source/api/txn.html.md +++ b/website/source/api/txn.html.md @@ -3,19 +3,18 @@ layout: api page_title: Transaction - HTTP API sidebar_current: api-txn description: |- - The /txn endpoints manage updates or fetches of multiple keys inside a single, - atomic transaction. + The /txn endpoint manages multiple operations in Consul, including catalog updates and fetches of multiple KV entries inside a single, atomic transaction. --- # Transactions HTTP API -The `/txn` endpoints manage updates or fetches of multiple keys inside a single, -atomic transaction. It is important to note that each datacenter has its own KV -store, and there is no built-in replication between datacenters. +The `/txn` endpoint manages multiple operations in Consul, including catalog +updates and fetches of multiple KV entries inside a single, atomic +transaction. ## Create Transaction -This endpoint permits submitting a list of operations to apply to the KV store +This endpoint permits submitting a list of operations to apply to Consul inside of a transaction. If any operation fails, the transaction is rolled back and none of the changes are applied. @@ -43,7 +42,7 @@ The table below shows this endpoint's support for | Blocking Queries | Consistency Modes | Agent Caching | ACL Required | | ---------------- | ----------------- | ------------- | ------------ | -| `NO` | `all`1 | `none` | `key:read,key:write`2 | +| `NO` | `all`1 | `none` | `key:read,key:write`
`node:read,node:write`
`service:read,service:write`2 1 For read-only transactions
@@ -55,7 +54,7 @@ The table below shows this endpoint's support for to the datacenter of the agent being queried. This is specified as part of the URL as a query parameter. -- `KV` is the only available operation type, though other types may be added in the future. +- `KV` operations have the following fields: - `Verb` `(string: )` - Specifies the type of operation to perform. Please see the table below for available verbs. @@ -74,7 +73,32 @@ The table below shows this endpoint's support for - `Session` `(string: "")` - Specifies a session. See the table below for more information. + +- `Node` operations have the following fields: + - `Verb` `(string: )` - Specifies the type of operation to perform. + + - `Node` `(Node: )` - Specifies the node information to use + for the operation. See the [catalog endpoint](/api/catalog.html#parameters) for the fields in this object. Note the only the node can be specified here, not any services or checks - separate service or check operations must be used for those. + +- `Service` operations have the following fields: + + - `Verb` `(string: )` - Specifies the type of operation to perform. + + - `Node` `(string: )` = Specifies the name of the node to use for + this service operation. + + - `Service` `(Service: )` - Specifies the service instance information to use + for the operation. See the [catalog endpoint](/api/catalog.html#parameters) for the fields in this object. + +- `Check` operations have the following fields: + + - `Verb` `(string: )` - Specifies the type of operation to perform. + + - `Service` `(Service: )` - Specifies the check to use + for the operation. See the [catalog endpoint](/api/catalog.html#parameters) for the fields in this object. + + Please see the table below for available verbs. ### Sample Payload The body of the request should be a list of operations to perform inside the @@ -91,6 +115,48 @@ atomic transaction. Up to 64 operations may be present in a single transaction. "Index": , "Session": "" } + }, + { + "Node": { + "Verb": "set", + "Node": { + "ID": "67539c9d-b948-ba67-edd4-d07a676d6673", + "Node": "bar", + "Address": "192.168.0.1", + "Datacenter": "dc1", + "Meta": { + "instance_type": "m2.large" + } + } + } + }, + { + "Service": { + "Verb": "delete", + "Node": "foo", + "Service": { + "ID": "db1" + } + } + }, + { + "Check": { + "Verb": "cas", + "Check": { + "Node": "bar", + "CheckID": "service:web1", + "Name": "Web HTTP Check", + "Status": "critical", + "ServiceID": "web1", + "ServiceName": "web", + "ServiceTags": null, + "Definition": { + "HTTP": "http://localhost:8080", + "Interval": "10s" + }, + "ModifyIndex": 22 + } + } } ] ``` @@ -123,6 +189,39 @@ look like this: "CreateIndex": , "ModifyIndex": } + }, + { + "Node": { + "ID": "67539c9d-b948-ba67-edd4-d07a676d6673", + "Node": "bar", + "Address": "192.168.0.1", + "Datacenter": "dc1", + "TaggedAddresses": null, + "Meta": { + "instance_type": "m2.large" + }, + "CreateIndex": 32, + "ModifyIndex": 32 + } + }, + { + "Check": { + "Node": "bar", + "CheckID": "service:web1", + "Name": "Web HTTP Check", + "Status": "critical", + "Notes": "", + "Output": "", + "ServiceID": "web1", + "ServiceName": "web", + "ServiceTags": null, + "Definition": { + "HTTP": "http://localhost:8080", + "Interval": "10s" + }, + "CreateIndex": 22, + "ModifyIndex": 35 + } } ], "Errors": [ @@ -130,12 +229,13 @@ look like this: "OpIndex": , "What": "" }, + ... ] } ``` - `Results` has entries for some operations if the transaction was successful. - To save space, the `Value` will be `null` for any `Verb` other than "get" or + To save space, the `Value` for KV results will be `null` for any `Verb` other than "get" or "get-tree". Like the `/v1/kv/` endpoint, `Value` will be Base64-encoded if it is present. Also, no result entries will be added for verbs that delete keys. @@ -145,10 +245,12 @@ look like this: transaction, and `What` is a string with an error message about why that operation failed. -### Table of Operations +### Tables of Operations + +#### KV Operations -The following table summarizes the available verbs and the fields that apply to -that operation ("X" means a field is required and "O" means it is optional): +The following tables summarize the available verbs and the fields that apply to +those operations ("X" means a field is required and "O" means it is optional): | Verb | Operation | Key | Value | Flags | Index | Session | | ------------------ | -------------------------------------------- | :--: | :---: | :---: | :---: | :-----: | @@ -164,3 +266,42 @@ that operation ("X" means a field is required and "O" means it is optional): | `delete` | Delete the key | `x` | | | | | | `delete-tree` | Delete all keys with a prefix | `x` | | | | | | `delete-cas` | Delete, but with CAS semantics | `x` | | | `x` | | + +#### Node Operations + +Node operations act on an individual node and require either a Node ID or name, giving precedence +to the ID if both are set. Delete operations will not return a result on success. + +| Verb | Operation | +| ------------------ | -------------------------------------------- | +| `set` | Sets the node to the given state | +| `cas` | Sets, but with CAS semantics using the given ModifyIndex | +| `get` | Get the node, fails if it does not exist | +| `delete` | Delete the node | +| `delete-cas` | Delete, but with CAS semantics | + +#### Service Operations + +Service operations act on an individual service instance on the given node name. Both a node name +and valid service name are required. Delete operations will not return a result on success. + +| Verb | Operation | +| ------------------ | -------------------------------------------- | +| `set` | Sets the service to the given state | +| `cas` | Sets, but with CAS semantics using the given ModifyIndex | +| `get` | Get the service, fails if it does not exist | +| `delete` | Delete the service | +| `delete-cas` | Delete, but with CAS semantics | + +#### Check Operations + +Check operations act on an individual health check instance on the given node name. Both a node name +and valid check ID are required. Delete operations will not return a result on success. + +| Verb | Operation | +| ------------------ | -------------------------------------------- | +| `set` | Sets the health check to the given state | +| `cas` | Sets, but with CAS semantics using the given ModifyIndex | +| `get` | Get the check, fails if it does not exist | +| `delete` | Delete the check | +| `delete-cas` | Delete, but with CAS semantics | \ No newline at end of file