Skip to content

Commit

Permalink
Backport of fix(txn): validate verbs into release/1.19.x (#21520)
Browse files Browse the repository at this point in the history
* backport of commit 5807c2c

* backport of commit 8ef60b8

---------

Co-authored-by: DanStough <dan.stough@hashicorp.com>
  • Loading branch information
hc-github-team-consul-core and DanStough authored Jul 5, 2024
1 parent 3c7555c commit ad6931c
Show file tree
Hide file tree
Showing 6 changed files with 350 additions and 30 deletions.
3 changes: 3 additions & 0 deletions .changelog/21519.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
txn: Fix a bug where mismatched Consul server versions could result in undetected data loss for when using newer Transaction verbs.
```
7 changes: 5 additions & 2 deletions agent/consul/kvs_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz resolver.Result, op api
return false, fmt.Errorf("Must provide key")
}

// Apply the ACL policy if any.
// Apply the ACL policy if any, and validate operation.
// enumcover:api.KVOp
switch op {
case api.KVDeleteTree:
var authzContext acl.AuthorizerContext
Expand All @@ -66,13 +67,15 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz resolver.Result, op api
return false, err
}

default:
case api.KVCheckNotExists, api.KVUnlock, api.KVLock, api.KVCAS, api.KVDeleteCAS, api.KVDelete, api.KVSet:
var authzContext acl.AuthorizerContext
dirEnt.FillAuthzContext(&authzContext)

if err := authz.ToAllowAuthorizer().KeyWriteAllowed(dirEnt.Key, &authzContext); err != nil {
return false, err
}
default:
return false, fmt.Errorf("unknown KV operation: %s", op)
}

// If this is a lock, we must check for a lock-delay. Since lock-delay
Expand Down
56 changes: 43 additions & 13 deletions agent/consul/state/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,31 @@
package state

import (
"errors"
"fmt"

"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
)

type UnsupportedFSMApplyPanicError struct {
Wrapped error
}

func (e *UnsupportedFSMApplyPanicError) Unwrap() error {
return e.Wrapped
}

func (e *UnsupportedFSMApplyPanicError) Error() string {
return e.Wrapped.Error()
}

// txnKVS handles all KV-related operations.
func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.TxnResults, error) {
var entry *structs.DirEntry
var err error

// enumcover: api.KVOp
switch op.Verb {
case api.KVSet:
entry = &op.DirEnt
Expand Down Expand Up @@ -95,7 +109,7 @@ func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.Tx
}

default:
err = fmt.Errorf("unknown KV verb %q", op.Verb)
err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown KV verb %q", op.Verb)}
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -123,11 +137,12 @@ func (s *Store) txnKVS(tx WriteTxn, idx uint64, op *structs.TxnKVOp) (structs.Tx
func txnSession(tx WriteTxn, idx uint64, op *structs.TxnSessionOp) error {
var err error

// enumcover: api.SessionOp
switch op.Verb {
case api.SessionDelete:
err = sessionDeleteWithSession(tx, &op.Session, idx)
default:
err = fmt.Errorf("unknown Session verb %q", op.Verb)
return &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown session verb %q", op.Verb)}
}
if err != nil {
return fmt.Errorf("failed to delete session: %v", err)
Expand All @@ -146,11 +161,17 @@ func txnLegacyIntention(tx WriteTxn, idx uint64, op *structs.TxnIntentionOp) err
case structs.IntentionOpDelete:
return legacyIntentionDeleteTxn(tx, idx, op.Intention.ID)
case structs.IntentionOpDeleteAll:
fallthrough // deliberately not available via this api
// deliberately not available via this api
return fmt.Errorf("Intention op not supported %q", op.Op)
case structs.IntentionOpUpsert:
fallthrough // deliberately not available via this api
// deliberately not available via this api
return fmt.Errorf("Intention op not supported %q", op.Op)
default:
return fmt.Errorf("unknown Intention op %q", op.Op)
// If we've gotten to this point, the unknown verb has slipped by
// endpoint validation. This means it could be a mismatch in Server versions
// that are sending known verbs as part of Raft logs. We panic rather than silently
// swallowing the error during Raft Apply.
panic(fmt.Sprintf("unknown Intention op %q", op.Op))
}
}

Expand Down Expand Up @@ -202,7 +223,7 @@ func (s *Store) txnNode(tx WriteTxn, idx uint64, op *structs.TxnNodeOp) (structs
}

default:
err = fmt.Errorf("unknown Node verb %q", op.Verb)
err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown Node verb %q", op.Verb)}
}
if err != nil {
return nil, err
Expand Down Expand Up @@ -271,7 +292,7 @@ func (s *Store) txnService(tx WriteTxn, idx uint64, op *structs.TxnServiceOp) (s
return nil, err

default:
return nil, fmt.Errorf("unknown Service verb %q", op.Verb)
return nil, &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown Service verb %q", op.Verb)}
}
}

Expand Down Expand Up @@ -326,7 +347,7 @@ func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (struc
}

default:
err = fmt.Errorf("unknown Check verb %q", op.Verb)
err = &UnsupportedFSMApplyPanicError{fmt.Errorf("unknown check verb %q", op.Verb)}
}
if err != nil {
return nil, err
Expand All @@ -352,7 +373,7 @@ func (s *Store) txnCheck(tx WriteTxn, idx uint64, op *structs.TxnCheckOp) (struc
// txnDispatch runs the given operations inside the state store transaction.
func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (structs.TxnResults, structs.TxnErrors) {
results := make(structs.TxnResults, 0, len(ops))
errors := make(structs.TxnErrors, 0, len(ops))
errs := make(structs.TxnErrors, 0, len(ops))
for i, op := range ops {
var ret structs.TxnResults
var err error
Expand All @@ -374,24 +395,33 @@ func (s *Store) txnDispatch(tx WriteTxn, idx uint64, ops structs.TxnOps) (struct
// compatibility with pre-1.9.0 raft logs and during upgrades.
err = txnLegacyIntention(tx, idx, op.Intention)
default:
err = fmt.Errorf("no operation specified")
panic("no operation specified")
}

// Accumulate the results.
results = append(results, ret...)

var panicErr *UnsupportedFSMApplyPanicError
if errors.As(err, &panicErr) {
// If we've gotten to this point, the unknown verb has slipped by
// endpoint validation. This means it could be a mismatch in Server versions
// that are sending known verbs as part of Raft logs. We panic rather than silently
// swallowing the error during Raft Apply. See NET-9016 for historical context.
panic(panicErr.Wrapped)
}

// Capture any error along with the index of the operation that
// failed.
if err != nil {
errors = append(errors, &structs.TxnError{
errs = append(errs, &structs.TxnError{
OpIndex: i,
What: err.Error(),
})
}
}

if len(errors) > 0 {
return nil, errors
if len(errs) > 0 {
return nil, errs
}

return results, nil
Expand Down
70 changes: 61 additions & 9 deletions agent/consul/state/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,14 +1058,6 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
},
},
},
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: "nope",
DirEnt: structs.DirEntry{
Key: "foo/delete",
},
},
},
}
results, errors := s.TxnRW(7, ops)
if len(errors) != len(ops) {
Expand All @@ -1086,7 +1078,6 @@ func TestStateStore_Txn_KVS_Rollback(t *testing.T) {
`key "nope" doesn't exist`,
"current modify index",
`key "nope" doesn't exist`,
"unknown KV verb",
}
if len(errors) != len(expected) {
t.Fatalf("bad len: %d != %d", len(errors), len(expected))
Expand Down Expand Up @@ -1415,3 +1406,64 @@ func TestStateStore_Txn_KVS_ModifyIndexes(t *testing.T) {
}
}
}

