Skip to content

Commit

Permalink
api: add support for new txn operations
Browse files Browse the repository at this point in the history
  • Loading branch information
kyhavlov committed Dec 12, 2018
1 parent de4dbf5 commit 67bac7a
Show file tree
Hide file tree
Showing 14 changed files with 567 additions and 63 deletions.
15 changes: 0 additions & 15 deletions agent/consul/acl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,11 +1361,6 @@ func vetNodeTxnOp(op *structs.TxnNodeOp, rule acl.Authorizer) error {

node := op.Node

// Filtering for GETs is done on the output side.
if op.Verb == api.NodeGet {
return nil
}

n := &api.Node{
Node: node.Node,
ID: string(node.ID),
Expand Down Expand Up @@ -1399,11 +1394,6 @@ func vetServiceTxnOp(op *structs.TxnServiceOp, rule acl.Authorizer) error {

service := op.Service

// Filtering for GETs is done on the output side.
if op.Verb == api.ServiceGet {
return nil
}

n := &api.Node{Node: op.Node}
svc := &api.AgentService{
ID: service.ID,
Expand Down Expand Up @@ -1431,11 +1421,6 @@ func vetCheckTxnOp(op *structs.TxnCheckOp, rule acl.Authorizer) error {
return nil
}

// Filtering for GETs is done on the output side.
if op.Verb == api.CheckGet {
return nil
}

n := &api.Node{Node: op.Check.Node}
svc := &api.AgentService{
ID: op.Check.ServiceID,
Expand Down
14 changes: 11 additions & 3 deletions agent/consul/state/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,14 +491,22 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
idx := maxIndexTxn(tx, "nodes")

// Retrieve the node from the state store
node, err := tx.First("nodes", "id", id)
node, err := getNodeTxn(tx, id)
if err != nil {
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
}
return idx, node, nil
}

func getNodeTxn(tx *memdb.Txn, nodeName string) (*structs.Node, error) {
node, err := tx.First("nodes", "id", nodeName)
if err != nil {
return nil, fmt.Errorf("node lookup failed: %s", err)
}
if node != nil {
return idx, node.(*structs.Node), nil
return node.(*structs.Node), nil
}
return idx, nil, nil
return nil, nil
}

func getNodeIDTxn(tx *memdb.Txn, id types.NodeID) (*structs.Node, error) {
Expand Down
12 changes: 11 additions & 1 deletion agent/consul/state/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,14 @@ func (s *Store) txnNode(tx *memdb.Txn, idx uint64, op *structs.TxnNodeOp) (struc

switch op.Verb {
case api.NodeGet:
entry, err = getNodeIDTxn(tx, op.Node.ID)
if op.Node.ID != "" {
entry, err = getNodeIDTxn(tx, op.Node.ID)
} else {
entry, err = getNodeTxn(tx, op.Node.Node)
}
if entry == nil && err == nil {
err = fmt.Errorf("node %q doesn't exist", op.Node.Node)
}

case api.NodeSet:
err = s.ensureNodeTxn(tx, idx, &op.Node)
Expand Down Expand Up @@ -188,6 +195,9 @@ func (s *Store) txnService(tx *memdb.Txn, idx uint64, op *structs.TxnServiceOp)
switch op.Verb {
case api.ServiceGet:
entry, err = s.nodeServiceTxn(tx, op.Node, op.Service.ID)
if entry == nil && err == nil {
err = fmt.Errorf("service %q on node %q doesn't exist", op.Service.ID, op.Node)
}

case api.ServiceSet:
err = s.ensureServiceTxn(tx, idx, op.Node, &op.Service)
Expand Down
15 changes: 10 additions & 5 deletions agent/consul/state/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,27 +279,32 @@ func TestStateStore_Txn_Service(t *testing.T) {
Service: "svc1",
Address: "1.1.1.1",
Port: 1111,
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 2,
ModifyIndex: 2,
},
Weights: &structs.Weights{Passing: 1, Warning: 1},
},
},
&structs.TxnResult{
Service: &structs.NodeService{
ID: "svc5",
ID: "svc5",
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 6,
ModifyIndex: 6,
},
},
},
&structs.TxnResult{
Service: &structs.NodeService{
ID: "svc2",
Tags: []string{"modified"},
ID: "svc2",
Tags: []string{"modified"},
Weights: &structs.Weights{Passing: 1, Warning: 1},
RaftIndex: structs.RaftIndex{
CreateIndex: 3,
ModifyIndex: 6,
},
Weights: &structs.Weights{Passing: 1, Warning: 1},
},
},
}
Expand Down
31 changes: 31 additions & 0 deletions agent/consul/txn_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)

// Txn endpoint is used to perform multi-object atomic transactions.
Expand Down Expand Up @@ -37,6 +38,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
})
}
case op.Node != nil:
// Skip the pre-apply checks if this is a GET.
if op.Node.Verb == api.NodeGet {
break
}