// TestStateStore_UnknownTxnOperationsPanic validates that unknown txn operations panic.
// If we error in this case this is from an FSM Apply, the state store of this agent could potentially be out of
// sync with other agents that applied the operation. In the case of responding to a local endpoint, we require
// that the operation type be validated prior to being sent to the state store.
// See NET-9016 for historical context.
func TestStateStore_UnknownTxnOperationsPanic(t *testing.T) {
s := testStateStore(t)

testCases := []structs.TxnOps{
{
&structs.TxnOp{
KV: &structs.TxnKVOp{
Verb: "sand-the-floor",
DirEnt: structs.DirEntry{
Key: "foo/a",
},
},
},
},
{
&structs.TxnOp{
Node: &structs.TxnNodeOp{
Verb: "wax-the-car",
},
},
},
{
&structs.TxnOp{
Service: &structs.TxnServiceOp{
Verb: "paint-the-house",
},
},
},
{
&structs.TxnOp{
Check: &structs.TxnCheckOp{
Verb: "paint-the-fence",
},
},
},
{
&structs.TxnOp{
Session: &structs.TxnSessionOp{
Verb: "sweep-the-knee",
},
},
},
{
&structs.TxnOp{
Intention: &structs.TxnIntentionOp{ // nolint:staticcheck // SA1019 intentional use of deprecated field
Op: "flying-crane-kick",
},
},
},
}

for _, tc := range testCases {
require.Panics(t, func() { s.TxnRW(3, tc) })
}
}
Loading

0 comments on commit ad6931c

Please sign in to comment.