node := op.Node.Node
if err := nodePreApply(node.Node, string(node.ID)); err != nil {
errors = append(errors, &structs.TxnError{
Expand All @@ -54,6 +60,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
})
}
case op.Service != nil:
// Skip the pre-apply checks if this is a GET.
if op.Service.Verb == api.ServiceGet {
break
}

service := &op.Service.Service
if err := servicePreApply(service, nil); err != nil {
errors = append(errors, &structs.TxnError{
Expand All @@ -71,6 +82,11 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx
})
}
case op.Check != nil:
// Skip the pre-apply checks if this is a GET.
if op.Check.Verb == api.CheckGet {
break
}

checkPreApply(&op.Check.Check)

// Check that the token has permissions for the given operation.
Expand Down Expand Up @@ -103,6 +119,21 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error
return nil
}

str := ""
for _, op := range args.Ops {
switch {
case op.KV != nil:
str += fmt.Sprintf("%#v\n", op.KV)
case op.Node != nil:
str += fmt.Sprintf("%#v\n", op.Node)
case op.Service != nil:
str += fmt.Sprintf("%#v\n", op.Service)
case op.Check != nil:
str += fmt.Sprintf("%#v\n", op.Check)
}
}
//return fmt.Errorf("%s", str)

// Apply the update.
resp, err := t.srv.raftApply(structs.TxnRequestType, args)
if err != nil {
Expand Down
13 changes: 12 additions & 1 deletion agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,18 @@ func decodeBody(req *http.Request, out interface{}, cb func(interface{}) error)
return err
}
}
return mapstructure.Decode(raw, out)

decodeConf := &mapstructure.DecoderConfig{
DecodeHook: mapstructure.StringToTimeDurationHookFunc(),
Result: &out,
}

decoder, err := mapstructure.NewDecoder(decodeConf)
if err != nil {
return err
}

return decoder.Decode(raw)
}

// setTranslateAddr is used to set the address translation header. This is only
Expand Down
64 changes: 56 additions & 8 deletions agent/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package structs

import (
"bytes"
"encoding/json"
"fmt"
"math/rand"
"reflect"
Expand Down Expand Up @@ -893,14 +894,61 @@ type HealthCheck struct {
}

type HealthCheckDefinition struct {
HTTP string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
Header map[string][]string `json:",omitempty"`
Method string `json:",omitempty"`
TCP string `json:",omitempty"`
Interval api.ReadableDuration `json:",omitempty"`
Timeout api.ReadableDuration `json:",omitempty"`
DeregisterCriticalServiceAfter api.ReadableDuration `json:",omitempty"`
HTTP string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
Header map[string][]string `json:",omitempty"`
Method string `json:",omitempty"`
TCP string `json:",omitempty"`
Interval time.Duration `json:",omitempty"`
Timeout time.Duration `json:",omitempty"`
DeregisterCriticalServiceAfter time.Duration `json:",omitempty"`
}

func (d *HealthCheckDefinition) MarshalJSON() ([]byte, error) {
type Alias HealthCheckDefinition
return json.Marshal(&struct {
Interval string
Timeout string
DeregisterCriticalServiceAfter string
*Alias
}{
Interval: d.Interval.String(),
Timeout: d.Timeout.String(),
DeregisterCriticalServiceAfter: d.DeregisterCriticalServiceAfter.String(),
Alias: (*Alias)(d),
})
}

func (d *HealthCheckDefinition) UnmarshalJSON(data []byte) error {
type Alias HealthCheckDefinition
aux := &struct {
Interval string
Timeout string
DeregisterCriticalServiceAfter string
*Alias
}{
Alias: (*Alias)(d),
}
if err := json.Unmarshal(data, &aux); err != nil {
return err
}
var err error
if aux.Interval != "" {
if d.Interval, err = time.ParseDuration(aux.Interval); err != nil {
return err
}
}
if aux.Timeout != "" {
if d.Timeout, err = time.ParseDuration(aux.Timeout); err != nil {
return err
}
}
if aux.DeregisterCriticalServiceAfter != "" {
if d.DeregisterCriticalServiceAfter, err = time.ParseDuration(aux.DeregisterCriticalServiceAfter); err != nil {
return err
}
}
return nil
}

// IsSame checks if one HealthCheck is the same as another, without looking
Expand Down
Loading

0 comments on commit 67bac7a

Please sign in to comment.